Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/utils.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

2035 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7General utility functions. 

8""" 

9 

10 

11from decimal import Decimal 

12from io import StringIO 

13from itertools import zip_longest 

14from uuid import UUID 

15 

16import argparse 

17import array 

18import base64 

19import collections 

20import decimal 

21import difflib 

22import enum 

23import gzip 

24import inspect 

25import locale 

26import math 

27import os 

28import pickle 

29import random 

30import re 

31import shutil 

32import socket 

33import struct 

34import subprocess 

35import sys 

36import tempfile 

37import threading 

38import time 

39import traceback 

40import warnings 

41 

42from scapy.config import conf 

43from scapy.consts import DARWIN, OPENBSD, WINDOWS 

44from scapy.data import MTU, DLT_EN10MB, DLT_RAW 

45from scapy.compat import ( 

46 orb, 

47 plain_str, 

48 chb, 

49 hex_bytes, 

50 bytes_encode, 

51) 

52from scapy.error import ( 

53 log_interactive, 

54 log_runtime, 

55 Scapy_Exception, 

56 warning, 

57) 

58from scapy.pton_ntop import inet_pton 

59 

60# Typing imports 

61from typing import ( 

62 cast, 

63 Any, 

64 AnyStr, 

65 Callable, 

66 Dict, 

67 IO, 

68 Iterator, 

69 List, 

70 Optional, 

71 TYPE_CHECKING, 

72 Tuple, 

73 Type, 

74 Union, 

75 overload, 

76) 

77from scapy.compat import ( 

78 DecoratorCallable, 

79 Literal, 

80) 

81 

82if TYPE_CHECKING: 

83 from scapy.packet import Packet 

84 from scapy.plist import _PacketIterable, PacketList 

85 from scapy.supersocket import SuperSocket 

86 import prompt_toolkit 

87 

88_ByteStream = Union[IO[bytes], gzip.GzipFile] 

89 

90########### 

91# Tools # 

92########### 

93 

94 

95def issubtype(x, # type: Any 

96 t, # type: Union[type, str] 

97 ): 

98 # type: (...) -> bool 

99 """issubtype(C, B) -> bool 

100 

101 Return whether C is a class and if it is a subclass of class B. 

102 When using a tuple as the second argument issubtype(X, (A, B, ...)), 

103 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.). 

104 """ 

105 if isinstance(t, str): 

106 return t in (z.__name__ for z in x.__bases__) 

107 if isinstance(x, type) and issubclass(x, t): 

108 return True 

109 return False 

110 

111 

112_Decimal = Union[Decimal, int] 

113 

114 

115class EDecimal(Decimal): 

116 """Extended Decimal 

117 

118 This implements arithmetic and comparison with float for 

119 backward compatibility 

120 """ 

121 

122 def __add__(self, other, context=None): 

123 # type: (_Decimal, Any) -> EDecimal 

124 return EDecimal(Decimal.__add__(self, Decimal(other))) 

125 

126 def __radd__(self, other): 

127 # type: (_Decimal) -> EDecimal 

128 return EDecimal(Decimal.__add__(self, Decimal(other))) 

129 

130 def __sub__(self, other): 

131 # type: (_Decimal) -> EDecimal 

132 return EDecimal(Decimal.__sub__(self, Decimal(other))) 

133 

134 def __rsub__(self, other): 

135 # type: (_Decimal) -> EDecimal 

136 return EDecimal(Decimal.__rsub__(self, Decimal(other))) 

137 

138 def __mul__(self, other): 

139 # type: (_Decimal) -> EDecimal 

140 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

141 

142 def __rmul__(self, other): 

143 # type: (_Decimal) -> EDecimal 

144 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

145 

146 def __truediv__(self, other): 

147 # type: (_Decimal) -> EDecimal 

148 return EDecimal(Decimal.__truediv__(self, Decimal(other))) 

149 

150 def __floordiv__(self, other): 

151 # type: (_Decimal) -> EDecimal 

152 return EDecimal(Decimal.__floordiv__(self, Decimal(other))) 

153 

154 def __divmod__(self, other): 

155 # type: (_Decimal) -> Tuple[EDecimal, EDecimal] 

156 r = Decimal.__divmod__(self, Decimal(other)) 

157 return EDecimal(r[0]), EDecimal(r[1]) 

158 

159 def __mod__(self, other): 

160 # type: (_Decimal) -> EDecimal 

161 return EDecimal(Decimal.__mod__(self, Decimal(other))) 

162 

163 def __rmod__(self, other): 

164 # type: (_Decimal) -> EDecimal 

165 return EDecimal(Decimal.__rmod__(self, Decimal(other))) 

166 

167 def __pow__(self, other, modulo=None): 

168 # type: (_Decimal, Optional[_Decimal]) -> EDecimal 

169 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo)) 

170 

171 def __eq__(self, other): 

172 # type: (Any) -> bool 

173 if isinstance(other, Decimal): 

174 return super(EDecimal, self).__eq__(other) 

175 else: 

176 return bool(float(self) == other) 

177 

178 def normalize(self, precision): # type: ignore 

179 # type: (int) -> EDecimal 

180 with decimal.localcontext() as ctx: 

181 ctx.prec = precision 

182 return EDecimal(super(EDecimal, self).normalize(ctx)) 

183 

184 

185@overload 

186def get_temp_file(keep, autoext, fd): 

187 # type: (bool, str, Literal[True]) -> IO[bytes] 

188 pass 

189 

190 

191@overload 

192def get_temp_file(keep=False, autoext="", fd=False): 

193 # type: (bool, str, Literal[False]) -> str 

194 pass 

195 

196 

197def get_temp_file(keep=False, autoext="", fd=False): 

198 # type: (bool, str, bool) -> Union[IO[bytes], str] 

199 """Creates a temporary file. 

200 

201 :param keep: If False, automatically delete the file when Scapy exits. 

202 :param autoext: Suffix to add to the generated file name. 

203 :param fd: If True, this returns a file-like object with the temporary 

204 file opened. If False (default), this returns a file path. 

205 """ 

206 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext, 

207 delete=False) 

208 if not keep: 

209 conf.temp_files.append(f.name) 

210 

211 if fd: 

212 return f 

213 else: 

214 # Close the file so something else can take it. 

215 f.close() 

216 return f.name 

217 

218 

219def get_temp_dir(keep=False): 

220 # type: (bool) -> str 

221 """Creates a temporary file, and returns its name. 

222 

223 :param keep: If False (default), the directory will be recursively 

224 deleted when Scapy exits. 

225 :return: A full path to a temporary directory. 

226 """ 

227 

228 dname = tempfile.mkdtemp(prefix="scapy") 

229 

230 if not keep: 

231 conf.temp_files.append(dname) 

232 

233 return dname 

234 

235 

236def _create_fifo() -> Tuple[str, Any]: 

237 """Creates a temporary fifo. 

238 

239 You must then use open_fifo() on the server_fd once 

240 the client is connected to use it. 

241 

242 :returns: (client_file, server_fd) 

243 """ 

244 if WINDOWS: 

245 from scapy.arch.windows.structures import _get_win_fifo 

246 return _get_win_fifo() 

247 else: 

248 f = get_temp_file() 

249 os.unlink(f) 

250 os.mkfifo(f) 

251 return f, f 

252 

253 

254def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]: 

255 """Open the server_fd (see create_fifo) 

256 """ 

257 if WINDOWS: 

258 from scapy.arch.windows.structures import _win_fifo_open 

259 return _win_fifo_open(fd) 

260 else: 

261 return open(fd, mode) 

262 

263 

264def sane(x, color=False): 

265 # type: (AnyStr, bool) -> str 

266 r = "" 

267 for i in x: 

268 j = orb(i) 

269 if (j < 32) or (j >= 127): 

270 if color: 

271 r += conf.color_theme.not_printable(".") 

272 else: 

273 r += "." 

274 else: 

275 r += chr(j) 

276 return r 

277 

278 

279@conf.commands.register 

280def restart(): 

281 # type: () -> None 

282 """Restarts scapy""" 

283 if not conf.interactive or not os.path.isfile(sys.argv[0]): 

284 raise OSError("Scapy was not started from console") 

285 if WINDOWS: 

286 res_code = 1 

287 try: 

288 res_code = subprocess.call([sys.executable] + sys.argv) 

289 finally: 

290 os._exit(res_code) 

291 os.execv(sys.executable, [sys.executable] + sys.argv) 

292 

293 

294def lhex(x): 

295 # type: (Any) -> str 

296 from scapy.volatile import VolatileValue 

297 if isinstance(x, VolatileValue): 

298 return repr(x) 

299 if isinstance(x, int): 

300 return hex(x) 

301 if isinstance(x, tuple): 

302 return "(%s)" % ", ".join(lhex(v) for v in x) 

303 if isinstance(x, list): 

304 return "[%s]" % ", ".join(lhex(v) for v in x) 

305 return str(x) 

306 

307 

308@conf.commands.register 

309def hexdump(p, dump=False): 

310 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

311 """Build a tcpdump like hexadecimal view 

312 

313 :param p: a Packet 

314 :param dump: define if the result must be printed or returned in a variable 

315 :return: a String only when dump=True 

316 """ 

317 s = "" 

318 x = bytes_encode(p) 

319 x_len = len(x) 

320 i = 0 

321 while i < x_len: 

322 s += "%04x " % i 

323 for j in range(16): 

324 if i + j < x_len: 

325 s += "%02X " % orb(x[i + j]) 

326 else: 

327 s += " " 

328 s += " %s\n" % sane(x[i:i + 16], color=True) 

329 i += 16 

330 # remove trailing \n 

331 s = s[:-1] if s.endswith("\n") else s 

332 if dump: 

333 return s 

334 else: 

335 print(s) 

336 return None 

337 

338 

339@conf.commands.register 

340def linehexdump(p, onlyasc=0, onlyhex=0, dump=False): 

341 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str] 

342 """Build an equivalent view of hexdump() on a single line 

343 

344 Note that setting both onlyasc and onlyhex to 1 results in a empty output 

345 

346 :param p: a Packet 

347 :param onlyasc: 1 to display only the ascii view 

348 :param onlyhex: 1 to display only the hexadecimal view 

349 :param dump: print the view if False 

350 :return: a String only when dump=True 

351 """ 

352 s = "" 

353 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump) 

354 if dump: 

355 return s 

356 else: 

357 print(s) 

358 return None 

359 

360 

361@conf.commands.register 

362def chexdump(p, dump=False): 

363 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

364 """Build a per byte hexadecimal representation 

365 

366 Example: 

367 >>> chexdump(IP()) 

368 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501 

369 

370 :param p: a Packet 

371 :param dump: print the view if False 

372 :return: a String only if dump=True 

373 """ 

374 x = bytes_encode(p) 

375 s = ", ".join("%#04x" % orb(x) for x in x) 

376 if dump: 

377 return s 

378 else: 

379 print(s) 

380 return None 

381 

382 

383@conf.commands.register 

384def hexstr(p, onlyasc=0, onlyhex=0, color=False): 

385 # type: (Union[Packet, AnyStr], int, int, bool) -> str 

386 """Build a fancy tcpdump like hex from bytes.""" 

387 x = bytes_encode(p) 

388 s = [] 

389 if not onlyasc: 

390 s.append(" ".join("%02X" % orb(b) for b in x)) 

391 if not onlyhex: 

392 s.append(sane(x, color=color)) 

393 return " ".join(s) 

394 

395 

396def repr_hex(s): 

397 # type: (bytes) -> str 

398 """ Convert provided bitstring to a simple string of hex digits """ 

399 return "".join("%02x" % orb(x) for x in s) 

400 

401 

402@conf.commands.register 

403def hexdiff( 

404 a: Union['Packet', AnyStr], 

405 b: Union['Packet', AnyStr], 

406 algo: Optional[str] = None, 

407 autojunk: bool = False, 

408) -> None: 

409 """ 

410 Show differences between 2 binary strings, Packets... 

411 

412 Available algorithms: 

413 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the 

414 Levenstein distance between the strings then backtrack. 

415 - difflib: Use the difflib.SequenceMatcher implementation. This based on a 

416 modified version of the Ratcliff and Obershelp algorithm. 

417 This is much faster, but far less accurate. 

418 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher 

419 

420 :param a: 

421 :param b: The binary strings, packets... to compare 

422 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'. 

423 By default, this is chosen depending on the complexity, optimistically 

424 preferring wagnerfischer unless really necessary. 

425 :param autojunk: (difflib only) See difflib documentation. 

426 """ 

427 xb = bytes_encode(a) 

428 yb = bytes_encode(b) 

429 

430 if algo is None: 

431 # Choose the best algorithm 

432 complexity = len(xb) * len(yb) 

433 if complexity < 1e7: 

434 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable. 

435 # Anything much larger than this shouldn't be attempted by default. 

436 algo = "wagnerfischer" 

437 if complexity > 1e6: 

438 log_interactive.info( 

439 "Complexity is a bit high. hexdiff will take a few seconds." 

440 ) 

441 else: 

442 algo = "difflib" 

443 

444 backtrackx = [] 

445 backtracky = [] 

446 

447 if algo == "wagnerfischer": 

448 xb = xb[::-1] 

449 yb = yb[::-1] 

450 

451 # costs for the 3 operations 

452 INSERT = 1 

453 DELETE = 1 

454 SUBST = 1 

455 

456 # Typically, d[i,j] will hold the distance between 

457 # the first i characters of xb and the first j characters of yb. 

458 # We change the Wagner Fischer to also store pointers to all 

459 # the intermediate steps taken while calculating the Levenstein distance. 

460 d = {(-1, -1): (0, (-1, -1))} 

461 for j in range(len(yb)): 

462 d[-1, j] = (j + 1) * INSERT, (-1, j - 1) 

463 for i in range(len(xb)): 

464 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1) 

465 

466 # Compute the Levenstein distance between the two strings, but 

467 # store all the steps to be able to backtrack at the end. 

468 for j in range(len(yb)): 

469 for i in range(len(xb)): 

470 d[i, j] = min( 

471 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)), 

472 (d[i - 1, j][0] + DELETE, (i - 1, j)), 

473 (d[i, j - 1][0] + INSERT, (i, j - 1)), 

474 ) 

475 

476 # Iterate through the steps backwards to create the diff 

477 i = len(xb) - 1 

478 j = len(yb) - 1 

479 while not (i == j == -1): 

480 i2, j2 = d[i, j][1] 

481 backtrackx.append(xb[i2 + 1:i + 1]) 

482 backtracky.append(yb[j2 + 1:j + 1]) 

483 i, j = i2, j2 

484 elif algo == "difflib": 

485 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk) 

486 xarr = [xb[i:i + 1] for i in range(len(xb))] 

487 yarr = [yb[i:i + 1] for i in range(len(yb))] 

488 # Iterate through opcodes to build the backtrack 

489 for opcode in sm.get_opcodes(): 

490 typ, x0, x1, y0, y1 = opcode 

491 if typ == 'delete': 

492 backtrackx += xarr[x0:x1] 

493 backtracky += [b''] * (x1 - x0) 

494 elif typ == 'insert': 

495 backtrackx += [b''] * (y1 - y0) 

496 backtracky += yarr[y0:y1] 

497 elif typ in ['equal', 'replace']: 

498 backtrackx += xarr[x0:x1] 

499 backtracky += yarr[y0:y1] 

500 # Some lines may have been considered as junk. Check the sizes 

501 if autojunk: 

502 lbx = len(backtrackx) 

503 lby = len(backtracky) 

504 backtrackx += [b''] * (max(lbx, lby) - lbx) 

505 backtracky += [b''] * (max(lbx, lby) - lby) 

506 else: 

507 raise ValueError("Unknown algorithm '%s'" % algo) 

508 

509 # Print the diff 

510 

511 x = y = i = 0 

512 colorize: Dict[int, Callable[[str], str]] = { 

513 0: lambda x: x, 

514 -1: conf.color_theme.left, 

515 1: conf.color_theme.right 

516 } 

517 

518 dox = 1 

519 doy = 0 

520 btx_len = len(backtrackx) 

521 while i < btx_len: 

522 linex = backtrackx[i:i + 16] 

523 liney = backtracky[i:i + 16] 

524 xx = sum(len(k) for k in linex) 

525 yy = sum(len(k) for k in liney) 

526 if dox and not xx: 

527 dox = 0 

528 doy = 1 

529 if dox and linex == liney: 

530 doy = 1 

531 

532 if dox: 

533 xd = y 

534 j = 0 

535 while not linex[j]: 

536 j += 1 

537 xd -= 1 

538 print(colorize[doy - dox]("%04x" % xd), end=' ') 

539 x += xx 

540 line = linex 

541 else: 

542 print(" ", end=' ') 

543 if doy: 

544 yd = y 

545 j = 0 

546 while not liney[j]: 

547 j += 1 

548 yd -= 1 

549 print(colorize[doy - dox]("%04x" % yd), end=' ') 

550 y += yy 

551 line = liney 

552 else: 

553 print(" ", end=' ') 

554 

555 print(" ", end=' ') 

556 

557 cl = "" 

558 for j in range(16): 

559 if i + j < min(len(backtrackx), len(backtracky)): 

560 if line[j]: 

561 col = colorize[(linex[j] != liney[j]) * (doy - dox)] 

562 print(col("%02X" % orb(line[j])), end=' ') 

563 if linex[j] == liney[j]: 

564 cl += sane(line[j], color=True) 

565 else: 

566 cl += col(sane(line[j])) 

567 else: 

568 print(" ", end=' ') 

569 cl += " " 

570 else: 

571 print(" ", end=' ') 

572 if j == 7: 

573 print("", end=' ') 

574 

575 print(" ", cl) 

576 

577 if doy or not yy: 

578 doy = 0 

579 dox = 1 

580 i += 16 

581 else: 

582 if yy: 

583 dox = 0 

584 doy = 1 

585 else: 

586 i += 16 

587 

588 

589if struct.pack("H", 1) == b"\x00\x01": # big endian 

590 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int] 

591else: 

592 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8 

593 

594 

595def checksum(pkt): 

596 # type: (bytes) -> int 

597 if len(pkt) % 2 == 1: 

598 pkt += b"\0" 

599 s = sum(array.array("H", pkt)) 

600 s = (s >> 16) + (s & 0xffff) 

601 s += s >> 16 

602 s = ~s 

603 return checksum_endian_transform(s) & 0xffff 

604 

605 

606def _fletcher16(charbuf): 

607 # type: (bytes) -> Tuple[int, int] 

608 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

609 c0 = c1 = 0 

610 for char in charbuf: 

611 c0 += char 

612 c1 += c0 

613 

614 c0 %= 255 

615 c1 %= 255 

616 return (c0, c1) 

617 

618 

619@conf.commands.register 

620def fletcher16_checksum(binbuf): 

621 # type: (bytes) -> int 

622 """Calculates Fletcher-16 checksum of the given buffer. 

623 

624 Note: 

625 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501 

626 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501 

627 """ 

628 (c0, c1) = _fletcher16(binbuf) 

629 return (c1 << 8) | c0 

630 

631 

632@conf.commands.register 

633def fletcher16_checkbytes(binbuf, offset): 

634 # type: (bytes, int) -> bytes 

635 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string. 

636 

637 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501 

638 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501 

639 the integrity of the buffer on the receiver side. 

640 

641 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501 

642 """ 

643 

644 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

645 if len(binbuf) < offset: 

646 raise Exception("Packet too short for checkbytes %d" % len(binbuf)) 

647 

648 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:] 

649 (c0, c1) = _fletcher16(binbuf) 

650 

651 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255 

652 

653 if (x <= 0): 

654 x += 255 

655 

656 y = 510 - c0 - x 

657 

658 if (y > 255): 

659 y -= 255 

660 return chb(x) + chb(y) 

661 

662 

663def mac2str(mac): 

664 # type: (str) -> bytes 

665 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':')) 

666 

667 

668def valid_mac(mac): 

669 # type: (str) -> bool 

670 try: 

671 return len(mac2str(mac)) == 6 

672 except ValueError: 

673 pass 

674 return False 

675 

676 

677def str2mac(s): 

678 # type: (bytes) -> str 

679 if isinstance(s, str): 

680 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s)) 

681 return ("%02x:" * len(s))[:-1] % tuple(s) 

682 

683 

684def randstring(length): 

685 # type: (int) -> bytes 

686 """ 

687 Returns a random string of length (length >= 0) 

688 """ 

689 return b"".join(struct.pack('B', random.randint(0, 255)) 

690 for _ in range(length)) 

691 

692 

693def zerofree_randstring(length): 

694 # type: (int) -> bytes 

695 """ 

696 Returns a random string of length (length >= 0) without zero in it. 

697 """ 

698 return b"".join(struct.pack('B', random.randint(1, 255)) 

699 for _ in range(length)) 

700 

701 

702def stror(s1, s2): 

703 # type: (bytes, bytes) -> bytes 

704 """ 

705 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2 

706 must be of same length. 

707 """ 

708 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2)) 

709 

710 

711def strxor(s1, s2): 

712 # type: (bytes, bytes) -> bytes 

713 """ 

714 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2 

715 must be of same length. 

716 """ 

717 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2)) 

718 

719 

720def strand(s1, s2): 

721 # type: (bytes, bytes) -> bytes 

722 """ 

723 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 

724 must be of same length. 

725 """ 

726 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2)) 

727 

728 

729def strrot(s1, count, right=True): 

730 # type: (bytes, int, bool) -> bytes 

731 """ 

732 Rotate the binary by 'count' bytes 

733 """ 

734 off = count % len(s1) 

735 if right: 

736 return s1[-off:] + s1[:-off] 

737 else: 

738 return s1[off:] + s1[:off] 

739 

740 

741# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501 

742try: 

743 socket.inet_aton("255.255.255.255") 

744except socket.error: 

745 def inet_aton(ip_string): 

746 # type: (str) -> bytes 

747 if ip_string == "255.255.255.255": 

748 return b"\xff" * 4 

749 else: 

750 return socket.inet_aton(ip_string) 

751else: 

752 inet_aton = socket.inet_aton # type: ignore 

753 

754inet_ntoa = socket.inet_ntoa 

755 

756 

757def atol(x): 

758 # type: (str) -> int 

759 try: 

760 ip = inet_aton(x) 

761 except socket.error: 

762 raise ValueError("Bad IP format: %s" % x) 

763 return cast(int, struct.unpack("!I", ip)[0]) 

764 

765 

766def valid_ip(addr): 

767 # type: (str) -> bool 

768 try: 

769 addr = plain_str(addr) 

770 except UnicodeDecodeError: 

771 return False 

772 try: 

773 atol(addr) 

774 except (OSError, ValueError, socket.error): 

775 return False 

776 return True 

777 

778 

779def valid_net(addr): 

780 # type: (str) -> bool 

781 try: 

782 addr = plain_str(addr) 

783 except UnicodeDecodeError: 

784 return False 

785 if '/' in addr: 

786 ip, mask = addr.split('/', 1) 

787 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32 

788 return valid_ip(addr) 

789 

790 

791def valid_ip6(addr): 

792 # type: (str) -> bool 

793 try: 

794 addr = plain_str(addr) 

795 except UnicodeDecodeError: 

796 return False 

797 try: 

798 inet_pton(socket.AF_INET6, addr) 

799 except socket.error: 

800 return False 

801 return True 

802 

803 

804def valid_net6(addr): 

805 # type: (str) -> bool 

806 try: 

807 addr = plain_str(addr) 

808 except UnicodeDecodeError: 

809 return False 

810 if '/' in addr: 

811 ip, mask = addr.split('/', 1) 

812 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128 

813 return valid_ip6(addr) 

814 

815 

816def ltoa(x): 

817 # type: (int) -> str 

818 return inet_ntoa(struct.pack("!I", x & 0xffffffff)) 

819 

820 

821def itom(x): 

822 # type: (int) -> int 

823 return (0xffffffff00000000 >> x) & 0xffffffff 

824 

825 

826def in4_cidr2mask(m): 

827 # type: (int) -> bytes 

828 """ 

829 Return the mask (bitstring) associated with provided length 

830 value. For instance if function is called on 20, return value is 

831 b'\xff\xff\xf0\x00'. 

832 """ 

833 if m > 32 or m < 0: 

834 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501 

835 

836 return strxor( 

837 b"\xff" * 4, 

838 struct.pack(">I", 2**(32 - m) - 1) 

839 ) 

840 

841 

842def in4_isincluded(addr, prefix, mask): 

843 # type: (str, str, int) -> bool 

844 """ 

845 Returns True when 'addr' belongs to prefix/mask. False otherwise. 

846 """ 

847 temp = inet_pton(socket.AF_INET, addr) 

848 pref = in4_cidr2mask(mask) 

849 zero = inet_pton(socket.AF_INET, prefix) 

850 return zero == strand(temp, pref) 

851 

852 

853def in4_ismaddr(str): 

854 # type: (str) -> bool 

855 """ 

856 Returns True if provided address in printable format belongs to 

857 allocated Multicast address space (224.0.0.0/4). 

858 """ 

859 return in4_isincluded(str, "224.0.0.0", 4) 

860 

861 

862def in4_ismlladdr(str): 

863 # type: (str) -> bool 

864 """ 

865 Returns True if address belongs to link-local multicast address 

866 space (224.0.0.0/24) 

867 """ 

868 return in4_isincluded(str, "224.0.0.0", 24) 

869 

870 

871def in4_ismgladdr(str): 

872 # type: (str) -> bool 

873 """ 

874 Returns True if address belongs to global multicast address 

875 space (224.0.1.0-238.255.255.255). 

876 """ 

877 return ( 

878 in4_isincluded(str, "224.0.0.0", 4) and 

879 not in4_isincluded(str, "224.0.0.0", 24) and 

880 not in4_isincluded(str, "239.0.0.0", 8) 

881 ) 

882 

883 

884def in4_ismlsaddr(str): 

885 # type: (str) -> bool 

886 """ 

887 Returns True if address belongs to limited scope multicast address 

888 space (239.0.0.0/8). 

889 """ 

890 return in4_isincluded(str, "239.0.0.0", 8) 

891 

892 

893def in4_isaddrllallnodes(str): 

894 # type: (str) -> bool 

895 """ 

896 Returns True if address is the link-local all-nodes multicast 

897 address (224.0.0.1). 

898 """ 

899 return (inet_pton(socket.AF_INET, "224.0.0.1") == 

900 inet_pton(socket.AF_INET, str)) 

901 

902 

903def in4_getnsmac(a): 

904 # type: (bytes) -> str 

905 """ 

906 Return the multicast mac address associated with provided 

907 IPv4 address. Passed address must be in network format. 

908 """ 

909 

910 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3]) 

911 

912 

913def decode_locale_str(x): 

914 # type: (bytes) -> str 

915 """ 

916 Decode bytes into a string using the system locale. 

917 Useful on Windows where it can be unusual (e.g. cp1252) 

918 """ 

919 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace") 

920 

921 

922class ContextManagerSubprocess(object): 

923 """ 

924 Context manager that eases checking for unknown command, without 

925 crashing. 

926 

927 Example: 

928 >>> with ContextManagerSubprocess("tcpdump"): 

929 >>> subprocess.Popen(["tcpdump", "--version"]) 

930 ERROR: Could not execute tcpdump, is it installed? 

931 

932 """ 

933 

934 def __init__(self, prog, suppress=True): 

935 # type: (str, bool) -> None 

936 self.prog = prog 

937 self.suppress = suppress 

938 

939 def __enter__(self): 

940 # type: () -> None 

941 pass 

942 

943 def __exit__(self, 

944 exc_type, # type: Optional[type] 

945 exc_value, # type: Optional[Exception] 

946 traceback, # type: Optional[Any] 

947 ): 

948 # type: (...) -> Optional[bool] 

949 if exc_value is None or exc_type is None: 

950 return None 

951 # Errored 

952 if isinstance(exc_value, EnvironmentError): 

953 msg = "Could not execute %s, is it installed?" % self.prog 

954 else: 

955 msg = "%s: execution failed (%s)" % ( 

956 self.prog, 

957 exc_type.__class__.__name__ 

958 ) 

959 if not self.suppress: 

960 raise exc_type(msg) 

961 log_runtime.error(msg, exc_info=True) 

962 return True # Suppress the exception 

963 

964 

965class ContextManagerCaptureOutput(object): 

966 """ 

967 Context manager that intercept the console's output. 

968 

969 Example: 

970 >>> with ContextManagerCaptureOutput() as cmco: 

971 ... print("hey") 

972 ... assert cmco.get_output() == "hey" 

973 """ 

974 

975 def __init__(self): 

976 # type: () -> None 

977 self.result_export_object = "" 

978 

979 def __enter__(self): 

980 # type: () -> ContextManagerCaptureOutput 

981 from unittest import mock 

982 

983 def write(s, decorator=self): 

984 # type: (str, ContextManagerCaptureOutput) -> None 

985 decorator.result_export_object += s 

986 mock_stdout = mock.Mock() 

987 mock_stdout.write = write 

988 self.bck_stdout = sys.stdout 

989 sys.stdout = mock_stdout 

990 return self 

991 

992 def __exit__(self, *exc): 

993 # type: (*Any) -> Literal[False] 

994 sys.stdout = self.bck_stdout 

995 return False 

996 

997 def get_output(self, eval_bytes=False): 

998 # type: (bool) -> str 

999 if self.result_export_object.startswith("b'") and eval_bytes: 

1000 return plain_str(eval(self.result_export_object)) 

1001 return self.result_export_object 

1002 

1003 

1004def do_graph( 

1005 graph, # type: str 

1006 prog=None, # type: Optional[str] 

1007 format=None, # type: Optional[str] 

1008 target=None, # type: Optional[Union[IO[bytes], str]] 

1009 type=None, # type: Optional[str] 

1010 string=None, # type: Optional[bool] 

1011 options=None # type: Optional[List[str]] 

1012): 

1013 # type: (...) -> Optional[str] 

1014 """Processes graph description using an external software. 

1015 This method is used to convert a graphviz format to an image. 

1016 

1017 :param graph: GraphViz graph description 

1018 :param prog: which graphviz program to use 

1019 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" 

1020 option 

1021 :param string: if not None, simply return the graph string 

1022 :param target: filename or redirect. Defaults pipe to Imagemagick's 

1023 display program 

1024 :param options: options to be passed to prog 

1025 """ 

1026 

1027 if format is None: 

1028 format = "svg" 

1029 if string: 

1030 return graph 

1031 if type is not None: 

1032 warnings.warn( 

1033 "type is deprecated, and was renamed format", 

1034 DeprecationWarning 

1035 ) 

1036 format = type 

1037 if prog is None: 

1038 prog = conf.prog.dot 

1039 start_viewer = False 

1040 if target is None: 

1041 if WINDOWS: 

1042 target = get_temp_file(autoext="." + format) 

1043 start_viewer = True 

1044 else: 

1045 with ContextManagerSubprocess(conf.prog.display): 

1046 target = subprocess.Popen([conf.prog.display], 

1047 stdin=subprocess.PIPE).stdin 

1048 if format is not None: 

1049 format = "-T%s" % format 

1050 if isinstance(target, str): 

1051 if target.startswith('|'): 

1052 target = subprocess.Popen(target[1:].lstrip(), shell=True, 

1053 stdin=subprocess.PIPE).stdin 

1054 elif target.startswith('>'): 

1055 target = open(target[1:].lstrip(), "wb") 

1056 else: 

1057 target = open(os.path.abspath(target), "wb") 

1058 target = cast(IO[bytes], target) 

1059 proc = subprocess.Popen( 

1060 "\"%s\" %s %s" % (prog, options or "", format or ""), 

1061 shell=True, stdin=subprocess.PIPE, stdout=target, 

1062 stderr=subprocess.PIPE 

1063 ) 

1064 _, stderr = proc.communicate(bytes_encode(graph)) 

1065 if proc.returncode != 0: 

1066 raise OSError( 

1067 "GraphViz call failed (is it installed?):\n" + 

1068 plain_str(stderr) 

1069 ) 

1070 try: 

1071 target.close() 

1072 except Exception: 

1073 pass 

1074 if start_viewer: 

1075 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501 

1076 waiting_start = time.time() 

1077 while not os.path.exists(target.name): 

1078 time.sleep(0.1) 

1079 if time.time() - waiting_start > 3: 

1080 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501 

1081 break 

1082 else: 

1083 if WINDOWS and conf.prog.display == conf.prog._default: 

1084 os.startfile(target.name) 

1085 else: 

1086 with ContextManagerSubprocess(conf.prog.display): 

1087 subprocess.Popen([conf.prog.display, target.name]) 

1088 return None 

1089 

1090 

1091_TEX_TR = { 

1092 "{": "{\\tt\\char123}", 

1093 "}": "{\\tt\\char125}", 

1094 "\\": "{\\tt\\char92}", 

1095 "^": "\\^{}", 

1096 "$": "\\$", 

1097 "#": "\\#", 

1098 "_": "\\_", 

1099 "&": "\\&", 

1100 "%": "\\%", 

1101 "|": "{\\tt\\char124}", 

1102 "~": "{\\tt\\char126}", 

1103 "<": "{\\tt\\char60}", 

1104 ">": "{\\tt\\char62}", 

1105} 

1106 

1107 

1108def tex_escape(x): 

1109 # type: (str) -> str 

1110 s = "" 

1111 for c in x: 

1112 s += _TEX_TR.get(c, c) 

1113 return s 

1114 

1115 

1116def colgen(*lstcol, # type: Any 

1117 **kargs # type: Any 

1118 ): 

1119 # type: (...) -> Iterator[Any] 

1120 """Returns a generator that mixes provided quantities forever 

1121 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501 

1122 if len(lstcol) < 2: 

1123 lstcol *= 2 

1124 trans = kargs.get("trans", lambda x, y, z: (x, y, z)) 

1125 while True: 

1126 for i in range(len(lstcol)): 

1127 for j in range(len(lstcol)): 

1128 for k in range(len(lstcol)): 

1129 if i != j or j != k or k != i: 

1130 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501 

1131 

1132 

1133def incremental_label(label="tag%05i", start=0): 

1134 # type: (str, int) -> Iterator[str] 

1135 while True: 

1136 yield label % start 

1137 start += 1 

1138 

1139 

1140def binrepr(val): 

1141 # type: (int) -> str 

1142 return bin(val)[2:] 

1143 

1144 

1145def long_converter(s): 

1146 # type: (str) -> int 

1147 return int(s.replace('\n', '').replace(' ', ''), 16) 

1148 

1149######################### 

1150# Enum management # 

1151######################### 

1152 

1153 

1154class EnumElement: 

1155 def __init__(self, key, value): 

1156 # type: (str, int) -> None 

1157 self._key = key 

1158 self._value = value 

1159 

1160 def __repr__(self): 

1161 # type: () -> str 

1162 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501 

1163 

1164 def __getattr__(self, attr): 

1165 # type: (str) -> Any 

1166 return getattr(self._value, attr) 

1167 

1168 def __str__(self): 

1169 # type: () -> str 

1170 return self._key 

1171 

1172 def __bytes__(self): 

1173 # type: () -> bytes 

1174 return bytes_encode(self.__str__()) 

1175 

1176 def __hash__(self): 

1177 # type: () -> int 

1178 return self._value 

1179 

1180 def __int__(self): 

1181 # type: () -> int 

1182 return int(self._value) 

1183 

1184 def __eq__(self, other): 

1185 # type: (Any) -> bool 

1186 return self._value == int(other) 

1187 

1188 def __neq__(self, other): 

1189 # type: (Any) -> bool 

1190 return not self.__eq__(other) 

1191 

1192 

1193class Enum_metaclass(type): 

1194 element_class = EnumElement 

1195 

1196 def __new__(cls, name, bases, dct): 

1197 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1198 rdict = {} 

1199 for k, v in dct.items(): 

1200 if isinstance(v, int): 

1201 v = cls.element_class(k, v) 

1202 dct[k] = v 

1203 rdict[v] = k 

1204 dct["__rdict__"] = rdict 

1205 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) 

1206 

1207 def __getitem__(self, attr): 

1208 # type: (int) -> Any 

1209 return self.__rdict__[attr] # type: ignore 

1210 

1211 def __contains__(self, val): 

1212 # type: (int) -> bool 

1213 return val in self.__rdict__ # type: ignore 

1214 

1215 def get(self, attr, val=None): 

1216 # type: (str, Optional[Any]) -> Any 

1217 return self.__rdict__.get(attr, val) # type: ignore 

1218 

1219 def __repr__(self): 

1220 # type: () -> str 

1221 return "<%s>" % self.__dict__.get("name", self.__name__) 

1222 

1223 

1224################### 

1225# Object saving # 

1226################### 

1227 

1228 

1229def export_object(obj): 

1230 # type: (Any) -> None 

1231 import zlib 

1232 print(base64.b64encode(zlib.compress(pickle.dumps(obj, 2), 9)).decode()) 

1233 

1234 

1235def import_object(obj=None): 

1236 # type: (Optional[str]) -> Any 

1237 import zlib 

1238 if obj is None: 

1239 obj = sys.stdin.read() 

1240 return pickle.loads(zlib.decompress(base64.b64decode(obj.strip()))) 

1241 

1242 

1243def save_object(fname, obj): 

1244 # type: (str, Any) -> None 

1245 """Pickle a Python object""" 

1246 

1247 fd = gzip.open(fname, "wb") 

1248 pickle.dump(obj, fd) 

1249 fd.close() 

1250 

1251 

1252def load_object(fname): 

1253 # type: (str) -> Any 

1254 """unpickle a Python object""" 

1255 return pickle.load(gzip.open(fname, "rb")) 

1256 

1257 

1258@conf.commands.register 

1259def corrupt_bytes(data, p=0.01, n=None): 

1260 # type: (str, float, Optional[int]) -> bytes 

1261 """ 

1262 Corrupt a given percentage (at least one byte) or number of bytes 

1263 from a string 

1264 """ 

1265 s = array.array("B", bytes_encode(data)) 

1266 s_len = len(s) 

1267 if n is None: 

1268 n = max(1, int(s_len * p)) 

1269 for i in random.sample(range(s_len), n): 

1270 s[i] = (s[i] + random.randint(1, 255)) % 256 

1271 return s.tobytes() 

1272 

1273 

1274@conf.commands.register 

1275def corrupt_bits(data, p=0.01, n=None): 

1276 # type: (str, float, Optional[int]) -> bytes 

1277 """ 

1278 Flip a given percentage (at least one bit) or number of bits 

1279 from a string 

1280 """ 

1281 s = array.array("B", bytes_encode(data)) 

1282 s_len = len(s) * 8 

1283 if n is None: 

1284 n = max(1, int(s_len * p)) 

1285 for i in random.sample(range(s_len), n): 

1286 s[i // 8] ^= 1 << (i % 8) 

1287 return s.tobytes() 

1288 

1289 

1290############################# 

1291# pcap capture file stuff # 

1292############################# 

1293 

1294@conf.commands.register 

1295def wrpcap(filename, # type: Union[IO[bytes], str] 

1296 pkt, # type: _PacketIterable 

1297 *args, # type: Any 

1298 **kargs # type: Any 

1299 ): 

1300 # type: (...) -> None 

1301 """Write a list of packets to a pcap file 

1302 

1303 :param filename: the name of the file to write packets to, or an open, 

1304 writable file-like object. The file descriptor will be 

1305 closed at the end of the call, so do not use an object you 

1306 do not want to close (e.g., running wrpcap(sys.stdout, []) 

1307 in interactive mode will crash Scapy). 

1308 :param gz: set to 1 to save a gzipped capture 

1309 :param linktype: force linktype value 

1310 :param endianness: "<" or ">", force endianness 

1311 :param sync: do not bufferize writes to the capture file 

1312 """ 

1313 with PcapWriter(filename, *args, **kargs) as fdesc: 

1314 fdesc.write(pkt) 

1315 

1316 

1317@conf.commands.register 

1318def wrpcapng(filename, # type: str 

1319 pkt, # type: _PacketIterable 

1320 ): 

1321 # type: (...) -> None 

1322 """Write a list of packets to a pcapng file 

1323 

1324 :param filename: the name of the file to write packets to, or an open, 

1325 writable file-like object. The file descriptor will be 

1326 closed at the end of the call, so do not use an object you 

1327 do not want to close (e.g., running wrpcapng(sys.stdout, []) 

1328 in interactive mode will crash Scapy). 

1329 :param pkt: packets to write 

1330 """ 

1331 with PcapNgWriter(filename) as fdesc: 

1332 fdesc.write(pkt) 

1333 

1334 

1335@conf.commands.register 

1336def rdpcap(filename, count=-1): 

1337 # type: (Union[IO[bytes], str], int) -> PacketList 

1338 """Read a pcap or pcapng file and return a packet list 

1339 

1340 :param count: read only <count> packets 

1341 """ 

1342 # Rant: Our complicated use of metaclasses and especially the 

1343 # __call__ function is, of course, not supported by MyPy. 

1344 # One day we should simplify this mess and use a much simpler 

1345 # layout that will actually be supported and properly dissected. 

1346 with PcapReader(filename) as fdesc: # type: ignore 

1347 return fdesc.read_all(count=count) 

1348 

1349 

1350# NOTE: Type hinting 

1351# Mypy doesn't understand the following metaclass, and thinks each 

1352# constructor (PcapReader...) needs 3 arguments each. To avoid this, 

1353# we add a fake (=None) to the last 2 arguments then force the value 

1354# to not be None in the signature and pack the whole thing in an ignore. 

1355# This allows to not have # type: ignore every time we call those 

1356# constructors. 

1357 

1358class PcapReader_metaclass(type): 

1359 """Metaclass for (Raw)Pcap(Ng)Readers""" 

1360 

1361 def __new__(cls, name, bases, dct): 

1362 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1363 """The `alternative` class attribute is declared in the PcapNg 

1364 variant, and set here to the Pcap variant. 

1365 

1366 """ 

1367 newcls = super(PcapReader_metaclass, cls).__new__( 

1368 cls, name, bases, dct 

1369 ) 

1370 if 'alternative' in dct: 

1371 dct['alternative'].alternative = newcls 

1372 return newcls 

1373 

1374 def __call__(cls, filename): 

1375 # type: (Union[IO[bytes], str]) -> Any 

1376 """Creates a cls instance, use the `alternative` if that 

1377 fails. 

1378 

1379 """ 

1380 i = cls.__new__( 

1381 cls, 

1382 cls.__name__, 

1383 cls.__bases__, 

1384 cls.__dict__ # type: ignore 

1385 ) 

1386 filename, fdesc, magic = cls.open(filename) 

1387 if not magic: 

1388 raise Scapy_Exception( 

1389 "No data could be read!" 

1390 ) 

1391 try: 

1392 i.__init__(filename, fdesc, magic) 

1393 return i 

1394 except (Scapy_Exception, EOFError): 

1395 pass 

1396 

1397 if "alternative" in cls.__dict__: 

1398 cls = cls.__dict__["alternative"] 

1399 i = cls.__new__( 

1400 cls, 

1401 cls.__name__, 

1402 cls.__bases__, 

1403 cls.__dict__ # type: ignore 

1404 ) 

1405 try: 

1406 i.__init__(filename, fdesc, magic) 

1407 return i 

1408 except (Scapy_Exception, EOFError): 

1409 pass 

1410 

1411 raise Scapy_Exception("Not a supported capture file") 

1412 

1413 @staticmethod 

1414 def open(fname # type: Union[IO[bytes], str] 

1415 ): 

1416 # type: (...) -> Tuple[str, _ByteStream, bytes] 

1417 """Open (if necessary) filename, and read the magic.""" 

1418 if isinstance(fname, str): 

1419 filename = fname 

1420 fdesc = open(filename, "rb") # type: _ByteStream 

1421 magic = fdesc.read(2) 

1422 if magic == b"\x1f\x8b": 

1423 # GZIP header detected. 

1424 fdesc.seek(0) 

1425 fdesc = gzip.GzipFile(fileobj=fdesc) 

1426 magic = fdesc.read(2) 

1427 magic += fdesc.read(2) 

1428 else: 

1429 fdesc = fname 

1430 filename = getattr(fdesc, "name", "No name") 

1431 magic = fdesc.read(4) 

1432 return filename, fdesc, magic 

1433 

1434 

1435class RawPcapReader(metaclass=PcapReader_metaclass): 

1436 """A stateful pcap reader. Each packet is returned as a string""" 

1437 

1438 # TODO: use Generics to properly type the various readers. 

1439 # As of right now, RawPcapReader is typed as if it returned packets 

1440 # because all of its child do. Fix that 

1441 

1442 nonblocking_socket = True 

1443 PacketMetadata = collections.namedtuple("PacketMetadata", 

1444 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501 

1445 

1446 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1447 # type: (str, _ByteStream, bytes) -> None 

1448 self.filename = filename 

1449 self.f = fdesc 

1450 if magic == b"\xa1\xb2\xc3\xd4": # big endian 

1451 self.endian = ">" 

1452 self.nano = False 

1453 elif magic == b"\xd4\xc3\xb2\xa1": # little endian 

1454 self.endian = "<" 

1455 self.nano = False 

1456 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision 

1457 self.endian = ">" 

1458 self.nano = True 

1459 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501 

1460 self.endian = "<" 

1461 self.nano = True 

1462 else: 

1463 raise Scapy_Exception( 

1464 "Not a pcap capture file (bad magic: %r)" % magic 

1465 ) 

1466 hdr = self.f.read(20) 

1467 if len(hdr) < 20: 

1468 raise Scapy_Exception("Invalid pcap file (too short)") 

1469 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( 

1470 self.endian + "HHIIII", hdr 

1471 ) 

1472 self.linktype = linktype 

1473 self.snaplen = snaplen 

1474 

1475 def __enter__(self): 

1476 # type: () -> RawPcapReader 

1477 return self 

1478 

1479 def __iter__(self): 

1480 # type: () -> RawPcapReader 

1481 return self 

1482 

1483 def __next__(self): 

1484 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1485 """ 

1486 implement the iterator protocol on a set of packets in a pcap file 

1487 """ 

1488 try: 

1489 return self._read_packet() 

1490 except EOFError: 

1491 raise StopIteration 

1492 

1493 def _read_packet(self, size=MTU): 

1494 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1495 """return a single packet read from the file as a tuple containing 

1496 (pkt_data, pkt_metadata) 

1497 

1498 raise EOFError when no more packets are available 

1499 """ 

1500 hdr = self.f.read(16) 

1501 if len(hdr) < 16: 

1502 raise EOFError 

1503 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr) 

1504 

1505 try: 

1506 data = self.f.read(caplen)[:size] 

1507 except OverflowError as e: 

1508 warning(f"Pcap: {e}") 

1509 raise EOFError 

1510 

1511 return (data, 

1512 RawPcapReader.PacketMetadata(sec=sec, usec=usec, 

1513 wirelen=wirelen, caplen=caplen)) 

1514 

1515 def read_packet(self, size=MTU): 

1516 # type: (int) -> Packet 

1517 raise Exception( 

1518 "Cannot call read_packet() in RawPcapReader. Use " 

1519 "_read_packet()" 

1520 ) 

1521 

1522 def dispatch(self, 

1523 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501 

1524 ): 

1525 # type: (...) -> None 

1526 """call the specified callback routine for each packet read 

1527 

1528 This is just a convenience function for the main loop 

1529 that allows for easy launching of packet processing in a 

1530 thread. 

1531 """ 

1532 for p in self: 

1533 callback(p) 

1534 

1535 def _read_all(self, count=-1): 

1536 # type: (int) -> List[Packet] 

1537 """return a list of all packets in the pcap file 

1538 """ 

1539 res = [] # type: List[Packet] 

1540 while count != 0: 

1541 count -= 1 

1542 try: 

1543 p = self.read_packet() # type: Packet 

1544 except EOFError: 

1545 break 

1546 res.append(p) 

1547 return res 

1548 

1549 def recv(self, size=MTU): 

1550 # type: (int) -> bytes 

1551 """ Emulate a socket 

1552 """ 

1553 return self._read_packet(size=size)[0] 

1554 

1555 def fileno(self): 

1556 # type: () -> int 

1557 return -1 if WINDOWS else self.f.fileno() 

1558 

1559 def close(self): 

1560 # type: () -> None 

1561 if isinstance(self.f, gzip.GzipFile): 

1562 self.f.fileobj.close() # type: ignore 

1563 self.f.close() 

1564 

1565 def __exit__(self, exc_type, exc_value, tracback): 

1566 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

1567 self.close() 

1568 

1569 # emulate SuperSocket 

1570 @staticmethod 

1571 def select(sockets, # type: List[SuperSocket] 

1572 remain=None, # type: Optional[float] 

1573 ): 

1574 # type: (...) -> List[SuperSocket] 

1575 return sockets 

1576 

1577 

1578class PcapReader(RawPcapReader): 

1579 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1580 # type: (str, IO[bytes], bytes) -> None 

1581 RawPcapReader.__init__(self, filename, fdesc, magic) 

1582 try: 

1583 self.LLcls = conf.l2types.num2layer[ 

1584 self.linktype 

1585 ] # type: Type[Packet] 

1586 except KeyError: 

1587 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501 

1588 if conf.raw_layer is None: 

1589 # conf.raw_layer is set on import 

1590 import scapy.packet # noqa: F401 

1591 self.LLcls = conf.raw_layer 

1592 

1593 def __enter__(self): 

1594 # type: () -> PcapReader 

1595 return self 

1596 

1597 def read_packet(self, size=MTU, **kwargs): 

1598 # type: (int, **Any) -> Packet 

1599 rp = super(PcapReader, self)._read_packet(size=size) 

1600 if rp is None: 

1601 raise EOFError 

1602 s, pkt_info = rp 

1603 

1604 try: 

1605 p = self.LLcls(s, **kwargs) # type: Packet 

1606 except KeyboardInterrupt: 

1607 raise 

1608 except Exception: 

1609 if conf.debug_dissector: 

1610 from scapy.sendrecv import debug 

1611 debug.crashed_on = (self.LLcls, s) 

1612 raise 

1613 if conf.raw_layer is None: 

1614 # conf.raw_layer is set on import 

1615 import scapy.packet # noqa: F401 

1616 p = conf.raw_layer(s) 

1617 power = Decimal(10) ** Decimal(-9 if self.nano else -6) 

1618 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec) 

1619 p.wirelen = pkt_info.wirelen 

1620 return p 

1621 

1622 def recv(self, size=MTU, **kwargs): # type: ignore 

1623 # type: (int, **Any) -> Packet 

1624 return self.read_packet(size=size, **kwargs) 

1625 

1626 def __iter__(self): 

1627 # type: () -> PcapReader 

1628 return self 

1629 

1630 def __next__(self): # type: ignore 

1631 # type: () -> Packet 

1632 try: 

1633 return self.read_packet() 

1634 except EOFError: 

1635 raise StopIteration 

1636 

1637 def read_all(self, count=-1): 

1638 # type: (int) -> PacketList 

1639 res = self._read_all(count) 

1640 from scapy import plist 

1641 return plist.PacketList(res, name=os.path.basename(self.filename)) 

1642 

1643 

1644class RawPcapNgReader(RawPcapReader): 

1645 """A stateful pcapng reader. Each packet is returned as 

1646 bytes. 

1647 

1648 """ 

1649 

1650 alternative = RawPcapReader # type: Type[Any] 

1651 

1652 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore 

1653 ["linktype", "tsresol", 

1654 "tshigh", "tslow", "wirelen", 

1655 "comments", "ifname", "direction", 

1656 "process_information"]) 

1657 

1658 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1659 # type: (str, IO[bytes], bytes) -> None 

1660 self.filename = filename 

1661 self.f = fdesc 

1662 # A list of (linktype, snaplen, tsresol); will be populated by IDBs. 

1663 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]] 

1664 self.default_options = { 

1665 "tsresol": 1000000 

1666 } 

1667 self.blocktypes: Dict[ 

1668 int, 

1669 Callable[ 

1670 [bytes, int], 

1671 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] 

1672 ]] = { 

1673 1: self._read_block_idb, 

1674 2: self._read_block_pkt, 

1675 3: self._read_block_spb, 

1676 6: self._read_block_epb, 

1677 10: self._read_block_dsb, 

1678 0x80000001: self._read_block_pib, 

1679 } 

1680 self.endian = "!" # Will be overwritten by first SHB 

1681 self.process_information = [] # type: List[Dict[str, Any]] 

1682 

1683 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg: 

1684 raise Scapy_Exception( 

1685 "Not a pcapng capture file (bad magic: %r)" % magic 

1686 ) 

1687 

1688 try: 

1689 self._read_block_shb() 

1690 except EOFError: 

1691 raise Scapy_Exception( 

1692 "The first SHB of the pcapng file is malformed !" 

1693 ) 

1694 

1695 def _read_block(self, size=MTU): 

1696 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501 

1697 try: 

1698 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1699 except struct.error: 

1700 raise EOFError 

1701 if blocktype == 0x0A0D0D0A: 

1702 # This function updates the endianness based on the block content. 

1703 self._read_block_shb() 

1704 return None 

1705 try: 

1706 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1707 except struct.error: 

1708 warning("PcapNg: Error reading blocklen before block body") 

1709 raise EOFError 

1710 if blocklen < 12: 

1711 warning("PcapNg: Invalid block length !") 

1712 raise EOFError 

1713 

1714 _block_body_length = blocklen - 12 

1715 block = self.f.read(_block_body_length) 

1716 if len(block) != _block_body_length: 

1717 raise Scapy_Exception("PcapNg: Invalid Block body length " 

1718 "(too short)") 

1719 self._read_block_tail(blocklen) 

1720 if blocktype in self.blocktypes: 

1721 return self.blocktypes[blocktype](block, size) 

1722 return None 

1723 

1724 def _read_block_tail(self, blocklen): 

1725 # type: (int) -> None 

1726 if blocklen % 4: 

1727 pad = self.f.read(-blocklen % 4) 

1728 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. " 

1729 "Ignored padding %r" % (blocklen, pad)) 

1730 try: 

1731 if blocklen != struct.unpack(self.endian + 'I', 

1732 self.f.read(4))[0]: 

1733 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)") 

1734 except struct.error: 

1735 warning("PcapNg: Could not read blocklen after block body") 

1736 raise EOFError 

1737 

1738 def _read_block_shb(self): 

1739 # type: () -> None 

1740 """Section Header Block""" 

1741 _blocklen = self.f.read(4) 

1742 endian = self.f.read(4) 

1743 if endian == b"\x1a\x2b\x3c\x4d": 

1744 self.endian = ">" 

1745 elif endian == b"\x4d\x3c\x2b\x1a": 

1746 self.endian = "<" 

1747 else: 

1748 warning("PcapNg: Bad magic in Section Header Block" 

1749 " (not a pcapng file?)") 

1750 raise EOFError 

1751 

1752 try: 

1753 blocklen = struct.unpack(self.endian + "I", _blocklen)[0] 

1754 except struct.error: 

1755 warning("PcapNg: Could not read blocklen") 

1756 raise EOFError 

1757 if blocklen < 28: 

1758 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501 

1759 raise EOFError 

1760 

1761 # Major version must be 1 

1762 _major = self.f.read(2) 

1763 try: 

1764 major = struct.unpack(self.endian + "H", _major)[0] 

1765 except struct.error: 

1766 warning("PcapNg: Could not read major value") 

1767 raise EOFError 

1768 if major != 1: 

1769 warning(f"PcapNg: SHB Major version {major} unsupported !") 

1770 raise EOFError 

1771 

1772 # Skip minor version & section length 

1773 skipped = self.f.read(10) 

1774 if len(skipped) != 10: 

1775 warning("PcapNg: Could not read minor value & section length") 

1776 raise EOFError 

1777 

1778 _options_len = blocklen - 28 

1779 options = self.f.read(_options_len) 

1780 if len(options) != _options_len: 

1781 raise Scapy_Exception("PcapNg: Invalid Section Header Block " 

1782 " options (too short)") 

1783 self._read_block_tail(blocklen) 

1784 self._read_options(options) 

1785 

1786 def _read_packet(self, size=MTU): # type: ignore 

1787 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1788 """Read blocks until it reaches either EOF or a packet, and 

1789 returns None or (packet, (linktype, sec, usec, wirelen)), 

1790 where packet is a string. 

1791 

1792 """ 

1793 while True: 

1794 res = self._read_block(size=size) 

1795 if res is not None: 

1796 return res 

1797 

1798 def _read_options(self, options): 

1799 # type: (bytes) -> Dict[int, Union[bytes, List[bytes]]] 

1800 opts = dict() # type: Dict[int, Union[bytes, List[bytes]]] 

1801 while len(options) >= 4: 

1802 try: 

1803 code, length = struct.unpack(self.endian + "HH", options[:4]) 

1804 except struct.error: 

1805 warning("PcapNg: options header is too small " 

1806 "%d !" % len(options)) 

1807 raise EOFError 

1808 if code != 0 and 4 + length <= len(options): 

1809 # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#name-options-format 

1810 if code in [1, 2988, 2989, 19372, 19373]: 

1811 if code not in opts: 

1812 opts[code] = [] 

1813 opts[code].append(options[4:4 + length]) # type: ignore 

1814 else: 

1815 opts[code] = options[4:4 + length] 

1816 if code == 0: 

1817 if length != 0: 

1818 warning("PcapNg: invalid option " 

1819 "length %d for end-of-option" % length) 

1820 break 

1821 if length % 4: 

1822 length += (4 - (length % 4)) 

1823 options = options[4 + length:] 

1824 return opts 

1825 

1826 def _read_block_idb(self, block, _): 

1827 # type: (bytes, int) -> None 

1828 """Interface Description Block""" 

1829 # 2 bytes LinkType + 2 bytes Reserved 

1830 # 4 bytes Snaplen 

1831 options_raw = self._read_options(block[8:]) 

1832 options = self.default_options.copy() # type: Dict[str, Any] 

1833 for c, v in options_raw.items(): 

1834 if isinstance(v, list): 

1835 # Spec allows multiple occurrences (see 

1836 # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#section-4.2-8.6) 

1837 # but does not define which to use. We take the first for 

1838 # backward compatibility. 

1839 v = v[0] 

1840 if c == 9: 

1841 length = len(v) 

1842 if length == 1: 

1843 tsresol = orb(v) 

1844 options["tsresol"] = (2 if tsresol & 128 else 10) ** ( 

1845 tsresol & 127 

1846 ) 

1847 else: 

1848 warning("PcapNg: invalid options " 

1849 "length %d for IDB tsresol" % length) 

1850 elif c == 2: 

1851 options["name"] = v 

1852 elif c == 1: 

1853 options["comment"] = v 

1854 try: 

1855 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack( 

1856 self.endian + "HxxI", 

1857 block[:8] 

1858 ) + (options,) 

1859 except struct.error: 

1860 warning("PcapNg: IDB is too small %d/8 !" % len(block)) 

1861 raise EOFError 

1862 self.interfaces.append(interface) 

1863 

1864 def _check_interface_id(self, intid): 

1865 # type: (int) -> None 

1866 """Check the interface id value and raise EOFError if invalid.""" 

1867 tmp_len = len(self.interfaces) 

1868 if intid >= tmp_len: 

1869 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len)) 

1870 raise EOFError 

1871 

1872 def _read_block_epb(self, block, size): 

1873 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1874 """Enhanced Packet Block""" 

1875 try: 

1876 intid, tshigh, tslow, caplen, wirelen = struct.unpack( 

1877 self.endian + "5I", 

1878 block[:20], 

1879 ) 

1880 except struct.error: 

1881 warning("PcapNg: EPB is too small %d/20 !" % len(block)) 

1882 raise EOFError 

1883 

1884 # Compute the options offset taking padding into account 

1885 if caplen % 4: 

1886 opt_offset = 20 + caplen + (-caplen) % 4 

1887 else: 

1888 opt_offset = 20 + caplen 

1889 

1890 # Parse options 

1891 options = self._read_options(block[opt_offset:]) 

1892 

1893 process_information = {} 

1894 for code, value in options.items(): 

1895 # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX 

1896 if code in [0x8001, 0x8003]: 

1897 try: 

1898 proc_index = struct.unpack( 

1899 self.endian + "I", value)[0] # type: ignore 

1900 except struct.error: 

1901 warning("PcapNg: EPB invalid proc index " 

1902 "(expected 4 bytes, got %d) !" % len(value)) 

1903 raise EOFError 

1904 if proc_index < len(self.process_information): 

1905 key = "proc" if code == 0x8001 else "eproc" 

1906 process_information[key] = self.process_information[proc_index] 

1907 else: 

1908 warning("PcapNg: EPB invalid process information index " 

1909 "(%d/%d) !" % (proc_index, len(self.process_information))) 

1910 

1911 comments = options.get(1, None) 

1912 epb_flags_raw = options.get(2, None) 

1913 if epb_flags_raw and isinstance(epb_flags_raw, bytes): 

1914 try: 

1915 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw) 

1916 except struct.error: 

1917 warning("PcapNg: EPB invalid flags size" 

1918 "(expected 4 bytes, got %d) !" % len(epb_flags_raw)) 

1919 raise EOFError 

1920 direction = epb_flags & 3 

1921 

1922 else: 

1923 direction = None 

1924 

1925 self._check_interface_id(intid) 

1926 ifname = self.interfaces[intid][2].get('name', None) 

1927 

1928 return (block[20:20 + caplen][:size], 

1929 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1930 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1931 tshigh=tshigh, 

1932 tslow=tslow, 

1933 wirelen=wirelen, 

1934 ifname=ifname, 

1935 direction=direction, 

1936 process_information=process_information, 

1937 comments=comments)) 

1938 

1939 def _read_block_spb(self, block, size): 

1940 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1941 """Simple Packet Block""" 

1942 # "it MUST be assumed that all the Simple Packet Blocks have 

1943 # been captured on the interface previously specified in the 

1944 # first Interface Description Block." 

1945 intid = 0 

1946 self._check_interface_id(intid) 

1947 

1948 try: 

1949 wirelen, = struct.unpack(self.endian + "I", block[:4]) 

1950 except struct.error: 

1951 warning("PcapNg: SPB is too small %d/4 !" % len(block)) 

1952 raise EOFError 

1953 

1954 caplen = min(wirelen, self.interfaces[intid][1]) 

1955 return (block[4:4 + caplen][:size], 

1956 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1957 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1958 tshigh=None, 

1959 tslow=None, 

1960 wirelen=wirelen, 

1961 ifname=None, 

1962 direction=None, 

1963 process_information={}, 

1964 comments=None)) 

1965 

1966 def _read_block_pkt(self, block, size): 

1967 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1968 """(Obsolete) Packet Block""" 

1969 try: 

1970 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack( 

1971 self.endian + "HH4I", 

1972 block[:20], 

1973 ) 

1974 except struct.error: 

1975 warning("PcapNg: PKT is too small %d/20 !" % len(block)) 

1976 raise EOFError 

1977 

1978 self._check_interface_id(intid) 

1979 return (block[20:20 + caplen][:size], 

1980 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1981 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1982 tshigh=tshigh, 

1983 tslow=tslow, 

1984 wirelen=wirelen, 

1985 ifname=None, 

1986 direction=None, 

1987 process_information={}, 

1988 comments=None)) 

1989 

1990 def _read_block_dsb(self, block, size): 

1991 # type: (bytes, int) -> None 

1992 """Decryption Secrets Block""" 

1993 

1994 # Parse the secrets type and length fields 

1995 try: 

1996 secrets_type, secrets_length = struct.unpack( 

1997 self.endian + "II", 

1998 block[:8], 

1999 ) 

2000 block = block[8:] 

2001 except struct.error: 

2002 warning("PcapNg: DSB is too small %d!", len(block)) 

2003 raise EOFError 

2004 

2005 # Compute the secrets length including the padding 

2006 padded_secrets_length = secrets_length + (-secrets_length) % 4 

2007 if len(block) < padded_secrets_length: 

2008 warning("PcapNg: invalid DSB secrets length!") 

2009 raise EOFError 

2010 

2011 # Extract secrets data and options 

2012 secrets_data = block[:padded_secrets_length][:secrets_length] 

2013 if block[padded_secrets_length:]: 

2014 warning("PcapNg: DSB options are not supported!") 

2015 

2016 # TLS Key Log 

2017 if secrets_type == 0x544c534b: 

2018 if getattr(conf, "tls_sessions", False) is False: 

2019 warning("PcapNg: TLS Key Log available, but " 

2020 "the TLS layer is not loaded! Scapy won't be able " 

2021 "to decrypt the packets.") 

2022 else: 

2023 from scapy.layers.tls.session import load_nss_keys 

2024 

2025 # Write Key Log to a file and parse it 

2026 filename = get_temp_file() 

2027 with open(filename, "wb") as fd: 

2028 fd.write(secrets_data) 

2029 fd.close() 

2030 

2031 keys = load_nss_keys(filename) 

2032 if not keys: 

2033 warning("PcapNg: invalid TLS Key Log in DSB!") 

2034 else: 

2035 # Note: these attributes are only available when the TLS 

2036 # layer is loaded. 

2037 conf.tls_nss_keys = keys 

2038 conf.tls_session_enable = True 

2039 else: 

2040 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type) 

2041 

2042 def _read_block_pib(self, block, _): 

2043 # type: (bytes, int) -> None 

2044 """Apple Process Information Block""" 

2045 

2046 # Get the Process ID 

2047 try: 

2048 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0] 

2049 process_information = {"id": dpeb_pid} 

2050 block = block[4:] 

2051 except struct.error: 

2052 warning("PcapNg: DPEB is too small (%d). Cannot get PID!", 

2053 len(block)) 

2054 raise EOFError 

2055 

2056 # Get Options 

2057 options = self._read_options(block) 

2058 for code, value in options.items(): 

2059 if code == 2: 

2060 process_information["name"] = value.decode( # type: ignore 

2061 "ascii", "backslashreplace") 

2062 elif code == 4: 

2063 if len(value) == 16: 

2064 process_information["uuid"] = str(UUID(bytes=value)) # type: ignore 

2065 else: 

2066 warning("PcapNg: DPEB UUID length is invalid (%d)!", 

2067 len(value)) 

2068 

2069 # Store process information 

2070 self.process_information.append(process_information) 

2071 

2072 

2073class PcapNgReader(RawPcapNgReader, PcapReader): 

2074 

2075 alternative = PcapReader 

2076 

2077 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

2078 # type: (str, IO[bytes], bytes) -> None 

2079 RawPcapNgReader.__init__(self, filename, fdesc, magic) 

2080 

2081 def __enter__(self): 

2082 # type: () -> PcapNgReader 

2083 return self 

2084 

2085 def read_packet(self, size=MTU, **kwargs): 

2086 # type: (int, **Any) -> Packet 

2087 rp = super(PcapNgReader, self)._read_packet(size=size) 

2088 if rp is None: 

2089 raise EOFError 

2090 s, (linktype, tsresol, tshigh, tslow, wirelen, comments, ifname, direction, process_information) = rp # noqa: E501 

2091 try: 

2092 cls = conf.l2types.num2layer[linktype] # type: Type[Packet] 

2093 p = cls(s, **kwargs) # type: Packet 

2094 except KeyboardInterrupt: 

2095 raise 

2096 except Exception: 

2097 if conf.debug_dissector: 

2098 raise 

2099 if conf.raw_layer is None: 

2100 # conf.raw_layer is set on import 

2101 import scapy.packet # noqa: F401 

2102 p = conf.raw_layer(s) 

2103 if tshigh is not None: 

2104 p.time = EDecimal((tshigh << 32) + tslow) / tsresol 

2105 p.wirelen = wirelen 

2106 p.comments = comments 

2107 p.direction = direction 

2108 p.process_information = process_information.copy() 

2109 if ifname is not None: 

2110 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace') 

2111 return p 

2112 

2113 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore 

2114 return self.read_packet(size=size, **kwargs) 

2115 

2116 

2117class GenericPcapWriter(object): 

2118 nano = False 

2119 linktype: int 

2120 

2121 def _write_header(self, pkt): 

2122 # type: (Optional[Union[Packet, bytes]]) -> None 

2123 raise NotImplementedError 

2124 

2125 def _write_packet(self, 

2126 packet, # type: Union[bytes, Packet] 

2127 linktype, # type: int 

2128 sec=None, # type: Optional[float] 

2129 usec=None, # type: Optional[int] 

2130 caplen=None, # type: Optional[int] 

2131 wirelen=None, # type: Optional[int] 

2132 ifname=None, # type: Optional[bytes] 

2133 direction=None, # type: Optional[int] 

2134 comments=None, # type: Optional[List[bytes]] 

2135 ): 

2136 # type: (...) -> None 

2137 raise NotImplementedError 

2138 

2139 def _get_time(self, 

2140 packet, # type: Union[bytes, Packet] 

2141 sec, # type: Optional[float] 

2142 usec # type: Optional[int] 

2143 ): 

2144 # type: (...) -> Tuple[float, int] 

2145 if hasattr(packet, "time"): 

2146 if sec is None: 

2147 packet_time = packet.time 

2148 tmp = int(packet_time) 

2149 usec = int(round((packet_time - tmp) * 

2150 (1000000000 if self.nano else 1000000))) 

2151 sec = float(packet_time) 

2152 if sec is not None and usec is None: 

2153 usec = 0 

2154 return sec, usec # type: ignore 

2155 

2156 def write_header(self, pkt): 

2157 # type: (Optional[Union[Packet, bytes]]) -> None 

2158 if not hasattr(self, 'linktype'): 

2159 try: 

2160 if pkt is None or isinstance(pkt, bytes): 

2161 # Can't guess LL 

2162 raise KeyError 

2163 self.linktype = conf.l2types.layer2num[ 

2164 pkt.__class__ 

2165 ] 

2166 except KeyError: 

2167 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)" 

2168 warning(msg, self.__class__.__name__, pkt.__class__.__name__) 

2169 self.linktype = DLT_EN10MB 

2170 self._write_header(pkt) 

2171 

2172 def write_packet(self, 

2173 packet, # type: Union[bytes, Packet] 

2174 sec=None, # type: Optional[float] 

2175 usec=None, # type: Optional[int] 

2176 caplen=None, # type: Optional[int] 

2177 wirelen=None, # type: Optional[int] 

2178 ): 

2179 # type: (...) -> None 

2180 """ 

2181 Writes a single packet to the pcap file. 

2182 

2183 :param packet: Packet, or bytes for a single packet 

2184 :type packet: scapy.packet.Packet or bytes 

2185 :param sec: time the packet was captured, in seconds since epoch. If 

2186 not supplied, defaults to now. 

2187 :type sec: float 

2188 :param usec: If ``nano=True``, then number of nanoseconds after the 

2189 second that the packet was captured. If ``nano=False``, 

2190 then the number of microseconds after the second the 

2191 packet was captured. If ``sec`` is not specified, 

2192 this value is ignored. 

2193 :type usec: int or long 

2194 :param caplen: The length of the packet in the capture file. If not 

2195 specified, uses ``len(raw(packet))``. 

2196 :type caplen: int 

2197 :param wirelen: The length of the packet on the wire. If not 

2198 specified, tries ``packet.wirelen``, otherwise uses 

2199 ``caplen``. 

2200 :type wirelen: int 

2201 :return: None 

2202 :rtype: None 

2203 """ 

2204 f_sec, usec = self._get_time(packet, sec, usec) 

2205 

2206 rawpkt = bytes_encode(packet) 

2207 caplen = len(rawpkt) if caplen is None else caplen 

2208 

2209 if wirelen is None: 

2210 if hasattr(packet, "wirelen"): 

2211 wirelen = packet.wirelen 

2212 if wirelen is None: 

2213 wirelen = caplen 

2214 

2215 comments = getattr(packet, "comments", None) 

2216 ifname = getattr(packet, "sniffed_on", None) 

2217 direction = getattr(packet, "direction", None) 

2218 if not isinstance(packet, bytes): 

2219 linktype: int = conf.l2types.layer2num[ 

2220 packet.__class__ 

2221 ] 

2222 else: 

2223 linktype = self.linktype 

2224 if ifname is not None: 

2225 ifname = str(ifname).encode('utf-8') 

2226 self._write_packet( 

2227 rawpkt, 

2228 sec=f_sec, usec=usec, 

2229 caplen=caplen, wirelen=wirelen, 

2230 ifname=ifname, 

2231 direction=direction, 

2232 linktype=linktype, 

2233 comments=comments, 

2234 ) 

2235 

2236 

2237class GenericRawPcapWriter(GenericPcapWriter): 

2238 header_present = False 

2239 nano = False 

2240 sync = False 

2241 f = None # type: Union[IO[bytes], gzip.GzipFile] 

2242 

2243 def fileno(self): 

2244 # type: () -> int 

2245 return -1 if WINDOWS else self.f.fileno() 

2246 

2247 def flush(self): 

2248 # type: () -> Optional[Any] 

2249 return self.f.flush() 

2250 

2251 def close(self): 

2252 # type: () -> Optional[Any] 

2253 if not self.header_present: 

2254 self.write_header(None) 

2255 return self.f.close() 

2256 

2257 def __enter__(self): 

2258 # type: () -> GenericRawPcapWriter 

2259 return self 

2260 

2261 def __exit__(self, exc_type, exc_value, tracback): 

2262 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

2263 self.flush() 

2264 self.close() 

2265 

2266 def write(self, pkt): 

2267 # type: (Union[_PacketIterable, bytes]) -> None 

2268 """ 

2269 Writes a Packet, a SndRcvList object, or bytes to a pcap file. 

2270 

2271 :param pkt: Packet(s) to write (one record for each Packet), or raw 

2272 bytes to write (as one record). 

2273 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes 

2274 """ 

2275 if isinstance(pkt, bytes): 

2276 if not self.header_present: 

2277 self.write_header(pkt) 

2278 self.write_packet(pkt) 

2279 else: 

2280 # Import here to avoid circular dependency 

2281 from scapy.supersocket import IterSocket 

2282 for p in IterSocket(pkt).iter: 

2283 if not self.header_present: 

2284 self.write_header(p) 

2285 

2286 if not isinstance(p, bytes) and \ 

2287 self.linktype != conf.l2types.get(type(p), None): 

2288 warning("Inconsistent linktypes detected!" 

2289 " The resulting file might contain" 

2290 " invalid packets." 

2291 ) 

2292 

2293 self.write_packet(p) 

2294 

2295 

2296class RawPcapWriter(GenericRawPcapWriter): 

2297 """A stream PCAP writer with more control than wrpcap()""" 

2298 

2299 def __init__(self, 

2300 filename, # type: Union[IO[bytes], str] 

2301 linktype=None, # type: Optional[int] 

2302 gz=False, # type: bool 

2303 endianness="", # type: str 

2304 append=False, # type: bool 

2305 sync=False, # type: bool 

2306 nano=False, # type: bool 

2307 snaplen=MTU, # type: int 

2308 bufsz=4096, # type: int 

2309 ): 

2310 # type: (...) -> None 

2311 """ 

2312 :param filename: the name of the file to write packets to, or an open, 

2313 writable file-like object. 

2314 :param linktype: force linktype to a given value. If None, linktype is 

2315 taken from the first writer packet 

2316 :param gz: compress the capture on the fly 

2317 :param endianness: force an endianness (little:"<", big:">"). 

2318 Default is native 

2319 :param append: append packets to the capture file instead of 

2320 truncating it 

2321 :param sync: do not bufferize writes to the capture file 

2322 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0) 

2323 

2324 """ 

2325 

2326 if linktype: 

2327 self.linktype = linktype 

2328 self.snaplen = snaplen 

2329 self.append = append 

2330 self.gz = gz 

2331 self.endian = endianness 

2332 self.sync = sync 

2333 self.nano = nano 

2334 if sync: 

2335 bufsz = 0 

2336 

2337 if isinstance(filename, str): 

2338 self.filename = filename 

2339 if gz: 

2340 self.f = cast(_ByteStream, gzip.open( 

2341 filename, append and "ab" or "wb", 9 

2342 )) 

2343 else: 

2344 self.f = open(filename, append and "ab" or "wb", bufsz) 

2345 else: 

2346 self.f = filename 

2347 self.filename = getattr(filename, "name", "No name") 

2348 

2349 def _write_header(self, pkt): 

2350 # type: (Optional[Union[Packet, bytes]]) -> None 

2351 self.header_present = True 

2352 

2353 if self.append: 

2354 # Even if prone to race conditions, this seems to be 

2355 # safest way to tell whether the header is already present 

2356 # because we have to handle compressed streams that 

2357 # are not as flexible as basic files 

2358 if self.gz: 

2359 g = gzip.open(self.filename, "rb") # type: _ByteStream 

2360 else: 

2361 g = open(self.filename, "rb") 

2362 try: 

2363 if g.read(16): 

2364 return 

2365 finally: 

2366 g.close() 

2367 

2368 if not hasattr(self, 'linktype'): 

2369 raise ValueError( 

2370 "linktype could not be guessed. " 

2371 "Please pass a linktype while creating the writer" 

2372 ) 

2373 

2374 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501 

2375 2, 4, 0, 0, self.snaplen, self.linktype)) 

2376 self.f.flush() 

2377 

2378 def _write_packet(self, 

2379 packet, # type: Union[bytes, Packet] 

2380 linktype, # type: int 

2381 sec=None, # type: Optional[float] 

2382 usec=None, # type: Optional[int] 

2383 caplen=None, # type: Optional[int] 

2384 wirelen=None, # type: Optional[int] 

2385 ifname=None, # type: Optional[bytes] 

2386 direction=None, # type: Optional[int] 

2387 comments=None, # type: Optional[List[bytes]] 

2388 ): 

2389 # type: (...) -> None 

2390 """ 

2391 Writes a single packet to the pcap file. 

2392 

2393 :param packet: bytes for a single packet 

2394 :type packet: bytes 

2395 :param linktype: linktype value associated with the packet 

2396 :type linktype: int 

2397 :param sec: time the packet was captured, in seconds since epoch. If 

2398 not supplied, defaults to now. 

2399 :type sec: float 

2400 :param usec: not used with pcapng 

2401 packet was captured 

2402 :type usec: int or long 

2403 :param caplen: The length of the packet in the capture file. If not 

2404 specified, uses ``len(packet)``. 

2405 :type caplen: int 

2406 :param wirelen: The length of the packet on the wire. If not 

2407 specified, uses ``caplen``. 

2408 :type wirelen: int 

2409 :return: None 

2410 :rtype: None 

2411 """ 

2412 if caplen is None: 

2413 caplen = len(packet) 

2414 if wirelen is None: 

2415 wirelen = caplen 

2416 if sec is None or usec is None: 

2417 t = time.time() 

2418 it = int(t) 

2419 if sec is None: 

2420 sec = it 

2421 usec = int(round((t - it) * 

2422 (1000000000 if self.nano else 1000000))) 

2423 elif usec is None: 

2424 usec = 0 

2425 

2426 self.f.write(struct.pack(self.endian + "IIII", 

2427 int(sec), usec, caplen, wirelen)) 

2428 self.f.write(bytes(packet)) 

2429 if self.sync: 

2430 self.f.flush() 

2431 

2432 

2433class RawPcapNgWriter(GenericRawPcapWriter): 

2434 """A stream pcapng writer with more control than wrpcapng()""" 

2435 

2436 def __init__(self, 

2437 filename, # type: str 

2438 ): 

2439 # type: (...) -> None 

2440 

2441 self.header_present = False 

2442 self.tsresol = 1000000 

2443 # A dict to keep if_name to IDB id mapping. 

2444 # unknown if_name(None) id=0 

2445 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0} 

2446 

2447 # tcpdump only support little-endian in PCAPng files 

2448 self.endian = "<" 

2449 self.endian_magic = b"\x4d\x3c\x2b\x1a" 

2450 

2451 self.filename = filename 

2452 self.f = open(filename, "wb", 4096) 

2453 

2454 def _get_time(self, 

2455 packet, # type: Union[bytes, Packet] 

2456 sec, # type: Optional[float] 

2457 usec # type: Optional[int] 

2458 ): 

2459 # type: (...) -> Tuple[float, int] 

2460 if hasattr(packet, "time"): 

2461 if sec is None: 

2462 sec = float(packet.time) 

2463 

2464 if usec is None: 

2465 usec = 0 

2466 

2467 return sec, usec # type: ignore 

2468 

2469 def _add_padding(self, raw_data): 

2470 # type: (bytes) -> bytes 

2471 raw_data += ((-len(raw_data)) % 4) * b"\x00" 

2472 return raw_data 

2473 

2474 def build_block(self, block_type, block_body, options=None): 

2475 # type: (bytes, bytes, Optional[bytes]) -> bytes 

2476 

2477 # Pad Block Body to 32 bits 

2478 block_body = self._add_padding(block_body) 

2479 

2480 if options: 

2481 block_body += options 

2482 

2483 # An empty block is 12 bytes long 

2484 block_total_length = 12 + len(block_body) 

2485 

2486 # Block Type 

2487 block = block_type 

2488 # Block Total Length$ 

2489 block += struct.pack(self.endian + "I", block_total_length) 

2490 # Block Body 

2491 block += block_body 

2492 # Block Total Length$ 

2493 block += struct.pack(self.endian + "I", block_total_length) 

2494 

2495 return block 

2496 

2497 def _write_header(self, pkt): 

2498 # type: (Optional[Union[Packet, bytes]]) -> None 

2499 if not self.header_present: 

2500 self.header_present = True 

2501 self._write_block_shb() 

2502 self._write_block_idb(linktype=self.linktype) 

2503 

2504 def _write_block_shb(self): 

2505 # type: () -> None 

2506 

2507 # Block Type 

2508 block_type = b"\x0A\x0D\x0D\x0A" 

2509 # Byte-Order Magic 

2510 block_shb = self.endian_magic 

2511 # Major Version 

2512 block_shb += struct.pack(self.endian + "H", 1) 

2513 # Minor Version 

2514 block_shb += struct.pack(self.endian + "H", 0) 

2515 # Section Length 

2516 block_shb += struct.pack(self.endian + "q", -1) 

2517 

2518 self.f.write(self.build_block(block_type, block_shb)) 

2519 

2520 def _write_block_idb(self, 

2521 linktype, # type: int 

2522 ifname=None # type: Optional[bytes] 

2523 ): 

2524 # type: (...) -> None 

2525 

2526 # Block Type 

2527 block_type = struct.pack(self.endian + "I", 1) 

2528 # LinkType 

2529 block_idb = struct.pack(self.endian + "H", linktype) 

2530 # Reserved 

2531 block_idb += struct.pack(self.endian + "H", 0) 

2532 # SnapLen 

2533 block_idb += struct.pack(self.endian + "I", 262144) 

2534 

2535 # if_name option 

2536 opts = None 

2537 if ifname is not None: 

2538 opts = struct.pack(self.endian + "HH", 2, len(ifname)) 

2539 # Pad Option Value to 32 bits 

2540 opts += self._add_padding(ifname) 

2541 opts += struct.pack(self.endian + "HH", 0, 0) 

2542 

2543 self.f.write(self.build_block(block_type, block_idb, options=opts)) 

2544 

2545 def _write_block_spb(self, raw_pkt): 

2546 # type: (bytes) -> None 

2547 

2548 # Block Type 

2549 block_type = struct.pack(self.endian + "I", 3) 

2550 # Original Packet Length 

2551 block_spb = struct.pack(self.endian + "I", len(raw_pkt)) 

2552 # Packet Data 

2553 block_spb += raw_pkt 

2554 

2555 self.f.write(self.build_block(block_type, block_spb)) 

2556 

2557 def _write_block_epb(self, 

2558 raw_pkt, # type: bytes 

2559 ifid, # type: int 

2560 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501 

2561 caplen=None, # type: Optional[int] 

2562 orglen=None, # type: Optional[int] 

2563 comments=None, # type: Optional[List[bytes]] 

2564 flags=None, # type: Optional[int] 

2565 ): 

2566 # type: (...) -> None 

2567 

2568 if timestamp: 

2569 tmp_ts = int(timestamp * self.tsresol) 

2570 ts_high = tmp_ts >> 32 

2571 ts_low = tmp_ts & 0xFFFFFFFF 

2572 else: 

2573 ts_high = ts_low = 0 

2574 

2575 if not caplen: 

2576 caplen = len(raw_pkt) 

2577 

2578 if not orglen: 

2579 orglen = len(raw_pkt) 

2580 

2581 # Block Type 

2582 block_type = struct.pack(self.endian + "I", 6) 

2583 # Interface ID 

2584 block_epb = struct.pack(self.endian + "I", ifid) 

2585 # Timestamp (High) 

2586 block_epb += struct.pack(self.endian + "I", ts_high) 

2587 # Timestamp (Low) 

2588 block_epb += struct.pack(self.endian + "I", ts_low) 

2589 # Captured Packet Length 

2590 block_epb += struct.pack(self.endian + "I", caplen) 

2591 # Original Packet Length 

2592 block_epb += struct.pack(self.endian + "I", orglen) 

2593 # Packet Data 

2594 block_epb += raw_pkt 

2595 

2596 # Options 

2597 opts = b'' 

2598 if comments and len(comments): 

2599 for c in comments: 

2600 comment = bytes_encode(c) 

2601 opts += struct.pack(self.endian + "HH", 1, len(comment)) 

2602 # Pad Option Value to 32 bits 

2603 opts += self._add_padding(comment) 

2604 if type(flags) == int: 

2605 opts += struct.pack(self.endian + "HH", 2, 4) 

2606 opts += struct.pack(self.endian + "I", flags) 

2607 if opts: 

2608 opts += struct.pack(self.endian + "HH", 0, 0) 

2609 

2610 self.f.write(self.build_block(block_type, block_epb, 

2611 options=opts)) 

2612 

2613 def _write_packet(self, # type: ignore 

2614 packet, # type: bytes 

2615 linktype, # type: int 

2616 sec=None, # type: Optional[float] 

2617 usec=None, # type: Optional[int] 

2618 caplen=None, # type: Optional[int] 

2619 wirelen=None, # type: Optional[int] 

2620 ifname=None, # type: Optional[bytes] 

2621 direction=None, # type: Optional[int] 

2622 comments=None, # type: Optional[List[bytes]] 

2623 ): 

2624 # type: (...) -> None 

2625 """ 

2626 Writes a single packet to the pcap file. 

2627 

2628 :param packet: bytes for a single packet 

2629 :type packet: bytes 

2630 :param linktype: linktype value associated with the packet 

2631 :type linktype: int 

2632 :param sec: time the packet was captured, in seconds since epoch. If 

2633 not supplied, defaults to now. 

2634 :type sec: float 

2635 :param caplen: The length of the packet in the capture file. If not 

2636 specified, uses ``len(packet)``. 

2637 :type caplen: int 

2638 :param wirelen: The length of the packet on the wire. If not 

2639 specified, uses ``caplen``. 

2640 :type wirelen: int 

2641 :param comment: UTF-8 string containing human-readable comment text 

2642 that is associated to the current block. Line separators 

2643 SHOULD be a carriage-return + linefeed ('\r\n') or 

2644 just linefeed ('\n'); either form may appear and 

2645 be considered a line separator. The string is not 

2646 zero-terminated. 

2647 :type bytes 

2648 :param ifname: UTF-8 string containing the 

2649 name of the device used to capture data. 

2650 The string is not zero-terminated. 

2651 :type bytes 

2652 :param direction: 0 = information not available, 

2653 1 = inbound, 

2654 2 = outbound 

2655 :type int 

2656 :return: None 

2657 :rtype: None 

2658 """ 

2659 if caplen is None: 

2660 caplen = len(packet) 

2661 if wirelen is None: 

2662 wirelen = caplen 

2663 

2664 ifid = self.interfaces2id.get(ifname, None) 

2665 if ifid is None: 

2666 ifid = max(self.interfaces2id.values()) + 1 

2667 self.interfaces2id[ifname] = ifid 

2668 self._write_block_idb(linktype=linktype, ifname=ifname) 

2669 

2670 # EPB flags (32 bits). 

2671 # currently only direction is implemented (least 2 significant bits) 

2672 if type(direction) == int: 

2673 flags = direction & 0x3 

2674 else: 

2675 flags = None 

2676 

2677 self._write_block_epb(packet, timestamp=sec, caplen=caplen, 

2678 orglen=wirelen, comments=comments, ifid=ifid, flags=flags) 

2679 if self.sync: 

2680 self.f.flush() 

2681 

2682 

2683class PcapWriter(RawPcapWriter): 

2684 """A stream PCAP writer with more control than wrpcap()""" 

2685 pass 

2686 

2687 

2688class PcapNgWriter(RawPcapNgWriter): 

2689 """A stream pcapng writer with more control than wrpcapng()""" 

2690 

2691 def _get_time(self, 

2692 packet, # type: Union[bytes, Packet] 

2693 sec, # type: Optional[float] 

2694 usec # type: Optional[int] 

2695 ): 

2696 # type: (...) -> Tuple[float, int] 

2697 if hasattr(packet, "time"): 

2698 if sec is None: 

2699 sec = float(packet.time) 

2700 

2701 if usec is None: 

2702 usec = 0 

2703 

2704 return sec, usec # type: ignore 

2705 

2706 

2707@conf.commands.register 

2708def rderf(filename, count=-1): 

2709 # type: (Union[IO[bytes], str], int) -> PacketList 

2710 """Read a ERF file and return a packet list 

2711 

2712 :param count: read only <count> packets 

2713 """ 

2714 with ERFEthernetReader(filename) as fdesc: 

2715 return fdesc.read_all(count=count) 

2716 

2717 

2718class ERFEthernetReader_metaclass(PcapReader_metaclass): 

2719 def __call__(cls, filename): 

2720 # type: (Union[IO[bytes], str]) -> Any 

2721 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore 

2722 filename, fdesc = cls.open(filename) 

2723 try: 

2724 i.__init__(filename, fdesc) 

2725 return i 

2726 except (Scapy_Exception, EOFError): 

2727 pass 

2728 

2729 if "alternative" in cls.__dict__: 

2730 cls = cls.__dict__["alternative"] 

2731 i = cls.__new__( 

2732 cls, 

2733 cls.__name__, 

2734 cls.__bases__, 

2735 cls.__dict__ # type: ignore 

2736 ) 

2737 try: 

2738 i.__init__(filename, fdesc) 

2739 return i 

2740 except (Scapy_Exception, EOFError): 

2741 pass 

2742 

2743 raise Scapy_Exception("Not a supported capture file") 

2744 

2745 @staticmethod 

2746 def open(fname # type: ignore 

2747 ): 

2748 # type: (...) -> Tuple[str, _ByteStream] 

2749 """Open (if necessary) filename""" 

2750 if isinstance(fname, str): 

2751 filename = fname 

2752 try: 

2753 with gzip.open(filename, "rb") as tmp: 

2754 tmp.read(1) 

2755 fdesc = gzip.open(filename, "rb") # type: _ByteStream 

2756 except IOError: 

2757 fdesc = open(filename, "rb") 

2758 

2759 else: 

2760 fdesc = fname 

2761 filename = getattr(fdesc, "name", "No name") 

2762 return filename, fdesc 

2763 

2764 

2765class ERFEthernetReader(PcapReader, 

2766 metaclass=ERFEthernetReader_metaclass): 

2767 

2768 def __init__(self, filename, fdesc=None): # type: ignore 

2769 # type: (Union[IO[bytes], str], IO[bytes]) -> None 

2770 self.filename = filename # type: ignore 

2771 self.f = fdesc 

2772 self.power = Decimal(10) ** Decimal(-9) 

2773 

2774 # time is in 64-bits Endace's format which can be see here: 

2775 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2776 def _convert_erf_timestamp(self, t): 

2777 # type: (int) -> EDecimal 

2778 sec = t >> 32 

2779 frac_sec = t & 0xffffffff 

2780 frac_sec *= 10**9 

2781 frac_sec += (frac_sec & 0x80000000) << 1 

2782 frac_sec >>= 32 

2783 return EDecimal(sec + self.power * frac_sec) 

2784 

2785 # The details of ERF Packet format can be see here: 

2786 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2787 def read_packet(self, size=MTU, **kwargs): 

2788 # type: (int, **Any) -> Packet 

2789 

2790 # General ERF Header have exactly 16 bytes 

2791 hdr = self.f.read(16) 

2792 if len(hdr) < 16: 

2793 raise EOFError 

2794 

2795 # The timestamp is in little-endian byte-order. 

2796 time = struct.unpack('<Q', hdr[:8])[0] 

2797 # The rest is in big-endian byte-order. 

2798 # Ignoring flags and lctr (loss counter) since they are ERF specific 

2799 # header fields which Packet object does not support. 

2800 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:]) 

2801 # Check if the type != 0x02, type Ethernet 

2802 if type & 0x02 == 0: 

2803 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)") 

2804 

2805 # If there are extended headers, ignore it because Packet object does 

2806 # not support it. Extended headers size is 8 bytes before the payload. 

2807 if type & 0x80: 

2808 _ = self.f.read(8) 

2809 s = self.f.read(rlen - 24) 

2810 else: 

2811 s = self.f.read(rlen - 16) 

2812 

2813 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both 

2814 # of the fields are disregarded by Endace. 

2815 pb = s[2:size] 

2816 from scapy.layers.l2 import Ether 

2817 try: 

2818 p = Ether(pb, **kwargs) # type: Packet 

2819 except KeyboardInterrupt: 

2820 raise 

2821 except Exception: 

2822 if conf.debug_dissector: 

2823 from scapy.sendrecv import debug 

2824 debug.crashed_on = (Ether, s) 

2825 raise 

2826 if conf.raw_layer is None: 

2827 # conf.raw_layer is set on import 

2828 import scapy.packet # noqa: F401 

2829 p = conf.raw_layer(s) 

2830 

2831 p.time = self._convert_erf_timestamp(time) 

2832 p.wirelen = wlen 

2833 

2834 return p 

2835 

2836 

2837@conf.commands.register 

2838def wrerf(filename, # type: Union[IO[bytes], str] 

2839 pkt, # type: _PacketIterable 

2840 *args, # type: Any 

2841 **kargs # type: Any 

2842 ): 

2843 # type: (...) -> None 

2844 """Write a list of packets to a ERF file 

2845 

2846 :param filename: the name of the file to write packets to, or an open, 

2847 writable file-like object. The file descriptor will be 

2848 closed at the end of the call, so do not use an object you 

2849 do not want to close (e.g., running wrerf(sys.stdout, []) 

2850 in interactive mode will crash Scapy). 

2851 :param gz: set to 1 to save a gzipped capture 

2852 :param append: append packets to the capture file instead of 

2853 truncating it 

2854 :param sync: do not bufferize writes to the capture file 

2855 """ 

2856 with ERFEthernetWriter(filename, *args, **kargs) as fdesc: 

2857 fdesc.write(pkt) 

2858 

2859 

2860class ERFEthernetWriter(PcapWriter): 

2861 """A stream ERF Ethernet writer with more control than wrerf()""" 

2862 

2863 def __init__(self, 

2864 filename, # type: Union[IO[bytes], str] 

2865 gz=False, # type: bool 

2866 append=False, # type: bool 

2867 sync=False, # type: bool 

2868 ): 

2869 # type: (...) -> None 

2870 """ 

2871 :param filename: the name of the file to write packets to, or an open, 

2872 writable file-like object. 

2873 :param gz: compress the capture on the fly 

2874 :param append: append packets to the capture file instead of 

2875 truncating it 

2876 :param sync: do not bufferize writes to the capture file 

2877 """ 

2878 super(ERFEthernetWriter, self).__init__(filename, 

2879 gz=gz, 

2880 append=append, 

2881 sync=sync) 

2882 

2883 def write(self, pkt): # type: ignore 

2884 # type: (_PacketIterable) -> None 

2885 """ 

2886 Writes a Packet, a SndRcvList object, or bytes to a ERF file. 

2887 

2888 :param pkt: Packet(s) to write (one record for each Packet) 

2889 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet 

2890 """ 

2891 # Import here to avoid circular dependency 

2892 from scapy.supersocket import IterSocket 

2893 for p in IterSocket(pkt).iter: 

2894 self.write_packet(p) 

2895 

2896 def write_packet(self, pkt): # type: ignore 

2897 # type: (Packet) -> None 

2898 

2899 if hasattr(pkt, "time"): 

2900 sec = int(pkt.time) 

2901 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9) 

2902 t = (sec << 32) + usec 

2903 else: 

2904 t = int(time.time()) << 32 

2905 

2906 # There are 16 bytes of headers + 2 bytes of padding before the packets 

2907 # payload. 

2908 rlen = len(pkt) + 18 

2909 

2910 if hasattr(pkt, "wirelen"): 

2911 wirelen = pkt.wirelen 

2912 if wirelen is None: 

2913 wirelen = rlen 

2914 

2915 self.f.write(struct.pack("<Q", t)) 

2916 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0)) 

2917 self.f.write(bytes(pkt)) 

2918 self.f.flush() 

2919 

2920 def close(self): 

2921 # type: () -> Optional[Any] 

2922 return self.f.close() 

2923 

2924 

2925@conf.commands.register 

2926def import_hexcap(input_string=None): 

2927 # type: (Optional[str]) -> bytes 

2928 """Imports a tcpdump like hexadecimal view 

2929 

2930 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex" 

2931 

2932 :param input_string: String containing the hexdump input to parse. If None, 

2933 read from standard input. 

2934 """ 

2935 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501 

2936 p = "" 

2937 try: 

2938 if input_string: 

2939 input_function = StringIO(input_string).readline 

2940 else: 

2941 input_function = input 

2942 while True: 

2943 line = input_function().strip() 

2944 if not line: 

2945 break 

2946 try: 

2947 p += re_extract_hexcap.match(line).groups()[2] # type: ignore 

2948 except Exception: 

2949 warning("Parsing error during hexcap") 

2950 continue 

2951 except EOFError: 

2952 pass 

2953 

2954 p = p.replace(" ", "") 

2955 return hex_bytes(p) 

2956 

2957 

2958@conf.commands.register 

2959def wireshark(pktlist, wait=False, **kwargs): 

2960 # type: (List[Packet], bool, **Any) -> Optional[Any] 

2961 """ 

2962 Runs Wireshark on a list of packets. 

2963 

2964 See :func:`tcpdump` for more parameter description. 

2965 

2966 Note: this defaults to wait=False, to run Wireshark in the background. 

2967 """ 

2968 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs) 

2969 

2970 

2971@conf.commands.register 

2972def tdecode( 

2973 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable] 

2974 args=None, # type: Optional[List[str]] 

2975 **kwargs # type: Any 

2976): 

2977 # type: (...) -> Any 

2978 """ 

2979 Run tshark on a list of packets. 

2980 

2981 :param args: If not specified, defaults to ``tshark -V``. 

2982 

2983 See :func:`tcpdump` for more parameters. 

2984 """ 

2985 if args is None: 

2986 args = ["-V"] 

2987 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs) 

2988 

2989 

2990def _guess_linktype_name(value): 

2991 # type: (int) -> str 

2992 """Guess the DLT name from its value.""" 

2993 from scapy.libs.winpcapy import pcap_datalink_val_to_name 

2994 return cast(bytes, pcap_datalink_val_to_name(value)).decode() 

2995 

2996 

2997def _guess_linktype_value(name): 

2998 # type: (str) -> int 

2999 """Guess the value of a DLT name.""" 

3000 from scapy.libs.winpcapy import pcap_datalink_name_to_val 

3001 val = cast(int, pcap_datalink_name_to_val(name.encode())) 

3002 if val == -1: 

3003 warning("Unknown linktype: %s. Using EN10MB", name) 

3004 return DLT_EN10MB 

3005 return val 

3006 

3007 

3008@conf.commands.register 

3009def tcpdump( 

3010 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable] 

3011 dump=False, # type: bool 

3012 getfd=False, # type: bool 

3013 args=None, # type: Optional[List[str]] 

3014 flt=None, # type: Optional[str] 

3015 prog=None, # type: Optional[Any] 

3016 getproc=False, # type: bool 

3017 quiet=False, # type: bool 

3018 use_tempfile=None, # type: Optional[Any] 

3019 read_stdin_opts=None, # type: Optional[Any] 

3020 linktype=None, # type: Optional[Any] 

3021 wait=True, # type: bool 

3022 _suppress=False # type: bool 

3023): 

3024 # type: (...) -> Any 

3025 """Run tcpdump or tshark on a list of packets. 

3026 

3027 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a 

3028 temporary file to store the packets. This works around a bug in Apple's 

3029 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/ 

3030 

3031 Otherwise, the packets are passed in stdin. 

3032 

3033 This function can be explicitly enabled or disabled with the 

3034 ``use_tempfile`` parameter. 

3035 

3036 When using ``wireshark``, it will be called with ``-ki -`` to start 

3037 immediately capturing packets from stdin. 

3038 

3039 Otherwise, the command will be run with ``-r -`` (which is correct for 

3040 ``tcpdump`` and ``tshark``). 

3041 

3042 This can be overridden with ``read_stdin_opts``. This has no effect when 

3043 ``use_tempfile=True``, or otherwise reading packets from a regular file. 

3044 

3045 :param pktlist: a Packet instance, a PacketList instance or a list of 

3046 Packet instances. Can also be a filename (as a string), an open 

3047 file-like object that must be a file format readable by 

3048 tshark (Pcap, PcapNg, etc.) or None (to sniff) 

3049 :param flt: a filter to use with tcpdump 

3050 :param dump: when set to True, returns a string instead of displaying it. 

3051 :param getfd: when set to True, returns a file-like object to read data 

3052 from tcpdump or tshark from. 

3053 :param getproc: when set to True, the subprocess.Popen object is returned 

3054 :param args: arguments (as a list) to pass to tshark (example for tshark: 

3055 args=["-T", "json"]). 

3056 :param prog: program to use (defaults to tcpdump, will work with tshark) 

3057 :param quiet: when set to True, the process stderr is discarded 

3058 :param use_tempfile: When set to True, always use a temporary file to store 

3059 packets. 

3060 When set to False, pipe packets through stdin. 

3061 When set to None (default), only use a temporary file with 

3062 ``tcpdump`` on OSX. 

3063 :param read_stdin_opts: When set, a list of arguments needed to capture 

3064 from stdin. Otherwise, attempts to guess. 

3065 :param linktype: A custom DLT value or name, to overwrite the default 

3066 values. 

3067 :param wait: If True (default), waits for the process to terminate before 

3068 returning to Scapy. If False, the process will be detached to the 

3069 background. If dump, getproc or getfd is True, these have the same 

3070 effect as ``wait=False``. 

3071 

3072 Examples:: 

3073 

3074 >>> tcpdump([IP()/TCP(), IP()/UDP()]) 

3075 reading from file -, link-type RAW (Raw IP) 

3076 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501 

3077 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain] 

3078 

3079 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark) 

3080 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501 

3081 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0 

3082 

3083 To get a JSON representation of a tshark-parsed PacketList(), one can:: 

3084 

3085 >>> import json, pprint 

3086 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5", 

3087 ... dst="45.33.32.156"), 

3088 ... prog=conf.prog.tshark, 

3089 ... args=["-T", "json"], 

3090 ... getfd=True)) 

3091 >>> pprint.pprint(json_data) 

3092 [{u'_index': u'packets-2016-12-23', 

3093 u'_score': None, 

3094 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20', 

3095 u'frame.encap_type': u'7', 

3096 [...] 

3097 }, 

3098 u'ip': {u'ip.addr': u'45.33.32.156', 

3099 u'ip.checksum': u'0x0000a20d', 

3100 [...] 

3101 u'ip.ttl': u'64', 

3102 u'ip.version': u'4'}, 

3103 u'raw': u'Raw packet data'}}, 

3104 u'_type': u'pcap_file'}] 

3105 >>> json_data[0]['_source']['layers']['ip']['ip.ttl'] 

3106 u'64' 

3107 """ 

3108 getfd = getfd or getproc 

3109 if prog is None: 

3110 if not conf.prog.tcpdump: 

3111 raise Scapy_Exception( 

3112 "tcpdump is not available" 

3113 ) 

3114 prog = [conf.prog.tcpdump] 

3115 elif isinstance(prog, str): 

3116 prog = [prog] 

3117 else: 

3118 raise ValueError("prog must be a string") 

3119 

3120 if linktype is not None: 

3121 if isinstance(linktype, int): 

3122 # Guess name from value 

3123 try: 

3124 linktype_name = _guess_linktype_name(linktype) 

3125 except StopIteration: 

3126 linktype = -1 

3127 else: 

3128 # Guess value from name 

3129 if linktype.startswith("DLT_"): 

3130 linktype = linktype[4:] 

3131 linktype_name = linktype 

3132 try: 

3133 linktype = _guess_linktype_value(linktype) 

3134 except KeyError: 

3135 linktype = -1 

3136 if linktype == -1: 

3137 raise ValueError( 

3138 "Unknown linktype. Try passing its datalink name instead" 

3139 ) 

3140 prog += ["-y", linktype_name] 

3141 

3142 # Build Popen arguments 

3143 if args is None: 

3144 args = [] 

3145 else: 

3146 # Make a copy of args 

3147 args = list(args) 

3148 

3149 if flt is not None: 

3150 # Check the validity of the filter 

3151 if linktype is None and isinstance(pktlist, str): 

3152 # linktype is unknown but required. Read it from file 

3153 with PcapReader(pktlist) as rd: 

3154 if isinstance(rd, PcapNgReader): 

3155 # Get the linktype from the first packet 

3156 try: 

3157 _, metadata = rd._read_packet() 

3158 linktype = metadata.linktype 

3159 if OPENBSD and linktype == 228: 

3160 linktype = DLT_RAW 

3161 except EOFError: 

3162 raise ValueError( 

3163 "Cannot get linktype from a PcapNg packet." 

3164 ) 

3165 else: 

3166 linktype = rd.linktype 

3167 from scapy.arch.common import compile_filter 

3168 compile_filter(flt, linktype=linktype) 

3169 args.append(flt) 

3170 

3171 stdout = subprocess.PIPE if dump or getfd else None 

3172 stderr = open(os.devnull) if quiet else None 

3173 proc = None 

3174 

3175 if use_tempfile is None: 

3176 # Apple's tcpdump cannot read from stdin, see: 

3177 # http://apple.stackexchange.com/questions/152682/ 

3178 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump 

3179 

3180 if read_stdin_opts is None: 

3181 if prog[0] == conf.prog.wireshark: 

3182 # Start capturing immediately (-k) from stdin (-i -) 

3183 read_stdin_opts = ["-ki", "-"] 

3184 elif prog[0] == conf.prog.tcpdump and not OPENBSD: 

3185 # Capture in packet-buffered mode (-U) from stdin (-r -) 

3186 read_stdin_opts = ["-U", "-r", "-"] 

3187 else: 

3188 read_stdin_opts = ["-r", "-"] 

3189 else: 

3190 # Make a copy of read_stdin_opts 

3191 read_stdin_opts = list(read_stdin_opts) 

3192 

3193 if pktlist is None: 

3194 # sniff 

3195 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3196 proc = subprocess.Popen( 

3197 prog + args, 

3198 stdout=stdout, 

3199 stderr=stderr, 

3200 ) 

3201 elif isinstance(pktlist, str): 

3202 # file 

3203 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3204 proc = subprocess.Popen( 

3205 prog + ["-r", pktlist] + args, 

3206 stdout=stdout, 

3207 stderr=stderr, 

3208 ) 

3209 elif use_tempfile: 

3210 tmpfile = get_temp_file( # type: ignore 

3211 autoext=".pcap", 

3212 fd=True 

3213 ) # type: IO[bytes] 

3214 try: 

3215 tmpfile.writelines( 

3216 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3217 ) 

3218 except AttributeError: 

3219 pktlist = cast("_PacketIterable", pktlist) 

3220 wrpcap(tmpfile, pktlist, linktype=linktype) 

3221 else: 

3222 tmpfile.close() 

3223 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3224 proc = subprocess.Popen( 

3225 prog + ["-r", tmpfile.name] + args, 

3226 stdout=stdout, 

3227 stderr=stderr, 

3228 ) 

3229 else: 

3230 try: 

3231 pktlist.fileno() # type: ignore 

3232 # pass the packet stream 

3233 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3234 proc = subprocess.Popen( 

3235 prog + read_stdin_opts + args, 

3236 stdin=pktlist, # type: ignore 

3237 stdout=stdout, 

3238 stderr=stderr, 

3239 ) 

3240 except (AttributeError, ValueError): 

3241 # write the packet stream to stdin 

3242 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3243 proc = subprocess.Popen( 

3244 prog + read_stdin_opts + args, 

3245 stdin=subprocess.PIPE, 

3246 stdout=stdout, 

3247 stderr=stderr, 

3248 ) 

3249 if proc is None: 

3250 # An error has occurred 

3251 return 

3252 try: 

3253 proc.stdin.writelines( # type: ignore 

3254 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3255 ) 

3256 except AttributeError: 

3257 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore 

3258 except UnboundLocalError: 

3259 # The error was handled by ContextManagerSubprocess 

3260 pass 

3261 else: 

3262 proc.stdin.close() # type: ignore 

3263 if proc is None: 

3264 # An error has occurred 

3265 return 

3266 if dump: 

3267 data = b"".join( 

3268 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore 

3269 ) 

3270 proc.terminate() 

3271 return data 

3272 if getproc: 

3273 return proc 

3274 if getfd: 

3275 return proc.stdout 

3276 if wait: 

3277 proc.wait() 

3278 

3279 

3280@conf.commands.register 

3281def hexedit(pktlist): 

3282 # type: (_PacketIterable) -> PacketList 

3283 """Run hexedit on a list of packets, then return the edited packets.""" 

3284 f = get_temp_file() 

3285 wrpcap(f, pktlist) 

3286 with ContextManagerSubprocess(conf.prog.hexedit): 

3287 subprocess.call([conf.prog.hexedit, f]) 

3288 rpktlist = rdpcap(f) 

3289 os.unlink(f) 

3290 return rpktlist 

3291 

3292 

3293def get_terminal_width(): 

3294 # type: () -> Optional[int] 

3295 """Get terminal width (number of characters) if in a window. 

3296 

3297 Notice: this will try several methods in order to 

3298 support as many terminals and OS as possible. 

3299 """ 

3300 sizex = shutil.get_terminal_size(fallback=(0, 0))[0] 

3301 if sizex != 0: 

3302 return sizex 

3303 # Backups 

3304 if WINDOWS: 

3305 from ctypes import windll, create_string_buffer 

3306 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ 

3307 h = windll.kernel32.GetStdHandle(-12) 

3308 csbi = create_string_buffer(22) 

3309 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 

3310 if res: 

3311 (bufx, bufy, curx, cury, wattr, 

3312 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501 

3313 sizex = right - left + 1 

3314 # sizey = bottom - top + 1 

3315 return sizex 

3316 return sizex 

3317 # We have various methods 

3318 # COLUMNS is set on some terminals 

3319 try: 

3320 sizex = int(os.environ['COLUMNS']) 

3321 except Exception: 

3322 pass 

3323 if sizex: 

3324 return sizex 

3325 # We can query TIOCGWINSZ 

3326 try: 

3327 import fcntl 

3328 import termios 

3329 s = struct.pack('HHHH', 0, 0, 0, 0) 

3330 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) 

3331 sizex = struct.unpack('HHHH', x)[1] 

3332 except (IOError, ModuleNotFoundError): 

3333 # If everything failed, return default terminal size 

3334 sizex = 79 

3335 return sizex 

3336 

3337 

3338def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]] 

3339 header, # type: List[Tuple[str, ...]] 

3340 sortBy=0, # type: Optional[int] 

3341 borders=False, # type: bool 

3342 ): 

3343 # type: (...) -> str 

3344 """ 

3345 Pretty list to fit the terminal, and add header. 

3346 

3347 :param rtlst: a list of tuples. each tuple contains a value which can 

3348 be either a string or a list of string. 

3349 :param sortBy: the column id (starting with 0) which will be used for 

3350 ordering 

3351 :param borders: whether to put borders on the table or not 

3352 """ 

3353 if borders: 

3354 _space = "|" 

3355 else: 

3356 _space = " " 

3357 cols = len(header[0]) 

3358 # Windows has a fat terminal border 

3359 _spacelen = len(_space) * (cols - 1) + int(WINDOWS) 

3360 _croped = False 

3361 if sortBy is not None: 

3362 # Sort correctly 

3363 rtlst.sort(key=lambda x: x[sortBy]) 

3364 # Resolve multi-values 

3365 for i, line in enumerate(rtlst): 

3366 ids = [] # type: List[int] 

3367 values = [] # type: List[Union[str, List[str]]] 

3368 for j, val in enumerate(line): 

3369 if isinstance(val, list): 

3370 ids.append(j) 

3371 values.append(val or " ") 

3372 if values: 

3373 del rtlst[i] 

3374 k = 0 

3375 for ex_vals in zip_longest(*values, fillvalue=" "): 

3376 if k: 

3377 extra_line = [" "] * cols 

3378 else: 

3379 extra_line = list(line) # type: ignore 

3380 for j, h in enumerate(ids): 

3381 extra_line[h] = ex_vals[j] 

3382 rtlst.insert(i + k, tuple(extra_line)) 

3383 k += 1 

3384 rtslst = cast(List[Tuple[str, ...]], rtlst) 

3385 # Append tag 

3386 rtslst = header + rtslst 

3387 # Detect column's width 

3388 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)] 

3389 # Make text fit in box (if required) 

3390 width = get_terminal_width() 

3391 if conf.auto_crop_tables and width: 

3392 width = width - _spacelen 

3393 while sum(colwidth) > width: 

3394 _croped = True 

3395 # Needs to be cropped 

3396 # Get the longest row 

3397 i = colwidth.index(max(colwidth)) 

3398 # Get all elements of this row 

3399 row = [len(x[i]) for x in rtslst] 

3400 # Get biggest element of this row: biggest of the array 

3401 j = row.index(max(row)) 

3402 # Re-build column tuple with the edited element 

3403 t = list(rtslst[j]) 

3404 t[i] = t[i][:-2] + "_" 

3405 rtslst[j] = tuple(t) 

3406 # Update max size 

3407 row[j] = len(t[i]) 

3408 colwidth[i] = max(row) 

3409 if _croped: 

3410 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501 

3411 # Generate padding scheme 

3412 fmt = _space.join(["%%-%ds" % x for x in colwidth]) 

3413 # Append separation line if needed 

3414 if borders: 

3415 rtslst.insert(1, tuple("-" * x for x in colwidth)) 

3416 # Compile 

3417 return "\n".join(fmt % x for x in rtslst) 

3418 

3419 

3420def human_size(x, fmt=".1f"): 

3421 # type: (int, str) -> str 

3422 """ 

3423 Convert a size in octets to a human string representation 

3424 """ 

3425 units = ['K', 'M', 'G', 'T', 'P', 'E'] 

3426 if not x: 

3427 return "0B" 

3428 i = int(math.log(x, 2**10)) 

3429 if i and i < len(units): 

3430 return format(x / 2**(10 * i), fmt) + units[i - 1] 

3431 return str(x) + "B" 

3432 

3433 

3434def __make_table( 

3435 yfmtfunc, # type: Callable[[int], str] 

3436 fmtfunc, # type: Callable[[int], str] 

3437 endline, # type: str 

3438 data, # type: List[Tuple[Packet, Packet]] 

3439 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]] 

3440 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3441 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3442 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]] 

3443 dump=False # type: bool 

3444): 

3445 # type: (...) -> Optional[str] 

3446 """Core function of the make_table suite, which generates the table""" 

3447 vx = {} # type: Dict[str, int] 

3448 vy = {} # type: Dict[str, Optional[int]] 

3449 vz = {} # type: Dict[Tuple[str, str], str] 

3450 vxf = {} # type: Dict[str, str] 

3451 

3452 tmp_len = 0 

3453 for e in data: 

3454 xx, yy, zz = [str(s) for s in fxyz(*e)] 

3455 tmp_len = max(len(yy), tmp_len) 

3456 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz)) 

3457 vy[yy] = None 

3458 vz[(xx, yy)] = zz 

3459 

3460 vxk = list(vx) 

3461 vyk = list(vy) 

3462 if sortx: 

3463 vxk.sort(key=sortx) 

3464 else: 

3465 try: 

3466 vxk.sort(key=int) 

3467 except Exception: 

3468 try: 

3469 vxk.sort(key=atol) 

3470 except Exception: 

3471 vxk.sort() 

3472 if sorty: 

3473 vyk.sort(key=sorty) 

3474 else: 

3475 try: 

3476 vyk.sort(key=int) 

3477 except Exception: 

3478 try: 

3479 vyk.sort(key=atol) 

3480 except Exception: 

3481 vyk.sort() 

3482 

3483 s = "" 

3484 if seplinefunc: 

3485 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk]) 

3486 s += sepline + "\n" 

3487 

3488 fmt = yfmtfunc(tmp_len) 

3489 s += fmt % "" 

3490 s += ' ' 

3491 for x in vxk: 

3492 vxf[x] = fmtfunc(vx[x]) 

3493 s += vxf[x] % x 

3494 s += ' ' 

3495 s += endline + "\n" 

3496 if seplinefunc: 

3497 s += sepline + "\n" 

3498 for y in vyk: 

3499 s += fmt % y 

3500 s += ' ' 

3501 for x in vxk: 

3502 s += vxf[x] % vz.get((x, y), "-") 

3503 s += ' ' 

3504 s += endline + "\n" 

3505 if seplinefunc: 

3506 s += sepline + "\n" 

3507 

3508 if dump: 

3509 return s 

3510 else: 

3511 print(s, end="") 

3512 return None 

3513 

3514 

3515def make_table(*args, **kargs): 

3516 # type: (*Any, **Any) -> Optional[Any] 

3517 return __make_table( 

3518 lambda l: "%%-%is" % l, 

3519 lambda l: "%%-%is" % l, 

3520 "", 

3521 *args, 

3522 **kargs 

3523 ) 

3524 

3525 

3526def make_lined_table(*args, **kargs): 

3527 # type: (*Any, **Any) -> Optional[str] 

3528 return __make_table( # type: ignore 

3529 lambda l: "%%-%is |" % l, 

3530 lambda l: "%%-%is |" % l, 

3531 "", 

3532 *args, 

3533 seplinefunc=lambda a, x: "+".join( 

3534 '-' * (y + 2) for y in [a - 1] + x + [-2] 

3535 ), 

3536 **kargs 

3537 ) 

3538 

3539 

3540def make_tex_table(*args, **kargs): 

3541 # type: (*Any, **Any) -> Optional[str] 

3542 return __make_table( # type: ignore 

3543 lambda l: "%s", 

3544 lambda l: "& %s", 

3545 "\\\\", 

3546 *args, 

3547 seplinefunc=lambda a, x: "\\hline", 

3548 **kargs 

3549 ) 

3550 

3551#################### 

3552# WHOIS CLIENT # 

3553#################### 

3554 

3555 

3556def whois(ip_address): 

3557 # type: (str) -> bytes 

3558 """Whois client for Python""" 

3559 whois_ip = str(ip_address) 

3560 try: 

3561 query = socket.gethostbyname(whois_ip) 

3562 except Exception: 

3563 query = whois_ip 

3564 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

3565 s.connect(("whois.ripe.net", 43)) 

3566 s.send(query.encode("utf8") + b"\r\n") 

3567 answer = b"" 

3568 while True: 

3569 d = s.recv(4096) 

3570 answer += d 

3571 if not d: 

3572 break 

3573 s.close() 

3574 ignore_tag = b"remarks:" 

3575 # ignore all lines starting with the ignore_tag 

3576 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501 

3577 # remove empty lines at the bottom 

3578 for i in range(1, len(lines)): 

3579 if not lines[-i].strip(): 

3580 del lines[-i] 

3581 else: 

3582 break 

3583 return b"\n".join(lines[3:]) 

3584 

3585#################### 

3586# CLI utils # 

3587#################### 

3588 

3589 

3590class _CLIUtilMetaclass(type): 

3591 class TYPE(enum.Enum): 

3592 COMMAND = 0 

3593 OUTPUT = 1 

3594 COMPLETE = 2 

3595 

3596 def __new__(cls, # type: Type[_CLIUtilMetaclass] 

3597 name, # type: str 

3598 bases, # type: Tuple[type, ...] 

3599 dct # type: Dict[str, Any] 

3600 ): 

3601 # type: (...) -> Type[CLIUtil] 

3602 dct["commands"] = { 

3603 x.__name__: x 

3604 for x in dct.values() 

3605 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMMAND 

3606 } 

3607 dct["commands_output"] = { 

3608 x.cliutil_ref.__name__: x 

3609 for x in dct.values() 

3610 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.OUTPUT 

3611 } 

3612 dct["commands_complete"] = { 

3613 x.cliutil_ref.__name__: x 

3614 for x in dct.values() 

3615 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMPLETE 

3616 } 

3617 newcls = cast(Type['CLIUtil'], type.__new__(cls, name, bases, dct)) 

3618 return newcls 

3619 

3620 

3621class CLIUtil(metaclass=_CLIUtilMetaclass): 

3622 """ 

3623 Provides a Util class to easily create simple CLI tools in Scapy, 

3624 that can still be used as an API. 

3625 

3626 Doc: 

3627 - override the ps1() function 

3628 - register commands with the @CLIUtil.addcomment decorator 

3629 - call the loop() function when ready 

3630 """ 

3631 

3632 def _depcheck(self) -> None: 

3633 """ 

3634 Check that all dependencies are installed 

3635 """ 

3636 try: 

3637 import prompt_toolkit # noqa: F401 

3638 except ImportError: 

3639 # okay we lie but prompt_toolkit is a dependency... 

3640 raise ImportError("You need to have IPython installed to use the CLI") 

3641 

3642 # Okay let's do nice code 

3643 commands: Dict[str, Callable[..., Any]] = {} 

3644 # print output of command 

3645 commands_output: Dict[str, Callable[..., str]] = {} 

3646 # provides completion to command 

3647 commands_complete: Dict[str, Callable[..., List[str]]] = {} 

3648 

3649 def __init__(self, cli: bool = True, debug: bool = False) -> None: 

3650 """ 

3651 DEV: overwrite 

3652 """ 

3653 if cli: 

3654 self._depcheck() 

3655 self.loop(debug=debug) 

3656 

3657 @staticmethod 

3658 def _inspectkwargs(func: DecoratorCallable) -> None: 

3659 """ 

3660 Internal function to parse arguments from the kwargs of the functions 

3661 """ 

3662 func._flagnames = [ # type: ignore 

3663 x.name for x in 

3664 inspect.signature(func).parameters.values() 

3665 if x.kind == inspect.Parameter.KEYWORD_ONLY 

3666 ] 

3667 func._flags = [ # type: ignore 

3668 ("-%s" % x) if len(x) == 1 else ("--%s" % x) 

3669 for x in func._flagnames # type: ignore 

3670 ] 

3671 

3672 @staticmethod 

3673 def _parsekwargs( 

3674 func: DecoratorCallable, 

3675 args: List[str] 

3676 ) -> Tuple[List[str], Dict[str, Literal[True]]]: 

3677 """ 

3678 Internal function to parse CLI arguments of a function. 

3679 """ 

3680 kwargs: Dict[str, Literal[True]] = {} 

3681 if func._flags: # type: ignore 

3682 i = 0 

3683 for arg in args: 

3684 if arg in func._flags: # type: ignore 

3685 i += 1 

3686 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501 

3687 continue 

3688 break 

3689 args = args[i:] 

3690 return args, kwargs 

3691 

3692 @classmethod 

3693 def _parseallargs( 

3694 cls, 

3695 func: DecoratorCallable, 

3696 cmd: str, args: List[str] 

3697 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]: 

3698 """ 

3699 Internal function to parse CLI arguments of both the function 

3700 and its output function. 

3701 """ 

3702 args, kwargs = cls._parsekwargs(func, args) 

3703 outkwargs: Dict[str, Literal[True]] = {} 

3704 if cmd in cls.commands_output: 

3705 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args) 

3706 return args, kwargs, outkwargs 

3707 

3708 @classmethod 

3709 def addcommand( 

3710 cls, 

3711 spaces: bool = False, 

3712 globsupport: bool = False, 

3713 ) -> Callable[[DecoratorCallable], DecoratorCallable]: 

3714 """ 

3715 Decorator to register a command 

3716 """ 

3717 def func(cmd: DecoratorCallable) -> DecoratorCallable: 

3718 cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore 

3719 cmd._spaces = spaces # type: ignore 

3720 cmd._globsupport = globsupport # type: ignore 

3721 cls._inspectkwargs(cmd) 

3722 if cmd._globsupport and not cmd._spaces: # type: ignore 

3723 raise ValueError("Cannot use globsupport without spaces.") 

3724 return cmd 

3725 return func 

3726 

3727 @classmethod 

3728 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3729 """ 

3730 Decorator to register a command output processor 

3731 """ 

3732 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3733 processor.cliutil_type = _CLIUtilMetaclass.TYPE.OUTPUT # type: ignore 

3734 processor.cliutil_ref = cmd # type: ignore 

3735 cls._inspectkwargs(processor) 

3736 return processor 

3737 return func 

3738 

3739 @classmethod 

3740 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3741 """ 

3742 Decorator to register a command completor 

3743 """ 

3744 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3745 processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore 

3746 processor.cliutil_ref = cmd # type: ignore 

3747 return processor 

3748 return func 

3749 

3750 def ps1(self) -> str: 

3751 """ 

3752 Return the PS1 of the shell 

3753 """ 

3754 return "> " 

3755 

3756 def close(self) -> None: 

3757 """ 

3758 Function called on exiting 

3759 """ 

3760 print("Exited") 

3761 

3762 def help(self, cmd: Optional[str] = None) -> None: 

3763 """ 

3764 Return the help related to this CLI util 

3765 """ 

3766 def _args(func: Any) -> str: 

3767 flags = func._flags.copy() 

3768 if func.__name__ in self.commands_output: 

3769 flags += self.commands_output[func.__name__]._flags # type: ignore 

3770 return " %s%s" % ( 

3771 ( 

3772 "%s " % " ".join("[%s]" % x for x in flags) 

3773 if flags else "" 

3774 ), 

3775 " ".join( 

3776 "<%s%s>" % ( 

3777 x.name, 

3778 "?" if 

3779 (x.default is None or x.default != inspect.Parameter.empty) 

3780 else "" 

3781 ) 

3782 for x in list(inspect.signature(func).parameters.values())[1:] 

3783 if x.name not in func._flagnames and x.name[0] != "_" 

3784 ) 

3785 ) 

3786 

3787 if cmd: 

3788 if cmd not in self.commands: 

3789 print("Unknown command '%s'" % cmd) 

3790 return 

3791 # help for one command 

3792 func = self.commands[cmd] 

3793 print("%s%s: %s" % ( 

3794 cmd, 

3795 _args(func), 

3796 func.__doc__ and func.__doc__.strip() 

3797 )) 

3798 else: 

3799 header = "│ %s - Help │" % self.__class__.__name__ 

3800 print("┌" + "─" * (len(header) - 2) + "┐") 

3801 print(header) 

3802 print("└" + "─" * (len(header) - 2) + "┘") 

3803 print( 

3804 pretty_list( 

3805 [ 

3806 ( 

3807 cmd, 

3808 _args(func), 

3809 func.__doc__ and func.__doc__.strip().split("\n")[0] or "" 

3810 ) 

3811 for cmd, func in self.commands.items() 

3812 ], 

3813 [("Command", "Arguments", "Description")] 

3814 ) 

3815 ) 

3816 

3817 def _completer(self) -> 'prompt_toolkit.completion.Completer': 

3818 """ 

3819 Returns a prompt_toolkit custom completer 

3820 """ 

3821 from prompt_toolkit.completion import Completer, Completion 

3822 

3823 class CLICompleter(Completer): 

3824 def get_completions(cmpl, document, complete_event): # type: ignore 

3825 if not complete_event.completion_requested: 

3826 # Only activate when the user does <TAB> 

3827 return 

3828 parts = document.text.split(" ") 

3829 cmd = parts[0].lower() 

3830 if cmd not in self.commands: 

3831 # We are trying to complete the command 

3832 for possible_cmd in (x for x in self.commands if x.startswith(cmd)): 

3833 yield Completion(possible_cmd, start_position=-len(cmd)) 

3834 else: 

3835 # We are trying to complete the command content 

3836 if len(parts) == 1: 

3837 return 

3838 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:]) 

3839 arg = " ".join(args) 

3840 if cmd in self.commands_complete: 

3841 for possible_arg in self.commands_complete[cmd](self, arg): 

3842 yield Completion(possible_arg, start_position=-len(arg)) 

3843 return 

3844 return CLICompleter() 

3845 

3846 def loop(self, debug: int = 0) -> None: 

3847 """ 

3848 Main command handling loop 

3849 """ 

3850 from prompt_toolkit import PromptSession 

3851 session = PromptSession(completer=self._completer()) 

3852 

3853 while True: 

3854 try: 

3855 cmd = session.prompt(self.ps1()).strip() 

3856 except KeyboardInterrupt: 

3857 continue 

3858 except EOFError: 

3859 self.close() 

3860 break 

3861 args = cmd.split(" ")[1:] 

3862 cmd = cmd.split(" ")[0].strip().lower() 

3863 if not cmd: 

3864 continue 

3865 if cmd in ["help", "h", "?"]: 

3866 self.help(" ".join(args)) 

3867 continue 

3868 if cmd in "exit": 

3869 break 

3870 if cmd not in self.commands: 

3871 print("Unknown command. Type help or ?") 

3872 else: 

3873 # check the number of arguments 

3874 func = self.commands[cmd] 

3875 args, kwargs, outkwargs = self._parseallargs(func, cmd, args) 

3876 if func._spaces: # type: ignore 

3877 args = [" ".join(args)] 

3878 # if globsupport is set, we might need to do several calls 

3879 if func._globsupport and "*" in args[0]: # type: ignore 

3880 if args[0].count("*") > 1: 

3881 print("More than 1 glob star (*) is currently unsupported.") 

3882 continue 

3883 before, after = args[0].split("*", 1) 

3884 reg = re.compile(re.escape(before) + r".*" + after) 

3885 calls = [ 

3886 [x] for x in 

3887 self.commands_complete[cmd](self, before) 

3888 if reg.match(x) 

3889 ] 

3890 else: 

3891 calls = [args] 

3892 else: 

3893 calls = [args] 

3894 # now iterate if required, call the function and print its output 

3895 res = None 

3896 for args in calls: 

3897 try: 

3898 res = func(self, *args, **kwargs) 

3899 except TypeError: 

3900 print("Bad number of arguments !") 

3901 self.help(cmd=cmd) 

3902 continue 

3903 except Exception as ex: 

3904 print("Command failed with error: %s" % ex) 

3905 if debug: 

3906 traceback.print_exception(ex) 

3907 try: 

3908 if res and cmd in self.commands_output: 

3909 self.commands_output[cmd](self, res, **outkwargs) 

3910 except Exception as ex: 

3911 print("Output processor failed with error: %s" % ex) 

3912 

3913 

3914def AutoArgparse( 

3915 func: DecoratorCallable, 

3916 _parseonly: bool = False, 

3917) -> Optional[Tuple[List[str], List[str]]]: 

3918 """ 

3919 Generate an Argparse call from a function, then call this function. 

3920 

3921 Notes: 

3922 

3923 - for the arguments to have a description, the sphinx docstring format 

3924 must be used. See 

3925 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html 

3926 - the arguments must be typed in Python (we ignore Sphinx-specific types) 

3927 untyped arguments are ignored. 

3928 - only types that would be supported by argparse are supported. The others 

3929 are omitted. 

3930 """ 

3931 argsdoc = {} 

3932 if func.__doc__: 

3933 # Sphinx doc format parser 

3934 m = re.match( 

3935 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)", 

3936 func.__doc__.strip(), 

3937 ) 

3938 if not m: 

3939 desc = func.__doc__.strip() 

3940 else: 

3941 desc = m.group(1) 

3942 sphinxargs = re.findall( 

3943 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)", 

3944 m.group(2), 

3945 ) 

3946 for argtype, argparam, argdesc in sphinxargs: 

3947 argparam = argparam.strip() 

3948 argdesc = argdesc.strip() 

3949 if argtype == "param": 

3950 if not argparam: 

3951 raise ValueError(":param: without a name !") 

3952 argsdoc[argparam] = argdesc 

3953 else: 

3954 desc = "" 

3955 

3956 # Process the parameters 

3957 positional = [] 

3958 noargument = [] 

3959 hexarguments = [] 

3960 parameters = {} 

3961 for param in inspect.signature(func).parameters.values(): 

3962 if not param.annotation: 

3963 continue 

3964 noarg = False 

3965 parname = param.name.replace("_", "-") 

3966 paramkwargs: Dict[str, Any] = {} 

3967 if param.annotation is bool: 

3968 if param.default is True: 

3969 parname = "no-" + parname 

3970 paramkwargs["action"] = "store_false" 

3971 else: 

3972 paramkwargs["action"] = "store_true" 

3973 noarg = True 

3974 elif param.annotation is bytes: 

3975 paramkwargs["type"] = str 

3976 hexarguments.append(parname) 

3977 elif param.annotation in [str, int, float]: 

3978 paramkwargs["type"] = param.annotation 

3979 else: 

3980 continue 

3981 if param.default != inspect.Parameter.empty: 

3982 if param.kind == inspect.Parameter.POSITIONAL_ONLY: 

3983 positional.append(param.name) 

3984 paramkwargs["nargs"] = '?' 

3985 else: 

3986 parname = "--" + parname 

3987 paramkwargs["default"] = param.default 

3988 else: 

3989 positional.append(param.name) 

3990 if param.kind == inspect.Parameter.VAR_POSITIONAL: 

3991 paramkwargs["action"] = "append" 

3992 if param.name in argsdoc: 

3993 paramkwargs["help"] = argsdoc[param.name] 

3994 if param.annotation is bytes: 

3995 paramkwargs["help"] = "(hex) " + paramkwargs["help"] 

3996 elif param.annotation is bool: 

3997 paramkwargs["help"] = "(flag) " + paramkwargs["help"] 

3998 else: 

3999 paramkwargs["help"] = ( 

4000 "(%s) " % param.annotation.__name__ + paramkwargs["help"] 

4001 ) 

4002 # Add to the parameter list 

4003 parameters[parname] = paramkwargs 

4004 if noarg: 

4005 noargument.append(parname) 

4006 

4007 if _parseonly: 

4008 # An internal mode used to generate bash autocompletion, do it then exit. 

4009 return ( 

4010 [x for x in parameters if x not in positional] + ["--help"], 

4011 [x for x in noargument if x not in positional] + ["--help"], 

4012 ) 

4013 

4014 # Now build the argparse.ArgumentParser 

4015 parser = argparse.ArgumentParser( 

4016 prog=func.__name__, 

4017 description=desc, 

4018 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

4019 ) 

4020 

4021 # Add parameters to parser 

4022 for parname, paramkwargs in parameters.items(): 

4023 parser.add_argument(parname, **paramkwargs) 

4024 

4025 # Now parse the sys.argv parameters 

4026 params = vars(parser.parse_args()) 

4027 

4028 # Convert hex parameters if provided 

4029 for p in hexarguments: 

4030 if params[p] is not None: 

4031 try: 

4032 params[p] = bytes.fromhex(params[p]) 

4033 except ValueError: 

4034 print( 

4035 conf.color_theme.fail( 

4036 "ERROR: the value of parameter %s " 

4037 "'%s' is not valid hexadecimal !" % (p, params[p]) 

4038 ) 

4039 ) 

4040 return None 

4041 

4042 # Act as in interactive mode 

4043 conf.logLevel = 20 

4044 from scapy.themes import DefaultTheme 

4045 conf.color_theme = DefaultTheme() 

4046 # And call the function 

4047 try: 

4048 func( 

4049 *[params.pop(x) for x in positional], 

4050 **{ 

4051 (k[3:] if k.startswith("no_") else k): v 

4052 for k, v in params.items() 

4053 } 

4054 ) 

4055 except AssertionError as ex: 

4056 print(conf.color_theme.fail("ERROR: " + str(ex))) 

4057 parser.print_help() 

4058 return None 

4059 

4060 

4061####################### 

4062# PERIODIC SENDER # 

4063####################### 

4064 

4065 

4066class PeriodicSenderThread(threading.Thread): 

4067 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True): 

4068 # type: (Any, _PacketIterable, float, bool) -> None 

4069 """ Thread to send packets periodically 

4070 

4071 Args: 

4072 sock: socket where packet is sent periodically 

4073 pkt: packet or list of packets to send 

4074 interval: interval between two packets 

4075 """ 

4076 if not isinstance(pkt, list): 

4077 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable 

4078 else: 

4079 self._pkts = pkt 

4080 self._socket = sock 

4081 self._stopped = threading.Event() 

4082 self._enabled = threading.Event() 

4083 self._enabled.set() 

4084 self._interval = interval 

4085 self._ignore_exceptions = ignore_exceptions 

4086 threading.Thread.__init__(self) 

4087 

4088 def enable(self): 

4089 # type: () -> None 

4090 self._enabled.set() 

4091 

4092 def disable(self): 

4093 # type: () -> None 

4094 self._enabled.clear() 

4095 

4096 def run(self): 

4097 # type: () -> None 

4098 while not self._stopped.is_set() and not self._socket.closed: 

4099 for p in self._pkts: 

4100 try: 

4101 if self._enabled.is_set(): 

4102 self._socket.send(p) 

4103 except (OSError, TimeoutError) as e: 

4104 if self._ignore_exceptions: 

4105 return 

4106 else: 

4107 raise e 

4108 self._stopped.wait(timeout=self._interval) 

4109 if self._stopped.is_set() or self._socket.closed: 

4110 break 

4111 

4112 def stop(self): 

4113 # type: () -> None 

4114 self._stopped.set() 

4115 self.join(self._interval * 2) 

4116 

4117 

4118class SingleConversationSocket(object): 

4119 def __init__(self, o): 

4120 # type: (Any) -> None 

4121 self._inner = o 

4122 self._tx_mutex = threading.RLock() 

4123 

4124 @property 

4125 def __dict__(self): # type: ignore 

4126 return self._inner.__dict__ 

4127 

4128 def __getattr__(self, name): 

4129 # type: (str) -> Any 

4130 return getattr(self._inner, name) 

4131 

4132 def sr1(self, *args, **kargs): 

4133 # type: (*Any, **Any) -> Any 

4134 with self._tx_mutex: 

4135 return self._inner.sr1(*args, **kargs) 

4136 

4137 def sr(self, *args, **kargs): 

4138 # type: (*Any, **Any) -> Any 

4139 with self._tx_mutex: 

4140 return self._inner.sr(*args, **kargs) 

4141 

4142 def send(self, x): 

4143 # type: (Packet) -> Any 

4144 with self._tx_mutex: 

4145 try: 

4146 return self._inner.send(x) 

4147 except (ConnectionError, OSError) as e: 

4148 self._inner.close() 

4149 raise e