Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/utils.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

2019 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7General utility functions. 

8""" 

9 

10 

11from decimal import Decimal 

12from io import StringIO 

13from itertools import zip_longest 

14from uuid import UUID 

15 

16import argparse 

17import array 

18import collections 

19import decimal 

20import difflib 

21import enum 

22import gzip 

23import inspect 

24import locale 

25import math 

26import os 

27import random 

28import re 

29import shutil 

30import socket 

31import struct 

32import subprocess 

33import sys 

34import tempfile 

35import threading 

36import time 

37import traceback 

38import warnings 

39 

40from scapy.config import conf 

41from scapy.consts import DARWIN, OPENBSD, WINDOWS 

42from scapy.data import MTU, DLT_EN10MB, DLT_RAW 

43from scapy.compat import ( 

44 orb, 

45 plain_str, 

46 chb, 

47 hex_bytes, 

48 bytes_encode, 

49) 

50from scapy.error import ( 

51 log_interactive, 

52 log_runtime, 

53 Scapy_Exception, 

54 warning, 

55) 

56from scapy.pton_ntop import inet_pton 

57 

58# Typing imports 

59from typing import ( 

60 cast, 

61 Any, 

62 AnyStr, 

63 Callable, 

64 Dict, 

65 IO, 

66 Iterator, 

67 List, 

68 Optional, 

69 TYPE_CHECKING, 

70 Tuple, 

71 Type, 

72 Union, 

73 overload, 

74) 

75from scapy.compat import ( 

76 DecoratorCallable, 

77 Literal, 

78) 

79 

80if TYPE_CHECKING: 

81 from scapy.packet import Packet 

82 from scapy.plist import _PacketIterable, PacketList 

83 from scapy.supersocket import SuperSocket 

84 import prompt_toolkit 

85 

86_ByteStream = Union[IO[bytes], gzip.GzipFile] 

87 

88########### 

89# Tools # 

90########### 

91 

92 

93def issubtype(x, # type: Any 

94 t, # type: Union[type, str] 

95 ): 

96 # type: (...) -> bool 

97 """issubtype(C, B) -> bool 

98 

99 Return whether C is a class and if it is a subclass of class B. 

100 When using a tuple as the second argument issubtype(X, (A, B, ...)), 

101 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.). 

102 """ 

103 if isinstance(t, str): 

104 return t in (z.__name__ for z in x.__bases__) 

105 if isinstance(x, type) and issubclass(x, t): 

106 return True 

107 return False 

108 

109 

110_Decimal = Union[Decimal, int] 

111 

112 

113class EDecimal(Decimal): 

114 """Extended Decimal 

115 

116 This implements arithmetic and comparison with float for 

117 backward compatibility 

118 """ 

119 

120 def __add__(self, other, context=None): 

121 # type: (_Decimal, Any) -> EDecimal 

122 return EDecimal(Decimal.__add__(self, Decimal(other))) 

123 

124 def __radd__(self, other): 

125 # type: (_Decimal) -> EDecimal 

126 return EDecimal(Decimal.__add__(self, Decimal(other))) 

127 

128 def __sub__(self, other): 

129 # type: (_Decimal) -> EDecimal 

130 return EDecimal(Decimal.__sub__(self, Decimal(other))) 

131 

132 def __rsub__(self, other): 

133 # type: (_Decimal) -> EDecimal 

134 return EDecimal(Decimal.__rsub__(self, Decimal(other))) 

135 

136 def __mul__(self, other): 

137 # type: (_Decimal) -> EDecimal 

138 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

139 

140 def __rmul__(self, other): 

141 # type: (_Decimal) -> EDecimal 

142 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

143 

144 def __truediv__(self, other): 

145 # type: (_Decimal) -> EDecimal 

146 return EDecimal(Decimal.__truediv__(self, Decimal(other))) 

147 

148 def __floordiv__(self, other): 

149 # type: (_Decimal) -> EDecimal 

150 return EDecimal(Decimal.__floordiv__(self, Decimal(other))) 

151 

152 def __divmod__(self, other): 

153 # type: (_Decimal) -> Tuple[EDecimal, EDecimal] 

154 r = Decimal.__divmod__(self, Decimal(other)) 

155 return EDecimal(r[0]), EDecimal(r[1]) 

156 

157 def __mod__(self, other): 

158 # type: (_Decimal) -> EDecimal 

159 return EDecimal(Decimal.__mod__(self, Decimal(other))) 

160 

161 def __rmod__(self, other): 

162 # type: (_Decimal) -> EDecimal 

163 return EDecimal(Decimal.__rmod__(self, Decimal(other))) 

164 

165 def __pow__(self, other, modulo=None): 

166 # type: (_Decimal, Optional[_Decimal]) -> EDecimal 

167 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo)) 

168 

169 def __eq__(self, other): 

170 # type: (Any) -> bool 

171 if isinstance(other, Decimal): 

172 return super(EDecimal, self).__eq__(other) 

173 else: 

174 return bool(float(self) == other) 

175 

176 def normalize(self, precision): # type: ignore 

177 # type: (int) -> EDecimal 

178 with decimal.localcontext() as ctx: 

179 ctx.prec = precision 

180 return EDecimal(super(EDecimal, self).normalize(ctx)) 

181 

182 

183@overload 

184def get_temp_file(keep, autoext, fd): 

185 # type: (bool, str, Literal[True]) -> IO[bytes] 

186 pass 

187 

188 

189@overload 

190def get_temp_file(keep=False, autoext="", fd=False): 

191 # type: (bool, str, Literal[False]) -> str 

192 pass 

193 

194 

195def get_temp_file(keep=False, autoext="", fd=False): 

196 # type: (bool, str, bool) -> Union[IO[bytes], str] 

197 """Creates a temporary file. 

198 

199 :param keep: If False, automatically delete the file when Scapy exits. 

200 :param autoext: Suffix to add to the generated file name. 

201 :param fd: If True, this returns a file-like object with the temporary 

202 file opened. If False (default), this returns a file path. 

203 """ 

204 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext, 

205 delete=False) 

206 if not keep: 

207 conf.temp_files.append(f.name) 

208 

209 if fd: 

210 return f 

211 else: 

212 # Close the file so something else can take it. 

213 f.close() 

214 return f.name 

215 

216 

217def get_temp_dir(keep=False): 

218 # type: (bool) -> str 

219 """Creates a temporary file, and returns its name. 

220 

221 :param keep: If False (default), the directory will be recursively 

222 deleted when Scapy exits. 

223 :return: A full path to a temporary directory. 

224 """ 

225 

226 dname = tempfile.mkdtemp(prefix="scapy") 

227 

228 if not keep: 

229 conf.temp_files.append(dname) 

230 

231 return dname 

232 

233 

234def _create_fifo() -> Tuple[str, Any]: 

235 """Creates a temporary fifo. 

236 

237 You must then use open_fifo() on the server_fd once 

238 the client is connected to use it. 

239 

240 :returns: (client_file, server_fd) 

241 """ 

242 if WINDOWS: 

243 from scapy.arch.windows.structures import _get_win_fifo 

244 return _get_win_fifo() 

245 else: 

246 f = get_temp_file() 

247 os.unlink(f) 

248 os.mkfifo(f) 

249 return f, f 

250 

251 

252def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]: 

253 """Open the server_fd (see create_fifo) 

254 """ 

255 if WINDOWS: 

256 from scapy.arch.windows.structures import _win_fifo_open 

257 return _win_fifo_open(fd) 

258 else: 

259 return open(fd, mode) 

260 

261 

262def sane(x, color=False): 

263 # type: (AnyStr, bool) -> str 

264 r = "" 

265 for i in x: 

266 j = orb(i) 

267 if (j < 32) or (j >= 127): 

268 if color: 

269 r += conf.color_theme.not_printable(".") 

270 else: 

271 r += "." 

272 else: 

273 r += chr(j) 

274 return r 

275 

276 

277@conf.commands.register 

278def restart(): 

279 # type: () -> None 

280 """Restarts scapy""" 

281 if not conf.interactive or not os.path.isfile(sys.argv[0]): 

282 raise OSError("Scapy was not started from console") 

283 if WINDOWS: 

284 res_code = 1 

285 try: 

286 res_code = subprocess.call([sys.executable] + sys.argv) 

287 finally: 

288 os._exit(res_code) 

289 os.execv(sys.executable, [sys.executable] + sys.argv) 

290 

291 

292def lhex(x): 

293 # type: (Any) -> str 

294 from scapy.volatile import VolatileValue 

295 if isinstance(x, VolatileValue): 

296 return repr(x) 

297 if isinstance(x, int): 

298 return hex(x) 

299 if isinstance(x, tuple): 

300 return "(%s)" % ", ".join(lhex(v) for v in x) 

301 if isinstance(x, list): 

302 return "[%s]" % ", ".join(lhex(v) for v in x) 

303 return str(x) 

304 

305 

306@conf.commands.register 

307def hexdump(p, dump=False): 

308 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

309 """Build a tcpdump like hexadecimal view 

310 

311 :param p: a Packet 

312 :param dump: define if the result must be printed or returned in a variable 

313 :return: a String only when dump=True 

314 """ 

315 s = "" 

316 x = bytes_encode(p) 

317 x_len = len(x) 

318 i = 0 

319 while i < x_len: 

320 s += "%04x " % i 

321 for j in range(16): 

322 if i + j < x_len: 

323 s += "%02X " % orb(x[i + j]) 

324 else: 

325 s += " " 

326 s += " %s\n" % sane(x[i:i + 16], color=True) 

327 i += 16 

328 # remove trailing \n 

329 s = s[:-1] if s.endswith("\n") else s 

330 if dump: 

331 return s 

332 else: 

333 print(s) 

334 return None 

335 

336 

337@conf.commands.register 

338def linehexdump(p, onlyasc=0, onlyhex=0, dump=False): 

339 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str] 

340 """Build an equivalent view of hexdump() on a single line 

341 

342 Note that setting both onlyasc and onlyhex to 1 results in a empty output 

343 

344 :param p: a Packet 

345 :param onlyasc: 1 to display only the ascii view 

346 :param onlyhex: 1 to display only the hexadecimal view 

347 :param dump: print the view if False 

348 :return: a String only when dump=True 

349 """ 

350 s = "" 

351 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump) 

352 if dump: 

353 return s 

354 else: 

355 print(s) 

356 return None 

357 

358 

359@conf.commands.register 

360def chexdump(p, dump=False): 

361 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

362 """Build a per byte hexadecimal representation 

363 

364 Example: 

365 >>> chexdump(IP()) 

366 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501 

367 

368 :param p: a Packet 

369 :param dump: print the view if False 

370 :return: a String only if dump=True 

371 """ 

372 x = bytes_encode(p) 

373 s = ", ".join("%#04x" % orb(x) for x in x) 

374 if dump: 

375 return s 

376 else: 

377 print(s) 

378 return None 

379 

380 

381@conf.commands.register 

382def hexstr(p, onlyasc=0, onlyhex=0, color=False): 

383 # type: (Union[Packet, AnyStr], int, int, bool) -> str 

384 """Build a fancy tcpdump like hex from bytes.""" 

385 x = bytes_encode(p) 

386 s = [] 

387 if not onlyasc: 

388 s.append(" ".join("%02X" % orb(b) for b in x)) 

389 if not onlyhex: 

390 s.append(sane(x, color=color)) 

391 return " ".join(s) 

392 

393 

394def repr_hex(s): 

395 # type: (bytes) -> str 

396 """ Convert provided bitstring to a simple string of hex digits """ 

397 return "".join("%02x" % orb(x) for x in s) 

398 

399 

400@conf.commands.register 

401def hexdiff( 

402 a: Union['Packet', AnyStr], 

403 b: Union['Packet', AnyStr], 

404 algo: Optional[str] = None, 

405 autojunk: bool = False, 

406) -> None: 

407 """ 

408 Show differences between 2 binary strings, Packets... 

409 

410 Available algorithms: 

411 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the 

412 Levenstein distance between the strings then backtrack. 

413 - difflib: Use the difflib.SequenceMatcher implementation. This based on a 

414 modified version of the Ratcliff and Obershelp algorithm. 

415 This is much faster, but far less accurate. 

416 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher 

417 

418 :param a: 

419 :param b: The binary strings, packets... to compare 

420 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'. 

421 By default, this is chosen depending on the complexity, optimistically 

422 preferring wagnerfischer unless really necessary. 

423 :param autojunk: (difflib only) See difflib documentation. 

424 """ 

425 xb = bytes_encode(a) 

426 yb = bytes_encode(b) 

427 

428 if algo is None: 

429 # Choose the best algorithm 

430 complexity = len(xb) * len(yb) 

431 if complexity < 1e7: 

432 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable. 

433 # Anything much larger than this shouldn't be attempted by default. 

434 algo = "wagnerfischer" 

435 if complexity > 1e6: 

436 log_interactive.info( 

437 "Complexity is a bit high. hexdiff will take a few seconds." 

438 ) 

439 else: 

440 algo = "difflib" 

441 

442 backtrackx = [] 

443 backtracky = [] 

444 

445 if algo == "wagnerfischer": 

446 xb = xb[::-1] 

447 yb = yb[::-1] 

448 

449 # costs for the 3 operations 

450 INSERT = 1 

451 DELETE = 1 

452 SUBST = 1 

453 

454 # Typically, d[i,j] will hold the distance between 

455 # the first i characters of xb and the first j characters of yb. 

456 # We change the Wagner Fischer to also store pointers to all 

457 # the intermediate steps taken while calculating the Levenstein distance. 

458 d = {(-1, -1): (0, (-1, -1))} 

459 for j in range(len(yb)): 

460 d[-1, j] = (j + 1) * INSERT, (-1, j - 1) 

461 for i in range(len(xb)): 

462 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1) 

463 

464 # Compute the Levenstein distance between the two strings, but 

465 # store all the steps to be able to backtrack at the end. 

466 for j in range(len(yb)): 

467 for i in range(len(xb)): 

468 d[i, j] = min( 

469 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)), 

470 (d[i - 1, j][0] + DELETE, (i - 1, j)), 

471 (d[i, j - 1][0] + INSERT, (i, j - 1)), 

472 ) 

473 

474 # Iterate through the steps backwards to create the diff 

475 i = len(xb) - 1 

476 j = len(yb) - 1 

477 while not (i == j == -1): 

478 i2, j2 = d[i, j][1] 

479 backtrackx.append(xb[i2 + 1:i + 1]) 

480 backtracky.append(yb[j2 + 1:j + 1]) 

481 i, j = i2, j2 

482 elif algo == "difflib": 

483 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk) 

484 xarr = [xb[i:i + 1] for i in range(len(xb))] 

485 yarr = [yb[i:i + 1] for i in range(len(yb))] 

486 # Iterate through opcodes to build the backtrack 

487 for opcode in sm.get_opcodes(): 

488 typ, x0, x1, y0, y1 = opcode 

489 if typ == 'delete': 

490 backtrackx += xarr[x0:x1] 

491 backtracky += [b''] * (x1 - x0) 

492 elif typ == 'insert': 

493 backtrackx += [b''] * (y1 - y0) 

494 backtracky += yarr[y0:y1] 

495 elif typ in ['equal', 'replace']: 

496 backtrackx += xarr[x0:x1] 

497 backtracky += yarr[y0:y1] 

498 # Some lines may have been considered as junk. Check the sizes 

499 if autojunk: 

500 lbx = len(backtrackx) 

501 lby = len(backtracky) 

502 backtrackx += [b''] * (max(lbx, lby) - lbx) 

503 backtracky += [b''] * (max(lbx, lby) - lby) 

504 else: 

505 raise ValueError("Unknown algorithm '%s'" % algo) 

506 

507 # Print the diff 

508 

509 x = y = i = 0 

510 colorize: Dict[int, Callable[[str], str]] = { 

511 0: lambda x: x, 

512 -1: conf.color_theme.left, 

513 1: conf.color_theme.right 

514 } 

515 

516 dox = 1 

517 doy = 0 

518 btx_len = len(backtrackx) 

519 while i < btx_len: 

520 linex = backtrackx[i:i + 16] 

521 liney = backtracky[i:i + 16] 

522 xx = sum(len(k) for k in linex) 

523 yy = sum(len(k) for k in liney) 

524 if dox and not xx: 

525 dox = 0 

526 doy = 1 

527 if dox and linex == liney: 

528 doy = 1 

529 

530 if dox: 

531 xd = y 

532 j = 0 

533 while not linex[j]: 

534 j += 1 

535 xd -= 1 

536 print(colorize[doy - dox]("%04x" % xd), end=' ') 

537 x += xx 

538 line = linex 

539 else: 

540 print(" ", end=' ') 

541 if doy: 

542 yd = y 

543 j = 0 

544 while not liney[j]: 

545 j += 1 

546 yd -= 1 

547 print(colorize[doy - dox]("%04x" % yd), end=' ') 

548 y += yy 

549 line = liney 

550 else: 

551 print(" ", end=' ') 

552 

553 print(" ", end=' ') 

554 

555 cl = "" 

556 for j in range(16): 

557 if i + j < min(len(backtrackx), len(backtracky)): 

558 if line[j]: 

559 col = colorize[(linex[j] != liney[j]) * (doy - dox)] 

560 print(col("%02X" % orb(line[j])), end=' ') 

561 if linex[j] == liney[j]: 

562 cl += sane(line[j], color=True) 

563 else: 

564 cl += col(sane(line[j])) 

565 else: 

566 print(" ", end=' ') 

567 cl += " " 

568 else: 

569 print(" ", end=' ') 

570 if j == 7: 

571 print("", end=' ') 

572 

573 print(" ", cl) 

574 

575 if doy or not yy: 

576 doy = 0 

577 dox = 1 

578 i += 16 

579 else: 

580 if yy: 

581 dox = 0 

582 doy = 1 

583 else: 

584 i += 16 

585 

586 

587if struct.pack("H", 1) == b"\x00\x01": # big endian 

588 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int] 

589else: 

590 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8 

591 

592 

593def checksum(pkt): 

594 # type: (bytes) -> int 

595 if len(pkt) % 2 == 1: 

596 pkt += b"\0" 

597 s = sum(array.array("H", pkt)) 

598 s = (s >> 16) + (s & 0xffff) 

599 s += s >> 16 

600 s = ~s 

601 return checksum_endian_transform(s) & 0xffff 

602 

603 

604def _fletcher16(charbuf): 

605 # type: (bytes) -> Tuple[int, int] 

606 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

607 c0 = c1 = 0 

608 for char in charbuf: 

609 c0 += char 

610 c1 += c0 

611 

612 c0 %= 255 

613 c1 %= 255 

614 return (c0, c1) 

615 

616 

617@conf.commands.register 

618def fletcher16_checksum(binbuf): 

619 # type: (bytes) -> int 

620 """Calculates Fletcher-16 checksum of the given buffer. 

621 

622 Note: 

623 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501 

624 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501 

625 """ 

626 (c0, c1) = _fletcher16(binbuf) 

627 return (c1 << 8) | c0 

628 

629 

630@conf.commands.register 

631def fletcher16_checkbytes(binbuf, offset): 

632 # type: (bytes, int) -> bytes 

633 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string. 

634 

635 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501 

636 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501 

637 the integrity of the buffer on the receiver side. 

638 

639 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501 

640 """ 

641 

642 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

643 if len(binbuf) < offset: 

644 raise Exception("Packet too short for checkbytes %d" % len(binbuf)) 

645 

646 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:] 

647 (c0, c1) = _fletcher16(binbuf) 

648 

649 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255 

650 

651 if (x <= 0): 

652 x += 255 

653 

654 y = 510 - c0 - x 

655 

656 if (y > 255): 

657 y -= 255 

658 return chb(x) + chb(y) 

659 

660 

661def mac2str(mac): 

662 # type: (str) -> bytes 

663 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':')) 

664 

665 

666def valid_mac(mac): 

667 # type: (str) -> bool 

668 try: 

669 return len(mac2str(mac)) == 6 

670 except ValueError: 

671 pass 

672 return False 

673 

674 

675def str2mac(s): 

676 # type: (bytes) -> str 

677 if isinstance(s, str): 

678 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s)) 

679 return ("%02x:" * len(s))[:-1] % tuple(s) 

680 

681 

682def randstring(length): 

683 # type: (int) -> bytes 

684 """ 

685 Returns a random string of length (length >= 0) 

686 """ 

687 return b"".join(struct.pack('B', random.randint(0, 255)) 

688 for _ in range(length)) 

689 

690 

691def zerofree_randstring(length): 

692 # type: (int) -> bytes 

693 """ 

694 Returns a random string of length (length >= 0) without zero in it. 

695 """ 

696 return b"".join(struct.pack('B', random.randint(1, 255)) 

697 for _ in range(length)) 

698 

699 

700def stror(s1, s2): 

701 # type: (bytes, bytes) -> bytes 

702 """ 

703 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2 

704 must be of same length. 

705 """ 

706 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2)) 

707 

708 

709def strxor(s1, s2): 

710 # type: (bytes, bytes) -> bytes 

711 """ 

712 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2 

713 must be of same length. 

714 """ 

715 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2)) 

716 

717 

718def strand(s1, s2): 

719 # type: (bytes, bytes) -> bytes 

720 """ 

721 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 

722 must be of same length. 

723 """ 

724 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2)) 

725 

726 

727def strrot(s1, count, right=True): 

728 # type: (bytes, int, bool) -> bytes 

729 """ 

730 Rotate the binary by 'count' bytes 

731 """ 

732 off = count % len(s1) 

733 if right: 

734 return s1[-off:] + s1[:-off] 

735 else: 

736 return s1[off:] + s1[:off] 

737 

738 

739# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501 

740try: 

741 socket.inet_aton("255.255.255.255") 

742except socket.error: 

743 def inet_aton(ip_string): 

744 # type: (str) -> bytes 

745 if ip_string == "255.255.255.255": 

746 return b"\xff" * 4 

747 else: 

748 return socket.inet_aton(ip_string) 

749else: 

750 inet_aton = socket.inet_aton # type: ignore 

751 

752inet_ntoa = socket.inet_ntoa 

753 

754 

755def atol(x): 

756 # type: (str) -> int 

757 try: 

758 ip = inet_aton(x) 

759 except socket.error: 

760 raise ValueError("Bad IP format: %s" % x) 

761 return cast(int, struct.unpack("!I", ip)[0]) 

762 

763 

764def valid_ip(addr): 

765 # type: (str) -> bool 

766 try: 

767 addr = plain_str(addr) 

768 except UnicodeDecodeError: 

769 return False 

770 try: 

771 atol(addr) 

772 except (OSError, ValueError, socket.error): 

773 return False 

774 return True 

775 

776 

777def valid_net(addr): 

778 # type: (str) -> bool 

779 try: 

780 addr = plain_str(addr) 

781 except UnicodeDecodeError: 

782 return False 

783 if '/' in addr: 

784 ip, mask = addr.split('/', 1) 

785 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32 

786 return valid_ip(addr) 

787 

788 

789def valid_ip6(addr): 

790 # type: (str) -> bool 

791 try: 

792 addr = plain_str(addr) 

793 except UnicodeDecodeError: 

794 return False 

795 try: 

796 inet_pton(socket.AF_INET6, addr) 

797 except socket.error: 

798 return False 

799 return True 

800 

801 

802def valid_net6(addr): 

803 # type: (str) -> bool 

804 try: 

805 addr = plain_str(addr) 

806 except UnicodeDecodeError: 

807 return False 

808 if '/' in addr: 

809 ip, mask = addr.split('/', 1) 

810 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128 

811 return valid_ip6(addr) 

812 

813 

814def ltoa(x): 

815 # type: (int) -> str 

816 return inet_ntoa(struct.pack("!I", x & 0xffffffff)) 

817 

818 

819def itom(x): 

820 # type: (int) -> int 

821 return (0xffffffff00000000 >> x) & 0xffffffff 

822 

823 

824def in4_cidr2mask(m): 

825 # type: (int) -> bytes 

826 """ 

827 Return the mask (bitstring) associated with provided length 

828 value. For instance if function is called on 20, return value is 

829 b'\xff\xff\xf0\x00'. 

830 """ 

831 if m > 32 or m < 0: 

832 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501 

833 

834 return strxor( 

835 b"\xff" * 4, 

836 struct.pack(">I", 2**(32 - m) - 1) 

837 ) 

838 

839 

840def in4_isincluded(addr, prefix, mask): 

841 # type: (str, str, int) -> bool 

842 """ 

843 Returns True when 'addr' belongs to prefix/mask. False otherwise. 

844 """ 

845 temp = inet_pton(socket.AF_INET, addr) 

846 pref = in4_cidr2mask(mask) 

847 zero = inet_pton(socket.AF_INET, prefix) 

848 return zero == strand(temp, pref) 

849 

850 

851def in4_ismaddr(str): 

852 # type: (str) -> bool 

853 """ 

854 Returns True if provided address in printable format belongs to 

855 allocated Multicast address space (224.0.0.0/4). 

856 """ 

857 return in4_isincluded(str, "224.0.0.0", 4) 

858 

859 

860def in4_ismlladdr(str): 

861 # type: (str) -> bool 

862 """ 

863 Returns True if address belongs to link-local multicast address 

864 space (224.0.0.0/24) 

865 """ 

866 return in4_isincluded(str, "224.0.0.0", 24) 

867 

868 

869def in4_ismgladdr(str): 

870 # type: (str) -> bool 

871 """ 

872 Returns True if address belongs to global multicast address 

873 space (224.0.1.0-238.255.255.255). 

874 """ 

875 return ( 

876 in4_isincluded(str, "224.0.0.0", 4) and 

877 not in4_isincluded(str, "224.0.0.0", 24) and 

878 not in4_isincluded(str, "239.0.0.0", 8) 

879 ) 

880 

881 

882def in4_ismlsaddr(str): 

883 # type: (str) -> bool 

884 """ 

885 Returns True if address belongs to limited scope multicast address 

886 space (239.0.0.0/8). 

887 """ 

888 return in4_isincluded(str, "239.0.0.0", 8) 

889 

890 

891def in4_isaddrllallnodes(str): 

892 # type: (str) -> bool 

893 """ 

894 Returns True if address is the link-local all-nodes multicast 

895 address (224.0.0.1). 

896 """ 

897 return (inet_pton(socket.AF_INET, "224.0.0.1") == 

898 inet_pton(socket.AF_INET, str)) 

899 

900 

901def in4_getnsmac(a): 

902 # type: (bytes) -> str 

903 """ 

904 Return the multicast mac address associated with provided 

905 IPv4 address. Passed address must be in network format. 

906 """ 

907 

908 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3]) 

909 

910 

911def decode_locale_str(x): 

912 # type: (bytes) -> str 

913 """ 

914 Decode bytes into a string using the system locale. 

915 Useful on Windows where it can be unusual (e.g. cp1252) 

916 """ 

917 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace") 

918 

919 

920class ContextManagerSubprocess(object): 

921 """ 

922 Context manager that eases checking for unknown command, without 

923 crashing. 

924 

925 Example: 

926 >>> with ContextManagerSubprocess("tcpdump"): 

927 >>> subprocess.Popen(["tcpdump", "--version"]) 

928 ERROR: Could not execute tcpdump, is it installed? 

929 

930 """ 

931 

932 def __init__(self, prog, suppress=True): 

933 # type: (str, bool) -> None 

934 self.prog = prog 

935 self.suppress = suppress 

936 

937 def __enter__(self): 

938 # type: () -> None 

939 pass 

940 

941 def __exit__(self, 

942 exc_type, # type: Optional[type] 

943 exc_value, # type: Optional[Exception] 

944 traceback, # type: Optional[Any] 

945 ): 

946 # type: (...) -> Optional[bool] 

947 if exc_value is None or exc_type is None: 

948 return None 

949 # Errored 

950 if isinstance(exc_value, EnvironmentError): 

951 msg = "Could not execute %s, is it installed?" % self.prog 

952 else: 

953 msg = "%s: execution failed (%s)" % ( 

954 self.prog, 

955 exc_type.__class__.__name__ 

956 ) 

957 if not self.suppress: 

958 raise exc_type(msg) 

959 log_runtime.error(msg, exc_info=True) 

960 return True # Suppress the exception 

961 

962 

963class ContextManagerCaptureOutput(object): 

964 """ 

965 Context manager that intercept the console's output. 

966 

967 Example: 

968 >>> with ContextManagerCaptureOutput() as cmco: 

969 ... print("hey") 

970 ... assert cmco.get_output() == "hey" 

971 """ 

972 

973 def __init__(self): 

974 # type: () -> None 

975 self.result_export_object = "" 

976 

977 def __enter__(self): 

978 # type: () -> ContextManagerCaptureOutput 

979 from unittest import mock 

980 

981 def write(s, decorator=self): 

982 # type: (str, ContextManagerCaptureOutput) -> None 

983 decorator.result_export_object += s 

984 mock_stdout = mock.Mock() 

985 mock_stdout.write = write 

986 self.bck_stdout = sys.stdout 

987 sys.stdout = mock_stdout 

988 return self 

989 

990 def __exit__(self, *exc): 

991 # type: (*Any) -> Literal[False] 

992 sys.stdout = self.bck_stdout 

993 return False 

994 

995 def get_output(self, eval_bytes=False): 

996 # type: (bool) -> str 

997 if self.result_export_object.startswith("b'") and eval_bytes: 

998 return plain_str(eval(self.result_export_object)) 

999 return self.result_export_object 

1000 

1001 

1002def do_graph( 

1003 graph, # type: str 

1004 prog=None, # type: Optional[str] 

1005 format=None, # type: Optional[str] 

1006 target=None, # type: Optional[Union[IO[bytes], str]] 

1007 type=None, # type: Optional[str] 

1008 string=None, # type: Optional[bool] 

1009 options=None # type: Optional[List[str]] 

1010): 

1011 # type: (...) -> Optional[str] 

1012 """Processes graph description using an external software. 

1013 This method is used to convert a graphviz format to an image. 

1014 

1015 :param graph: GraphViz graph description 

1016 :param prog: which graphviz program to use 

1017 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" 

1018 option 

1019 :param string: if not None, simply return the graph string 

1020 :param target: filename or redirect. Defaults pipe to Imagemagick's 

1021 display program 

1022 :param options: options to be passed to prog 

1023 """ 

1024 

1025 if format is None: 

1026 format = "svg" 

1027 if string: 

1028 return graph 

1029 if type is not None: 

1030 warnings.warn( 

1031 "type is deprecated, and was renamed format", 

1032 DeprecationWarning 

1033 ) 

1034 format = type 

1035 if prog is None: 

1036 prog = conf.prog.dot 

1037 start_viewer = False 

1038 if target is None: 

1039 if WINDOWS: 

1040 target = get_temp_file(autoext="." + format) 

1041 start_viewer = True 

1042 else: 

1043 with ContextManagerSubprocess(conf.prog.display): 

1044 target = subprocess.Popen([conf.prog.display], 

1045 stdin=subprocess.PIPE).stdin 

1046 if format is not None: 

1047 format = "-T%s" % format 

1048 if isinstance(target, str): 

1049 if target.startswith('|'): 

1050 target = subprocess.Popen(target[1:].lstrip(), shell=True, 

1051 stdin=subprocess.PIPE).stdin 

1052 elif target.startswith('>'): 

1053 target = open(target[1:].lstrip(), "wb") 

1054 else: 

1055 target = open(os.path.abspath(target), "wb") 

1056 target = cast(IO[bytes], target) 

1057 proc = subprocess.Popen( 

1058 "\"%s\" %s %s" % (prog, options or "", format or ""), 

1059 shell=True, stdin=subprocess.PIPE, stdout=target, 

1060 stderr=subprocess.PIPE 

1061 ) 

1062 _, stderr = proc.communicate(bytes_encode(graph)) 

1063 if proc.returncode != 0: 

1064 raise OSError( 

1065 "GraphViz call failed (is it installed?):\n" + 

1066 plain_str(stderr) 

1067 ) 

1068 try: 

1069 target.close() 

1070 except Exception: 

1071 pass 

1072 if start_viewer: 

1073 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501 

1074 waiting_start = time.time() 

1075 while not os.path.exists(target.name): 

1076 time.sleep(0.1) 

1077 if time.time() - waiting_start > 3: 

1078 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501 

1079 break 

1080 else: 

1081 if WINDOWS and conf.prog.display == conf.prog._default: 

1082 os.startfile(target.name) 

1083 else: 

1084 with ContextManagerSubprocess(conf.prog.display): 

1085 subprocess.Popen([conf.prog.display, target.name]) 

1086 return None 

1087 

1088 

1089_TEX_TR = { 

1090 "{": "{\\tt\\char123}", 

1091 "}": "{\\tt\\char125}", 

1092 "\\": "{\\tt\\char92}", 

1093 "^": "\\^{}", 

1094 "$": "\\$", 

1095 "#": "\\#", 

1096 "_": "\\_", 

1097 "&": "\\&", 

1098 "%": "\\%", 

1099 "|": "{\\tt\\char124}", 

1100 "~": "{\\tt\\char126}", 

1101 "<": "{\\tt\\char60}", 

1102 ">": "{\\tt\\char62}", 

1103} 

1104 

1105 

1106def tex_escape(x): 

1107 # type: (str) -> str 

1108 s = "" 

1109 for c in x: 

1110 s += _TEX_TR.get(c, c) 

1111 return s 

1112 

1113 

1114def colgen(*lstcol, # type: Any 

1115 **kargs # type: Any 

1116 ): 

1117 # type: (...) -> Iterator[Any] 

1118 """Returns a generator that mixes provided quantities forever 

1119 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501 

1120 if len(lstcol) < 2: 

1121 lstcol *= 2 

1122 trans = kargs.get("trans", lambda x, y, z: (x, y, z)) 

1123 while True: 

1124 for i in range(len(lstcol)): 

1125 for j in range(len(lstcol)): 

1126 for k in range(len(lstcol)): 

1127 if i != j or j != k or k != i: 

1128 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501 

1129 

1130 

1131def incremental_label(label="tag%05i", start=0): 

1132 # type: (str, int) -> Iterator[str] 

1133 while True: 

1134 yield label % start 

1135 start += 1 

1136 

1137 

1138def binrepr(val): 

1139 # type: (int) -> str 

1140 return bin(val)[2:] 

1141 

1142 

1143def long_converter(s): 

1144 # type: (str) -> int 

1145 return int(s.replace('\n', '').replace(' ', ''), 16) 

1146 

1147######################### 

1148# Enum management # 

1149######################### 

1150 

1151 

1152class EnumElement: 

1153 def __init__(self, key, value): 

1154 # type: (str, int) -> None 

1155 self._key = key 

1156 self._value = value 

1157 

1158 def __repr__(self): 

1159 # type: () -> str 

1160 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501 

1161 

1162 def __getattr__(self, attr): 

1163 # type: (str) -> Any 

1164 return getattr(self._value, attr) 

1165 

1166 def __str__(self): 

1167 # type: () -> str 

1168 return self._key 

1169 

1170 def __bytes__(self): 

1171 # type: () -> bytes 

1172 return bytes_encode(self.__str__()) 

1173 

1174 def __hash__(self): 

1175 # type: () -> int 

1176 return self._value 

1177 

1178 def __int__(self): 

1179 # type: () -> int 

1180 return int(self._value) 

1181 

1182 def __eq__(self, other): 

1183 # type: (Any) -> bool 

1184 return self._value == int(other) 

1185 

1186 def __neq__(self, other): 

1187 # type: (Any) -> bool 

1188 return not self.__eq__(other) 

1189 

1190 

1191class Enum_metaclass(type): 

1192 element_class = EnumElement 

1193 

1194 def __new__(cls, name, bases, dct): 

1195 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1196 rdict = {} 

1197 for k, v in dct.items(): 

1198 if isinstance(v, int): 

1199 v = cls.element_class(k, v) 

1200 dct[k] = v 

1201 rdict[v] = k 

1202 dct["__rdict__"] = rdict 

1203 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) 

1204 

1205 def __getitem__(self, attr): 

1206 # type: (int) -> Any 

1207 return self.__rdict__[attr] # type: ignore 

1208 

1209 def __contains__(self, val): 

1210 # type: (int) -> bool 

1211 return val in self.__rdict__ # type: ignore 

1212 

1213 def get(self, attr, val=None): 

1214 # type: (str, Optional[Any]) -> Any 

1215 return self.__rdict__.get(attr, val) # type: ignore 

1216 

1217 def __repr__(self): 

1218 # type: () -> str 

1219 return "<%s>" % self.__dict__.get("name", self.__name__) 

1220 

1221 

1222################## 

1223# Corrupt data # 

1224################## 

1225 

1226@conf.commands.register 

1227def corrupt_bytes(data, p=0.01, n=None): 

1228 # type: (str, float, Optional[int]) -> bytes 

1229 """ 

1230 Corrupt a given percentage (at least one byte) or number of bytes 

1231 from a string 

1232 """ 

1233 s = array.array("B", bytes_encode(data)) 

1234 s_len = len(s) 

1235 if n is None: 

1236 n = max(1, int(s_len * p)) 

1237 for i in random.sample(range(s_len), n): 

1238 s[i] = (s[i] + random.randint(1, 255)) % 256 

1239 return s.tobytes() 

1240 

1241 

1242@conf.commands.register 

1243def corrupt_bits(data, p=0.01, n=None): 

1244 # type: (str, float, Optional[int]) -> bytes 

1245 """ 

1246 Flip a given percentage (at least one bit) or number of bits 

1247 from a string 

1248 """ 

1249 s = array.array("B", bytes_encode(data)) 

1250 s_len = len(s) * 8 

1251 if n is None: 

1252 n = max(1, int(s_len * p)) 

1253 for i in random.sample(range(s_len), n): 

1254 s[i // 8] ^= 1 << (i % 8) 

1255 return s.tobytes() 

1256 

1257 

1258############################# 

1259# pcap capture file stuff # 

1260############################# 

1261 

1262@conf.commands.register 

1263def wrpcap(filename, # type: Union[IO[bytes], str] 

1264 pkt, # type: _PacketIterable 

1265 *args, # type: Any 

1266 **kargs # type: Any 

1267 ): 

1268 # type: (...) -> None 

1269 """Write a list of packets to a pcap file 

1270 

1271 :param filename: the name of the file to write packets to, or an open, 

1272 writable file-like object. The file descriptor will be 

1273 closed at the end of the call, so do not use an object you 

1274 do not want to close (e.g., running wrpcap(sys.stdout, []) 

1275 in interactive mode will crash Scapy). 

1276 :param gz: set to 1 to save a gzipped capture 

1277 :param linktype: force linktype value 

1278 :param endianness: "<" or ">", force endianness 

1279 :param sync: do not bufferize writes to the capture file 

1280 """ 

1281 with PcapWriter(filename, *args, **kargs) as fdesc: 

1282 fdesc.write(pkt) 

1283 

1284 

1285@conf.commands.register 

1286def wrpcapng(filename, # type: str 

1287 pkt, # type: _PacketIterable 

1288 ): 

1289 # type: (...) -> None 

1290 """Write a list of packets to a pcapng file 

1291 

1292 :param filename: the name of the file to write packets to, or an open, 

1293 writable file-like object. The file descriptor will be 

1294 closed at the end of the call, so do not use an object you 

1295 do not want to close (e.g., running wrpcapng(sys.stdout, []) 

1296 in interactive mode will crash Scapy). 

1297 :param pkt: packets to write 

1298 """ 

1299 with PcapNgWriter(filename) as fdesc: 

1300 fdesc.write(pkt) 

1301 

1302 

1303@conf.commands.register 

1304def rdpcap(filename, count=-1): 

1305 # type: (Union[IO[bytes], str], int) -> PacketList 

1306 """Read a pcap or pcapng file and return a packet list 

1307 

1308 :param count: read only <count> packets 

1309 """ 

1310 # Rant: Our complicated use of metaclasses and especially the 

1311 # __call__ function is, of course, not supported by MyPy. 

1312 # One day we should simplify this mess and use a much simpler 

1313 # layout that will actually be supported and properly dissected. 

1314 with PcapReader(filename) as fdesc: # type: ignore 

1315 return fdesc.read_all(count=count) 

1316 

1317 

1318# NOTE: Type hinting 

1319# Mypy doesn't understand the following metaclass, and thinks each 

1320# constructor (PcapReader...) needs 3 arguments each. To avoid this, 

1321# we add a fake (=None) to the last 2 arguments then force the value 

1322# to not be None in the signature and pack the whole thing in an ignore. 

1323# This allows to not have # type: ignore every time we call those 

1324# constructors. 

1325 

1326class PcapReader_metaclass(type): 

1327 """Metaclass for (Raw)Pcap(Ng)Readers""" 

1328 

1329 def __new__(cls, name, bases, dct): 

1330 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1331 """The `alternative` class attribute is declared in the PcapNg 

1332 variant, and set here to the Pcap variant. 

1333 

1334 """ 

1335 newcls = super(PcapReader_metaclass, cls).__new__( 

1336 cls, name, bases, dct 

1337 ) 

1338 if 'alternative' in dct: 

1339 dct['alternative'].alternative = newcls 

1340 return newcls 

1341 

1342 def __call__(cls, filename): 

1343 # type: (Union[IO[bytes], str]) -> Any 

1344 """Creates a cls instance, use the `alternative` if that 

1345 fails. 

1346 

1347 """ 

1348 i = cls.__new__( 

1349 cls, 

1350 cls.__name__, 

1351 cls.__bases__, 

1352 cls.__dict__ # type: ignore 

1353 ) 

1354 filename, fdesc, magic = cls.open(filename) 

1355 if not magic: 

1356 raise Scapy_Exception( 

1357 "No data could be read!" 

1358 ) 

1359 try: 

1360 i.__init__(filename, fdesc, magic) 

1361 return i 

1362 except (Scapy_Exception, EOFError): 

1363 pass 

1364 

1365 if "alternative" in cls.__dict__: 

1366 cls = cls.__dict__["alternative"] 

1367 i = cls.__new__( 

1368 cls, 

1369 cls.__name__, 

1370 cls.__bases__, 

1371 cls.__dict__ # type: ignore 

1372 ) 

1373 try: 

1374 i.__init__(filename, fdesc, magic) 

1375 return i 

1376 except (Scapy_Exception, EOFError): 

1377 pass 

1378 

1379 raise Scapy_Exception("Not a supported capture file") 

1380 

1381 @staticmethod 

1382 def open(fname # type: Union[IO[bytes], str] 

1383 ): 

1384 # type: (...) -> Tuple[str, _ByteStream, bytes] 

1385 """Open (if necessary) filename, and read the magic.""" 

1386 if isinstance(fname, str): 

1387 filename = fname 

1388 fdesc = open(filename, "rb") # type: _ByteStream 

1389 magic = fdesc.read(2) 

1390 if magic == b"\x1f\x8b": 

1391 # GZIP header detected. 

1392 fdesc.seek(0) 

1393 fdesc = gzip.GzipFile(fileobj=fdesc) 

1394 magic = fdesc.read(2) 

1395 magic += fdesc.read(2) 

1396 else: 

1397 fdesc = fname 

1398 filename = getattr(fdesc, "name", "No name") 

1399 magic = fdesc.read(4) 

1400 return filename, fdesc, magic 

1401 

1402 

1403class RawPcapReader(metaclass=PcapReader_metaclass): 

1404 """A stateful pcap reader. Each packet is returned as a string""" 

1405 

1406 # TODO: use Generics to properly type the various readers. 

1407 # As of right now, RawPcapReader is typed as if it returned packets 

1408 # because all of its child do. Fix that 

1409 

1410 nonblocking_socket = True 

1411 PacketMetadata = collections.namedtuple("PacketMetadata", 

1412 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501 

1413 

1414 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1415 # type: (str, _ByteStream, bytes) -> None 

1416 self.filename = filename 

1417 self.f = fdesc 

1418 if magic == b"\xa1\xb2\xc3\xd4": # big endian 

1419 self.endian = ">" 

1420 self.nano = False 

1421 elif magic == b"\xd4\xc3\xb2\xa1": # little endian 

1422 self.endian = "<" 

1423 self.nano = False 

1424 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision 

1425 self.endian = ">" 

1426 self.nano = True 

1427 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501 

1428 self.endian = "<" 

1429 self.nano = True 

1430 else: 

1431 raise Scapy_Exception( 

1432 "Not a pcap capture file (bad magic: %r)" % magic 

1433 ) 

1434 hdr = self.f.read(20) 

1435 if len(hdr) < 20: 

1436 raise Scapy_Exception("Invalid pcap file (too short)") 

1437 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( 

1438 self.endian + "HHIIII", hdr 

1439 ) 

1440 self.linktype = linktype 

1441 self.snaplen = snaplen 

1442 

1443 def __enter__(self): 

1444 # type: () -> RawPcapReader 

1445 return self 

1446 

1447 def __iter__(self): 

1448 # type: () -> RawPcapReader 

1449 return self 

1450 

1451 def __next__(self): 

1452 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1453 """ 

1454 implement the iterator protocol on a set of packets in a pcap file 

1455 """ 

1456 try: 

1457 return self._read_packet() 

1458 except EOFError: 

1459 raise StopIteration 

1460 

1461 def _read_packet(self, size=MTU): 

1462 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1463 """return a single packet read from the file as a tuple containing 

1464 (pkt_data, pkt_metadata) 

1465 

1466 raise EOFError when no more packets are available 

1467 """ 

1468 hdr = self.f.read(16) 

1469 if len(hdr) < 16: 

1470 raise EOFError 

1471 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr) 

1472 

1473 try: 

1474 data = self.f.read(caplen)[:size] 

1475 except OverflowError as e: 

1476 warning(f"Pcap: {e}") 

1477 raise EOFError 

1478 

1479 return (data, 

1480 RawPcapReader.PacketMetadata(sec=sec, usec=usec, 

1481 wirelen=wirelen, caplen=caplen)) 

1482 

1483 def read_packet(self, size=MTU): 

1484 # type: (int) -> Packet 

1485 raise Exception( 

1486 "Cannot call read_packet() in RawPcapReader. Use " 

1487 "_read_packet()" 

1488 ) 

1489 

1490 def dispatch(self, 

1491 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501 

1492 ): 

1493 # type: (...) -> None 

1494 """call the specified callback routine for each packet read 

1495 

1496 This is just a convenience function for the main loop 

1497 that allows for easy launching of packet processing in a 

1498 thread. 

1499 """ 

1500 for p in self: 

1501 callback(p) 

1502 

1503 def _read_all(self, count=-1): 

1504 # type: (int) -> List[Packet] 

1505 """return a list of all packets in the pcap file 

1506 """ 

1507 res = [] # type: List[Packet] 

1508 while count != 0: 

1509 count -= 1 

1510 try: 

1511 p = self.read_packet() # type: Packet 

1512 except EOFError: 

1513 break 

1514 res.append(p) 

1515 return res 

1516 

1517 def recv(self, size=MTU): 

1518 # type: (int) -> bytes 

1519 """ Emulate a socket 

1520 """ 

1521 return self._read_packet(size=size)[0] 

1522 

1523 def fileno(self): 

1524 # type: () -> int 

1525 return -1 if WINDOWS else self.f.fileno() 

1526 

1527 def close(self): 

1528 # type: () -> None 

1529 if isinstance(self.f, gzip.GzipFile): 

1530 self.f.fileobj.close() # type: ignore 

1531 self.f.close() 

1532 

1533 def __exit__(self, exc_type, exc_value, tracback): 

1534 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

1535 self.close() 

1536 

1537 # emulate SuperSocket 

1538 @staticmethod 

1539 def select(sockets, # type: List[SuperSocket] 

1540 remain=None, # type: Optional[float] 

1541 ): 

1542 # type: (...) -> List[SuperSocket] 

1543 return sockets 

1544 

1545 

1546class PcapReader(RawPcapReader): 

1547 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1548 # type: (str, IO[bytes], bytes) -> None 

1549 RawPcapReader.__init__(self, filename, fdesc, magic) 

1550 try: 

1551 self.LLcls = conf.l2types.num2layer[ 

1552 self.linktype 

1553 ] # type: Type[Packet] 

1554 except KeyError: 

1555 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501 

1556 if conf.raw_layer is None: 

1557 # conf.raw_layer is set on import 

1558 import scapy.packet # noqa: F401 

1559 self.LLcls = conf.raw_layer 

1560 

1561 def __enter__(self): 

1562 # type: () -> PcapReader 

1563 return self 

1564 

1565 def read_packet(self, size=MTU, **kwargs): 

1566 # type: (int, **Any) -> Packet 

1567 rp = super(PcapReader, self)._read_packet(size=size) 

1568 if rp is None: 

1569 raise EOFError 

1570 s, pkt_info = rp 

1571 

1572 try: 

1573 p = self.LLcls(s, **kwargs) # type: Packet 

1574 except KeyboardInterrupt: 

1575 raise 

1576 except Exception: 

1577 if conf.debug_dissector: 

1578 from scapy.sendrecv import debug 

1579 debug.crashed_on = (self.LLcls, s) 

1580 raise 

1581 if conf.raw_layer is None: 

1582 # conf.raw_layer is set on import 

1583 import scapy.packet # noqa: F401 

1584 p = conf.raw_layer(s) 

1585 power = Decimal(10) ** Decimal(-9 if self.nano else -6) 

1586 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec) 

1587 p.wirelen = pkt_info.wirelen 

1588 return p 

1589 

1590 def recv(self, size=MTU, **kwargs): # type: ignore 

1591 # type: (int, **Any) -> Packet 

1592 return self.read_packet(size=size, **kwargs) 

1593 

1594 def __iter__(self): 

1595 # type: () -> PcapReader 

1596 return self 

1597 

1598 def __next__(self): # type: ignore 

1599 # type: () -> Packet 

1600 try: 

1601 return self.read_packet() 

1602 except EOFError: 

1603 raise StopIteration 

1604 

1605 def read_all(self, count=-1): 

1606 # type: (int) -> PacketList 

1607 res = self._read_all(count) 

1608 from scapy import plist 

1609 return plist.PacketList(res, name=os.path.basename(self.filename)) 

1610 

1611 

1612class RawPcapNgReader(RawPcapReader): 

1613 """A stateful pcapng reader. Each packet is returned as 

1614 bytes. 

1615 

1616 """ 

1617 

1618 alternative = RawPcapReader # type: Type[Any] 

1619 

1620 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore 

1621 ["linktype", "tsresol", 

1622 "tshigh", "tslow", "wirelen", 

1623 "comments", "ifname", "direction", 

1624 "process_information"]) 

1625 

1626 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1627 # type: (str, IO[bytes], bytes) -> None 

1628 self.filename = filename 

1629 self.f = fdesc 

1630 # A list of (linktype, snaplen, tsresol); will be populated by IDBs. 

1631 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]] 

1632 self.default_options = { 

1633 "tsresol": 1000000 

1634 } 

1635 self.blocktypes: Dict[ 

1636 int, 

1637 Callable[ 

1638 [bytes, int], 

1639 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] 

1640 ]] = { 

1641 1: self._read_block_idb, 

1642 2: self._read_block_pkt, 

1643 3: self._read_block_spb, 

1644 6: self._read_block_epb, 

1645 10: self._read_block_dsb, 

1646 0x80000001: self._read_block_pib, 

1647 } 

1648 self.endian = "!" # Will be overwritten by first SHB 

1649 self.process_information = [] # type: List[Dict[str, Any]] 

1650 

1651 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg: 

1652 raise Scapy_Exception( 

1653 "Not a pcapng capture file (bad magic: %r)" % magic 

1654 ) 

1655 

1656 try: 

1657 self._read_block_shb() 

1658 except EOFError: 

1659 raise Scapy_Exception( 

1660 "The first SHB of the pcapng file is malformed !" 

1661 ) 

1662 

1663 def _read_block(self, size=MTU): 

1664 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501 

1665 try: 

1666 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1667 except struct.error: 

1668 raise EOFError 

1669 if blocktype == 0x0A0D0D0A: 

1670 # This function updates the endianness based on the block content. 

1671 self._read_block_shb() 

1672 return None 

1673 try: 

1674 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1675 except struct.error: 

1676 warning("PcapNg: Error reading blocklen before block body") 

1677 raise EOFError 

1678 if blocklen < 12: 

1679 warning("PcapNg: Invalid block length !") 

1680 raise EOFError 

1681 

1682 _block_body_length = blocklen - 12 

1683 block = self.f.read(_block_body_length) 

1684 if len(block) != _block_body_length: 

1685 raise Scapy_Exception("PcapNg: Invalid Block body length " 

1686 "(too short)") 

1687 self._read_block_tail(blocklen) 

1688 if blocktype in self.blocktypes: 

1689 return self.blocktypes[blocktype](block, size) 

1690 return None 

1691 

1692 def _read_block_tail(self, blocklen): 

1693 # type: (int) -> None 

1694 if blocklen % 4: 

1695 pad = self.f.read(-blocklen % 4) 

1696 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. " 

1697 "Ignored padding %r" % (blocklen, pad)) 

1698 try: 

1699 if blocklen != struct.unpack(self.endian + 'I', 

1700 self.f.read(4))[0]: 

1701 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)") 

1702 except struct.error: 

1703 warning("PcapNg: Could not read blocklen after block body") 

1704 raise EOFError 

1705 

1706 def _read_block_shb(self): 

1707 # type: () -> None 

1708 """Section Header Block""" 

1709 _blocklen = self.f.read(4) 

1710 endian = self.f.read(4) 

1711 if endian == b"\x1a\x2b\x3c\x4d": 

1712 self.endian = ">" 

1713 elif endian == b"\x4d\x3c\x2b\x1a": 

1714 self.endian = "<" 

1715 else: 

1716 warning("PcapNg: Bad magic in Section Header Block" 

1717 " (not a pcapng file?)") 

1718 raise EOFError 

1719 

1720 try: 

1721 blocklen = struct.unpack(self.endian + "I", _blocklen)[0] 

1722 except struct.error: 

1723 warning("PcapNg: Could not read blocklen") 

1724 raise EOFError 

1725 if blocklen < 28: 

1726 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501 

1727 raise EOFError 

1728 

1729 # Major version must be 1 

1730 _major = self.f.read(2) 

1731 try: 

1732 major = struct.unpack(self.endian + "H", _major)[0] 

1733 except struct.error: 

1734 warning("PcapNg: Could not read major value") 

1735 raise EOFError 

1736 if major != 1: 

1737 warning(f"PcapNg: SHB Major version {major} unsupported !") 

1738 raise EOFError 

1739 

1740 # Skip minor version & section length 

1741 skipped = self.f.read(10) 

1742 if len(skipped) != 10: 

1743 warning("PcapNg: Could not read minor value & section length") 

1744 raise EOFError 

1745 

1746 _options_len = blocklen - 28 

1747 options = self.f.read(_options_len) 

1748 if len(options) != _options_len: 

1749 raise Scapy_Exception("PcapNg: Invalid Section Header Block " 

1750 " options (too short)") 

1751 self._read_block_tail(blocklen) 

1752 self._read_options(options) 

1753 

1754 def _read_packet(self, size=MTU): # type: ignore 

1755 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1756 """Read blocks until it reaches either EOF or a packet, and 

1757 returns None or (packet, (linktype, sec, usec, wirelen)), 

1758 where packet is a string. 

1759 

1760 """ 

1761 while True: 

1762 res = self._read_block(size=size) 

1763 if res is not None: 

1764 return res 

1765 

1766 def _read_options(self, options): 

1767 # type: (bytes) -> Dict[int, Union[bytes, List[bytes]]] 

1768 opts = dict() # type: Dict[int, Union[bytes, List[bytes]]] 

1769 while len(options) >= 4: 

1770 try: 

1771 code, length = struct.unpack(self.endian + "HH", options[:4]) 

1772 except struct.error: 

1773 warning("PcapNg: options header is too small " 

1774 "%d !" % len(options)) 

1775 raise EOFError 

1776 if code != 0 and 4 + length <= len(options): 

1777 # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#name-options-format 

1778 if code in [1, 2988, 2989, 19372, 19373]: 

1779 if code not in opts: 

1780 opts[code] = [] 

1781 opts[code].append(options[4:4 + length]) # type: ignore 

1782 else: 

1783 opts[code] = options[4:4 + length] 

1784 if code == 0: 

1785 if length != 0: 

1786 warning("PcapNg: invalid option " 

1787 "length %d for end-of-option" % length) 

1788 break 

1789 if length % 4: 

1790 length += (4 - (length % 4)) 

1791 options = options[4 + length:] 

1792 return opts 

1793 

1794 def _read_block_idb(self, block, _): 

1795 # type: (bytes, int) -> None 

1796 """Interface Description Block""" 

1797 # 2 bytes LinkType + 2 bytes Reserved 

1798 # 4 bytes Snaplen 

1799 options_raw = self._read_options(block[8:]) 

1800 options = self.default_options.copy() # type: Dict[str, Any] 

1801 for c, v in options_raw.items(): 

1802 if isinstance(v, list): 

1803 # Spec allows multiple occurrences (see 

1804 # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#section-4.2-8.6) 

1805 # but does not define which to use. We take the first for 

1806 # backward compatibility. 

1807 v = v[0] 

1808 if c == 9: 

1809 length = len(v) 

1810 if length == 1: 

1811 tsresol = orb(v) 

1812 options["tsresol"] = (2 if tsresol & 128 else 10) ** ( 

1813 tsresol & 127 

1814 ) 

1815 else: 

1816 warning("PcapNg: invalid options " 

1817 "length %d for IDB tsresol" % length) 

1818 elif c == 2: 

1819 options["name"] = v 

1820 elif c == 1: 

1821 options["comment"] = v 

1822 try: 

1823 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack( 

1824 self.endian + "HxxI", 

1825 block[:8] 

1826 ) + (options,) 

1827 except struct.error: 

1828 warning("PcapNg: IDB is too small %d/8 !" % len(block)) 

1829 raise EOFError 

1830 self.interfaces.append(interface) 

1831 

1832 def _check_interface_id(self, intid): 

1833 # type: (int) -> None 

1834 """Check the interface id value and raise EOFError if invalid.""" 

1835 tmp_len = len(self.interfaces) 

1836 if intid >= tmp_len: 

1837 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len)) 

1838 raise EOFError 

1839 

1840 def _read_block_epb(self, block, size): 

1841 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1842 """Enhanced Packet Block""" 

1843 try: 

1844 intid, tshigh, tslow, caplen, wirelen = struct.unpack( 

1845 self.endian + "5I", 

1846 block[:20], 

1847 ) 

1848 except struct.error: 

1849 warning("PcapNg: EPB is too small %d/20 !" % len(block)) 

1850 raise EOFError 

1851 

1852 # Compute the options offset taking padding into account 

1853 if caplen % 4: 

1854 opt_offset = 20 + caplen + (-caplen) % 4 

1855 else: 

1856 opt_offset = 20 + caplen 

1857 

1858 # Parse options 

1859 options = self._read_options(block[opt_offset:]) 

1860 

1861 process_information = {} 

1862 for code, value in options.items(): 

1863 # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX 

1864 if code in [0x8001, 0x8003]: 

1865 try: 

1866 proc_index = struct.unpack( 

1867 self.endian + "I", value)[0] # type: ignore 

1868 except struct.error: 

1869 warning("PcapNg: EPB invalid proc index " 

1870 "(expected 4 bytes, got %d) !" % len(value)) 

1871 raise EOFError 

1872 if proc_index < len(self.process_information): 

1873 key = "proc" if code == 0x8001 else "eproc" 

1874 process_information[key] = self.process_information[proc_index] 

1875 else: 

1876 warning("PcapNg: EPB invalid process information index " 

1877 "(%d/%d) !" % (proc_index, len(self.process_information))) 

1878 

1879 comments = options.get(1, None) 

1880 epb_flags_raw = options.get(2, None) 

1881 if epb_flags_raw and isinstance(epb_flags_raw, bytes): 

1882 try: 

1883 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw) 

1884 except struct.error: 

1885 warning("PcapNg: EPB invalid flags size" 

1886 "(expected 4 bytes, got %d) !" % len(epb_flags_raw)) 

1887 raise EOFError 

1888 direction = epb_flags & 3 

1889 

1890 else: 

1891 direction = None 

1892 

1893 self._check_interface_id(intid) 

1894 ifname = self.interfaces[intid][2].get('name', None) 

1895 

1896 return (block[20:20 + caplen][:size], 

1897 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1898 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1899 tshigh=tshigh, 

1900 tslow=tslow, 

1901 wirelen=wirelen, 

1902 ifname=ifname, 

1903 direction=direction, 

1904 process_information=process_information, 

1905 comments=comments)) 

1906 

1907 def _read_block_spb(self, block, size): 

1908 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1909 """Simple Packet Block""" 

1910 # "it MUST be assumed that all the Simple Packet Blocks have 

1911 # been captured on the interface previously specified in the 

1912 # first Interface Description Block." 

1913 intid = 0 

1914 self._check_interface_id(intid) 

1915 

1916 try: 

1917 wirelen, = struct.unpack(self.endian + "I", block[:4]) 

1918 except struct.error: 

1919 warning("PcapNg: SPB is too small %d/4 !" % len(block)) 

1920 raise EOFError 

1921 

1922 caplen = min(wirelen, self.interfaces[intid][1]) 

1923 return (block[4:4 + caplen][:size], 

1924 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1925 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1926 tshigh=None, 

1927 tslow=None, 

1928 wirelen=wirelen, 

1929 ifname=None, 

1930 direction=None, 

1931 process_information={}, 

1932 comments=None)) 

1933 

1934 def _read_block_pkt(self, block, size): 

1935 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1936 """(Obsolete) Packet Block""" 

1937 try: 

1938 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack( 

1939 self.endian + "HH4I", 

1940 block[:20], 

1941 ) 

1942 except struct.error: 

1943 warning("PcapNg: PKT is too small %d/20 !" % len(block)) 

1944 raise EOFError 

1945 

1946 self._check_interface_id(intid) 

1947 return (block[20:20 + caplen][:size], 

1948 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1949 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1950 tshigh=tshigh, 

1951 tslow=tslow, 

1952 wirelen=wirelen, 

1953 ifname=None, 

1954 direction=None, 

1955 process_information={}, 

1956 comments=None)) 

1957 

1958 def _read_block_dsb(self, block, size): 

1959 # type: (bytes, int) -> None 

1960 """Decryption Secrets Block""" 

1961 

1962 # Parse the secrets type and length fields 

1963 try: 

1964 secrets_type, secrets_length = struct.unpack( 

1965 self.endian + "II", 

1966 block[:8], 

1967 ) 

1968 block = block[8:] 

1969 except struct.error: 

1970 warning("PcapNg: DSB is too small %d!", len(block)) 

1971 raise EOFError 

1972 

1973 # Compute the secrets length including the padding 

1974 padded_secrets_length = secrets_length + (-secrets_length) % 4 

1975 if len(block) < padded_secrets_length: 

1976 warning("PcapNg: invalid DSB secrets length!") 

1977 raise EOFError 

1978 

1979 # Extract secrets data and options 

1980 secrets_data = block[:padded_secrets_length][:secrets_length] 

1981 if block[padded_secrets_length:]: 

1982 warning("PcapNg: DSB options are not supported!") 

1983 

1984 # TLS Key Log 

1985 if secrets_type == 0x544c534b: 

1986 if getattr(conf, "tls_sessions", False) is False: 

1987 warning("PcapNg: TLS Key Log available, but " 

1988 "the TLS layer is not loaded! Scapy won't be able " 

1989 "to decrypt the packets.") 

1990 else: 

1991 from scapy.layers.tls.session import load_nss_keys 

1992 

1993 # Write Key Log to a file and parse it 

1994 filename = get_temp_file() 

1995 with open(filename, "wb") as fd: 

1996 fd.write(secrets_data) 

1997 fd.close() 

1998 

1999 keys = load_nss_keys(filename) 

2000 if not keys: 

2001 warning("PcapNg: invalid TLS Key Log in DSB!") 

2002 else: 

2003 # Note: these attributes are only available when the TLS 

2004 # layer is loaded. 

2005 conf.tls_nss_keys = keys 

2006 conf.tls_session_enable = True 

2007 else: 

2008 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type) 

2009 

2010 def _read_block_pib(self, block, _): 

2011 # type: (bytes, int) -> None 

2012 """Apple Process Information Block""" 

2013 

2014 # Get the Process ID 

2015 try: 

2016 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0] 

2017 process_information = {"id": dpeb_pid} 

2018 block = block[4:] 

2019 except struct.error: 

2020 warning("PcapNg: DPEB is too small (%d). Cannot get PID!", 

2021 len(block)) 

2022 raise EOFError 

2023 

2024 # Get Options 

2025 options = self._read_options(block) 

2026 for code, value in options.items(): 

2027 if code == 2: 

2028 process_information["name"] = value.decode( # type: ignore 

2029 "ascii", "backslashreplace") 

2030 elif code == 4: 

2031 if len(value) == 16: 

2032 process_information["uuid"] = str(UUID(bytes=value)) # type: ignore 

2033 else: 

2034 warning("PcapNg: DPEB UUID length is invalid (%d)!", 

2035 len(value)) 

2036 

2037 # Store process information 

2038 self.process_information.append(process_information) 

2039 

2040 

2041class PcapNgReader(RawPcapNgReader, PcapReader): 

2042 

2043 alternative = PcapReader 

2044 

2045 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

2046 # type: (str, IO[bytes], bytes) -> None 

2047 RawPcapNgReader.__init__(self, filename, fdesc, magic) 

2048 

2049 def __enter__(self): 

2050 # type: () -> PcapNgReader 

2051 return self 

2052 

2053 def read_packet(self, size=MTU, **kwargs): 

2054 # type: (int, **Any) -> Packet 

2055 rp = super(PcapNgReader, self)._read_packet(size=size) 

2056 if rp is None: 

2057 raise EOFError 

2058 s, (linktype, tsresol, tshigh, tslow, wirelen, comments, ifname, direction, process_information) = rp # noqa: E501 

2059 try: 

2060 cls = conf.l2types.num2layer[linktype] # type: Type[Packet] 

2061 p = cls(s, **kwargs) # type: Packet 

2062 except KeyboardInterrupt: 

2063 raise 

2064 except Exception: 

2065 if conf.debug_dissector: 

2066 raise 

2067 if conf.raw_layer is None: 

2068 # conf.raw_layer is set on import 

2069 import scapy.packet # noqa: F401 

2070 p = conf.raw_layer(s) 

2071 if tshigh is not None: 

2072 p.time = EDecimal((tshigh << 32) + tslow) / tsresol 

2073 p.wirelen = wirelen 

2074 p.comments = comments 

2075 p.direction = direction 

2076 p.process_information = process_information.copy() 

2077 if ifname is not None: 

2078 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace') 

2079 return p 

2080 

2081 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore 

2082 return self.read_packet(size=size, **kwargs) 

2083 

2084 

2085class GenericPcapWriter(object): 

2086 nano = False 

2087 linktype: int 

2088 

2089 def _write_header(self, pkt): 

2090 # type: (Optional[Union[Packet, bytes]]) -> None 

2091 raise NotImplementedError 

2092 

2093 def _write_packet(self, 

2094 packet, # type: Union[bytes, Packet] 

2095 linktype, # type: int 

2096 sec=None, # type: Optional[float] 

2097 usec=None, # type: Optional[int] 

2098 caplen=None, # type: Optional[int] 

2099 wirelen=None, # type: Optional[int] 

2100 ifname=None, # type: Optional[bytes] 

2101 direction=None, # type: Optional[int] 

2102 comments=None, # type: Optional[List[bytes]] 

2103 ): 

2104 # type: (...) -> None 

2105 raise NotImplementedError 

2106 

2107 def _get_time(self, 

2108 packet, # type: Union[bytes, Packet] 

2109 sec, # type: Optional[float] 

2110 usec # type: Optional[int] 

2111 ): 

2112 # type: (...) -> Tuple[float, int] 

2113 if hasattr(packet, "time"): 

2114 if sec is None: 

2115 packet_time = packet.time 

2116 tmp = int(packet_time) 

2117 usec = int(round((packet_time - tmp) * 

2118 (1000000000 if self.nano else 1000000))) 

2119 sec = float(packet_time) 

2120 if sec is not None and usec is None: 

2121 usec = 0 

2122 return sec, usec # type: ignore 

2123 

2124 def write_header(self, pkt): 

2125 # type: (Optional[Union[Packet, bytes]]) -> None 

2126 if not hasattr(self, 'linktype'): 

2127 try: 

2128 if pkt is None or isinstance(pkt, bytes): 

2129 # Can't guess LL 

2130 raise KeyError 

2131 self.linktype = conf.l2types.layer2num[ 

2132 pkt.__class__ 

2133 ] 

2134 except KeyError: 

2135 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)" 

2136 warning(msg, self.__class__.__name__, pkt.__class__.__name__) 

2137 self.linktype = DLT_EN10MB 

2138 self._write_header(pkt) 

2139 

2140 def write_packet(self, 

2141 packet, # type: Union[bytes, Packet] 

2142 sec=None, # type: Optional[float] 

2143 usec=None, # type: Optional[int] 

2144 caplen=None, # type: Optional[int] 

2145 wirelen=None, # type: Optional[int] 

2146 ): 

2147 # type: (...) -> None 

2148 """ 

2149 Writes a single packet to the pcap file. 

2150 

2151 :param packet: Packet, or bytes for a single packet 

2152 :type packet: scapy.packet.Packet or bytes 

2153 :param sec: time the packet was captured, in seconds since epoch. If 

2154 not supplied, defaults to now. 

2155 :type sec: float 

2156 :param usec: If ``nano=True``, then number of nanoseconds after the 

2157 second that the packet was captured. If ``nano=False``, 

2158 then the number of microseconds after the second the 

2159 packet was captured. If ``sec`` is not specified, 

2160 this value is ignored. 

2161 :type usec: int or long 

2162 :param caplen: The length of the packet in the capture file. If not 

2163 specified, uses ``len(raw(packet))``. 

2164 :type caplen: int 

2165 :param wirelen: The length of the packet on the wire. If not 

2166 specified, tries ``packet.wirelen``, otherwise uses 

2167 ``caplen``. 

2168 :type wirelen: int 

2169 :return: None 

2170 :rtype: None 

2171 """ 

2172 f_sec, usec = self._get_time(packet, sec, usec) 

2173 

2174 rawpkt = bytes_encode(packet) 

2175 caplen = len(rawpkt) if caplen is None else caplen 

2176 

2177 if wirelen is None: 

2178 if hasattr(packet, "wirelen"): 

2179 wirelen = packet.wirelen 

2180 if wirelen is None: 

2181 wirelen = caplen 

2182 

2183 comments = getattr(packet, "comments", None) 

2184 ifname = getattr(packet, "sniffed_on", None) 

2185 direction = getattr(packet, "direction", None) 

2186 if not isinstance(packet, bytes): 

2187 linktype: int = conf.l2types.layer2num[ 

2188 packet.__class__ 

2189 ] 

2190 else: 

2191 linktype = self.linktype 

2192 if ifname is not None: 

2193 ifname = str(ifname).encode('utf-8') 

2194 self._write_packet( 

2195 rawpkt, 

2196 sec=f_sec, usec=usec, 

2197 caplen=caplen, wirelen=wirelen, 

2198 ifname=ifname, 

2199 direction=direction, 

2200 linktype=linktype, 

2201 comments=comments, 

2202 ) 

2203 

2204 

2205class GenericRawPcapWriter(GenericPcapWriter): 

2206 header_present = False 

2207 nano = False 

2208 sync = False 

2209 f = None # type: Union[IO[bytes], gzip.GzipFile] 

2210 

2211 def fileno(self): 

2212 # type: () -> int 

2213 return -1 if WINDOWS else self.f.fileno() 

2214 

2215 def flush(self): 

2216 # type: () -> Optional[Any] 

2217 return self.f.flush() 

2218 

2219 def close(self): 

2220 # type: () -> Optional[Any] 

2221 if not self.header_present: 

2222 self.write_header(None) 

2223 return self.f.close() 

2224 

2225 def __enter__(self): 

2226 # type: () -> GenericRawPcapWriter 

2227 return self 

2228 

2229 def __exit__(self, exc_type, exc_value, tracback): 

2230 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

2231 self.flush() 

2232 self.close() 

2233 

2234 def write(self, pkt): 

2235 # type: (Union[_PacketIterable, bytes]) -> None 

2236 """ 

2237 Writes a Packet, a SndRcvList object, or bytes to a pcap file. 

2238 

2239 :param pkt: Packet(s) to write (one record for each Packet), or raw 

2240 bytes to write (as one record). 

2241 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes 

2242 """ 

2243 if isinstance(pkt, bytes): 

2244 if not self.header_present: 

2245 self.write_header(pkt) 

2246 self.write_packet(pkt) 

2247 else: 

2248 # Import here to avoid circular dependency 

2249 from scapy.supersocket import IterSocket 

2250 for p in IterSocket(pkt).iter: 

2251 if not self.header_present: 

2252 self.write_header(p) 

2253 

2254 if not isinstance(p, bytes) and \ 

2255 self.linktype != conf.l2types.get(type(p), None): 

2256 warning("Inconsistent linktypes detected!" 

2257 " The resulting file might contain" 

2258 " invalid packets." 

2259 ) 

2260 

2261 self.write_packet(p) 

2262 

2263 

2264class RawPcapWriter(GenericRawPcapWriter): 

2265 """A stream PCAP writer with more control than wrpcap()""" 

2266 

2267 def __init__(self, 

2268 filename, # type: Union[IO[bytes], str] 

2269 linktype=None, # type: Optional[int] 

2270 gz=False, # type: bool 

2271 endianness="", # type: str 

2272 append=False, # type: bool 

2273 sync=False, # type: bool 

2274 nano=False, # type: bool 

2275 snaplen=MTU, # type: int 

2276 bufsz=4096, # type: int 

2277 ): 

2278 # type: (...) -> None 

2279 """ 

2280 :param filename: the name of the file to write packets to, or an open, 

2281 writable file-like object. 

2282 :param linktype: force linktype to a given value. If None, linktype is 

2283 taken from the first writer packet 

2284 :param gz: compress the capture on the fly 

2285 :param endianness: force an endianness (little:"<", big:">"). 

2286 Default is native 

2287 :param append: append packets to the capture file instead of 

2288 truncating it 

2289 :param sync: do not bufferize writes to the capture file 

2290 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0) 

2291 

2292 """ 

2293 

2294 if linktype: 

2295 self.linktype = linktype 

2296 self.snaplen = snaplen 

2297 self.append = append 

2298 self.gz = gz 

2299 self.endian = endianness 

2300 self.sync = sync 

2301 self.nano = nano 

2302 if sync: 

2303 bufsz = 0 

2304 

2305 if isinstance(filename, str): 

2306 self.filename = filename 

2307 if gz: 

2308 self.f = cast(_ByteStream, gzip.open( 

2309 filename, append and "ab" or "wb", 9 

2310 )) 

2311 else: 

2312 self.f = open(filename, append and "ab" or "wb", bufsz) 

2313 else: 

2314 self.f = filename 

2315 self.filename = getattr(filename, "name", "No name") 

2316 

2317 def _write_header(self, pkt): 

2318 # type: (Optional[Union[Packet, bytes]]) -> None 

2319 self.header_present = True 

2320 

2321 if self.append: 

2322 # Even if prone to race conditions, this seems to be 

2323 # safest way to tell whether the header is already present 

2324 # because we have to handle compressed streams that 

2325 # are not as flexible as basic files 

2326 if self.gz: 

2327 g = gzip.open(self.filename, "rb") # type: _ByteStream 

2328 else: 

2329 g = open(self.filename, "rb") 

2330 try: 

2331 if g.read(16): 

2332 return 

2333 finally: 

2334 g.close() 

2335 

2336 if not hasattr(self, 'linktype'): 

2337 raise ValueError( 

2338 "linktype could not be guessed. " 

2339 "Please pass a linktype while creating the writer" 

2340 ) 

2341 

2342 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501 

2343 2, 4, 0, 0, self.snaplen, self.linktype)) 

2344 self.f.flush() 

2345 

2346 def _write_packet(self, 

2347 packet, # type: Union[bytes, Packet] 

2348 linktype, # type: int 

2349 sec=None, # type: Optional[float] 

2350 usec=None, # type: Optional[int] 

2351 caplen=None, # type: Optional[int] 

2352 wirelen=None, # type: Optional[int] 

2353 ifname=None, # type: Optional[bytes] 

2354 direction=None, # type: Optional[int] 

2355 comments=None, # type: Optional[List[bytes]] 

2356 ): 

2357 # type: (...) -> None 

2358 """ 

2359 Writes a single packet to the pcap file. 

2360 

2361 :param packet: bytes for a single packet 

2362 :type packet: bytes 

2363 :param linktype: linktype value associated with the packet 

2364 :type linktype: int 

2365 :param sec: time the packet was captured, in seconds since epoch. If 

2366 not supplied, defaults to now. 

2367 :type sec: float 

2368 :param usec: not used with pcapng 

2369 packet was captured 

2370 :type usec: int or long 

2371 :param caplen: The length of the packet in the capture file. If not 

2372 specified, uses ``len(packet)``. 

2373 :type caplen: int 

2374 :param wirelen: The length of the packet on the wire. If not 

2375 specified, uses ``caplen``. 

2376 :type wirelen: int 

2377 :return: None 

2378 :rtype: None 

2379 """ 

2380 if caplen is None: 

2381 caplen = len(packet) 

2382 if wirelen is None: 

2383 wirelen = caplen 

2384 if sec is None or usec is None: 

2385 t = time.time() 

2386 it = int(t) 

2387 if sec is None: 

2388 sec = it 

2389 usec = int(round((t - it) * 

2390 (1000000000 if self.nano else 1000000))) 

2391 elif usec is None: 

2392 usec = 0 

2393 

2394 self.f.write(struct.pack(self.endian + "IIII", 

2395 int(sec), usec, caplen, wirelen)) 

2396 self.f.write(bytes(packet)) 

2397 if self.sync: 

2398 self.f.flush() 

2399 

2400 

2401class RawPcapNgWriter(GenericRawPcapWriter): 

2402 """A stream pcapng writer with more control than wrpcapng()""" 

2403 

2404 def __init__(self, 

2405 filename, # type: str 

2406 ): 

2407 # type: (...) -> None 

2408 

2409 self.header_present = False 

2410 self.tsresol = 1000000 

2411 # A dict to keep if_name to IDB id mapping. 

2412 # unknown if_name(None) id=0 

2413 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0} 

2414 

2415 # tcpdump only support little-endian in PCAPng files 

2416 self.endian = "<" 

2417 self.endian_magic = b"\x4d\x3c\x2b\x1a" 

2418 

2419 self.filename = filename 

2420 self.f = open(filename, "wb", 4096) 

2421 

2422 def _get_time(self, 

2423 packet, # type: Union[bytes, Packet] 

2424 sec, # type: Optional[float] 

2425 usec # type: Optional[int] 

2426 ): 

2427 # type: (...) -> Tuple[float, int] 

2428 if hasattr(packet, "time"): 

2429 if sec is None: 

2430 sec = float(packet.time) 

2431 

2432 if usec is None: 

2433 usec = 0 

2434 

2435 return sec, usec # type: ignore 

2436 

2437 def _add_padding(self, raw_data): 

2438 # type: (bytes) -> bytes 

2439 raw_data += ((-len(raw_data)) % 4) * b"\x00" 

2440 return raw_data 

2441 

2442 def build_block(self, block_type, block_body, options=None): 

2443 # type: (bytes, bytes, Optional[bytes]) -> bytes 

2444 

2445 # Pad Block Body to 32 bits 

2446 block_body = self._add_padding(block_body) 

2447 

2448 if options: 

2449 block_body += options 

2450 

2451 # An empty block is 12 bytes long 

2452 block_total_length = 12 + len(block_body) 

2453 

2454 # Block Type 

2455 block = block_type 

2456 # Block Total Length$ 

2457 block += struct.pack(self.endian + "I", block_total_length) 

2458 # Block Body 

2459 block += block_body 

2460 # Block Total Length$ 

2461 block += struct.pack(self.endian + "I", block_total_length) 

2462 

2463 return block 

2464 

2465 def _write_header(self, pkt): 

2466 # type: (Optional[Union[Packet, bytes]]) -> None 

2467 if not self.header_present: 

2468 self.header_present = True 

2469 self._write_block_shb() 

2470 self._write_block_idb(linktype=self.linktype) 

2471 

2472 def _write_block_shb(self): 

2473 # type: () -> None 

2474 

2475 # Block Type 

2476 block_type = b"\x0A\x0D\x0D\x0A" 

2477 # Byte-Order Magic 

2478 block_shb = self.endian_magic 

2479 # Major Version 

2480 block_shb += struct.pack(self.endian + "H", 1) 

2481 # Minor Version 

2482 block_shb += struct.pack(self.endian + "H", 0) 

2483 # Section Length 

2484 block_shb += struct.pack(self.endian + "q", -1) 

2485 

2486 self.f.write(self.build_block(block_type, block_shb)) 

2487 

2488 def _write_block_idb(self, 

2489 linktype, # type: int 

2490 ifname=None # type: Optional[bytes] 

2491 ): 

2492 # type: (...) -> None 

2493 

2494 # Block Type 

2495 block_type = struct.pack(self.endian + "I", 1) 

2496 # LinkType 

2497 block_idb = struct.pack(self.endian + "H", linktype) 

2498 # Reserved 

2499 block_idb += struct.pack(self.endian + "H", 0) 

2500 # SnapLen 

2501 block_idb += struct.pack(self.endian + "I", 262144) 

2502 

2503 # if_name option 

2504 opts = None 

2505 if ifname is not None: 

2506 opts = struct.pack(self.endian + "HH", 2, len(ifname)) 

2507 # Pad Option Value to 32 bits 

2508 opts += self._add_padding(ifname) 

2509 opts += struct.pack(self.endian + "HH", 0, 0) 

2510 

2511 self.f.write(self.build_block(block_type, block_idb, options=opts)) 

2512 

2513 def _write_block_spb(self, raw_pkt): 

2514 # type: (bytes) -> None 

2515 

2516 # Block Type 

2517 block_type = struct.pack(self.endian + "I", 3) 

2518 # Original Packet Length 

2519 block_spb = struct.pack(self.endian + "I", len(raw_pkt)) 

2520 # Packet Data 

2521 block_spb += raw_pkt 

2522 

2523 self.f.write(self.build_block(block_type, block_spb)) 

2524 

2525 def _write_block_epb(self, 

2526 raw_pkt, # type: bytes 

2527 ifid, # type: int 

2528 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501 

2529 caplen=None, # type: Optional[int] 

2530 orglen=None, # type: Optional[int] 

2531 comments=None, # type: Optional[List[bytes]] 

2532 flags=None, # type: Optional[int] 

2533 ): 

2534 # type: (...) -> None 

2535 

2536 if timestamp: 

2537 tmp_ts = int(timestamp * self.tsresol) 

2538 ts_high = tmp_ts >> 32 

2539 ts_low = tmp_ts & 0xFFFFFFFF 

2540 else: 

2541 ts_high = ts_low = 0 

2542 

2543 if not caplen: 

2544 caplen = len(raw_pkt) 

2545 

2546 if not orglen: 

2547 orglen = len(raw_pkt) 

2548 

2549 # Block Type 

2550 block_type = struct.pack(self.endian + "I", 6) 

2551 # Interface ID 

2552 block_epb = struct.pack(self.endian + "I", ifid) 

2553 # Timestamp (High) 

2554 block_epb += struct.pack(self.endian + "I", ts_high) 

2555 # Timestamp (Low) 

2556 block_epb += struct.pack(self.endian + "I", ts_low) 

2557 # Captured Packet Length 

2558 block_epb += struct.pack(self.endian + "I", caplen) 

2559 # Original Packet Length 

2560 block_epb += struct.pack(self.endian + "I", orglen) 

2561 # Packet Data 

2562 block_epb += raw_pkt 

2563 

2564 # Options 

2565 opts = b'' 

2566 if comments and len(comments): 

2567 for c in comments: 

2568 comment = bytes_encode(c) 

2569 opts += struct.pack(self.endian + "HH", 1, len(comment)) 

2570 # Pad Option Value to 32 bits 

2571 opts += self._add_padding(comment) 

2572 if type(flags) == int: 

2573 opts += struct.pack(self.endian + "HH", 2, 4) 

2574 opts += struct.pack(self.endian + "I", flags) 

2575 if opts: 

2576 opts += struct.pack(self.endian + "HH", 0, 0) 

2577 

2578 self.f.write(self.build_block(block_type, block_epb, 

2579 options=opts)) 

2580 

2581 def _write_packet(self, # type: ignore 

2582 packet, # type: bytes 

2583 linktype, # type: int 

2584 sec=None, # type: Optional[float] 

2585 usec=None, # type: Optional[int] 

2586 caplen=None, # type: Optional[int] 

2587 wirelen=None, # type: Optional[int] 

2588 ifname=None, # type: Optional[bytes] 

2589 direction=None, # type: Optional[int] 

2590 comments=None, # type: Optional[List[bytes]] 

2591 ): 

2592 # type: (...) -> None 

2593 """ 

2594 Writes a single packet to the pcap file. 

2595 

2596 :param packet: bytes for a single packet 

2597 :type packet: bytes 

2598 :param linktype: linktype value associated with the packet 

2599 :type linktype: int 

2600 :param sec: time the packet was captured, in seconds since epoch. If 

2601 not supplied, defaults to now. 

2602 :type sec: float 

2603 :param caplen: The length of the packet in the capture file. If not 

2604 specified, uses ``len(packet)``. 

2605 :type caplen: int 

2606 :param wirelen: The length of the packet on the wire. If not 

2607 specified, uses ``caplen``. 

2608 :type wirelen: int 

2609 :param comment: UTF-8 string containing human-readable comment text 

2610 that is associated to the current block. Line separators 

2611 SHOULD be a carriage-return + linefeed ('\r\n') or 

2612 just linefeed ('\n'); either form may appear and 

2613 be considered a line separator. The string is not 

2614 zero-terminated. 

2615 :type bytes 

2616 :param ifname: UTF-8 string containing the 

2617 name of the device used to capture data. 

2618 The string is not zero-terminated. 

2619 :type bytes 

2620 :param direction: 0 = information not available, 

2621 1 = inbound, 

2622 2 = outbound 

2623 :type int 

2624 :return: None 

2625 :rtype: None 

2626 """ 

2627 if caplen is None: 

2628 caplen = len(packet) 

2629 if wirelen is None: 

2630 wirelen = caplen 

2631 

2632 ifid = self.interfaces2id.get(ifname, None) 

2633 if ifid is None: 

2634 ifid = max(self.interfaces2id.values()) + 1 

2635 self.interfaces2id[ifname] = ifid 

2636 self._write_block_idb(linktype=linktype, ifname=ifname) 

2637 

2638 # EPB flags (32 bits). 

2639 # currently only direction is implemented (least 2 significant bits) 

2640 if type(direction) == int: 

2641 flags = direction & 0x3 

2642 else: 

2643 flags = None 

2644 

2645 self._write_block_epb(packet, timestamp=sec, caplen=caplen, 

2646 orglen=wirelen, comments=comments, ifid=ifid, flags=flags) 

2647 if self.sync: 

2648 self.f.flush() 

2649 

2650 

2651class PcapWriter(RawPcapWriter): 

2652 """A stream PCAP writer with more control than wrpcap()""" 

2653 pass 

2654 

2655 

2656class PcapNgWriter(RawPcapNgWriter): 

2657 """A stream pcapng writer with more control than wrpcapng()""" 

2658 

2659 def _get_time(self, 

2660 packet, # type: Union[bytes, Packet] 

2661 sec, # type: Optional[float] 

2662 usec # type: Optional[int] 

2663 ): 

2664 # type: (...) -> Tuple[float, int] 

2665 if hasattr(packet, "time"): 

2666 if sec is None: 

2667 sec = float(packet.time) 

2668 

2669 if usec is None: 

2670 usec = 0 

2671 

2672 return sec, usec # type: ignore 

2673 

2674 

2675@conf.commands.register 

2676def rderf(filename, count=-1): 

2677 # type: (Union[IO[bytes], str], int) -> PacketList 

2678 """Read a ERF file and return a packet list 

2679 

2680 :param count: read only <count> packets 

2681 """ 

2682 with ERFEthernetReader(filename) as fdesc: 

2683 return fdesc.read_all(count=count) 

2684 

2685 

2686class ERFEthernetReader_metaclass(PcapReader_metaclass): 

2687 def __call__(cls, filename): 

2688 # type: (Union[IO[bytes], str]) -> Any 

2689 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore 

2690 filename, fdesc = cls.open(filename) 

2691 try: 

2692 i.__init__(filename, fdesc) 

2693 return i 

2694 except (Scapy_Exception, EOFError): 

2695 pass 

2696 

2697 if "alternative" in cls.__dict__: 

2698 cls = cls.__dict__["alternative"] 

2699 i = cls.__new__( 

2700 cls, 

2701 cls.__name__, 

2702 cls.__bases__, 

2703 cls.__dict__ # type: ignore 

2704 ) 

2705 try: 

2706 i.__init__(filename, fdesc) 

2707 return i 

2708 except (Scapy_Exception, EOFError): 

2709 pass 

2710 

2711 raise Scapy_Exception("Not a supported capture file") 

2712 

2713 @staticmethod 

2714 def open(fname # type: ignore 

2715 ): 

2716 # type: (...) -> Tuple[str, _ByteStream] 

2717 """Open (if necessary) filename""" 

2718 if isinstance(fname, str): 

2719 filename = fname 

2720 try: 

2721 with gzip.open(filename, "rb") as tmp: 

2722 tmp.read(1) 

2723 fdesc = gzip.open(filename, "rb") # type: _ByteStream 

2724 except IOError: 

2725 fdesc = open(filename, "rb") 

2726 

2727 else: 

2728 fdesc = fname 

2729 filename = getattr(fdesc, "name", "No name") 

2730 return filename, fdesc 

2731 

2732 

2733class ERFEthernetReader(PcapReader, 

2734 metaclass=ERFEthernetReader_metaclass): 

2735 

2736 def __init__(self, filename, fdesc=None): # type: ignore 

2737 # type: (Union[IO[bytes], str], IO[bytes]) -> None 

2738 self.filename = filename # type: ignore 

2739 self.f = fdesc 

2740 self.power = Decimal(10) ** Decimal(-9) 

2741 

2742 # time is in 64-bits Endace's format which can be see here: 

2743 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2744 def _convert_erf_timestamp(self, t): 

2745 # type: (int) -> EDecimal 

2746 sec = t >> 32 

2747 frac_sec = t & 0xffffffff 

2748 frac_sec *= 10**9 

2749 frac_sec += (frac_sec & 0x80000000) << 1 

2750 frac_sec >>= 32 

2751 return EDecimal(sec + self.power * frac_sec) 

2752 

2753 # The details of ERF Packet format can be see here: 

2754 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2755 def read_packet(self, size=MTU, **kwargs): 

2756 # type: (int, **Any) -> Packet 

2757 

2758 # General ERF Header have exactly 16 bytes 

2759 hdr = self.f.read(16) 

2760 if len(hdr) < 16: 

2761 raise EOFError 

2762 

2763 # The timestamp is in little-endian byte-order. 

2764 time = struct.unpack('<Q', hdr[:8])[0] 

2765 # The rest is in big-endian byte-order. 

2766 # Ignoring flags and lctr (loss counter) since they are ERF specific 

2767 # header fields which Packet object does not support. 

2768 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:]) 

2769 # Check if the type != 0x02, type Ethernet 

2770 if type & 0x02 == 0: 

2771 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)") 

2772 

2773 # If there are extended headers, ignore it because Packet object does 

2774 # not support it. Extended headers size is 8 bytes before the payload. 

2775 if type & 0x80: 

2776 _ = self.f.read(8) 

2777 s = self.f.read(rlen - 24) 

2778 else: 

2779 s = self.f.read(rlen - 16) 

2780 

2781 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both 

2782 # of the fields are disregarded by Endace. 

2783 pb = s[2:size] 

2784 from scapy.layers.l2 import Ether 

2785 try: 

2786 p = Ether(pb, **kwargs) # type: Packet 

2787 except KeyboardInterrupt: 

2788 raise 

2789 except Exception: 

2790 if conf.debug_dissector: 

2791 from scapy.sendrecv import debug 

2792 debug.crashed_on = (Ether, s) 

2793 raise 

2794 if conf.raw_layer is None: 

2795 # conf.raw_layer is set on import 

2796 import scapy.packet # noqa: F401 

2797 p = conf.raw_layer(s) 

2798 

2799 p.time = self._convert_erf_timestamp(time) 

2800 p.wirelen = wlen 

2801 

2802 return p 

2803 

2804 

2805@conf.commands.register 

2806def wrerf(filename, # type: Union[IO[bytes], str] 

2807 pkt, # type: _PacketIterable 

2808 *args, # type: Any 

2809 **kargs # type: Any 

2810 ): 

2811 # type: (...) -> None 

2812 """Write a list of packets to a ERF file 

2813 

2814 :param filename: the name of the file to write packets to, or an open, 

2815 writable file-like object. The file descriptor will be 

2816 closed at the end of the call, so do not use an object you 

2817 do not want to close (e.g., running wrerf(sys.stdout, []) 

2818 in interactive mode will crash Scapy). 

2819 :param gz: set to 1 to save a gzipped capture 

2820 :param append: append packets to the capture file instead of 

2821 truncating it 

2822 :param sync: do not bufferize writes to the capture file 

2823 """ 

2824 with ERFEthernetWriter(filename, *args, **kargs) as fdesc: 

2825 fdesc.write(pkt) 

2826 

2827 

2828class ERFEthernetWriter(PcapWriter): 

2829 """A stream ERF Ethernet writer with more control than wrerf()""" 

2830 

2831 def __init__(self, 

2832 filename, # type: Union[IO[bytes], str] 

2833 gz=False, # type: bool 

2834 append=False, # type: bool 

2835 sync=False, # type: bool 

2836 ): 

2837 # type: (...) -> None 

2838 """ 

2839 :param filename: the name of the file to write packets to, or an open, 

2840 writable file-like object. 

2841 :param gz: compress the capture on the fly 

2842 :param append: append packets to the capture file instead of 

2843 truncating it 

2844 :param sync: do not bufferize writes to the capture file 

2845 """ 

2846 super(ERFEthernetWriter, self).__init__(filename, 

2847 gz=gz, 

2848 append=append, 

2849 sync=sync) 

2850 

2851 def write(self, pkt): # type: ignore 

2852 # type: (_PacketIterable) -> None 

2853 """ 

2854 Writes a Packet, a SndRcvList object, or bytes to a ERF file. 

2855 

2856 :param pkt: Packet(s) to write (one record for each Packet) 

2857 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet 

2858 """ 

2859 # Import here to avoid circular dependency 

2860 from scapy.supersocket import IterSocket 

2861 for p in IterSocket(pkt).iter: 

2862 self.write_packet(p) 

2863 

2864 def write_packet(self, pkt): # type: ignore 

2865 # type: (Packet) -> None 

2866 

2867 if hasattr(pkt, "time"): 

2868 sec = int(pkt.time) 

2869 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9) 

2870 t = (sec << 32) + usec 

2871 else: 

2872 t = int(time.time()) << 32 

2873 

2874 # There are 16 bytes of headers + 2 bytes of padding before the packets 

2875 # payload. 

2876 rlen = len(pkt) + 18 

2877 

2878 if hasattr(pkt, "wirelen"): 

2879 wirelen = pkt.wirelen 

2880 if wirelen is None: 

2881 wirelen = rlen 

2882 

2883 self.f.write(struct.pack("<Q", t)) 

2884 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0)) 

2885 self.f.write(bytes(pkt)) 

2886 self.f.flush() 

2887 

2888 def close(self): 

2889 # type: () -> Optional[Any] 

2890 return self.f.close() 

2891 

2892 

2893@conf.commands.register 

2894def import_hexcap(input_string=None): 

2895 # type: (Optional[str]) -> bytes 

2896 """Imports a tcpdump like hexadecimal view 

2897 

2898 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex" 

2899 

2900 :param input_string: String containing the hexdump input to parse. If None, 

2901 read from standard input. 

2902 """ 

2903 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501 

2904 p = "" 

2905 try: 

2906 if input_string: 

2907 input_function = StringIO(input_string).readline 

2908 else: 

2909 input_function = input 

2910 while True: 

2911 line = input_function().strip() 

2912 if not line: 

2913 break 

2914 try: 

2915 p += re_extract_hexcap.match(line).groups()[2] # type: ignore 

2916 except Exception: 

2917 warning("Parsing error during hexcap") 

2918 continue 

2919 except EOFError: 

2920 pass 

2921 

2922 p = p.replace(" ", "") 

2923 return hex_bytes(p) 

2924 

2925 

2926@conf.commands.register 

2927def wireshark(pktlist, wait=False, **kwargs): 

2928 # type: (List[Packet], bool, **Any) -> Optional[Any] 

2929 """ 

2930 Runs Wireshark on a list of packets. 

2931 

2932 See :func:`tcpdump` for more parameter description. 

2933 

2934 Note: this defaults to wait=False, to run Wireshark in the background. 

2935 """ 

2936 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs) 

2937 

2938 

2939@conf.commands.register 

2940def tdecode( 

2941 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable] 

2942 args=None, # type: Optional[List[str]] 

2943 **kwargs # type: Any 

2944): 

2945 # type: (...) -> Any 

2946 """ 

2947 Run tshark on a list of packets. 

2948 

2949 :param args: If not specified, defaults to ``tshark -V``. 

2950 

2951 See :func:`tcpdump` for more parameters. 

2952 """ 

2953 if args is None: 

2954 args = ["-V"] 

2955 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs) 

2956 

2957 

2958def _guess_linktype_name(value): 

2959 # type: (int) -> str 

2960 """Guess the DLT name from its value.""" 

2961 from scapy.libs.winpcapy import pcap_datalink_val_to_name 

2962 return cast(bytes, pcap_datalink_val_to_name(value)).decode() 

2963 

2964 

2965def _guess_linktype_value(name): 

2966 # type: (str) -> int 

2967 """Guess the value of a DLT name.""" 

2968 from scapy.libs.winpcapy import pcap_datalink_name_to_val 

2969 val = cast(int, pcap_datalink_name_to_val(name.encode())) 

2970 if val == -1: 

2971 warning("Unknown linktype: %s. Using EN10MB", name) 

2972 return DLT_EN10MB 

2973 return val 

2974 

2975 

2976@conf.commands.register 

2977def tcpdump( 

2978 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable] 

2979 dump=False, # type: bool 

2980 getfd=False, # type: bool 

2981 args=None, # type: Optional[List[str]] 

2982 flt=None, # type: Optional[str] 

2983 prog=None, # type: Optional[Any] 

2984 getproc=False, # type: bool 

2985 quiet=False, # type: bool 

2986 use_tempfile=None, # type: Optional[Any] 

2987 read_stdin_opts=None, # type: Optional[Any] 

2988 linktype=None, # type: Optional[Any] 

2989 wait=True, # type: bool 

2990 _suppress=False # type: bool 

2991): 

2992 # type: (...) -> Any 

2993 """Run tcpdump or tshark on a list of packets. 

2994 

2995 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a 

2996 temporary file to store the packets. This works around a bug in Apple's 

2997 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/ 

2998 

2999 Otherwise, the packets are passed in stdin. 

3000 

3001 This function can be explicitly enabled or disabled with the 

3002 ``use_tempfile`` parameter. 

3003 

3004 When using ``wireshark``, it will be called with ``-ki -`` to start 

3005 immediately capturing packets from stdin. 

3006 

3007 Otherwise, the command will be run with ``-r -`` (which is correct for 

3008 ``tcpdump`` and ``tshark``). 

3009 

3010 This can be overridden with ``read_stdin_opts``. This has no effect when 

3011 ``use_tempfile=True``, or otherwise reading packets from a regular file. 

3012 

3013 :param pktlist: a Packet instance, a PacketList instance or a list of 

3014 Packet instances. Can also be a filename (as a string), an open 

3015 file-like object that must be a file format readable by 

3016 tshark (Pcap, PcapNg, etc.) or None (to sniff) 

3017 :param flt: a filter to use with tcpdump 

3018 :param dump: when set to True, returns a string instead of displaying it. 

3019 :param getfd: when set to True, returns a file-like object to read data 

3020 from tcpdump or tshark from. 

3021 :param getproc: when set to True, the subprocess.Popen object is returned 

3022 :param args: arguments (as a list) to pass to tshark (example for tshark: 

3023 args=["-T", "json"]). 

3024 :param prog: program to use (defaults to tcpdump, will work with tshark) 

3025 :param quiet: when set to True, the process stderr is discarded 

3026 :param use_tempfile: When set to True, always use a temporary file to store 

3027 packets. 

3028 When set to False, pipe packets through stdin. 

3029 When set to None (default), only use a temporary file with 

3030 ``tcpdump`` on OSX. 

3031 :param read_stdin_opts: When set, a list of arguments needed to capture 

3032 from stdin. Otherwise, attempts to guess. 

3033 :param linktype: A custom DLT value or name, to overwrite the default 

3034 values. 

3035 :param wait: If True (default), waits for the process to terminate before 

3036 returning to Scapy. If False, the process will be detached to the 

3037 background. If dump, getproc or getfd is True, these have the same 

3038 effect as ``wait=False``. 

3039 

3040 Examples:: 

3041 

3042 >>> tcpdump([IP()/TCP(), IP()/UDP()]) 

3043 reading from file -, link-type RAW (Raw IP) 

3044 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501 

3045 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain] 

3046 

3047 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark) 

3048 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501 

3049 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0 

3050 

3051 To get a JSON representation of a tshark-parsed PacketList(), one can:: 

3052 

3053 >>> import json, pprint 

3054 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5", 

3055 ... dst="45.33.32.156"), 

3056 ... prog=conf.prog.tshark, 

3057 ... args=["-T", "json"], 

3058 ... getfd=True)) 

3059 >>> pprint.pprint(json_data) 

3060 [{u'_index': u'packets-2016-12-23', 

3061 u'_score': None, 

3062 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20', 

3063 u'frame.encap_type': u'7', 

3064 [...] 

3065 }, 

3066 u'ip': {u'ip.addr': u'45.33.32.156', 

3067 u'ip.checksum': u'0x0000a20d', 

3068 [...] 

3069 u'ip.ttl': u'64', 

3070 u'ip.version': u'4'}, 

3071 u'raw': u'Raw packet data'}}, 

3072 u'_type': u'pcap_file'}] 

3073 >>> json_data[0]['_source']['layers']['ip']['ip.ttl'] 

3074 u'64' 

3075 """ 

3076 getfd = getfd or getproc 

3077 if prog is None: 

3078 if not conf.prog.tcpdump: 

3079 raise Scapy_Exception( 

3080 "tcpdump is not available" 

3081 ) 

3082 prog = [conf.prog.tcpdump] 

3083 elif isinstance(prog, str): 

3084 prog = [prog] 

3085 else: 

3086 raise ValueError("prog must be a string") 

3087 

3088 if linktype is not None: 

3089 if isinstance(linktype, int): 

3090 # Guess name from value 

3091 try: 

3092 linktype_name = _guess_linktype_name(linktype) 

3093 except StopIteration: 

3094 linktype = -1 

3095 else: 

3096 # Guess value from name 

3097 if linktype.startswith("DLT_"): 

3098 linktype = linktype[4:] 

3099 linktype_name = linktype 

3100 try: 

3101 linktype = _guess_linktype_value(linktype) 

3102 except KeyError: 

3103 linktype = -1 

3104 if linktype == -1: 

3105 raise ValueError( 

3106 "Unknown linktype. Try passing its datalink name instead" 

3107 ) 

3108 prog += ["-y", linktype_name] 

3109 

3110 # Build Popen arguments 

3111 if args is None: 

3112 args = [] 

3113 else: 

3114 # Make a copy of args 

3115 args = list(args) 

3116 

3117 if flt is not None: 

3118 # Check the validity of the filter 

3119 if linktype is None and isinstance(pktlist, str): 

3120 # linktype is unknown but required. Read it from file 

3121 with PcapReader(pktlist) as rd: 

3122 if isinstance(rd, PcapNgReader): 

3123 # Get the linktype from the first packet 

3124 try: 

3125 _, metadata = rd._read_packet() 

3126 linktype = metadata.linktype 

3127 if OPENBSD and linktype == 228: 

3128 linktype = DLT_RAW 

3129 except EOFError: 

3130 raise ValueError( 

3131 "Cannot get linktype from a PcapNg packet." 

3132 ) 

3133 else: 

3134 linktype = rd.linktype 

3135 from scapy.arch.common import compile_filter 

3136 compile_filter(flt, linktype=linktype) 

3137 args.append(flt) 

3138 

3139 stdout = subprocess.PIPE if dump or getfd else None 

3140 stderr = open(os.devnull) if quiet else None 

3141 proc = None 

3142 

3143 if use_tempfile is None: 

3144 # Apple's tcpdump cannot read from stdin, see: 

3145 # http://apple.stackexchange.com/questions/152682/ 

3146 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump 

3147 

3148 if read_stdin_opts is None: 

3149 if prog[0] == conf.prog.wireshark: 

3150 # Start capturing immediately (-k) from stdin (-i -) 

3151 read_stdin_opts = ["-ki", "-"] 

3152 elif prog[0] == conf.prog.tcpdump and not OPENBSD: 

3153 # Capture in packet-buffered mode (-U) from stdin (-r -) 

3154 read_stdin_opts = ["-U", "-r", "-"] 

3155 else: 

3156 read_stdin_opts = ["-r", "-"] 

3157 else: 

3158 # Make a copy of read_stdin_opts 

3159 read_stdin_opts = list(read_stdin_opts) 

3160 

3161 if pktlist is None: 

3162 # sniff 

3163 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3164 proc = subprocess.Popen( 

3165 prog + args, 

3166 stdout=stdout, 

3167 stderr=stderr, 

3168 ) 

3169 elif isinstance(pktlist, str): 

3170 # file 

3171 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3172 proc = subprocess.Popen( 

3173 prog + ["-r", pktlist] + args, 

3174 stdout=stdout, 

3175 stderr=stderr, 

3176 ) 

3177 elif use_tempfile: 

3178 tmpfile = get_temp_file( # type: ignore 

3179 autoext=".pcap", 

3180 fd=True 

3181 ) # type: IO[bytes] 

3182 try: 

3183 tmpfile.writelines( 

3184 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3185 ) 

3186 except AttributeError: 

3187 pktlist = cast("_PacketIterable", pktlist) 

3188 wrpcap(tmpfile, pktlist, linktype=linktype) 

3189 else: 

3190 tmpfile.close() 

3191 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3192 proc = subprocess.Popen( 

3193 prog + ["-r", tmpfile.name] + args, 

3194 stdout=stdout, 

3195 stderr=stderr, 

3196 ) 

3197 else: 

3198 try: 

3199 pktlist.fileno() # type: ignore 

3200 # pass the packet stream 

3201 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3202 proc = subprocess.Popen( 

3203 prog + read_stdin_opts + args, 

3204 stdin=pktlist, # type: ignore 

3205 stdout=stdout, 

3206 stderr=stderr, 

3207 ) 

3208 except (AttributeError, ValueError): 

3209 # write the packet stream to stdin 

3210 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3211 proc = subprocess.Popen( 

3212 prog + read_stdin_opts + args, 

3213 stdin=subprocess.PIPE, 

3214 stdout=stdout, 

3215 stderr=stderr, 

3216 ) 

3217 if proc is None: 

3218 # An error has occurred 

3219 return 

3220 try: 

3221 proc.stdin.writelines( # type: ignore 

3222 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3223 ) 

3224 except AttributeError: 

3225 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore 

3226 except UnboundLocalError: 

3227 # The error was handled by ContextManagerSubprocess 

3228 pass 

3229 else: 

3230 proc.stdin.close() # type: ignore 

3231 if proc is None: 

3232 # An error has occurred 

3233 return 

3234 if dump: 

3235 data = b"".join( 

3236 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore 

3237 ) 

3238 proc.terminate() 

3239 return data 

3240 if getproc: 

3241 return proc 

3242 if getfd: 

3243 return proc.stdout 

3244 if wait: 

3245 proc.wait() 

3246 

3247 

3248@conf.commands.register 

3249def hexedit(pktlist): 

3250 # type: (_PacketIterable) -> PacketList 

3251 """Run hexedit on a list of packets, then return the edited packets.""" 

3252 f = get_temp_file() 

3253 wrpcap(f, pktlist) 

3254 with ContextManagerSubprocess(conf.prog.hexedit): 

3255 subprocess.call([conf.prog.hexedit, f]) 

3256 rpktlist = rdpcap(f) 

3257 os.unlink(f) 

3258 return rpktlist 

3259 

3260 

3261def get_terminal_width(): 

3262 # type: () -> Optional[int] 

3263 """Get terminal width (number of characters) if in a window. 

3264 

3265 Notice: this will try several methods in order to 

3266 support as many terminals and OS as possible. 

3267 """ 

3268 sizex = shutil.get_terminal_size(fallback=(0, 0))[0] 

3269 if sizex != 0: 

3270 return sizex 

3271 # Backups 

3272 if WINDOWS: 

3273 from ctypes import windll, create_string_buffer 

3274 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ 

3275 h = windll.kernel32.GetStdHandle(-12) 

3276 csbi = create_string_buffer(22) 

3277 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 

3278 if res: 

3279 (bufx, bufy, curx, cury, wattr, 

3280 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501 

3281 sizex = right - left + 1 

3282 # sizey = bottom - top + 1 

3283 return sizex 

3284 return sizex 

3285 # We have various methods 

3286 # COLUMNS is set on some terminals 

3287 try: 

3288 sizex = int(os.environ['COLUMNS']) 

3289 except Exception: 

3290 pass 

3291 if sizex: 

3292 return sizex 

3293 # We can query TIOCGWINSZ 

3294 try: 

3295 import fcntl 

3296 import termios 

3297 s = struct.pack('HHHH', 0, 0, 0, 0) 

3298 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) 

3299 sizex = struct.unpack('HHHH', x)[1] 

3300 except (IOError, ModuleNotFoundError): 

3301 # If everything failed, return default terminal size 

3302 sizex = 79 

3303 return sizex 

3304 

3305 

3306def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]] 

3307 header, # type: List[Tuple[str, ...]] 

3308 sortBy=0, # type: Optional[int] 

3309 borders=False, # type: bool 

3310 ): 

3311 # type: (...) -> str 

3312 """ 

3313 Pretty list to fit the terminal, and add header. 

3314 

3315 :param rtlst: a list of tuples. each tuple contains a value which can 

3316 be either a string or a list of string. 

3317 :param sortBy: the column id (starting with 0) which will be used for 

3318 ordering 

3319 :param borders: whether to put borders on the table or not 

3320 """ 

3321 if borders: 

3322 _space = "|" 

3323 else: 

3324 _space = " " 

3325 cols = len(header[0]) 

3326 # Windows has a fat terminal border 

3327 _spacelen = len(_space) * (cols - 1) + int(WINDOWS) 

3328 _croped = False 

3329 if sortBy is not None: 

3330 # Sort correctly 

3331 rtlst.sort(key=lambda x: x[sortBy]) 

3332 # Resolve multi-values 

3333 for i, line in enumerate(rtlst): 

3334 ids = [] # type: List[int] 

3335 values = [] # type: List[Union[str, List[str]]] 

3336 for j, val in enumerate(line): 

3337 if isinstance(val, list): 

3338 ids.append(j) 

3339 values.append(val or " ") 

3340 if values: 

3341 del rtlst[i] 

3342 k = 0 

3343 for ex_vals in zip_longest(*values, fillvalue=" "): 

3344 if k: 

3345 extra_line = [" "] * cols 

3346 else: 

3347 extra_line = list(line) # type: ignore 

3348 for j, h in enumerate(ids): 

3349 extra_line[h] = ex_vals[j] 

3350 rtlst.insert(i + k, tuple(extra_line)) 

3351 k += 1 

3352 rtslst = cast(List[Tuple[str, ...]], rtlst) 

3353 # Append tag 

3354 rtslst = header + rtslst 

3355 # Detect column's width 

3356 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)] 

3357 # Make text fit in box (if required) 

3358 width = get_terminal_width() 

3359 if conf.auto_crop_tables and width: 

3360 width = width - _spacelen 

3361 while sum(colwidth) > width: 

3362 _croped = True 

3363 # Needs to be cropped 

3364 # Get the longest row 

3365 i = colwidth.index(max(colwidth)) 

3366 # Get all elements of this row 

3367 row = [len(x[i]) for x in rtslst] 

3368 # Get biggest element of this row: biggest of the array 

3369 j = row.index(max(row)) 

3370 # Re-build column tuple with the edited element 

3371 t = list(rtslst[j]) 

3372 t[i] = t[i][:-2] + "_" 

3373 rtslst[j] = tuple(t) 

3374 # Update max size 

3375 row[j] = len(t[i]) 

3376 colwidth[i] = max(row) 

3377 if _croped: 

3378 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501 

3379 # Generate padding scheme 

3380 fmt = _space.join(["%%-%ds" % x for x in colwidth]) 

3381 # Append separation line if needed 

3382 if borders: 

3383 rtslst.insert(1, tuple("-" * x for x in colwidth)) 

3384 # Compile 

3385 return "\n".join(fmt % x for x in rtslst) 

3386 

3387 

3388def human_size(x, fmt=".1f"): 

3389 # type: (int, str) -> str 

3390 """ 

3391 Convert a size in octets to a human string representation 

3392 """ 

3393 units = ['K', 'M', 'G', 'T', 'P', 'E'] 

3394 if not x: 

3395 return "0B" 

3396 i = int(math.log(x, 2**10)) 

3397 if i and i < len(units): 

3398 return format(x / 2**(10 * i), fmt) + units[i - 1] 

3399 return str(x) + "B" 

3400 

3401 

3402def __make_table( 

3403 yfmtfunc, # type: Callable[[int], str] 

3404 fmtfunc, # type: Callable[[int], str] 

3405 endline, # type: str 

3406 data, # type: List[Tuple[Packet, Packet]] 

3407 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]] 

3408 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3409 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3410 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]] 

3411 dump=False # type: bool 

3412): 

3413 # type: (...) -> Optional[str] 

3414 """Core function of the make_table suite, which generates the table""" 

3415 vx = {} # type: Dict[str, int] 

3416 vy = {} # type: Dict[str, Optional[int]] 

3417 vz = {} # type: Dict[Tuple[str, str], str] 

3418 vxf = {} # type: Dict[str, str] 

3419 

3420 tmp_len = 0 

3421 for e in data: 

3422 xx, yy, zz = [str(s) for s in fxyz(*e)] 

3423 tmp_len = max(len(yy), tmp_len) 

3424 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz)) 

3425 vy[yy] = None 

3426 vz[(xx, yy)] = zz 

3427 

3428 vxk = list(vx) 

3429 vyk = list(vy) 

3430 if sortx: 

3431 vxk.sort(key=sortx) 

3432 else: 

3433 try: 

3434 vxk.sort(key=int) 

3435 except Exception: 

3436 try: 

3437 vxk.sort(key=atol) 

3438 except Exception: 

3439 vxk.sort() 

3440 if sorty: 

3441 vyk.sort(key=sorty) 

3442 else: 

3443 try: 

3444 vyk.sort(key=int) 

3445 except Exception: 

3446 try: 

3447 vyk.sort(key=atol) 

3448 except Exception: 

3449 vyk.sort() 

3450 

3451 s = "" 

3452 if seplinefunc: 

3453 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk]) 

3454 s += sepline + "\n" 

3455 

3456 fmt = yfmtfunc(tmp_len) 

3457 s += fmt % "" 

3458 s += ' ' 

3459 for x in vxk: 

3460 vxf[x] = fmtfunc(vx[x]) 

3461 s += vxf[x] % x 

3462 s += ' ' 

3463 s += endline + "\n" 

3464 if seplinefunc: 

3465 s += sepline + "\n" 

3466 for y in vyk: 

3467 s += fmt % y 

3468 s += ' ' 

3469 for x in vxk: 

3470 s += vxf[x] % vz.get((x, y), "-") 

3471 s += ' ' 

3472 s += endline + "\n" 

3473 if seplinefunc: 

3474 s += sepline + "\n" 

3475 

3476 if dump: 

3477 return s 

3478 else: 

3479 print(s, end="") 

3480 return None 

3481 

3482 

3483def make_table(*args, **kargs): 

3484 # type: (*Any, **Any) -> Optional[Any] 

3485 return __make_table( 

3486 lambda l: "%%-%is" % l, 

3487 lambda l: "%%-%is" % l, 

3488 "", 

3489 *args, 

3490 **kargs 

3491 ) 

3492 

3493 

3494def make_lined_table(*args, **kargs): 

3495 # type: (*Any, **Any) -> Optional[str] 

3496 return __make_table( # type: ignore 

3497 lambda l: "%%-%is |" % l, 

3498 lambda l: "%%-%is |" % l, 

3499 "", 

3500 *args, 

3501 seplinefunc=lambda a, x: "+".join( 

3502 '-' * (y + 2) for y in [a - 1] + x + [-2] 

3503 ), 

3504 **kargs 

3505 ) 

3506 

3507 

3508def make_tex_table(*args, **kargs): 

3509 # type: (*Any, **Any) -> Optional[str] 

3510 return __make_table( # type: ignore 

3511 lambda l: "%s", 

3512 lambda l: "& %s", 

3513 "\\\\", 

3514 *args, 

3515 seplinefunc=lambda a, x: "\\hline", 

3516 **kargs 

3517 ) 

3518 

3519#################### 

3520# WHOIS CLIENT # 

3521#################### 

3522 

3523 

3524def whois(ip_address): 

3525 # type: (str) -> bytes 

3526 """Whois client for Python""" 

3527 whois_ip = str(ip_address) 

3528 try: 

3529 query = socket.gethostbyname(whois_ip) 

3530 except Exception: 

3531 query = whois_ip 

3532 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

3533 s.connect(("whois.ripe.net", 43)) 

3534 s.send(query.encode("utf8") + b"\r\n") 

3535 answer = b"" 

3536 while True: 

3537 d = s.recv(4096) 

3538 answer += d 

3539 if not d: 

3540 break 

3541 s.close() 

3542 ignore_tag = b"remarks:" 

3543 # ignore all lines starting with the ignore_tag 

3544 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501 

3545 # remove empty lines at the bottom 

3546 for i in range(1, len(lines)): 

3547 if not lines[-i].strip(): 

3548 del lines[-i] 

3549 else: 

3550 break 

3551 return b"\n".join(lines[3:]) 

3552 

3553#################### 

3554# CLI utils # 

3555#################### 

3556 

3557 

3558class _CLIUtilMetaclass(type): 

3559 class TYPE(enum.Enum): 

3560 COMMAND = 0 

3561 OUTPUT = 1 

3562 COMPLETE = 2 

3563 

3564 def __new__(cls, # type: Type[_CLIUtilMetaclass] 

3565 name, # type: str 

3566 bases, # type: Tuple[type, ...] 

3567 dct # type: Dict[str, Any] 

3568 ): 

3569 # type: (...) -> Type[CLIUtil] 

3570 dct["commands"] = { 

3571 x.__name__: x 

3572 for x in dct.values() 

3573 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMMAND 

3574 } 

3575 dct["commands_output"] = { 

3576 x.cliutil_ref.__name__: x 

3577 for x in dct.values() 

3578 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.OUTPUT 

3579 } 

3580 dct["commands_complete"] = { 

3581 x.cliutil_ref.__name__: x 

3582 for x in dct.values() 

3583 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMPLETE 

3584 } 

3585 newcls = cast(Type['CLIUtil'], type.__new__(cls, name, bases, dct)) 

3586 return newcls 

3587 

3588 

3589class CLIUtil(metaclass=_CLIUtilMetaclass): 

3590 """ 

3591 Provides a Util class to easily create simple CLI tools in Scapy, 

3592 that can still be used as an API. 

3593 

3594 Doc: 

3595 - override the ps1() function 

3596 - register commands with the @CLIUtil.addcomment decorator 

3597 - call the loop() function when ready 

3598 """ 

3599 

3600 def _depcheck(self) -> None: 

3601 """ 

3602 Check that all dependencies are installed 

3603 """ 

3604 try: 

3605 import prompt_toolkit # noqa: F401 

3606 except ImportError: 

3607 # okay we lie but prompt_toolkit is a dependency... 

3608 raise ImportError("You need to have IPython installed to use the CLI") 

3609 

3610 # Okay let's do nice code 

3611 commands: Dict[str, Callable[..., Any]] = {} 

3612 # print output of command 

3613 commands_output: Dict[str, Callable[..., str]] = {} 

3614 # provides completion to command 

3615 commands_complete: Dict[str, Callable[..., List[str]]] = {} 

3616 

3617 def __init__(self, cli: bool = True, debug: bool = False) -> None: 

3618 """ 

3619 DEV: overwrite 

3620 """ 

3621 if cli: 

3622 self._depcheck() 

3623 self.loop(debug=debug) 

3624 

3625 @staticmethod 

3626 def _inspectkwargs(func: DecoratorCallable) -> None: 

3627 """ 

3628 Internal function to parse arguments from the kwargs of the functions 

3629 """ 

3630 func._flagnames = [ # type: ignore 

3631 x.name for x in 

3632 inspect.signature(func).parameters.values() 

3633 if x.kind == inspect.Parameter.KEYWORD_ONLY 

3634 ] 

3635 func._flags = [ # type: ignore 

3636 ("-%s" % x) if len(x) == 1 else ("--%s" % x) 

3637 for x in func._flagnames # type: ignore 

3638 ] 

3639 

3640 @staticmethod 

3641 def _parsekwargs( 

3642 func: DecoratorCallable, 

3643 args: List[str] 

3644 ) -> Tuple[List[str], Dict[str, Literal[True]]]: 

3645 """ 

3646 Internal function to parse CLI arguments of a function. 

3647 """ 

3648 kwargs: Dict[str, Literal[True]] = {} 

3649 if func._flags: # type: ignore 

3650 i = 0 

3651 for arg in args: 

3652 if arg in func._flags: # type: ignore 

3653 i += 1 

3654 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501 

3655 continue 

3656 break 

3657 args = args[i:] 

3658 return args, kwargs 

3659 

3660 @classmethod 

3661 def _parseallargs( 

3662 cls, 

3663 func: DecoratorCallable, 

3664 cmd: str, args: List[str] 

3665 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]: 

3666 """ 

3667 Internal function to parse CLI arguments of both the function 

3668 and its output function. 

3669 """ 

3670 args, kwargs = cls._parsekwargs(func, args) 

3671 outkwargs: Dict[str, Literal[True]] = {} 

3672 if cmd in cls.commands_output: 

3673 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args) 

3674 return args, kwargs, outkwargs 

3675 

3676 @classmethod 

3677 def addcommand( 

3678 cls, 

3679 spaces: bool = False, 

3680 globsupport: bool = False, 

3681 ) -> Callable[[DecoratorCallable], DecoratorCallable]: 

3682 """ 

3683 Decorator to register a command 

3684 """ 

3685 def func(cmd: DecoratorCallable) -> DecoratorCallable: 

3686 cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore 

3687 cmd._spaces = spaces # type: ignore 

3688 cmd._globsupport = globsupport # type: ignore 

3689 cls._inspectkwargs(cmd) 

3690 if cmd._globsupport and not cmd._spaces: # type: ignore 

3691 raise ValueError("Cannot use globsupport without spaces.") 

3692 return cmd 

3693 return func 

3694 

3695 @classmethod 

3696 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3697 """ 

3698 Decorator to register a command output processor 

3699 """ 

3700 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3701 processor.cliutil_type = _CLIUtilMetaclass.TYPE.OUTPUT # type: ignore 

3702 processor.cliutil_ref = cmd # type: ignore 

3703 cls._inspectkwargs(processor) 

3704 return processor 

3705 return func 

3706 

3707 @classmethod 

3708 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3709 """ 

3710 Decorator to register a command completor 

3711 """ 

3712 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3713 processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore 

3714 processor.cliutil_ref = cmd # type: ignore 

3715 return processor 

3716 return func 

3717 

3718 def ps1(self) -> str: 

3719 """ 

3720 Return the PS1 of the shell 

3721 """ 

3722 return "> " 

3723 

3724 def close(self) -> None: 

3725 """ 

3726 Function called on exiting 

3727 """ 

3728 print("Exited") 

3729 

3730 def help(self, cmd: Optional[str] = None) -> None: 

3731 """ 

3732 Return the help related to this CLI util 

3733 """ 

3734 def _args(func: Any) -> str: 

3735 flags = func._flags.copy() 

3736 if func.__name__ in self.commands_output: 

3737 flags += self.commands_output[func.__name__]._flags # type: ignore 

3738 return " %s%s" % ( 

3739 ( 

3740 "%s " % " ".join("[%s]" % x for x in flags) 

3741 if flags else "" 

3742 ), 

3743 " ".join( 

3744 "<%s%s>" % ( 

3745 x.name, 

3746 "?" if 

3747 (x.default is None or x.default != inspect.Parameter.empty) 

3748 else "" 

3749 ) 

3750 for x in list(inspect.signature(func).parameters.values())[1:] 

3751 if x.name not in func._flagnames and x.name[0] != "_" 

3752 ) 

3753 ) 

3754 

3755 if cmd: 

3756 if cmd not in self.commands: 

3757 print("Unknown command '%s'" % cmd) 

3758 return 

3759 # help for one command 

3760 func = self.commands[cmd] 

3761 print("%s%s: %s" % ( 

3762 cmd, 

3763 _args(func), 

3764 func.__doc__ and func.__doc__.strip() 

3765 )) 

3766 else: 

3767 header = "│ %s - Help │" % self.__class__.__name__ 

3768 print("┌" + "─" * (len(header) - 2) + "┐") 

3769 print(header) 

3770 print("└" + "─" * (len(header) - 2) + "┘") 

3771 print( 

3772 pretty_list( 

3773 [ 

3774 ( 

3775 cmd, 

3776 _args(func), 

3777 func.__doc__ and func.__doc__.strip().split("\n")[0] or "" 

3778 ) 

3779 for cmd, func in self.commands.items() 

3780 ], 

3781 [("Command", "Arguments", "Description")] 

3782 ) 

3783 ) 

3784 

3785 def _completer(self) -> 'prompt_toolkit.completion.Completer': 

3786 """ 

3787 Returns a prompt_toolkit custom completer 

3788 """ 

3789 from prompt_toolkit.completion import Completer, Completion 

3790 

3791 class CLICompleter(Completer): 

3792 def get_completions(cmpl, document, complete_event): # type: ignore 

3793 if not complete_event.completion_requested: 

3794 # Only activate when the user does <TAB> 

3795 return 

3796 parts = document.text.split(" ") 

3797 cmd = parts[0].lower() 

3798 if cmd not in self.commands: 

3799 # We are trying to complete the command 

3800 for possible_cmd in (x for x in self.commands if x.startswith(cmd)): 

3801 yield Completion(possible_cmd, start_position=-len(cmd)) 

3802 else: 

3803 # We are trying to complete the command content 

3804 if len(parts) == 1: 

3805 return 

3806 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:]) 

3807 arg = " ".join(args) 

3808 if cmd in self.commands_complete: 

3809 for possible_arg in self.commands_complete[cmd](self, arg): 

3810 yield Completion(possible_arg, start_position=-len(arg)) 

3811 return 

3812 return CLICompleter() 

3813 

3814 def loop(self, debug: int = 0) -> None: 

3815 """ 

3816 Main command handling loop 

3817 """ 

3818 from prompt_toolkit import PromptSession 

3819 session = PromptSession(completer=self._completer()) 

3820 

3821 while True: 

3822 try: 

3823 cmd = session.prompt(self.ps1()).strip() 

3824 except KeyboardInterrupt: 

3825 continue 

3826 except EOFError: 

3827 self.close() 

3828 break 

3829 args = cmd.split(" ")[1:] 

3830 cmd = cmd.split(" ")[0].strip().lower() 

3831 if not cmd: 

3832 continue 

3833 if cmd in ["help", "h", "?"]: 

3834 self.help(" ".join(args)) 

3835 continue 

3836 if cmd in "exit": 

3837 break 

3838 if cmd not in self.commands: 

3839 print("Unknown command. Type help or ?") 

3840 else: 

3841 # check the number of arguments 

3842 func = self.commands[cmd] 

3843 args, kwargs, outkwargs = self._parseallargs(func, cmd, args) 

3844 if func._spaces: # type: ignore 

3845 args = [" ".join(args)] 

3846 # if globsupport is set, we might need to do several calls 

3847 if func._globsupport and "*" in args[0]: # type: ignore 

3848 if args[0].count("*") > 1: 

3849 print("More than 1 glob star (*) is currently unsupported.") 

3850 continue 

3851 before, after = args[0].split("*", 1) 

3852 reg = re.compile(re.escape(before) + r".*" + after) 

3853 calls = [ 

3854 [x] for x in 

3855 self.commands_complete[cmd](self, before) 

3856 if reg.match(x) 

3857 ] 

3858 else: 

3859 calls = [args] 

3860 else: 

3861 calls = [args] 

3862 # now iterate if required, call the function and print its output 

3863 res = None 

3864 for args in calls: 

3865 try: 

3866 res = func(self, *args, **kwargs) 

3867 except TypeError: 

3868 print("Bad number of arguments !") 

3869 self.help(cmd=cmd) 

3870 continue 

3871 except Exception as ex: 

3872 print("Command failed with error: %s" % ex) 

3873 if debug: 

3874 traceback.print_exception(ex) 

3875 try: 

3876 if res and cmd in self.commands_output: 

3877 self.commands_output[cmd](self, res, **outkwargs) 

3878 except Exception as ex: 

3879 print("Output processor failed with error: %s" % ex) 

3880 

3881 

3882def AutoArgparse( 

3883 func: DecoratorCallable, 

3884 _parseonly: bool = False, 

3885) -> Optional[Tuple[List[str], List[str]]]: 

3886 """ 

3887 Generate an Argparse call from a function, then call this function. 

3888 

3889 Notes: 

3890 

3891 - for the arguments to have a description, the sphinx docstring format 

3892 must be used. See 

3893 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html 

3894 - the arguments must be typed in Python (we ignore Sphinx-specific types) 

3895 untyped arguments are ignored. 

3896 - only types that would be supported by argparse are supported. The others 

3897 are omitted. 

3898 """ 

3899 argsdoc = {} 

3900 if func.__doc__: 

3901 # Sphinx doc format parser 

3902 m = re.match( 

3903 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)", 

3904 func.__doc__.strip(), 

3905 ) 

3906 if not m: 

3907 desc = func.__doc__.strip() 

3908 else: 

3909 desc = m.group(1) 

3910 sphinxargs = re.findall( 

3911 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)", 

3912 m.group(2), 

3913 ) 

3914 for argtype, argparam, argdesc in sphinxargs: 

3915 argparam = argparam.strip() 

3916 argdesc = argdesc.strip() 

3917 if argtype == "param": 

3918 if not argparam: 

3919 raise ValueError(":param: without a name !") 

3920 argsdoc[argparam] = argdesc 

3921 else: 

3922 desc = "" 

3923 

3924 # Process the parameters 

3925 positional = [] 

3926 noargument = [] 

3927 hexarguments = [] 

3928 parameters = {} 

3929 for param in inspect.signature(func).parameters.values(): 

3930 if not param.annotation: 

3931 continue 

3932 noarg = False 

3933 parname = param.name.replace("_", "-") 

3934 paramkwargs: Dict[str, Any] = {} 

3935 if param.annotation is bool: 

3936 if param.default is True: 

3937 parname = "no-" + parname 

3938 paramkwargs["action"] = "store_false" 

3939 else: 

3940 paramkwargs["action"] = "store_true" 

3941 noarg = True 

3942 elif param.annotation is bytes: 

3943 paramkwargs["type"] = str 

3944 hexarguments.append(parname) 

3945 elif param.annotation in [str, int, float]: 

3946 paramkwargs["type"] = param.annotation 

3947 else: 

3948 continue 

3949 if param.default != inspect.Parameter.empty: 

3950 if param.kind == inspect.Parameter.POSITIONAL_ONLY: 

3951 positional.append(param.name) 

3952 paramkwargs["nargs"] = '?' 

3953 else: 

3954 parname = "--" + parname 

3955 paramkwargs["default"] = param.default 

3956 else: 

3957 positional.append(param.name) 

3958 if param.kind == inspect.Parameter.VAR_POSITIONAL: 

3959 paramkwargs["action"] = "append" 

3960 if param.name in argsdoc: 

3961 paramkwargs["help"] = argsdoc[param.name] 

3962 if param.annotation is bytes: 

3963 paramkwargs["help"] = "(hex) " + paramkwargs["help"] 

3964 elif param.annotation is bool: 

3965 paramkwargs["help"] = "(flag) " + paramkwargs["help"] 

3966 else: 

3967 paramkwargs["help"] = ( 

3968 "(%s) " % param.annotation.__name__ + paramkwargs["help"] 

3969 ) 

3970 # Add to the parameter list 

3971 parameters[parname] = paramkwargs 

3972 if noarg: 

3973 noargument.append(parname) 

3974 

3975 if _parseonly: 

3976 # An internal mode used to generate bash autocompletion, do it then exit. 

3977 return ( 

3978 [x for x in parameters if x not in positional] + ["--help"], 

3979 [x for x in noargument if x not in positional] + ["--help"], 

3980 ) 

3981 

3982 # Now build the argparse.ArgumentParser 

3983 parser = argparse.ArgumentParser( 

3984 prog=func.__name__, 

3985 description=desc, 

3986 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

3987 ) 

3988 

3989 # Add parameters to parser 

3990 for parname, paramkwargs in parameters.items(): 

3991 parser.add_argument(parname, **paramkwargs) 

3992 

3993 # Now parse the sys.argv parameters 

3994 params = vars(parser.parse_args()) 

3995 

3996 # Convert hex parameters if provided 

3997 for p in hexarguments: 

3998 if params[p] is not None: 

3999 try: 

4000 params[p] = bytes.fromhex(params[p]) 

4001 except ValueError: 

4002 print( 

4003 conf.color_theme.fail( 

4004 "ERROR: the value of parameter %s " 

4005 "'%s' is not valid hexadecimal !" % (p, params[p]) 

4006 ) 

4007 ) 

4008 return None 

4009 

4010 # Act as in interactive mode 

4011 conf.logLevel = 20 

4012 from scapy.themes import DefaultTheme 

4013 conf.color_theme = DefaultTheme() 

4014 # And call the function 

4015 try: 

4016 func( 

4017 *[params.pop(x) for x in positional], 

4018 **{ 

4019 (k[3:] if k.startswith("no_") else k): v 

4020 for k, v in params.items() 

4021 } 

4022 ) 

4023 except AssertionError as ex: 

4024 print(conf.color_theme.fail("ERROR: " + str(ex))) 

4025 parser.print_help() 

4026 return None 

4027 

4028 

4029####################### 

4030# PERIODIC SENDER # 

4031####################### 

4032 

4033 

4034class PeriodicSenderThread(threading.Thread): 

4035 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True): 

4036 # type: (Any, _PacketIterable, float, bool) -> None 

4037 """ Thread to send packets periodically 

4038 

4039 Args: 

4040 sock: socket where packet is sent periodically 

4041 pkt: packet or list of packets to send 

4042 interval: interval between two packets 

4043 """ 

4044 if not isinstance(pkt, list): 

4045 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable 

4046 else: 

4047 self._pkts = pkt 

4048 self._socket = sock 

4049 self._stopped = threading.Event() 

4050 self._enabled = threading.Event() 

4051 self._enabled.set() 

4052 self._interval = interval 

4053 self._ignore_exceptions = ignore_exceptions 

4054 threading.Thread.__init__(self) 

4055 

4056 def enable(self): 

4057 # type: () -> None 

4058 self._enabled.set() 

4059 

4060 def disable(self): 

4061 # type: () -> None 

4062 self._enabled.clear() 

4063 

4064 def run(self): 

4065 # type: () -> None 

4066 while not self._stopped.is_set() and not self._socket.closed: 

4067 for p in self._pkts: 

4068 try: 

4069 if self._enabled.is_set(): 

4070 self._socket.send(p) 

4071 except (OSError, TimeoutError) as e: 

4072 if self._ignore_exceptions: 

4073 return 

4074 else: 

4075 raise e 

4076 self._stopped.wait(timeout=self._interval) 

4077 if self._stopped.is_set() or self._socket.closed: 

4078 break 

4079 

4080 def stop(self): 

4081 # type: () -> None 

4082 self._stopped.set() 

4083 self.join(self._interval * 2) 

4084 

4085 

4086class SingleConversationSocket(object): 

4087 def __init__(self, o): 

4088 # type: (Any) -> None 

4089 self._inner = o 

4090 self._tx_mutex = threading.RLock() 

4091 

4092 @property 

4093 def __dict__(self): # type: ignore 

4094 return self._inner.__dict__ 

4095 

4096 def __getattr__(self, name): 

4097 # type: (str) -> Any 

4098 return getattr(self._inner, name) 

4099 

4100 def sr1(self, *args, **kargs): 

4101 # type: (*Any, **Any) -> Any 

4102 with self._tx_mutex: 

4103 return self._inner.sr1(*args, **kargs) 

4104 

4105 def sr(self, *args, **kargs): 

4106 # type: (*Any, **Any) -> Any 

4107 with self._tx_mutex: 

4108 return self._inner.sr(*args, **kargs) 

4109 

4110 def send(self, x): 

4111 # type: (Packet) -> Any 

4112 with self._tx_mutex: 

4113 try: 

4114 return self._inner.send(x) 

4115 except (ConnectionError, OSError) as e: 

4116 self._inner.close() 

4117 raise e