Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/utils.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1917 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7General utility functions. 

8""" 

9 

10 

11from decimal import Decimal 

12from io import StringIO 

13from itertools import zip_longest 

14 

15import array 

16import argparse 

17import collections 

18import decimal 

19import difflib 

20import gzip 

21import inspect 

22import locale 

23import math 

24import os 

25import pickle 

26import random 

27import re 

28import shutil 

29import socket 

30import struct 

31import subprocess 

32import sys 

33import tempfile 

34import threading 

35import time 

36import traceback 

37import warnings 

38 

39from scapy.config import conf 

40from scapy.consts import DARWIN, OPENBSD, WINDOWS 

41from scapy.data import MTU, DLT_EN10MB, DLT_RAW 

42from scapy.compat import ( 

43 orb, 

44 plain_str, 

45 chb, 

46 bytes_base64, 

47 base64_bytes, 

48 hex_bytes, 

49 bytes_encode, 

50) 

51from scapy.error import ( 

52 log_interactive, 

53 log_runtime, 

54 Scapy_Exception, 

55 warning, 

56) 

57from scapy.pton_ntop import inet_pton 

58 

59# Typing imports 

60from typing import ( 

61 cast, 

62 Any, 

63 AnyStr, 

64 Callable, 

65 Dict, 

66 IO, 

67 Iterator, 

68 List, 

69 Optional, 

70 TYPE_CHECKING, 

71 Tuple, 

72 Type, 

73 Union, 

74 overload, 

75) 

76from scapy.compat import ( 

77 DecoratorCallable, 

78 Literal, 

79) 

80 

81if TYPE_CHECKING: 

82 from scapy.packet import Packet 

83 from scapy.plist import _PacketIterable, PacketList 

84 from scapy.supersocket import SuperSocket 

85 import prompt_toolkit 

86 

87_ByteStream = Union[IO[bytes], gzip.GzipFile] 

88 

89########### 

90# Tools # 

91########### 

92 

93 

94def issubtype(x, # type: Any 

95 t, # type: Union[type, str] 

96 ): 

97 # type: (...) -> bool 

98 """issubtype(C, B) -> bool 

99 

100 Return whether C is a class and if it is a subclass of class B. 

101 When using a tuple as the second argument issubtype(X, (A, B, ...)), 

102 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.). 

103 """ 

104 if isinstance(t, str): 

105 return t in (z.__name__ for z in x.__bases__) 

106 if isinstance(x, type) and issubclass(x, t): 

107 return True 

108 return False 

109 

110 

111_Decimal = Union[Decimal, int] 

112 

113 

114class EDecimal(Decimal): 

115 """Extended Decimal 

116 

117 This implements arithmetic and comparison with float for 

118 backward compatibility 

119 """ 

120 

121 def __add__(self, other, context=None): 

122 # type: (_Decimal, Any) -> EDecimal 

123 return EDecimal(Decimal.__add__(self, Decimal(other))) 

124 

125 def __radd__(self, other): 

126 # type: (_Decimal) -> EDecimal 

127 return EDecimal(Decimal.__add__(self, Decimal(other))) 

128 

129 def __sub__(self, other): 

130 # type: (_Decimal) -> EDecimal 

131 return EDecimal(Decimal.__sub__(self, Decimal(other))) 

132 

133 def __rsub__(self, other): 

134 # type: (_Decimal) -> EDecimal 

135 return EDecimal(Decimal.__rsub__(self, Decimal(other))) 

136 

137 def __mul__(self, other): 

138 # type: (_Decimal) -> EDecimal 

139 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

140 

141 def __rmul__(self, other): 

142 # type: (_Decimal) -> EDecimal 

143 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

144 

145 def __truediv__(self, other): 

146 # type: (_Decimal) -> EDecimal 

147 return EDecimal(Decimal.__truediv__(self, Decimal(other))) 

148 

149 def __floordiv__(self, other): 

150 # type: (_Decimal) -> EDecimal 

151 return EDecimal(Decimal.__floordiv__(self, Decimal(other))) 

152 

153 if sys.version_info >= (3,): 

154 def __divmod__(self, other): 

155 # type: (_Decimal) -> Tuple[EDecimal, EDecimal] 

156 r = Decimal.__divmod__(self, Decimal(other)) 

157 return EDecimal(r[0]), EDecimal(r[1]) 

158 else: 

159 def __div__(self, other): 

160 # type: (_Decimal) -> EDecimal 

161 return EDecimal(Decimal.__div__(self, Decimal(other))) 

162 

163 def __rdiv__(self, other): 

164 # type: (_Decimal) -> EDecimal 

165 return EDecimal(Decimal.__rdiv__(self, Decimal(other))) 

166 

167 def __mod__(self, other): 

168 # type: (_Decimal) -> EDecimal 

169 return EDecimal(Decimal.__mod__(self, Decimal(other))) 

170 

171 def __rmod__(self, other): 

172 # type: (_Decimal) -> EDecimal 

173 return EDecimal(Decimal.__rmod__(self, Decimal(other))) 

174 

175 def __pow__(self, other, modulo=None): 

176 # type: (_Decimal, Optional[_Decimal]) -> EDecimal 

177 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo)) 

178 

179 def __eq__(self, other): 

180 # type: (Any) -> bool 

181 return super(EDecimal, self).__eq__(other) or float(self) == other 

182 

183 def normalize(self, precision): # type: ignore 

184 # type: (int) -> EDecimal 

185 with decimal.localcontext() as ctx: 

186 ctx.prec = precision 

187 return EDecimal(super(EDecimal, self).normalize(ctx)) 

188 

189 

190@overload 

191def get_temp_file(keep, autoext, fd): 

192 # type: (bool, str, Literal[True]) -> IO[bytes] 

193 pass 

194 

195 

196@overload 

197def get_temp_file(keep=False, autoext="", fd=False): 

198 # type: (bool, str, Literal[False]) -> str 

199 pass 

200 

201 

202def get_temp_file(keep=False, autoext="", fd=False): 

203 # type: (bool, str, bool) -> Union[IO[bytes], str] 

204 """Creates a temporary file. 

205 

206 :param keep: If False, automatically delete the file when Scapy exits. 

207 :param autoext: Suffix to add to the generated file name. 

208 :param fd: If True, this returns a file-like object with the temporary 

209 file opened. If False (default), this returns a file path. 

210 """ 

211 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext, 

212 delete=False) 

213 if not keep: 

214 conf.temp_files.append(f.name) 

215 

216 if fd: 

217 return f 

218 else: 

219 # Close the file so something else can take it. 

220 f.close() 

221 return f.name 

222 

223 

224def get_temp_dir(keep=False): 

225 # type: (bool) -> str 

226 """Creates a temporary file, and returns its name. 

227 

228 :param keep: If False (default), the directory will be recursively 

229 deleted when Scapy exits. 

230 :return: A full path to a temporary directory. 

231 """ 

232 

233 dname = tempfile.mkdtemp(prefix="scapy") 

234 

235 if not keep: 

236 conf.temp_files.append(dname) 

237 

238 return dname 

239 

240 

241def _create_fifo() -> Tuple[str, Any]: 

242 """Creates a temporary fifo. 

243 

244 You must then use open_fifo() on the server_fd once 

245 the client is connected to use it. 

246 

247 :returns: (client_file, server_fd) 

248 """ 

249 if WINDOWS: 

250 from scapy.arch.windows.structures import _get_win_fifo 

251 return _get_win_fifo() 

252 else: 

253 f = get_temp_file() 

254 os.unlink(f) 

255 os.mkfifo(f) 

256 return f, f 

257 

258 

259def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]: 

260 """Open the server_fd (see create_fifo) 

261 """ 

262 if WINDOWS: 

263 from scapy.arch.windows.structures import _win_fifo_open 

264 return _win_fifo_open(fd) 

265 else: 

266 return open(fd, mode) 

267 

268 

269def sane(x, color=False): 

270 # type: (AnyStr, bool) -> str 

271 r = "" 

272 for i in x: 

273 j = orb(i) 

274 if (j < 32) or (j >= 127): 

275 if color: 

276 r += conf.color_theme.not_printable(".") 

277 else: 

278 r += "." 

279 else: 

280 r += chr(j) 

281 return r 

282 

283 

284@conf.commands.register 

285def restart(): 

286 # type: () -> None 

287 """Restarts scapy""" 

288 if not conf.interactive or not os.path.isfile(sys.argv[0]): 

289 raise OSError("Scapy was not started from console") 

290 if WINDOWS: 

291 res_code = 1 

292 try: 

293 res_code = subprocess.call([sys.executable] + sys.argv) 

294 finally: 

295 os._exit(res_code) 

296 os.execv(sys.executable, [sys.executable] + sys.argv) 

297 

298 

299def lhex(x): 

300 # type: (Any) -> str 

301 from scapy.volatile import VolatileValue 

302 if isinstance(x, VolatileValue): 

303 return repr(x) 

304 if isinstance(x, int): 

305 return hex(x) 

306 if isinstance(x, tuple): 

307 return "(%s)" % ", ".join(lhex(v) for v in x) 

308 if isinstance(x, list): 

309 return "[%s]" % ", ".join(lhex(v) for v in x) 

310 return str(x) 

311 

312 

313@conf.commands.register 

314def hexdump(p, dump=False): 

315 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

316 """Build a tcpdump like hexadecimal view 

317 

318 :param p: a Packet 

319 :param dump: define if the result must be printed or returned in a variable 

320 :return: a String only when dump=True 

321 """ 

322 s = "" 

323 x = bytes_encode(p) 

324 x_len = len(x) 

325 i = 0 

326 while i < x_len: 

327 s += "%04x " % i 

328 for j in range(16): 

329 if i + j < x_len: 

330 s += "%02X " % orb(x[i + j]) 

331 else: 

332 s += " " 

333 s += " %s\n" % sane(x[i:i + 16], color=True) 

334 i += 16 

335 # remove trailing \n 

336 s = s[:-1] if s.endswith("\n") else s 

337 if dump: 

338 return s 

339 else: 

340 print(s) 

341 return None 

342 

343 

344@conf.commands.register 

345def linehexdump(p, onlyasc=0, onlyhex=0, dump=False): 

346 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str] 

347 """Build an equivalent view of hexdump() on a single line 

348 

349 Note that setting both onlyasc and onlyhex to 1 results in a empty output 

350 

351 :param p: a Packet 

352 :param onlyasc: 1 to display only the ascii view 

353 :param onlyhex: 1 to display only the hexadecimal view 

354 :param dump: print the view if False 

355 :return: a String only when dump=True 

356 """ 

357 s = "" 

358 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump) 

359 if dump: 

360 return s 

361 else: 

362 print(s) 

363 return None 

364 

365 

366@conf.commands.register 

367def chexdump(p, dump=False): 

368 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

369 """Build a per byte hexadecimal representation 

370 

371 Example: 

372 >>> chexdump(IP()) 

373 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501 

374 

375 :param p: a Packet 

376 :param dump: print the view if False 

377 :return: a String only if dump=True 

378 """ 

379 x = bytes_encode(p) 

380 s = ", ".join("%#04x" % orb(x) for x in x) 

381 if dump: 

382 return s 

383 else: 

384 print(s) 

385 return None 

386 

387 

388@conf.commands.register 

389def hexstr(p, onlyasc=0, onlyhex=0, color=False): 

390 # type: (Union[Packet, AnyStr], int, int, bool) -> str 

391 """Build a fancy tcpdump like hex from bytes.""" 

392 x = bytes_encode(p) 

393 s = [] 

394 if not onlyasc: 

395 s.append(" ".join("%02X" % orb(b) for b in x)) 

396 if not onlyhex: 

397 s.append(sane(x, color=color)) 

398 return " ".join(s) 

399 

400 

401def repr_hex(s): 

402 # type: (bytes) -> str 

403 """ Convert provided bitstring to a simple string of hex digits """ 

404 return "".join("%02x" % orb(x) for x in s) 

405 

406 

407@conf.commands.register 

408def hexdiff( 

409 a: Union['Packet', AnyStr], 

410 b: Union['Packet', AnyStr], 

411 algo: Optional[str] = None, 

412 autojunk: bool = False, 

413) -> None: 

414 """ 

415 Show differences between 2 binary strings, Packets... 

416 

417 Available algorithms: 

418 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the 

419 Levenstein distance between the strings then backtrack. 

420 - difflib: Use the difflib.SequenceMatcher implementation. This based on a 

421 modified version of the Ratcliff and Obershelp algorithm. 

422 This is much faster, but far less accurate. 

423 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher 

424 

425 :param a: 

426 :param b: The binary strings, packets... to compare 

427 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'. 

428 By default, this is chosen depending on the complexity, optimistically 

429 preferring wagnerfischer unless really necessary. 

430 :param autojunk: (difflib only) See difflib documentation. 

431 """ 

432 xb = bytes_encode(a) 

433 yb = bytes_encode(b) 

434 

435 if algo is None: 

436 # Choose the best algorithm 

437 complexity = len(xb) * len(yb) 

438 if complexity < 1e7: 

439 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable. 

440 # Anything much larger than this shouldn't be attempted by default. 

441 algo = "wagnerfischer" 

442 if complexity > 1e6: 

443 log_interactive.info( 

444 "Complexity is a bit high. hexdiff will take a few seconds." 

445 ) 

446 else: 

447 algo = "difflib" 

448 

449 backtrackx = [] 

450 backtracky = [] 

451 

452 if algo == "wagnerfischer": 

453 xb = xb[::-1] 

454 yb = yb[::-1] 

455 

456 # costs for the 3 operations 

457 INSERT = 1 

458 DELETE = 1 

459 SUBST = 1 

460 

461 # Typically, d[i,j] will hold the distance between 

462 # the first i characters of xb and the first j characters of yb. 

463 # We change the Wagner Fischer to also store pointers to all 

464 # the intermediate steps taken while calculating the Levenstein distance. 

465 d = {(-1, -1): (0, (-1, -1))} 

466 for j in range(len(yb)): 

467 d[-1, j] = (j + 1) * INSERT, (-1, j - 1) 

468 for i in range(len(xb)): 

469 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1) 

470 

471 # Compute the Levenstein distance between the two strings, but 

472 # store all the steps to be able to backtrack at the end. 

473 for j in range(len(yb)): 

474 for i in range(len(xb)): 

475 d[i, j] = min( 

476 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)), 

477 (d[i - 1, j][0] + DELETE, (i - 1, j)), 

478 (d[i, j - 1][0] + INSERT, (i, j - 1)), 

479 ) 

480 

481 # Iterate through the steps backwards to create the diff 

482 i = len(xb) - 1 

483 j = len(yb) - 1 

484 while not (i == j == -1): 

485 i2, j2 = d[i, j][1] 

486 backtrackx.append(xb[i2 + 1:i + 1]) 

487 backtracky.append(yb[j2 + 1:j + 1]) 

488 i, j = i2, j2 

489 elif algo == "difflib": 

490 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk) 

491 xarr = [xb[i:i + 1] for i in range(len(xb))] 

492 yarr = [yb[i:i + 1] for i in range(len(yb))] 

493 # Iterate through opcodes to build the backtrack 

494 for opcode in sm.get_opcodes(): 

495 typ, x0, x1, y0, y1 = opcode 

496 if typ == 'delete': 

497 backtrackx += xarr[x0:x1] 

498 backtracky += [b''] * (x1 - x0) 

499 elif typ == 'insert': 

500 backtrackx += [b''] * (y1 - y0) 

501 backtracky += yarr[y0:y1] 

502 elif typ in ['equal', 'replace']: 

503 backtrackx += xarr[x0:x1] 

504 backtracky += yarr[y0:y1] 

505 # Some lines may have been considered as junk. Check the sizes 

506 if autojunk: 

507 lbx = len(backtrackx) 

508 lby = len(backtracky) 

509 backtrackx += [b''] * (max(lbx, lby) - lbx) 

510 backtracky += [b''] * (max(lbx, lby) - lby) 

511 else: 

512 raise ValueError("Unknown algorithm '%s'" % algo) 

513 

514 # Print the diff 

515 

516 x = y = i = 0 

517 colorize: Dict[int, Callable[[str], str]] = { 

518 0: lambda x: x, 

519 -1: conf.color_theme.left, 

520 1: conf.color_theme.right 

521 } 

522 

523 dox = 1 

524 doy = 0 

525 btx_len = len(backtrackx) 

526 while i < btx_len: 

527 linex = backtrackx[i:i + 16] 

528 liney = backtracky[i:i + 16] 

529 xx = sum(len(k) for k in linex) 

530 yy = sum(len(k) for k in liney) 

531 if dox and not xx: 

532 dox = 0 

533 doy = 1 

534 if dox and linex == liney: 

535 doy = 1 

536 

537 if dox: 

538 xd = y 

539 j = 0 

540 while not linex[j]: 

541 j += 1 

542 xd -= 1 

543 print(colorize[doy - dox]("%04x" % xd), end=' ') 

544 x += xx 

545 line = linex 

546 else: 

547 print(" ", end=' ') 

548 if doy: 

549 yd = y 

550 j = 0 

551 while not liney[j]: 

552 j += 1 

553 yd -= 1 

554 print(colorize[doy - dox]("%04x" % yd), end=' ') 

555 y += yy 

556 line = liney 

557 else: 

558 print(" ", end=' ') 

559 

560 print(" ", end=' ') 

561 

562 cl = "" 

563 for j in range(16): 

564 if i + j < min(len(backtrackx), len(backtracky)): 

565 if line[j]: 

566 col = colorize[(linex[j] != liney[j]) * (doy - dox)] 

567 print(col("%02X" % orb(line[j])), end=' ') 

568 if linex[j] == liney[j]: 

569 cl += sane(line[j], color=True) 

570 else: 

571 cl += col(sane(line[j])) 

572 else: 

573 print(" ", end=' ') 

574 cl += " " 

575 else: 

576 print(" ", end=' ') 

577 if j == 7: 

578 print("", end=' ') 

579 

580 print(" ", cl) 

581 

582 if doy or not yy: 

583 doy = 0 

584 dox = 1 

585 i += 16 

586 else: 

587 if yy: 

588 dox = 0 

589 doy = 1 

590 else: 

591 i += 16 

592 

593 

594if struct.pack("H", 1) == b"\x00\x01": # big endian 

595 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int] 

596else: 

597 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8 

598 

599 

600def checksum(pkt): 

601 # type: (bytes) -> int 

602 if len(pkt) % 2 == 1: 

603 pkt += b"\0" 

604 s = sum(array.array("H", pkt)) 

605 s = (s >> 16) + (s & 0xffff) 

606 s += s >> 16 

607 s = ~s 

608 return checksum_endian_transform(s) & 0xffff 

609 

610 

611def _fletcher16(charbuf): 

612 # type: (bytes) -> Tuple[int, int] 

613 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

614 c0 = c1 = 0 

615 for char in charbuf: 

616 c0 += char 

617 c1 += c0 

618 

619 c0 %= 255 

620 c1 %= 255 

621 return (c0, c1) 

622 

623 

624@conf.commands.register 

625def fletcher16_checksum(binbuf): 

626 # type: (bytes) -> int 

627 """Calculates Fletcher-16 checksum of the given buffer. 

628 

629 Note: 

630 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501 

631 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501 

632 """ 

633 (c0, c1) = _fletcher16(binbuf) 

634 return (c1 << 8) | c0 

635 

636 

637@conf.commands.register 

638def fletcher16_checkbytes(binbuf, offset): 

639 # type: (bytes, int) -> bytes 

640 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string. 

641 

642 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501 

643 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501 

644 the integrity of the buffer on the receiver side. 

645 

646 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501 

647 """ 

648 

649 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

650 if len(binbuf) < offset: 

651 raise Exception("Packet too short for checkbytes %d" % len(binbuf)) 

652 

653 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:] 

654 (c0, c1) = _fletcher16(binbuf) 

655 

656 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255 

657 

658 if (x <= 0): 

659 x += 255 

660 

661 y = 510 - c0 - x 

662 

663 if (y > 255): 

664 y -= 255 

665 return chb(x) + chb(y) 

666 

667 

668def mac2str(mac): 

669 # type: (str) -> bytes 

670 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':')) 

671 

672 

673def valid_mac(mac): 

674 # type: (str) -> bool 

675 try: 

676 return len(mac2str(mac)) == 6 

677 except ValueError: 

678 pass 

679 return False 

680 

681 

682def str2mac(s): 

683 # type: (bytes) -> str 

684 if isinstance(s, str): 

685 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s)) 

686 return ("%02x:" * len(s))[:-1] % tuple(s) 

687 

688 

689def randstring(length): 

690 # type: (int) -> bytes 

691 """ 

692 Returns a random string of length (length >= 0) 

693 """ 

694 return b"".join(struct.pack('B', random.randint(0, 255)) 

695 for _ in range(length)) 

696 

697 

698def zerofree_randstring(length): 

699 # type: (int) -> bytes 

700 """ 

701 Returns a random string of length (length >= 0) without zero in it. 

702 """ 

703 return b"".join(struct.pack('B', random.randint(1, 255)) 

704 for _ in range(length)) 

705 

706 

707def strxor(s1, s2): 

708 # type: (bytes, bytes) -> bytes 

709 """ 

710 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2 

711 must be of same length. 

712 """ 

713 return b"".join(map(lambda x, y: chb(orb(x) ^ orb(y)), s1, s2)) 

714 

715 

716def strand(s1, s2): 

717 # type: (bytes, bytes) -> bytes 

718 """ 

719 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 

720 must be of same length. 

721 """ 

722 return b"".join(map(lambda x, y: chb(orb(x) & orb(y)), s1, s2)) 

723 

724 

725def strrot(s1, count, right=True): 

726 # type: (bytes, int, bool) -> bytes 

727 """ 

728 Rotate the binary by 'count' bytes 

729 """ 

730 off = count % len(s1) 

731 if right: 

732 return s1[-off:] + s1[:-off] 

733 else: 

734 return s1[off:] + s1[:off] 

735 

736 

737# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501 

738try: 

739 socket.inet_aton("255.255.255.255") 

740except socket.error: 

741 def inet_aton(ip_string): 

742 # type: (str) -> bytes 

743 if ip_string == "255.255.255.255": 

744 return b"\xff" * 4 

745 else: 

746 return socket.inet_aton(ip_string) 

747else: 

748 inet_aton = socket.inet_aton # type: ignore 

749 

750inet_ntoa = socket.inet_ntoa 

751 

752 

753def atol(x): 

754 # type: (str) -> int 

755 try: 

756 ip = inet_aton(x) 

757 except socket.error: 

758 raise ValueError("Bad IP format: %s" % x) 

759 return cast(int, struct.unpack("!I", ip)[0]) 

760 

761 

762def valid_ip(addr): 

763 # type: (str) -> bool 

764 try: 

765 addr = plain_str(addr) 

766 except UnicodeDecodeError: 

767 return False 

768 try: 

769 atol(addr) 

770 except (OSError, ValueError, socket.error): 

771 return False 

772 return True 

773 

774 

775def valid_net(addr): 

776 # type: (str) -> bool 

777 try: 

778 addr = plain_str(addr) 

779 except UnicodeDecodeError: 

780 return False 

781 if '/' in addr: 

782 ip, mask = addr.split('/', 1) 

783 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32 

784 return valid_ip(addr) 

785 

786 

787def valid_ip6(addr): 

788 # type: (str) -> bool 

789 try: 

790 addr = plain_str(addr) 

791 except UnicodeDecodeError: 

792 return False 

793 try: 

794 inet_pton(socket.AF_INET6, addr) 

795 except socket.error: 

796 return False 

797 return True 

798 

799 

800def valid_net6(addr): 

801 # type: (str) -> bool 

802 try: 

803 addr = plain_str(addr) 

804 except UnicodeDecodeError: 

805 return False 

806 if '/' in addr: 

807 ip, mask = addr.split('/', 1) 

808 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128 

809 return valid_ip6(addr) 

810 

811 

812def ltoa(x): 

813 # type: (int) -> str 

814 return inet_ntoa(struct.pack("!I", x & 0xffffffff)) 

815 

816 

817def itom(x): 

818 # type: (int) -> int 

819 return (0xffffffff00000000 >> x) & 0xffffffff 

820 

821 

822def decode_locale_str(x): 

823 # type: (bytes) -> str 

824 """ 

825 Decode bytes into a string using the system locale. 

826 Useful on Windows where it can be unusual (e.g. cp1252) 

827 """ 

828 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace") 

829 

830 

831class ContextManagerSubprocess(object): 

832 """ 

833 Context manager that eases checking for unknown command, without 

834 crashing. 

835 

836 Example: 

837 >>> with ContextManagerSubprocess("tcpdump"): 

838 >>> subprocess.Popen(["tcpdump", "--version"]) 

839 ERROR: Could not execute tcpdump, is it installed? 

840 

841 """ 

842 

843 def __init__(self, prog, suppress=True): 

844 # type: (str, bool) -> None 

845 self.prog = prog 

846 self.suppress = suppress 

847 

848 def __enter__(self): 

849 # type: () -> None 

850 pass 

851 

852 def __exit__(self, 

853 exc_type, # type: Optional[type] 

854 exc_value, # type: Optional[Exception] 

855 traceback, # type: Optional[Any] 

856 ): 

857 # type: (...) -> Optional[bool] 

858 if exc_value is None or exc_type is None: 

859 return None 

860 # Errored 

861 if isinstance(exc_value, EnvironmentError): 

862 msg = "Could not execute %s, is it installed?" % self.prog 

863 else: 

864 msg = "%s: execution failed (%s)" % ( 

865 self.prog, 

866 exc_type.__class__.__name__ 

867 ) 

868 if not self.suppress: 

869 raise exc_type(msg) 

870 log_runtime.error(msg, exc_info=True) 

871 return True # Suppress the exception 

872 

873 

874class ContextManagerCaptureOutput(object): 

875 """ 

876 Context manager that intercept the console's output. 

877 

878 Example: 

879 >>> with ContextManagerCaptureOutput() as cmco: 

880 ... print("hey") 

881 ... assert cmco.get_output() == "hey" 

882 """ 

883 

884 def __init__(self): 

885 # type: () -> None 

886 self.result_export_object = "" 

887 try: 

888 import mock # noqa: F401 

889 except Exception: 

890 raise ImportError("The mock module needs to be installed !") 

891 

892 def __enter__(self): 

893 # type: () -> ContextManagerCaptureOutput 

894 import mock 

895 

896 def write(s, decorator=self): 

897 # type: (str, ContextManagerCaptureOutput) -> None 

898 decorator.result_export_object += s 

899 mock_stdout = mock.Mock() 

900 mock_stdout.write = write 

901 self.bck_stdout = sys.stdout 

902 sys.stdout = mock_stdout 

903 return self 

904 

905 def __exit__(self, *exc): 

906 # type: (*Any) -> Literal[False] 

907 sys.stdout = self.bck_stdout 

908 return False 

909 

910 def get_output(self, eval_bytes=False): 

911 # type: (bool) -> str 

912 if self.result_export_object.startswith("b'") and eval_bytes: 

913 return plain_str(eval(self.result_export_object)) 

914 return self.result_export_object 

915 

916 

917def do_graph( 

918 graph, # type: str 

919 prog=None, # type: Optional[str] 

920 format=None, # type: Optional[str] 

921 target=None, # type: Optional[Union[IO[bytes], str]] 

922 type=None, # type: Optional[str] 

923 string=None, # type: Optional[bool] 

924 options=None # type: Optional[List[str]] 

925): 

926 # type: (...) -> Optional[str] 

927 """Processes graph description using an external software. 

928 This method is used to convert a graphviz format to an image. 

929 

930 :param graph: GraphViz graph description 

931 :param prog: which graphviz program to use 

932 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" 

933 option 

934 :param string: if not None, simply return the graph string 

935 :param target: filename or redirect. Defaults pipe to Imagemagick's 

936 display program 

937 :param options: options to be passed to prog 

938 """ 

939 

940 if format is None: 

941 format = "svg" 

942 if string: 

943 return graph 

944 if type is not None: 

945 warnings.warn( 

946 "type is deprecated, and was renamed format", 

947 DeprecationWarning 

948 ) 

949 format = type 

950 if prog is None: 

951 prog = conf.prog.dot 

952 start_viewer = False 

953 if target is None: 

954 if WINDOWS: 

955 target = get_temp_file(autoext="." + format) 

956 start_viewer = True 

957 else: 

958 with ContextManagerSubprocess(conf.prog.display): 

959 target = subprocess.Popen([conf.prog.display], 

960 stdin=subprocess.PIPE).stdin 

961 if format is not None: 

962 format = "-T%s" % format 

963 if isinstance(target, str): 

964 if target.startswith('|'): 

965 target = subprocess.Popen(target[1:].lstrip(), shell=True, 

966 stdin=subprocess.PIPE).stdin 

967 elif target.startswith('>'): 

968 target = open(target[1:].lstrip(), "wb") 

969 else: 

970 target = open(os.path.abspath(target), "wb") 

971 target = cast(IO[bytes], target) 

972 proc = subprocess.Popen( 

973 "\"%s\" %s %s" % (prog, options or "", format or ""), 

974 shell=True, stdin=subprocess.PIPE, stdout=target, 

975 stderr=subprocess.PIPE 

976 ) 

977 _, stderr = proc.communicate(bytes_encode(graph)) 

978 if proc.returncode != 0: 

979 raise OSError( 

980 "GraphViz call failed (is it installed?):\n" + 

981 plain_str(stderr) 

982 ) 

983 try: 

984 target.close() 

985 except Exception: 

986 pass 

987 if start_viewer: 

988 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501 

989 waiting_start = time.time() 

990 while not os.path.exists(target.name): 

991 time.sleep(0.1) 

992 if time.time() - waiting_start > 3: 

993 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501 

994 break 

995 else: 

996 if WINDOWS and conf.prog.display == conf.prog._default: 

997 os.startfile(target.name) 

998 else: 

999 with ContextManagerSubprocess(conf.prog.display): 

1000 subprocess.Popen([conf.prog.display, target.name]) 

1001 return None 

1002 

1003 

1004_TEX_TR = { 

1005 "{": "{\\tt\\char123}", 

1006 "}": "{\\tt\\char125}", 

1007 "\\": "{\\tt\\char92}", 

1008 "^": "\\^{}", 

1009 "$": "\\$", 

1010 "#": "\\#", 

1011 "_": "\\_", 

1012 "&": "\\&", 

1013 "%": "\\%", 

1014 "|": "{\\tt\\char124}", 

1015 "~": "{\\tt\\char126}", 

1016 "<": "{\\tt\\char60}", 

1017 ">": "{\\tt\\char62}", 

1018} 

1019 

1020 

1021def tex_escape(x): 

1022 # type: (str) -> str 

1023 s = "" 

1024 for c in x: 

1025 s += _TEX_TR.get(c, c) 

1026 return s 

1027 

1028 

1029def colgen(*lstcol, # type: Any 

1030 **kargs # type: Any 

1031 ): 

1032 # type: (...) -> Iterator[Any] 

1033 """Returns a generator that mixes provided quantities forever 

1034 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501 

1035 if len(lstcol) < 2: 

1036 lstcol *= 2 

1037 trans = kargs.get("trans", lambda x, y, z: (x, y, z)) 

1038 while True: 

1039 for i in range(len(lstcol)): 

1040 for j in range(len(lstcol)): 

1041 for k in range(len(lstcol)): 

1042 if i != j or j != k or k != i: 

1043 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501 

1044 

1045 

1046def incremental_label(label="tag%05i", start=0): 

1047 # type: (str, int) -> Iterator[str] 

1048 while True: 

1049 yield label % start 

1050 start += 1 

1051 

1052 

1053def binrepr(val): 

1054 # type: (int) -> str 

1055 return bin(val)[2:] 

1056 

1057 

1058def long_converter(s): 

1059 # type: (str) -> int 

1060 return int(s.replace('\n', '').replace(' ', ''), 16) 

1061 

1062######################### 

1063# Enum management # 

1064######################### 

1065 

1066 

1067class EnumElement: 

1068 def __init__(self, key, value): 

1069 # type: (str, int) -> None 

1070 self._key = key 

1071 self._value = value 

1072 

1073 def __repr__(self): 

1074 # type: () -> str 

1075 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501 

1076 

1077 def __getattr__(self, attr): 

1078 # type: (str) -> Any 

1079 return getattr(self._value, attr) 

1080 

1081 def __str__(self): 

1082 # type: () -> str 

1083 return self._key 

1084 

1085 def __bytes__(self): 

1086 # type: () -> bytes 

1087 return bytes_encode(self.__str__()) 

1088 

1089 def __hash__(self): 

1090 # type: () -> int 

1091 return self._value 

1092 

1093 def __int__(self): 

1094 # type: () -> int 

1095 return int(self._value) 

1096 

1097 def __eq__(self, other): 

1098 # type: (Any) -> bool 

1099 return self._value == int(other) 

1100 

1101 def __neq__(self, other): 

1102 # type: (Any) -> bool 

1103 return not self.__eq__(other) 

1104 

1105 

1106class Enum_metaclass(type): 

1107 element_class = EnumElement 

1108 

1109 def __new__(cls, name, bases, dct): 

1110 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1111 rdict = {} 

1112 for k, v in dct.items(): 

1113 if isinstance(v, int): 

1114 v = cls.element_class(k, v) 

1115 dct[k] = v 

1116 rdict[v] = k 

1117 dct["__rdict__"] = rdict 

1118 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) 

1119 

1120 def __getitem__(self, attr): 

1121 # type: (int) -> Any 

1122 return self.__rdict__[attr] # type: ignore 

1123 

1124 def __contains__(self, val): 

1125 # type: (int) -> bool 

1126 return val in self.__rdict__ # type: ignore 

1127 

1128 def get(self, attr, val=None): 

1129 # type: (str, Optional[Any]) -> Any 

1130 return self.__rdict__.get(attr, val) # type: ignore 

1131 

1132 def __repr__(self): 

1133 # type: () -> str 

1134 return "<%s>" % self.__dict__.get("name", self.__name__) 

1135 

1136 

1137################### 

1138# Object saving # 

1139################### 

1140 

1141 

1142def export_object(obj): 

1143 # type: (Any) -> None 

1144 import zlib 

1145 print(bytes_base64(zlib.compress(pickle.dumps(obj, 2), 9))) 

1146 

1147 

1148def import_object(obj=None): 

1149 # type: (Optional[str]) -> Any 

1150 import zlib 

1151 if obj is None: 

1152 obj = sys.stdin.read() 

1153 return pickle.loads(zlib.decompress(base64_bytes(obj.strip()))) # noqa: E501 

1154 

1155 

1156def save_object(fname, obj): 

1157 # type: (str, Any) -> None 

1158 """Pickle a Python object""" 

1159 

1160 fd = gzip.open(fname, "wb") 

1161 pickle.dump(obj, fd) 

1162 fd.close() 

1163 

1164 

1165def load_object(fname): 

1166 # type: (str) -> Any 

1167 """unpickle a Python object""" 

1168 return pickle.load(gzip.open(fname, "rb")) 

1169 

1170 

1171@conf.commands.register 

1172def corrupt_bytes(data, p=0.01, n=None): 

1173 # type: (str, float, Optional[int]) -> bytes 

1174 """ 

1175 Corrupt a given percentage (at least one byte) or number of bytes 

1176 from a string 

1177 """ 

1178 s = array.array("B", bytes_encode(data)) 

1179 s_len = len(s) 

1180 if n is None: 

1181 n = max(1, int(s_len * p)) 

1182 for i in random.sample(range(s_len), n): 

1183 s[i] = (s[i] + random.randint(1, 255)) % 256 

1184 return s.tobytes() 

1185 

1186 

1187@conf.commands.register 

1188def corrupt_bits(data, p=0.01, n=None): 

1189 # type: (str, float, Optional[int]) -> bytes 

1190 """ 

1191 Flip a given percentage (at least one bit) or number of bits 

1192 from a string 

1193 """ 

1194 s = array.array("B", bytes_encode(data)) 

1195 s_len = len(s) * 8 

1196 if n is None: 

1197 n = max(1, int(s_len * p)) 

1198 for i in random.sample(range(s_len), n): 

1199 s[i // 8] ^= 1 << (i % 8) 

1200 return s.tobytes() 

1201 

1202 

1203############################# 

1204# pcap capture file stuff # 

1205############################# 

1206 

1207@conf.commands.register 

1208def wrpcap(filename, # type: Union[IO[bytes], str] 

1209 pkt, # type: _PacketIterable 

1210 *args, # type: Any 

1211 **kargs # type: Any 

1212 ): 

1213 # type: (...) -> None 

1214 """Write a list of packets to a pcap file 

1215 

1216 :param filename: the name of the file to write packets to, or an open, 

1217 writable file-like object. The file descriptor will be 

1218 closed at the end of the call, so do not use an object you 

1219 do not want to close (e.g., running wrpcap(sys.stdout, []) 

1220 in interactive mode will crash Scapy). 

1221 :param gz: set to 1 to save a gzipped capture 

1222 :param linktype: force linktype value 

1223 :param endianness: "<" or ">", force endianness 

1224 :param sync: do not bufferize writes to the capture file 

1225 """ 

1226 with PcapWriter(filename, *args, **kargs) as fdesc: 

1227 fdesc.write(pkt) 

1228 

1229 

1230@conf.commands.register 

1231def wrpcapng(filename, # type: str 

1232 pkt, # type: _PacketIterable 

1233 ): 

1234 # type: (...) -> None 

1235 """Write a list of packets to a pcapng file 

1236 

1237 :param filename: the name of the file to write packets to, or an open, 

1238 writable file-like object. The file descriptor will be 

1239 closed at the end of the call, so do not use an object you 

1240 do not want to close (e.g., running wrpcapng(sys.stdout, []) 

1241 in interactive mode will crash Scapy). 

1242 :param pkt: packets to write 

1243 """ 

1244 with PcapNgWriter(filename) as fdesc: 

1245 fdesc.write(pkt) 

1246 

1247 

1248@conf.commands.register 

1249def rdpcap(filename, count=-1): 

1250 # type: (Union[IO[bytes], str], int) -> PacketList 

1251 """Read a pcap or pcapng file and return a packet list 

1252 

1253 :param count: read only <count> packets 

1254 """ 

1255 # Rant: Our complicated use of metaclasses and especially the 

1256 # __call__ function is, of course, not supported by MyPy. 

1257 # One day we should simplify this mess and use a much simpler 

1258 # layout that will actually be supported and properly dissected. 

1259 with PcapReader(filename) as fdesc: # type: ignore 

1260 return fdesc.read_all(count=count) 

1261 

1262 

1263# NOTE: Type hinting 

1264# Mypy doesn't understand the following metaclass, and thinks each 

1265# constructor (PcapReader...) needs 3 arguments each. To avoid this, 

1266# we add a fake (=None) to the last 2 arguments then force the value 

1267# to not be None in the signature and pack the whole thing in an ignore. 

1268# This allows to not have # type: ignore every time we call those 

1269# constructors. 

1270 

1271class PcapReader_metaclass(type): 

1272 """Metaclass for (Raw)Pcap(Ng)Readers""" 

1273 

1274 def __new__(cls, name, bases, dct): 

1275 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1276 """The `alternative` class attribute is declared in the PcapNg 

1277 variant, and set here to the Pcap variant. 

1278 

1279 """ 

1280 newcls = super(PcapReader_metaclass, cls).__new__( 

1281 cls, name, bases, dct 

1282 ) 

1283 if 'alternative' in dct: 

1284 dct['alternative'].alternative = newcls 

1285 return newcls 

1286 

1287 def __call__(cls, filename): 

1288 # type: (Union[IO[bytes], str]) -> Any 

1289 """Creates a cls instance, use the `alternative` if that 

1290 fails. 

1291 

1292 """ 

1293 i = cls.__new__( 

1294 cls, 

1295 cls.__name__, 

1296 cls.__bases__, 

1297 cls.__dict__ # type: ignore 

1298 ) 

1299 filename, fdesc, magic = cls.open(filename) 

1300 if not magic: 

1301 raise Scapy_Exception( 

1302 "No data could be read!" 

1303 ) 

1304 try: 

1305 i.__init__(filename, fdesc, magic) 

1306 return i 

1307 except (Scapy_Exception, EOFError): 

1308 pass 

1309 

1310 if "alternative" in cls.__dict__: 

1311 cls = cls.__dict__["alternative"] 

1312 i = cls.__new__( 

1313 cls, 

1314 cls.__name__, 

1315 cls.__bases__, 

1316 cls.__dict__ # type: ignore 

1317 ) 

1318 try: 

1319 i.__init__(filename, fdesc, magic) 

1320 return i 

1321 except (Scapy_Exception, EOFError): 

1322 pass 

1323 

1324 raise Scapy_Exception("Not a supported capture file") 

1325 

1326 @staticmethod 

1327 def open(fname # type: Union[IO[bytes], str] 

1328 ): 

1329 # type: (...) -> Tuple[str, _ByteStream, bytes] 

1330 """Open (if necessary) filename, and read the magic.""" 

1331 if isinstance(fname, str): 

1332 filename = fname 

1333 try: 

1334 fdesc = gzip.open(filename, "rb") # type: _ByteStream 

1335 magic = fdesc.read(4) 

1336 except IOError: 

1337 fdesc = open(filename, "rb") 

1338 magic = fdesc.read(4) 

1339 else: 

1340 fdesc = fname 

1341 filename = getattr(fdesc, "name", "No name") 

1342 magic = fdesc.read(4) 

1343 return filename, fdesc, magic 

1344 

1345 

1346class RawPcapReader(metaclass=PcapReader_metaclass): 

1347 """A stateful pcap reader. Each packet is returned as a string""" 

1348 

1349 # TODO: use Generics to properly type the various readers. 

1350 # As of right now, RawPcapReader is typed as if it returned packets 

1351 # because all of its child do. Fix that 

1352 

1353 nonblocking_socket = True 

1354 PacketMetadata = collections.namedtuple("PacketMetadata", 

1355 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501 

1356 

1357 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1358 # type: (str, _ByteStream, bytes) -> None 

1359 self.filename = filename 

1360 self.f = fdesc 

1361 if magic == b"\xa1\xb2\xc3\xd4": # big endian 

1362 self.endian = ">" 

1363 self.nano = False 

1364 elif magic == b"\xd4\xc3\xb2\xa1": # little endian 

1365 self.endian = "<" 

1366 self.nano = False 

1367 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision 

1368 self.endian = ">" 

1369 self.nano = True 

1370 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501 

1371 self.endian = "<" 

1372 self.nano = True 

1373 else: 

1374 raise Scapy_Exception( 

1375 "Not a pcap capture file (bad magic: %r)" % magic 

1376 ) 

1377 hdr = self.f.read(20) 

1378 if len(hdr) < 20: 

1379 raise Scapy_Exception("Invalid pcap file (too short)") 

1380 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( 

1381 self.endian + "HHIIII", hdr 

1382 ) 

1383 self.linktype = linktype 

1384 self.snaplen = snaplen 

1385 

1386 def __enter__(self): 

1387 # type: () -> RawPcapReader 

1388 return self 

1389 

1390 def __iter__(self): 

1391 # type: () -> RawPcapReader 

1392 return self 

1393 

1394 def __next__(self): 

1395 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1396 """ 

1397 implement the iterator protocol on a set of packets in a pcap file 

1398 """ 

1399 try: 

1400 return self._read_packet() 

1401 except EOFError: 

1402 raise StopIteration 

1403 

1404 def _read_packet(self, size=MTU): 

1405 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1406 """return a single packet read from the file as a tuple containing 

1407 (pkt_data, pkt_metadata) 

1408 

1409 raise EOFError when no more packets are available 

1410 """ 

1411 hdr = self.f.read(16) 

1412 if len(hdr) < 16: 

1413 raise EOFError 

1414 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr) 

1415 return (self.f.read(caplen)[:size], 

1416 RawPcapReader.PacketMetadata(sec=sec, usec=usec, 

1417 wirelen=wirelen, caplen=caplen)) 

1418 

1419 def read_packet(self, size=MTU): 

1420 # type: (int) -> Packet 

1421 raise Exception( 

1422 "Cannot call read_packet() in RawPcapReader. Use " 

1423 "_read_packet()" 

1424 ) 

1425 

1426 def dispatch(self, 

1427 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501 

1428 ): 

1429 # type: (...) -> None 

1430 """call the specified callback routine for each packet read 

1431 

1432 This is just a convenience function for the main loop 

1433 that allows for easy launching of packet processing in a 

1434 thread. 

1435 """ 

1436 for p in self: 

1437 callback(p) 

1438 

1439 def _read_all(self, count=-1): 

1440 # type: (int) -> List[Packet] 

1441 """return a list of all packets in the pcap file 

1442 """ 

1443 res = [] # type: List[Packet] 

1444 while count != 0: 

1445 count -= 1 

1446 try: 

1447 p = self.read_packet() # type: Packet 

1448 except EOFError: 

1449 break 

1450 res.append(p) 

1451 return res 

1452 

1453 def recv(self, size=MTU): 

1454 # type: (int) -> bytes 

1455 """ Emulate a socket 

1456 """ 

1457 return self._read_packet(size=size)[0] 

1458 

1459 def fileno(self): 

1460 # type: () -> int 

1461 return -1 if WINDOWS else self.f.fileno() 

1462 

1463 def close(self): 

1464 # type: () -> Optional[Any] 

1465 return self.f.close() 

1466 

1467 def __exit__(self, exc_type, exc_value, tracback): 

1468 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

1469 self.close() 

1470 

1471 # emulate SuperSocket 

1472 @staticmethod 

1473 def select(sockets, # type: List[SuperSocket] 

1474 remain=None, # type: Optional[float] 

1475 ): 

1476 # type: (...) -> List[SuperSocket] 

1477 return sockets 

1478 

1479 

1480class PcapReader(RawPcapReader): 

1481 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1482 # type: (str, IO[bytes], bytes) -> None 

1483 RawPcapReader.__init__(self, filename, fdesc, magic) 

1484 try: 

1485 self.LLcls = conf.l2types.num2layer[ 

1486 self.linktype 

1487 ] # type: Type[Packet] 

1488 except KeyError: 

1489 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501 

1490 if conf.raw_layer is None: 

1491 # conf.raw_layer is set on import 

1492 import scapy.packet # noqa: F401 

1493 self.LLcls = conf.raw_layer 

1494 

1495 def __enter__(self): 

1496 # type: () -> PcapReader 

1497 return self 

1498 

1499 def read_packet(self, size=MTU, **kwargs): 

1500 # type: (int, **Any) -> Packet 

1501 rp = super(PcapReader, self)._read_packet(size=size) 

1502 if rp is None: 

1503 raise EOFError 

1504 s, pkt_info = rp 

1505 

1506 try: 

1507 p = self.LLcls(s, **kwargs) # type: Packet 

1508 except KeyboardInterrupt: 

1509 raise 

1510 except Exception: 

1511 if conf.debug_dissector: 

1512 from scapy.sendrecv import debug 

1513 debug.crashed_on = (self.LLcls, s) 

1514 raise 

1515 if conf.raw_layer is None: 

1516 # conf.raw_layer is set on import 

1517 import scapy.packet # noqa: F401 

1518 p = conf.raw_layer(s) 

1519 power = Decimal(10) ** Decimal(-9 if self.nano else -6) 

1520 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec) 

1521 p.wirelen = pkt_info.wirelen 

1522 return p 

1523 

1524 def recv(self, size=MTU, **kwargs): # type: ignore 

1525 # type: (int, **Any) -> Packet 

1526 return self.read_packet(size=size, **kwargs) 

1527 

1528 def __next__(self): # type: ignore 

1529 # type: () -> Packet 

1530 try: 

1531 return self.read_packet() 

1532 except EOFError: 

1533 raise StopIteration 

1534 

1535 def read_all(self, count=-1): 

1536 # type: (int) -> PacketList 

1537 res = self._read_all(count) 

1538 from scapy import plist 

1539 return plist.PacketList(res, name=os.path.basename(self.filename)) 

1540 

1541 

1542class RawPcapNgReader(RawPcapReader): 

1543 """A stateful pcapng reader. Each packet is returned as 

1544 bytes. 

1545 

1546 """ 

1547 

1548 alternative = RawPcapReader # type: Type[Any] 

1549 

1550 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore 

1551 ["linktype", "tsresol", 

1552 "tshigh", "tslow", "wirelen", 

1553 "comment", "ifname", "direction"]) 

1554 

1555 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1556 # type: (str, IO[bytes], bytes) -> None 

1557 self.filename = filename 

1558 self.f = fdesc 

1559 # A list of (linktype, snaplen, tsresol); will be populated by IDBs. 

1560 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]] 

1561 self.default_options = { 

1562 "tsresol": 1000000 

1563 } 

1564 self.blocktypes: Dict[ 

1565 int, 

1566 Callable[ 

1567 [bytes, int], 

1568 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] 

1569 ]] = { 

1570 1: self._read_block_idb, 

1571 2: self._read_block_pkt, 

1572 3: self._read_block_spb, 

1573 6: self._read_block_epb, 

1574 10: self._read_block_dsb, 

1575 } 

1576 self.endian = "!" # Will be overwritten by first SHB 

1577 

1578 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg: 

1579 raise Scapy_Exception( 

1580 "Not a pcapng capture file (bad magic: %r)" % magic 

1581 ) 

1582 

1583 try: 

1584 self._read_block_shb() 

1585 except EOFError: 

1586 raise Scapy_Exception( 

1587 "The first SHB of the pcapng file is malformed !" 

1588 ) 

1589 

1590 def _read_block(self, size=MTU): 

1591 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501 

1592 try: 

1593 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1594 except struct.error: 

1595 raise EOFError 

1596 if blocktype == 0x0A0D0D0A: 

1597 # This function updates the endianness based on the block content. 

1598 self._read_block_shb() 

1599 return None 

1600 try: 

1601 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1602 except struct.error: 

1603 warning("PcapNg: Error reading blocklen before block body") 

1604 raise EOFError 

1605 if blocklen < 12: 

1606 warning("PcapNg: Invalid block length !") 

1607 raise EOFError 

1608 

1609 _block_body_length = blocklen - 12 

1610 block = self.f.read(_block_body_length) 

1611 if len(block) != _block_body_length: 

1612 raise Scapy_Exception("PcapNg: Invalid Block body length " 

1613 "(too short)") 

1614 self._read_block_tail(blocklen) 

1615 if blocktype in self.blocktypes: 

1616 return self.blocktypes[blocktype](block, size) 

1617 return None 

1618 

1619 def _read_block_tail(self, blocklen): 

1620 # type: (int) -> None 

1621 if blocklen % 4: 

1622 pad = self.f.read(-blocklen % 4) 

1623 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. " 

1624 "Ignored padding %r" % (blocklen, pad)) 

1625 try: 

1626 if blocklen != struct.unpack(self.endian + 'I', 

1627 self.f.read(4))[0]: 

1628 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)") 

1629 except struct.error: 

1630 warning("PcapNg: Could not read blocklen after block body") 

1631 raise EOFError 

1632 

1633 def _read_block_shb(self): 

1634 # type: () -> None 

1635 """Section Header Block""" 

1636 _blocklen = self.f.read(4) 

1637 endian = self.f.read(4) 

1638 if endian == b"\x1a\x2b\x3c\x4d": 

1639 self.endian = ">" 

1640 elif endian == b"\x4d\x3c\x2b\x1a": 

1641 self.endian = "<" 

1642 else: 

1643 warning("PcapNg: Bad magic in Section Header Block" 

1644 " (not a pcapng file?)") 

1645 raise EOFError 

1646 

1647 try: 

1648 blocklen = struct.unpack(self.endian + "I", _blocklen)[0] 

1649 except struct.error: 

1650 warning("PcapNg: Could not read blocklen") 

1651 raise EOFError 

1652 if blocklen < 28: 

1653 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501 

1654 raise EOFError 

1655 

1656 # Major version must be 1 

1657 _major = self.f.read(2) 

1658 try: 

1659 major = struct.unpack(self.endian + "H", _major)[0] 

1660 except struct.error: 

1661 warning("PcapNg: Could not read major value") 

1662 raise EOFError 

1663 if major != 1: 

1664 warning(f"PcapNg: SHB Major version {major} unsupported !") 

1665 raise EOFError 

1666 

1667 # Skip minor version & section length 

1668 skipped = self.f.read(10) 

1669 if len(skipped) != 10: 

1670 warning("PcapNg: Could not read minor value & section length") 

1671 raise EOFError 

1672 

1673 _options_len = blocklen - 28 

1674 options = self.f.read(_options_len) 

1675 if len(options) != _options_len: 

1676 raise Scapy_Exception("PcapNg: Invalid Section Header Block " 

1677 " options (too short)") 

1678 self._read_block_tail(blocklen) 

1679 self._read_options(options) 

1680 

1681 def _read_packet(self, size=MTU): # type: ignore 

1682 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1683 """Read blocks until it reaches either EOF or a packet, and 

1684 returns None or (packet, (linktype, sec, usec, wirelen)), 

1685 where packet is a string. 

1686 

1687 """ 

1688 while True: 

1689 res = self._read_block() 

1690 if res is not None: 

1691 return res 

1692 

1693 def _read_options(self, options): 

1694 # type: (bytes) -> Dict[int, bytes] 

1695 opts = dict() 

1696 while len(options) >= 4: 

1697 try: 

1698 code, length = struct.unpack(self.endian + "HH", options[:4]) 

1699 except struct.error: 

1700 warning("PcapNg: options header is too small " 

1701 "%d !" % len(options)) 

1702 raise EOFError 

1703 if code != 0 and 4 + length < len(options): 

1704 opts[code] = options[4:4 + length] 

1705 if code == 0: 

1706 if length != 0: 

1707 warning("PcapNg: invalid option " 

1708 "length %d for end-of-option" % length) 

1709 break 

1710 if length % 4: 

1711 length += (4 - (length % 4)) 

1712 options = options[4 + length:] 

1713 return opts 

1714 

1715 def _read_block_idb(self, block, _): 

1716 # type: (bytes, int) -> None 

1717 """Interface Description Block""" 

1718 # 2 bytes LinkType + 2 bytes Reserved 

1719 # 4 bytes Snaplen 

1720 options_raw = self._read_options(block[8:]) 

1721 options = self.default_options.copy() # type: Dict[str, Any] 

1722 for c, v in options_raw.items(): 

1723 if c == 9: 

1724 length = len(v) 

1725 if length == 1: 

1726 tsresol = orb(v) 

1727 options["tsresol"] = (2 if tsresol & 128 else 10) ** ( 

1728 tsresol & 127 

1729 ) 

1730 else: 

1731 warning("PcapNg: invalid options " 

1732 "length %d for IDB tsresol" % length) 

1733 elif c == 2: 

1734 options["name"] = v 

1735 elif c == 1: 

1736 options["comment"] = v 

1737 try: 

1738 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack( 

1739 self.endian + "HxxI", 

1740 block[:8] 

1741 ) + (options,) 

1742 except struct.error: 

1743 warning("PcapNg: IDB is too small %d/8 !" % len(block)) 

1744 raise EOFError 

1745 self.interfaces.append(interface) 

1746 

1747 def _check_interface_id(self, intid): 

1748 # type: (int) -> None 

1749 """Check the interface id value and raise EOFError if invalid.""" 

1750 tmp_len = len(self.interfaces) 

1751 if intid >= tmp_len: 

1752 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len)) 

1753 raise EOFError 

1754 

1755 def _read_block_epb(self, block, size): 

1756 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1757 """Enhanced Packet Block""" 

1758 try: 

1759 intid, tshigh, tslow, caplen, wirelen = struct.unpack( 

1760 self.endian + "5I", 

1761 block[:20], 

1762 ) 

1763 except struct.error: 

1764 warning("PcapNg: EPB is too small %d/20 !" % len(block)) 

1765 raise EOFError 

1766 

1767 # Compute the options offset taking padding into account 

1768 if caplen % 4: 

1769 opt_offset = 20 + caplen + (-caplen) % 4 

1770 else: 

1771 opt_offset = 20 + caplen 

1772 

1773 # Parse options 

1774 options = self._read_options(block[opt_offset:]) 

1775 comment = options.get(1, None) 

1776 epb_flags_raw = options.get(2, None) 

1777 if epb_flags_raw: 

1778 try: 

1779 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw) 

1780 except struct.error: 

1781 warning("PcapNg: EPB invalid flags size" 

1782 "(expected 4 bytes, got %d) !" % len(epb_flags_raw)) 

1783 raise EOFError 

1784 direction = epb_flags & 3 

1785 

1786 else: 

1787 direction = None 

1788 

1789 self._check_interface_id(intid) 

1790 ifname = self.interfaces[intid][2].get('name', None) 

1791 return (block[20:20 + caplen][:size], 

1792 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1793 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1794 tshigh=tshigh, 

1795 tslow=tslow, 

1796 wirelen=wirelen, 

1797 comment=comment, 

1798 ifname=ifname, 

1799 direction=direction)) 

1800 

1801 def _read_block_spb(self, block, size): 

1802 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1803 """Simple Packet Block""" 

1804 # "it MUST be assumed that all the Simple Packet Blocks have 

1805 # been captured on the interface previously specified in the 

1806 # first Interface Description Block." 

1807 intid = 0 

1808 self._check_interface_id(intid) 

1809 

1810 try: 

1811 wirelen, = struct.unpack(self.endian + "I", block[:4]) 

1812 except struct.error: 

1813 warning("PcapNg: SPB is too small %d/4 !" % len(block)) 

1814 raise EOFError 

1815 

1816 caplen = min(wirelen, self.interfaces[intid][1]) 

1817 return (block[4:4 + caplen][:size], 

1818 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1819 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1820 tshigh=None, 

1821 tslow=None, 

1822 wirelen=wirelen, 

1823 comment=None, 

1824 ifname=None, 

1825 direction=None)) 

1826 

1827 def _read_block_pkt(self, block, size): 

1828 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1829 """(Obsolete) Packet Block""" 

1830 try: 

1831 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack( 

1832 self.endian + "HH4I", 

1833 block[:20], 

1834 ) 

1835 except struct.error: 

1836 warning("PcapNg: PKT is too small %d/20 !" % len(block)) 

1837 raise EOFError 

1838 

1839 self._check_interface_id(intid) 

1840 return (block[20:20 + caplen][:size], 

1841 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1842 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1843 tshigh=tshigh, 

1844 tslow=tslow, 

1845 wirelen=wirelen, 

1846 comment=None, 

1847 ifname=None, 

1848 direction=None)) 

1849 

1850 def _read_block_dsb(self, block, size): 

1851 # type: (bytes, int) -> None 

1852 """Decryption Secrets Block""" 

1853 

1854 # Parse the secrets type and length fields 

1855 try: 

1856 secrets_type, secrets_length = struct.unpack( 

1857 self.endian + "II", 

1858 block[:8], 

1859 ) 

1860 block = block[8:] 

1861 except struct.error: 

1862 warning("PcapNg: DSB is too small %d!", len(block)) 

1863 raise EOFError 

1864 

1865 # Compute the secrets length including the padding 

1866 padded_secrets_length = secrets_length + (-secrets_length) % 4 

1867 if len(block) < padded_secrets_length: 

1868 warning("PcapNg: invalid DSB secrets length!") 

1869 raise EOFError 

1870 

1871 # Extract secrets data and options 

1872 secrets_data = block[:padded_secrets_length][:secrets_length] 

1873 if block[padded_secrets_length:]: 

1874 warning("PcapNg: DSB options are not supported!") 

1875 

1876 # TLS Key Log 

1877 if secrets_type == 0x544c534b: 

1878 if getattr(conf, "tls_sessions", False) is False: 

1879 warning("PcapNg: TLS Key Log available, but " 

1880 "the TLS layer is not loaded! Scapy won't be able " 

1881 "to decrypt the packets.") 

1882 else: 

1883 from scapy.layers.tls.session import load_nss_keys 

1884 

1885 # Write Key Log to a file and parse it 

1886 filename = get_temp_file() 

1887 with open(filename, "wb") as fd: 

1888 fd.write(secrets_data) 

1889 fd.close() 

1890 

1891 keys = load_nss_keys(filename) 

1892 if not keys: 

1893 warning("PcapNg: invalid TLS Key Log in DSB!") 

1894 else: 

1895 # Note: these attributes are only available when the TLS 

1896 # layer is loaded. 

1897 conf.tls_nss_keys = keys 

1898 conf.tls_session_enable = True 

1899 else: 

1900 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type) 

1901 

1902 

1903class PcapNgReader(RawPcapNgReader, PcapReader): 

1904 

1905 alternative = PcapReader 

1906 

1907 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1908 # type: (str, IO[bytes], bytes) -> None 

1909 RawPcapNgReader.__init__(self, filename, fdesc, magic) 

1910 

1911 def __enter__(self): 

1912 # type: () -> PcapNgReader 

1913 return self 

1914 

1915 def read_packet(self, size=MTU, **kwargs): 

1916 # type: (int, **Any) -> Packet 

1917 rp = super(PcapNgReader, self)._read_packet(size=size) 

1918 if rp is None: 

1919 raise EOFError 

1920 s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction) = rp 

1921 try: 

1922 cls = conf.l2types.num2layer[linktype] # type: Type[Packet] 

1923 p = cls(s, **kwargs) # type: Packet 

1924 except KeyboardInterrupt: 

1925 raise 

1926 except Exception: 

1927 if conf.debug_dissector: 

1928 raise 

1929 if conf.raw_layer is None: 

1930 # conf.raw_layer is set on import 

1931 import scapy.packet # noqa: F401 

1932 p = conf.raw_layer(s) 

1933 if tshigh is not None: 

1934 p.time = EDecimal((tshigh << 32) + tslow) / tsresol 

1935 p.wirelen = wirelen 

1936 p.comment = comment 

1937 p.direction = direction 

1938 if ifname is not None: 

1939 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace') 

1940 return p 

1941 

1942 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore 

1943 return self.read_packet(size=size, **kwargs) 

1944 

1945 

1946class GenericPcapWriter(object): 

1947 nano = False 

1948 linktype: int 

1949 

1950 def _write_header(self, pkt): 

1951 # type: (Optional[Union[Packet, bytes]]) -> None 

1952 raise NotImplementedError 

1953 

1954 def _write_packet(self, 

1955 packet, # type: Union[bytes, Packet] 

1956 linktype, # type: int 

1957 sec=None, # type: Optional[float] 

1958 usec=None, # type: Optional[int] 

1959 caplen=None, # type: Optional[int] 

1960 wirelen=None, # type: Optional[int] 

1961 comment=None, # type: Optional[bytes] 

1962 ifname=None, # type: Optional[bytes] 

1963 direction=None, # type: Optional[int] 

1964 ): 

1965 # type: (...) -> None 

1966 raise NotImplementedError 

1967 

1968 def _get_time(self, 

1969 packet, # type: Union[bytes, Packet] 

1970 sec, # type: Optional[float] 

1971 usec # type: Optional[int] 

1972 ): 

1973 # type: (...) -> Tuple[float, int] 

1974 if hasattr(packet, "time"): 

1975 if sec is None: 

1976 packet_time = packet.time 

1977 tmp = int(packet_time) 

1978 usec = int(round((packet_time - tmp) * 

1979 (1000000000 if self.nano else 1000000))) 

1980 sec = float(packet_time) 

1981 if sec is not None and usec is None: 

1982 usec = 0 

1983 return sec, usec # type: ignore 

1984 

1985 def write_header(self, pkt): 

1986 # type: (Optional[Union[Packet, bytes]]) -> None 

1987 if not hasattr(self, 'linktype'): 

1988 try: 

1989 if pkt is None or isinstance(pkt, bytes): 

1990 # Can't guess LL 

1991 raise KeyError 

1992 self.linktype = conf.l2types.layer2num[ 

1993 pkt.__class__ 

1994 ] 

1995 except KeyError: 

1996 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)" 

1997 warning(msg, self.__class__.__name__, pkt.__class__.__name__) 

1998 self.linktype = DLT_EN10MB 

1999 self._write_header(pkt) 

2000 

2001 def write_packet(self, 

2002 packet, # type: Union[bytes, Packet] 

2003 sec=None, # type: Optional[float] 

2004 usec=None, # type: Optional[int] 

2005 caplen=None, # type: Optional[int] 

2006 wirelen=None, # type: Optional[int] 

2007 ): 

2008 # type: (...) -> None 

2009 """ 

2010 Writes a single packet to the pcap file. 

2011 

2012 :param packet: Packet, or bytes for a single packet 

2013 :type packet: scapy.packet.Packet or bytes 

2014 :param sec: time the packet was captured, in seconds since epoch. If 

2015 not supplied, defaults to now. 

2016 :type sec: float 

2017 :param usec: If ``nano=True``, then number of nanoseconds after the 

2018 second that the packet was captured. If ``nano=False``, 

2019 then the number of microseconds after the second the 

2020 packet was captured. If ``sec`` is not specified, 

2021 this value is ignored. 

2022 :type usec: int or long 

2023 :param caplen: The length of the packet in the capture file. If not 

2024 specified, uses ``len(raw(packet))``. 

2025 :type caplen: int 

2026 :param wirelen: The length of the packet on the wire. If not 

2027 specified, tries ``packet.wirelen``, otherwise uses 

2028 ``caplen``. 

2029 :type wirelen: int 

2030 :return: None 

2031 :rtype: None 

2032 """ 

2033 f_sec, usec = self._get_time(packet, sec, usec) 

2034 

2035 rawpkt = bytes_encode(packet) 

2036 caplen = len(rawpkt) if caplen is None else caplen 

2037 

2038 if wirelen is None: 

2039 if hasattr(packet, "wirelen"): 

2040 wirelen = packet.wirelen 

2041 if wirelen is None: 

2042 wirelen = caplen 

2043 

2044 comment = getattr(packet, "comment", None) 

2045 ifname = getattr(packet, "sniffed_on", None) 

2046 direction = getattr(packet, "direction", None) 

2047 if not isinstance(packet, bytes): 

2048 linktype: int = conf.l2types.layer2num[ 

2049 packet.__class__ 

2050 ] 

2051 else: 

2052 linktype = self.linktype 

2053 if ifname is not None: 

2054 ifname = str(ifname).encode('utf-8') 

2055 self._write_packet( 

2056 rawpkt, 

2057 sec=f_sec, usec=usec, 

2058 caplen=caplen, wirelen=wirelen, 

2059 comment=comment, 

2060 ifname=ifname, 

2061 direction=direction, 

2062 linktype=linktype 

2063 ) 

2064 

2065 

2066class GenericRawPcapWriter(GenericPcapWriter): 

2067 header_present = False 

2068 nano = False 

2069 sync = False 

2070 f = None # type: Union[IO[bytes], gzip.GzipFile] 

2071 

2072 def fileno(self): 

2073 # type: () -> int 

2074 return -1 if WINDOWS else self.f.fileno() 

2075 

2076 def flush(self): 

2077 # type: () -> Optional[Any] 

2078 return self.f.flush() 

2079 

2080 def close(self): 

2081 # type: () -> Optional[Any] 

2082 if not self.header_present: 

2083 self.write_header(None) 

2084 return self.f.close() 

2085 

2086 def __enter__(self): 

2087 # type: () -> GenericRawPcapWriter 

2088 return self 

2089 

2090 def __exit__(self, exc_type, exc_value, tracback): 

2091 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

2092 self.flush() 

2093 self.close() 

2094 

2095 def write(self, pkt): 

2096 # type: (Union[_PacketIterable, bytes]) -> None 

2097 """ 

2098 Writes a Packet, a SndRcvList object, or bytes to a pcap file. 

2099 

2100 :param pkt: Packet(s) to write (one record for each Packet), or raw 

2101 bytes to write (as one record). 

2102 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes 

2103 """ 

2104 if isinstance(pkt, bytes): 

2105 if not self.header_present: 

2106 self.write_header(pkt) 

2107 self.write_packet(pkt) 

2108 else: 

2109 # Import here to avoid circular dependency 

2110 from scapy.supersocket import IterSocket 

2111 for p in IterSocket(pkt).iter: 

2112 if not self.header_present: 

2113 self.write_header(p) 

2114 

2115 if not isinstance(p, bytes) and \ 

2116 self.linktype != conf.l2types.get(type(p), None): 

2117 warning("Inconsistent linktypes detected!" 

2118 " The resulting file might contain" 

2119 " invalid packets." 

2120 ) 

2121 

2122 self.write_packet(p) 

2123 

2124 

2125class RawPcapWriter(GenericRawPcapWriter): 

2126 """A stream PCAP writer with more control than wrpcap()""" 

2127 

2128 def __init__(self, 

2129 filename, # type: Union[IO[bytes], str] 

2130 linktype=None, # type: Optional[int] 

2131 gz=False, # type: bool 

2132 endianness="", # type: str 

2133 append=False, # type: bool 

2134 sync=False, # type: bool 

2135 nano=False, # type: bool 

2136 snaplen=MTU, # type: int 

2137 bufsz=4096, # type: int 

2138 ): 

2139 # type: (...) -> None 

2140 """ 

2141 :param filename: the name of the file to write packets to, or an open, 

2142 writable file-like object. 

2143 :param linktype: force linktype to a given value. If None, linktype is 

2144 taken from the first writer packet 

2145 :param gz: compress the capture on the fly 

2146 :param endianness: force an endianness (little:"<", big:">"). 

2147 Default is native 

2148 :param append: append packets to the capture file instead of 

2149 truncating it 

2150 :param sync: do not bufferize writes to the capture file 

2151 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0) 

2152 

2153 """ 

2154 

2155 if linktype: 

2156 self.linktype = linktype 

2157 self.snaplen = snaplen 

2158 self.append = append 

2159 self.gz = gz 

2160 self.endian = endianness 

2161 self.sync = sync 

2162 self.nano = nano 

2163 if sync: 

2164 bufsz = 0 

2165 

2166 if isinstance(filename, str): 

2167 self.filename = filename 

2168 if gz: 

2169 self.f = cast(_ByteStream, gzip.open( 

2170 filename, append and "ab" or "wb", 9 

2171 )) 

2172 else: 

2173 self.f = open(filename, append and "ab" or "wb", bufsz) 

2174 else: 

2175 self.f = filename 

2176 self.filename = getattr(filename, "name", "No name") 

2177 

2178 def _write_header(self, pkt): 

2179 # type: (Optional[Union[Packet, bytes]]) -> None 

2180 self.header_present = True 

2181 

2182 if self.append: 

2183 # Even if prone to race conditions, this seems to be 

2184 # safest way to tell whether the header is already present 

2185 # because we have to handle compressed streams that 

2186 # are not as flexible as basic files 

2187 if self.gz: 

2188 g = gzip.open(self.filename, "rb") # type: _ByteStream 

2189 else: 

2190 g = open(self.filename, "rb") 

2191 try: 

2192 if g.read(16): 

2193 return 

2194 finally: 

2195 g.close() 

2196 

2197 if not hasattr(self, 'linktype'): 

2198 raise ValueError( 

2199 "linktype could not be guessed. " 

2200 "Please pass a linktype while creating the writer" 

2201 ) 

2202 

2203 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501 

2204 2, 4, 0, 0, self.snaplen, self.linktype)) 

2205 self.f.flush() 

2206 

2207 def _write_packet(self, 

2208 packet, # type: Union[bytes, Packet] 

2209 linktype, # type: int 

2210 sec=None, # type: Optional[float] 

2211 usec=None, # type: Optional[int] 

2212 caplen=None, # type: Optional[int] 

2213 wirelen=None, # type: Optional[int] 

2214 comment=None, # type: Optional[bytes] 

2215 ifname=None, # type: Optional[bytes] 

2216 direction=None, # type: Optional[int] 

2217 ): 

2218 # type: (...) -> None 

2219 """ 

2220 Writes a single packet to the pcap file. 

2221 

2222 :param packet: bytes for a single packet 

2223 :type packet: bytes 

2224 :param linktype: linktype value associated with the packet 

2225 :type linktype: int 

2226 :param sec: time the packet was captured, in seconds since epoch. If 

2227 not supplied, defaults to now. 

2228 :type sec: float 

2229 :param usec: not used with pcapng 

2230 packet was captured 

2231 :type usec: int or long 

2232 :param caplen: The length of the packet in the capture file. If not 

2233 specified, uses ``len(packet)``. 

2234 :type caplen: int 

2235 :param wirelen: The length of the packet on the wire. If not 

2236 specified, uses ``caplen``. 

2237 :type wirelen: int 

2238 :return: None 

2239 :rtype: None 

2240 """ 

2241 if caplen is None: 

2242 caplen = len(packet) 

2243 if wirelen is None: 

2244 wirelen = caplen 

2245 if sec is None or usec is None: 

2246 t = time.time() 

2247 it = int(t) 

2248 if sec is None: 

2249 sec = it 

2250 usec = int(round((t - it) * 

2251 (1000000000 if self.nano else 1000000))) 

2252 elif usec is None: 

2253 usec = 0 

2254 

2255 self.f.write(struct.pack(self.endian + "IIII", 

2256 int(sec), usec, caplen, wirelen)) 

2257 self.f.write(bytes(packet)) 

2258 if self.sync: 

2259 self.f.flush() 

2260 

2261 

2262class RawPcapNgWriter(GenericRawPcapWriter): 

2263 """A stream pcapng writer with more control than wrpcapng()""" 

2264 

2265 def __init__(self, 

2266 filename, # type: str 

2267 ): 

2268 # type: (...) -> None 

2269 

2270 self.header_present = False 

2271 self.tsresol = 1000000 

2272 # A dict to keep if_name to IDB id mapping. 

2273 # unknown if_name(None) id=0 

2274 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0} 

2275 

2276 # tcpdump only support little-endian in PCAPng files 

2277 self.endian = "<" 

2278 self.endian_magic = b"\x4d\x3c\x2b\x1a" 

2279 

2280 self.filename = filename 

2281 self.f = open(filename, "wb", 4096) 

2282 

2283 def _get_time(self, 

2284 packet, # type: Union[bytes, Packet] 

2285 sec, # type: Optional[float] 

2286 usec # type: Optional[int] 

2287 ): 

2288 # type: (...) -> Tuple[float, int] 

2289 if hasattr(packet, "time"): 

2290 if sec is None: 

2291 sec = float(packet.time) 

2292 

2293 if usec is None: 

2294 usec = 0 

2295 

2296 return sec, usec # type: ignore 

2297 

2298 def _add_padding(self, raw_data): 

2299 # type: (bytes) -> bytes 

2300 raw_data += ((-len(raw_data)) % 4) * b"\x00" 

2301 return raw_data 

2302 

2303 def build_block(self, block_type, block_body, options=None): 

2304 # type: (bytes, bytes, Optional[bytes]) -> bytes 

2305 

2306 # Pad Block Body to 32 bits 

2307 block_body = self._add_padding(block_body) 

2308 

2309 if options: 

2310 block_body += options 

2311 

2312 # An empty block is 12 bytes long 

2313 block_total_length = 12 + len(block_body) 

2314 

2315 # Block Type 

2316 block = block_type 

2317 # Block Total Length$ 

2318 block += struct.pack(self.endian + "I", block_total_length) 

2319 # Block Body 

2320 block += block_body 

2321 # Block Total Length$ 

2322 block += struct.pack(self.endian + "I", block_total_length) 

2323 

2324 return block 

2325 

2326 def _write_header(self, pkt): 

2327 # type: (Optional[Union[Packet, bytes]]) -> None 

2328 if not self.header_present: 

2329 self.header_present = True 

2330 self._write_block_shb() 

2331 self._write_block_idb(linktype=self.linktype) 

2332 

2333 def _write_block_shb(self): 

2334 # type: () -> None 

2335 

2336 # Block Type 

2337 block_type = b"\x0A\x0D\x0D\x0A" 

2338 # Byte-Order Magic 

2339 block_shb = self.endian_magic 

2340 # Major Version 

2341 block_shb += struct.pack(self.endian + "H", 1) 

2342 # Minor Version 

2343 block_shb += struct.pack(self.endian + "H", 0) 

2344 # Section Length 

2345 block_shb += struct.pack(self.endian + "q", -1) 

2346 

2347 self.f.write(self.build_block(block_type, block_shb)) 

2348 

2349 def _write_block_idb(self, 

2350 linktype, # type: int 

2351 ifname=None # type: Optional[bytes] 

2352 ): 

2353 # type: (...) -> None 

2354 

2355 # Block Type 

2356 block_type = struct.pack(self.endian + "I", 1) 

2357 # LinkType 

2358 block_idb = struct.pack(self.endian + "H", linktype) 

2359 # Reserved 

2360 block_idb += struct.pack(self.endian + "H", 0) 

2361 # SnapLen 

2362 block_idb += struct.pack(self.endian + "I", 262144) 

2363 

2364 # if_name option 

2365 opts = None 

2366 if ifname is not None: 

2367 opts = struct.pack(self.endian + "HH", 2, len(ifname)) 

2368 # Pad Option Value to 32 bits 

2369 opts += self._add_padding(ifname) 

2370 opts += struct.pack(self.endian + "HH", 0, 0) 

2371 

2372 self.f.write(self.build_block(block_type, block_idb, options=opts)) 

2373 

2374 def _write_block_spb(self, raw_pkt): 

2375 # type: (bytes) -> None 

2376 

2377 # Block Type 

2378 block_type = struct.pack(self.endian + "I", 3) 

2379 # Original Packet Length 

2380 block_spb = struct.pack(self.endian + "I", len(raw_pkt)) 

2381 # Packet Data 

2382 block_spb += raw_pkt 

2383 

2384 self.f.write(self.build_block(block_type, block_spb)) 

2385 

2386 def _write_block_epb(self, 

2387 raw_pkt, # type: bytes 

2388 ifid, # type: int 

2389 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501 

2390 caplen=None, # type: Optional[int] 

2391 orglen=None, # type: Optional[int] 

2392 comment=None, # type: Optional[bytes] 

2393 flags=None, # type: Optional[int] 

2394 ): 

2395 # type: (...) -> None 

2396 

2397 if timestamp: 

2398 tmp_ts = int(timestamp * self.tsresol) 

2399 ts_high = tmp_ts >> 32 

2400 ts_low = tmp_ts & 0xFFFFFFFF 

2401 else: 

2402 ts_high = ts_low = 0 

2403 

2404 if not caplen: 

2405 caplen = len(raw_pkt) 

2406 

2407 if not orglen: 

2408 orglen = len(raw_pkt) 

2409 

2410 # Block Type 

2411 block_type = struct.pack(self.endian + "I", 6) 

2412 # Interface ID 

2413 block_epb = struct.pack(self.endian + "I", ifid) 

2414 # Timestamp (High) 

2415 block_epb += struct.pack(self.endian + "I", ts_high) 

2416 # Timestamp (Low) 

2417 block_epb += struct.pack(self.endian + "I", ts_low) 

2418 # Captured Packet Length 

2419 block_epb += struct.pack(self.endian + "I", caplen) 

2420 # Original Packet Length 

2421 block_epb += struct.pack(self.endian + "I", orglen) 

2422 # Packet Data 

2423 block_epb += raw_pkt 

2424 

2425 # Options 

2426 opts = b'' 

2427 if comment is not None: 

2428 comment = bytes_encode(comment) 

2429 opts += struct.pack(self.endian + "HH", 1, len(comment)) 

2430 # Pad Option Value to 32 bits 

2431 opts += self._add_padding(comment) 

2432 if type(flags) == int: 

2433 opts += struct.pack(self.endian + "HH", 2, 4) 

2434 opts += struct.pack(self.endian + "I", flags) 

2435 if opts: 

2436 opts += struct.pack(self.endian + "HH", 0, 0) 

2437 

2438 self.f.write(self.build_block(block_type, block_epb, 

2439 options=opts)) 

2440 

2441 def _write_packet(self, # type: ignore 

2442 packet, # type: bytes 

2443 linktype, # type: int 

2444 sec=None, # type: Optional[float] 

2445 usec=None, # type: Optional[int] 

2446 caplen=None, # type: Optional[int] 

2447 wirelen=None, # type: Optional[int] 

2448 comment=None, # type: Optional[bytes] 

2449 ifname=None, # type: Optional[bytes] 

2450 direction=None, # type: Optional[int] 

2451 ): 

2452 # type: (...) -> None 

2453 """ 

2454 Writes a single packet to the pcap file. 

2455 

2456 :param packet: bytes for a single packet 

2457 :type packet: bytes 

2458 :param linktype: linktype value associated with the packet 

2459 :type linktype: int 

2460 :param sec: time the packet was captured, in seconds since epoch. If 

2461 not supplied, defaults to now. 

2462 :type sec: float 

2463 :param caplen: The length of the packet in the capture file. If not 

2464 specified, uses ``len(packet)``. 

2465 :type caplen: int 

2466 :param wirelen: The length of the packet on the wire. If not 

2467 specified, uses ``caplen``. 

2468 :type wirelen: int 

2469 :param comment: UTF-8 string containing human-readable comment text 

2470 that is associated to the current block. Line separators 

2471 SHOULD be a carriage-return + linefeed ('\r\n') or 

2472 just linefeed ('\n'); either form may appear and 

2473 be considered a line separator. The string is not 

2474 zero-terminated. 

2475 :type bytes 

2476 :param ifname: UTF-8 string containing the 

2477 name of the device used to capture data. 

2478 The string is not zero-terminated. 

2479 :type bytes 

2480 :param direction: 0 = information not available, 

2481 1 = inbound, 

2482 2 = outbound 

2483 :type int 

2484 :return: None 

2485 :rtype: None 

2486 """ 

2487 if caplen is None: 

2488 caplen = len(packet) 

2489 if wirelen is None: 

2490 wirelen = caplen 

2491 

2492 ifid = self.interfaces2id.get(ifname, None) 

2493 if ifid is None: 

2494 ifid = max(self.interfaces2id.values()) + 1 

2495 self.interfaces2id[ifname] = ifid 

2496 self._write_block_idb(linktype=linktype, ifname=ifname) 

2497 

2498 # EPB flags (32 bits). 

2499 # currently only direction is implemented (least 2 significant bits) 

2500 if type(direction) == int: 

2501 flags = direction & 0x3 

2502 else: 

2503 flags = None 

2504 

2505 self._write_block_epb(packet, timestamp=sec, caplen=caplen, 

2506 orglen=wirelen, comment=comment, ifid=ifid, flags=flags) 

2507 if self.sync: 

2508 self.f.flush() 

2509 

2510 

2511class PcapWriter(RawPcapWriter): 

2512 """A stream PCAP writer with more control than wrpcap()""" 

2513 pass 

2514 

2515 

2516class PcapNgWriter(RawPcapNgWriter): 

2517 """A stream pcapng writer with more control than wrpcapng()""" 

2518 

2519 def _get_time(self, 

2520 packet, # type: Union[bytes, Packet] 

2521 sec, # type: Optional[float] 

2522 usec # type: Optional[int] 

2523 ): 

2524 # type: (...) -> Tuple[float, int] 

2525 if hasattr(packet, "time"): 

2526 if sec is None: 

2527 sec = float(packet.time) 

2528 

2529 if usec is None: 

2530 usec = 0 

2531 

2532 return sec, usec # type: ignore 

2533 

2534 

2535@conf.commands.register 

2536def rderf(filename, count=-1): 

2537 # type: (Union[IO[bytes], str], int) -> PacketList 

2538 """Read a ERF file and return a packet list 

2539 

2540 :param count: read only <count> packets 

2541 """ 

2542 with ERFEthernetReader(filename) as fdesc: 

2543 return fdesc.read_all(count=count) 

2544 

2545 

2546class ERFEthernetReader_metaclass(PcapReader_metaclass): 

2547 def __call__(cls, filename): 

2548 # type: (Union[IO[bytes], str]) -> Any 

2549 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore 

2550 filename, fdesc = cls.open(filename) 

2551 try: 

2552 i.__init__(filename, fdesc) 

2553 return i 

2554 except (Scapy_Exception, EOFError): 

2555 pass 

2556 

2557 if "alternative" in cls.__dict__: 

2558 cls = cls.__dict__["alternative"] 

2559 i = cls.__new__( 

2560 cls, 

2561 cls.__name__, 

2562 cls.__bases__, 

2563 cls.__dict__ # type: ignore 

2564 ) 

2565 try: 

2566 i.__init__(filename, fdesc) 

2567 return i 

2568 except (Scapy_Exception, EOFError): 

2569 pass 

2570 

2571 raise Scapy_Exception("Not a supported capture file") 

2572 

2573 @staticmethod 

2574 def open(fname # type: ignore 

2575 ): 

2576 # type: (...) -> Tuple[str, _ByteStream] 

2577 """Open (if necessary) filename""" 

2578 if isinstance(fname, str): 

2579 filename = fname 

2580 try: 

2581 with gzip.open(filename, "rb") as tmp: 

2582 tmp.read(1) 

2583 fdesc = gzip.open(filename, "rb") # type: _ByteStream 

2584 except IOError: 

2585 fdesc = open(filename, "rb") 

2586 

2587 else: 

2588 fdesc = fname 

2589 filename = getattr(fdesc, "name", "No name") 

2590 return filename, fdesc 

2591 

2592 

2593class ERFEthernetReader(PcapReader, 

2594 metaclass=ERFEthernetReader_metaclass): 

2595 

2596 def __init__(self, filename, fdesc=None): # type: ignore 

2597 # type: (Union[IO[bytes], str], IO[bytes]) -> None 

2598 self.filename = filename # type: ignore 

2599 self.f = fdesc 

2600 self.power = Decimal(10) ** Decimal(-9) 

2601 

2602 # time is in 64-bits Endace's format which can be see here: 

2603 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2604 def _convert_erf_timestamp(self, t): 

2605 # type: (int) -> EDecimal 

2606 sec = t >> 32 

2607 frac_sec = t & 0xffffffff 

2608 frac_sec *= 10**9 

2609 frac_sec += (frac_sec & 0x80000000) << 1 

2610 frac_sec >>= 32 

2611 return EDecimal(sec + self.power * frac_sec) 

2612 

2613 # The details of ERF Packet format can be see here: 

2614 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2615 def read_packet(self, size=MTU, **kwargs): 

2616 # type: (int, **Any) -> Packet 

2617 

2618 # General ERF Header have exactly 16 bytes 

2619 hdr = self.f.read(16) 

2620 if len(hdr) < 16: 

2621 raise EOFError 

2622 

2623 # The timestamp is in little-endian byte-order. 

2624 time = struct.unpack('<Q', hdr[:8])[0] 

2625 # The rest is in big-endian byte-order. 

2626 # Ignoring flags and lctr (loss counter) since they are ERF specific 

2627 # header fields which Packet object does not support. 

2628 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:]) 

2629 # Check if the type != 0x02, type Ethernet 

2630 if type & 0x02 == 0: 

2631 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)") 

2632 

2633 # If there are extended headers, ignore it because Packet object does 

2634 # not support it. Extended headers size is 8 bytes before the payload. 

2635 if type & 0x80: 

2636 _ = self.f.read(8) 

2637 s = self.f.read(rlen - 24) 

2638 else: 

2639 s = self.f.read(rlen - 16) 

2640 

2641 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both 

2642 # of the fields are disregarded by Endace. 

2643 pb = s[2:size] 

2644 from scapy.layers.l2 import Ether 

2645 try: 

2646 p = Ether(pb, **kwargs) # type: Packet 

2647 except KeyboardInterrupt: 

2648 raise 

2649 except Exception: 

2650 if conf.debug_dissector: 

2651 from scapy.sendrecv import debug 

2652 debug.crashed_on = (Ether, s) 

2653 raise 

2654 if conf.raw_layer is None: 

2655 # conf.raw_layer is set on import 

2656 import scapy.packet # noqa: F401 

2657 p = conf.raw_layer(s) 

2658 

2659 p.time = self._convert_erf_timestamp(time) 

2660 p.wirelen = wlen 

2661 

2662 return p 

2663 

2664 

2665@conf.commands.register 

2666def wrerf(filename, # type: Union[IO[bytes], str] 

2667 pkt, # type: _PacketIterable 

2668 *args, # type: Any 

2669 **kargs # type: Any 

2670 ): 

2671 # type: (...) -> None 

2672 """Write a list of packets to a ERF file 

2673 

2674 :param filename: the name of the file to write packets to, or an open, 

2675 writable file-like object. The file descriptor will be 

2676 closed at the end of the call, so do not use an object you 

2677 do not want to close (e.g., running wrerf(sys.stdout, []) 

2678 in interactive mode will crash Scapy). 

2679 :param gz: set to 1 to save a gzipped capture 

2680 :param append: append packets to the capture file instead of 

2681 truncating it 

2682 :param sync: do not bufferize writes to the capture file 

2683 """ 

2684 with ERFEthernetWriter(filename, *args, **kargs) as fdesc: 

2685 fdesc.write(pkt) 

2686 

2687 

2688class ERFEthernetWriter(PcapWriter): 

2689 """A stream ERF Ethernet writer with more control than wrerf()""" 

2690 

2691 def __init__(self, 

2692 filename, # type: Union[IO[bytes], str] 

2693 gz=False, # type: bool 

2694 append=False, # type: bool 

2695 sync=False, # type: bool 

2696 ): 

2697 # type: (...) -> None 

2698 """ 

2699 :param filename: the name of the file to write packets to, or an open, 

2700 writable file-like object. 

2701 :param gz: compress the capture on the fly 

2702 :param append: append packets to the capture file instead of 

2703 truncating it 

2704 :param sync: do not bufferize writes to the capture file 

2705 """ 

2706 super(ERFEthernetWriter, self).__init__(filename, 

2707 gz=gz, 

2708 append=append, 

2709 sync=sync) 

2710 

2711 def write(self, pkt): # type: ignore 

2712 # type: (_PacketIterable) -> None 

2713 """ 

2714 Writes a Packet, a SndRcvList object, or bytes to a ERF file. 

2715 

2716 :param pkt: Packet(s) to write (one record for each Packet) 

2717 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet 

2718 """ 

2719 # Import here to avoid circular dependency 

2720 from scapy.supersocket import IterSocket 

2721 for p in IterSocket(pkt).iter: 

2722 self.write_packet(p) 

2723 

2724 def write_packet(self, pkt): # type: ignore 

2725 # type: (Packet) -> None 

2726 

2727 if hasattr(pkt, "time"): 

2728 sec = int(pkt.time) 

2729 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9) 

2730 t = (sec << 32) + usec 

2731 else: 

2732 t = int(time.time()) << 32 

2733 

2734 # There are 16 bytes of headers + 2 bytes of padding before the packets 

2735 # payload. 

2736 rlen = len(pkt) + 18 

2737 

2738 if hasattr(pkt, "wirelen"): 

2739 wirelen = pkt.wirelen 

2740 if wirelen is None: 

2741 wirelen = rlen 

2742 

2743 self.f.write(struct.pack("<Q", t)) 

2744 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0)) 

2745 self.f.write(bytes(pkt)) 

2746 self.f.flush() 

2747 

2748 def close(self): 

2749 # type: () -> Optional[Any] 

2750 return self.f.close() 

2751 

2752 

2753@conf.commands.register 

2754def import_hexcap(input_string=None): 

2755 # type: (Optional[str]) -> bytes 

2756 """Imports a tcpdump like hexadecimal view 

2757 

2758 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex" 

2759 

2760 :param input_string: String containing the hexdump input to parse. If None, 

2761 read from standard input. 

2762 """ 

2763 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501 

2764 p = "" 

2765 try: 

2766 if input_string: 

2767 input_function = StringIO(input_string).readline 

2768 else: 

2769 input_function = input 

2770 while True: 

2771 line = input_function().strip() 

2772 if not line: 

2773 break 

2774 try: 

2775 p += re_extract_hexcap.match(line).groups()[2] # type: ignore 

2776 except Exception: 

2777 warning("Parsing error during hexcap") 

2778 continue 

2779 except EOFError: 

2780 pass 

2781 

2782 p = p.replace(" ", "") 

2783 return hex_bytes(p) 

2784 

2785 

2786@conf.commands.register 

2787def wireshark(pktlist, wait=False, **kwargs): 

2788 # type: (List[Packet], bool, **Any) -> Optional[Any] 

2789 """ 

2790 Runs Wireshark on a list of packets. 

2791 

2792 See :func:`tcpdump` for more parameter description. 

2793 

2794 Note: this defaults to wait=False, to run Wireshark in the background. 

2795 """ 

2796 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs) 

2797 

2798 

2799@conf.commands.register 

2800def tdecode( 

2801 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable] 

2802 args=None, # type: Optional[List[str]] 

2803 **kwargs # type: Any 

2804): 

2805 # type: (...) -> Any 

2806 """ 

2807 Run tshark on a list of packets. 

2808 

2809 :param args: If not specified, defaults to ``tshark -V``. 

2810 

2811 See :func:`tcpdump` for more parameters. 

2812 """ 

2813 if args is None: 

2814 args = ["-V"] 

2815 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs) 

2816 

2817 

2818def _guess_linktype_name(value): 

2819 # type: (int) -> str 

2820 """Guess the DLT name from its value.""" 

2821 from scapy.libs.winpcapy import pcap_datalink_val_to_name 

2822 return cast(bytes, pcap_datalink_val_to_name(value)).decode() 

2823 

2824 

2825def _guess_linktype_value(name): 

2826 # type: (str) -> int 

2827 """Guess the value of a DLT name.""" 

2828 from scapy.libs.winpcapy import pcap_datalink_name_to_val 

2829 val = cast(int, pcap_datalink_name_to_val(name.encode())) 

2830 if val == -1: 

2831 warning("Unknown linktype: %s. Using EN10MB", name) 

2832 return DLT_EN10MB 

2833 return val 

2834 

2835 

2836@conf.commands.register 

2837def tcpdump( 

2838 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable] 

2839 dump=False, # type: bool 

2840 getfd=False, # type: bool 

2841 args=None, # type: Optional[List[str]] 

2842 flt=None, # type: Optional[str] 

2843 prog=None, # type: Optional[Any] 

2844 getproc=False, # type: bool 

2845 quiet=False, # type: bool 

2846 use_tempfile=None, # type: Optional[Any] 

2847 read_stdin_opts=None, # type: Optional[Any] 

2848 linktype=None, # type: Optional[Any] 

2849 wait=True, # type: bool 

2850 _suppress=False # type: bool 

2851): 

2852 # type: (...) -> Any 

2853 """Run tcpdump or tshark on a list of packets. 

2854 

2855 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a 

2856 temporary file to store the packets. This works around a bug in Apple's 

2857 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/ 

2858 

2859 Otherwise, the packets are passed in stdin. 

2860 

2861 This function can be explicitly enabled or disabled with the 

2862 ``use_tempfile`` parameter. 

2863 

2864 When using ``wireshark``, it will be called with ``-ki -`` to start 

2865 immediately capturing packets from stdin. 

2866 

2867 Otherwise, the command will be run with ``-r -`` (which is correct for 

2868 ``tcpdump`` and ``tshark``). 

2869 

2870 This can be overridden with ``read_stdin_opts``. This has no effect when 

2871 ``use_tempfile=True``, or otherwise reading packets from a regular file. 

2872 

2873 :param pktlist: a Packet instance, a PacketList instance or a list of 

2874 Packet instances. Can also be a filename (as a string), an open 

2875 file-like object that must be a file format readable by 

2876 tshark (Pcap, PcapNg, etc.) or None (to sniff) 

2877 :param flt: a filter to use with tcpdump 

2878 :param dump: when set to True, returns a string instead of displaying it. 

2879 :param getfd: when set to True, returns a file-like object to read data 

2880 from tcpdump or tshark from. 

2881 :param getproc: when set to True, the subprocess.Popen object is returned 

2882 :param args: arguments (as a list) to pass to tshark (example for tshark: 

2883 args=["-T", "json"]). 

2884 :param prog: program to use (defaults to tcpdump, will work with tshark) 

2885 :param quiet: when set to True, the process stderr is discarded 

2886 :param use_tempfile: When set to True, always use a temporary file to store 

2887 packets. 

2888 When set to False, pipe packets through stdin. 

2889 When set to None (default), only use a temporary file with 

2890 ``tcpdump`` on OSX. 

2891 :param read_stdin_opts: When set, a list of arguments needed to capture 

2892 from stdin. Otherwise, attempts to guess. 

2893 :param linktype: A custom DLT value or name, to overwrite the default 

2894 values. 

2895 :param wait: If True (default), waits for the process to terminate before 

2896 returning to Scapy. If False, the process will be detached to the 

2897 background. If dump, getproc or getfd is True, these have the same 

2898 effect as ``wait=False``. 

2899 

2900 Examples:: 

2901 

2902 >>> tcpdump([IP()/TCP(), IP()/UDP()]) 

2903 reading from file -, link-type RAW (Raw IP) 

2904 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501 

2905 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain] 

2906 

2907 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark) 

2908 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501 

2909 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0 

2910 

2911 To get a JSON representation of a tshark-parsed PacketList(), one can:: 

2912 

2913 >>> import json, pprint 

2914 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5", 

2915 ... dst="45.33.32.156"), 

2916 ... prog=conf.prog.tshark, 

2917 ... args=["-T", "json"], 

2918 ... getfd=True)) 

2919 >>> pprint.pprint(json_data) 

2920 [{u'_index': u'packets-2016-12-23', 

2921 u'_score': None, 

2922 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20', 

2923 u'frame.encap_type': u'7', 

2924 [...] 

2925 }, 

2926 u'ip': {u'ip.addr': u'45.33.32.156', 

2927 u'ip.checksum': u'0x0000a20d', 

2928 [...] 

2929 u'ip.ttl': u'64', 

2930 u'ip.version': u'4'}, 

2931 u'raw': u'Raw packet data'}}, 

2932 u'_type': u'pcap_file'}] 

2933 >>> json_data[0]['_source']['layers']['ip']['ip.ttl'] 

2934 u'64' 

2935 """ 

2936 getfd = getfd or getproc 

2937 if prog is None: 

2938 if not conf.prog.tcpdump: 

2939 raise Scapy_Exception( 

2940 "tcpdump is not available" 

2941 ) 

2942 prog = [conf.prog.tcpdump] 

2943 elif isinstance(prog, str): 

2944 prog = [prog] 

2945 else: 

2946 raise ValueError("prog must be a string") 

2947 

2948 if linktype is not None: 

2949 if isinstance(linktype, int): 

2950 # Guess name from value 

2951 try: 

2952 linktype_name = _guess_linktype_name(linktype) 

2953 except StopIteration: 

2954 linktype = -1 

2955 else: 

2956 # Guess value from name 

2957 if linktype.startswith("DLT_"): 

2958 linktype = linktype[4:] 

2959 linktype_name = linktype 

2960 try: 

2961 linktype = _guess_linktype_value(linktype) 

2962 except KeyError: 

2963 linktype = -1 

2964 if linktype == -1: 

2965 raise ValueError( 

2966 "Unknown linktype. Try passing its datalink name instead" 

2967 ) 

2968 prog += ["-y", linktype_name] 

2969 

2970 # Build Popen arguments 

2971 if args is None: 

2972 args = [] 

2973 else: 

2974 # Make a copy of args 

2975 args = list(args) 

2976 

2977 if flt is not None: 

2978 # Check the validity of the filter 

2979 if linktype is None and isinstance(pktlist, str): 

2980 # linktype is unknown but required. Read it from file 

2981 with PcapReader(pktlist) as rd: 

2982 if isinstance(rd, PcapNgReader): 

2983 # Get the linktype from the first packet 

2984 try: 

2985 _, metadata = rd._read_packet() 

2986 linktype = metadata.linktype 

2987 if OPENBSD and linktype == 228: 

2988 linktype = DLT_RAW 

2989 except EOFError: 

2990 raise ValueError( 

2991 "Cannot get linktype from a PcapNg packet." 

2992 ) 

2993 else: 

2994 linktype = rd.linktype 

2995 from scapy.arch.common import compile_filter 

2996 compile_filter(flt, linktype=linktype) 

2997 args.append(flt) 

2998 

2999 stdout = subprocess.PIPE if dump or getfd else None 

3000 stderr = open(os.devnull) if quiet else None 

3001 proc = None 

3002 

3003 if use_tempfile is None: 

3004 # Apple's tcpdump cannot read from stdin, see: 

3005 # http://apple.stackexchange.com/questions/152682/ 

3006 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump 

3007 

3008 if read_stdin_opts is None: 

3009 if prog[0] == conf.prog.wireshark: 

3010 # Start capturing immediately (-k) from stdin (-i -) 

3011 read_stdin_opts = ["-ki", "-"] 

3012 elif prog[0] == conf.prog.tcpdump and not OPENBSD: 

3013 # Capture in packet-buffered mode (-U) from stdin (-r -) 

3014 read_stdin_opts = ["-U", "-r", "-"] 

3015 else: 

3016 read_stdin_opts = ["-r", "-"] 

3017 else: 

3018 # Make a copy of read_stdin_opts 

3019 read_stdin_opts = list(read_stdin_opts) 

3020 

3021 if pktlist is None: 

3022 # sniff 

3023 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3024 proc = subprocess.Popen( 

3025 prog + args, 

3026 stdout=stdout, 

3027 stderr=stderr, 

3028 ) 

3029 elif isinstance(pktlist, str): 

3030 # file 

3031 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3032 proc = subprocess.Popen( 

3033 prog + ["-r", pktlist] + args, 

3034 stdout=stdout, 

3035 stderr=stderr, 

3036 ) 

3037 elif use_tempfile: 

3038 tmpfile = get_temp_file( # type: ignore 

3039 autoext=".pcap", 

3040 fd=True 

3041 ) # type: IO[bytes] 

3042 try: 

3043 tmpfile.writelines( 

3044 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3045 ) 

3046 except AttributeError: 

3047 pktlist = cast("_PacketIterable", pktlist) 

3048 wrpcap(tmpfile, pktlist, linktype=linktype) 

3049 else: 

3050 tmpfile.close() 

3051 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3052 proc = subprocess.Popen( 

3053 prog + ["-r", tmpfile.name] + args, 

3054 stdout=stdout, 

3055 stderr=stderr, 

3056 ) 

3057 else: 

3058 try: 

3059 pktlist.fileno() # type: ignore 

3060 # pass the packet stream 

3061 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3062 proc = subprocess.Popen( 

3063 prog + read_stdin_opts + args, 

3064 stdin=pktlist, # type: ignore 

3065 stdout=stdout, 

3066 stderr=stderr, 

3067 ) 

3068 except (AttributeError, ValueError): 

3069 # write the packet stream to stdin 

3070 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3071 proc = subprocess.Popen( 

3072 prog + read_stdin_opts + args, 

3073 stdin=subprocess.PIPE, 

3074 stdout=stdout, 

3075 stderr=stderr, 

3076 ) 

3077 if proc is None: 

3078 # An error has occurred 

3079 return 

3080 try: 

3081 proc.stdin.writelines( # type: ignore 

3082 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3083 ) 

3084 except AttributeError: 

3085 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore 

3086 except UnboundLocalError: 

3087 # The error was handled by ContextManagerSubprocess 

3088 pass 

3089 else: 

3090 proc.stdin.close() # type: ignore 

3091 if proc is None: 

3092 # An error has occurred 

3093 return 

3094 if dump: 

3095 data = b"".join( 

3096 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore 

3097 ) 

3098 proc.terminate() 

3099 return data 

3100 if getproc: 

3101 return proc 

3102 if getfd: 

3103 return proc.stdout 

3104 if wait: 

3105 proc.wait() 

3106 

3107 

3108@conf.commands.register 

3109def hexedit(pktlist): 

3110 # type: (_PacketIterable) -> PacketList 

3111 """Run hexedit on a list of packets, then return the edited packets.""" 

3112 f = get_temp_file() 

3113 wrpcap(f, pktlist) 

3114 with ContextManagerSubprocess(conf.prog.hexedit): 

3115 subprocess.call([conf.prog.hexedit, f]) 

3116 rpktlist = rdpcap(f) 

3117 os.unlink(f) 

3118 return rpktlist 

3119 

3120 

3121def get_terminal_width(): 

3122 # type: () -> Optional[int] 

3123 """Get terminal width (number of characters) if in a window. 

3124 

3125 Notice: this will try several methods in order to 

3126 support as many terminals and OS as possible. 

3127 """ 

3128 sizex = shutil.get_terminal_size(fallback=(0, 0))[0] 

3129 if sizex != 0: 

3130 return sizex 

3131 # Backups 

3132 if WINDOWS: 

3133 from ctypes import windll, create_string_buffer 

3134 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ 

3135 h = windll.kernel32.GetStdHandle(-12) 

3136 csbi = create_string_buffer(22) 

3137 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 

3138 if res: 

3139 (bufx, bufy, curx, cury, wattr, 

3140 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501 

3141 sizex = right - left + 1 

3142 # sizey = bottom - top + 1 

3143 return sizex 

3144 return sizex 

3145 # We have various methods 

3146 # COLUMNS is set on some terminals 

3147 try: 

3148 sizex = int(os.environ['COLUMNS']) 

3149 except Exception: 

3150 pass 

3151 if sizex: 

3152 return sizex 

3153 # We can query TIOCGWINSZ 

3154 try: 

3155 import fcntl 

3156 import termios 

3157 s = struct.pack('HHHH', 0, 0, 0, 0) 

3158 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) 

3159 sizex = struct.unpack('HHHH', x)[1] 

3160 except (IOError, ModuleNotFoundError): 

3161 # If everything failed, return default terminal size 

3162 sizex = 79 

3163 return sizex 

3164 

3165 

3166def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]] 

3167 header, # type: List[Tuple[str, ...]] 

3168 sortBy=0, # type: Optional[int] 

3169 borders=False, # type: bool 

3170 ): 

3171 # type: (...) -> str 

3172 """ 

3173 Pretty list to fit the terminal, and add header. 

3174 

3175 :param rtlst: a list of tuples. each tuple contains a value which can 

3176 be either a string or a list of string. 

3177 :param sortBy: the column id (starting with 0) which will be used for 

3178 ordering 

3179 :param borders: whether to put borders on the table or not 

3180 """ 

3181 if borders: 

3182 _space = "|" 

3183 else: 

3184 _space = " " 

3185 cols = len(header[0]) 

3186 # Windows has a fat terminal border 

3187 _spacelen = len(_space) * (cols - 1) + int(WINDOWS) 

3188 _croped = False 

3189 if sortBy is not None: 

3190 # Sort correctly 

3191 rtlst.sort(key=lambda x: x[sortBy]) 

3192 # Resolve multi-values 

3193 for i, line in enumerate(rtlst): 

3194 ids = [] # type: List[int] 

3195 values = [] # type: List[Union[str, List[str]]] 

3196 for j, val in enumerate(line): 

3197 if isinstance(val, list): 

3198 ids.append(j) 

3199 values.append(val or " ") 

3200 if values: 

3201 del rtlst[i] 

3202 k = 0 

3203 for ex_vals in zip_longest(*values, fillvalue=" "): 

3204 if k: 

3205 extra_line = [" "] * cols 

3206 else: 

3207 extra_line = list(line) # type: ignore 

3208 for j, h in enumerate(ids): 

3209 extra_line[h] = ex_vals[j] 

3210 rtlst.insert(i + k, tuple(extra_line)) 

3211 k += 1 

3212 rtslst = cast(List[Tuple[str, ...]], rtlst) 

3213 # Append tag 

3214 rtslst = header + rtslst 

3215 # Detect column's width 

3216 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)] 

3217 # Make text fit in box (if required) 

3218 width = get_terminal_width() 

3219 if conf.auto_crop_tables and width: 

3220 width = width - _spacelen 

3221 while sum(colwidth) > width: 

3222 _croped = True 

3223 # Needs to be cropped 

3224 # Get the longest row 

3225 i = colwidth.index(max(colwidth)) 

3226 # Get all elements of this row 

3227 row = [len(x[i]) for x in rtslst] 

3228 # Get biggest element of this row: biggest of the array 

3229 j = row.index(max(row)) 

3230 # Re-build column tuple with the edited element 

3231 t = list(rtslst[j]) 

3232 t[i] = t[i][:-2] + "_" 

3233 rtslst[j] = tuple(t) 

3234 # Update max size 

3235 row[j] = len(t[i]) 

3236 colwidth[i] = max(row) 

3237 if _croped: 

3238 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501 

3239 # Generate padding scheme 

3240 fmt = _space.join(["%%-%ds" % x for x in colwidth]) 

3241 # Append separation line if needed 

3242 if borders: 

3243 rtslst.insert(1, tuple("-" * x for x in colwidth)) 

3244 # Compile 

3245 return "\n".join(fmt % x for x in rtslst) 

3246 

3247 

3248def human_size(x, fmt=".1f"): 

3249 # type: (int, str) -> str 

3250 """ 

3251 Convert a size in octets to a human string representation 

3252 """ 

3253 units = ['K', 'M', 'G', 'T', 'P', 'E'] 

3254 if not x: 

3255 return "0B" 

3256 i = int(math.log(x, 2**10)) 

3257 if i and i < len(units): 

3258 return format(x / 2**(10 * i), fmt) + units[i - 1] 

3259 return str(x) + "B" 

3260 

3261 

3262def __make_table( 

3263 yfmtfunc, # type: Callable[[int], str] 

3264 fmtfunc, # type: Callable[[int], str] 

3265 endline, # type: str 

3266 data, # type: List[Tuple[Packet, Packet]] 

3267 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]] 

3268 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3269 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3270 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]] 

3271 dump=False # type: bool 

3272): 

3273 # type: (...) -> Optional[str] 

3274 """Core function of the make_table suite, which generates the table""" 

3275 vx = {} # type: Dict[str, int] 

3276 vy = {} # type: Dict[str, Optional[int]] 

3277 vz = {} # type: Dict[Tuple[str, str], str] 

3278 vxf = {} # type: Dict[str, str] 

3279 

3280 tmp_len = 0 

3281 for e in data: 

3282 xx, yy, zz = [str(s) for s in fxyz(*e)] 

3283 tmp_len = max(len(yy), tmp_len) 

3284 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz)) 

3285 vy[yy] = None 

3286 vz[(xx, yy)] = zz 

3287 

3288 vxk = list(vx) 

3289 vyk = list(vy) 

3290 if sortx: 

3291 vxk.sort(key=sortx) 

3292 else: 

3293 try: 

3294 vxk.sort(key=int) 

3295 except Exception: 

3296 try: 

3297 vxk.sort(key=atol) 

3298 except Exception: 

3299 vxk.sort() 

3300 if sorty: 

3301 vyk.sort(key=sorty) 

3302 else: 

3303 try: 

3304 vyk.sort(key=int) 

3305 except Exception: 

3306 try: 

3307 vyk.sort(key=atol) 

3308 except Exception: 

3309 vyk.sort() 

3310 

3311 s = "" 

3312 if seplinefunc: 

3313 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk]) 

3314 s += sepline + "\n" 

3315 

3316 fmt = yfmtfunc(tmp_len) 

3317 s += fmt % "" 

3318 s += ' ' 

3319 for x in vxk: 

3320 vxf[x] = fmtfunc(vx[x]) 

3321 s += vxf[x] % x 

3322 s += ' ' 

3323 s += endline + "\n" 

3324 if seplinefunc: 

3325 s += sepline + "\n" 

3326 for y in vyk: 

3327 s += fmt % y 

3328 s += ' ' 

3329 for x in vxk: 

3330 s += vxf[x] % vz.get((x, y), "-") 

3331 s += ' ' 

3332 s += endline + "\n" 

3333 if seplinefunc: 

3334 s += sepline + "\n" 

3335 

3336 if dump: 

3337 return s 

3338 else: 

3339 print(s, end="") 

3340 return None 

3341 

3342 

3343def make_table(*args, **kargs): 

3344 # type: (*Any, **Any) -> Optional[Any] 

3345 return __make_table( 

3346 lambda l: "%%-%is" % l, 

3347 lambda l: "%%-%is" % l, 

3348 "", 

3349 *args, 

3350 **kargs 

3351 ) 

3352 

3353 

3354def make_lined_table(*args, **kargs): 

3355 # type: (*Any, **Any) -> Optional[str] 

3356 return __make_table( # type: ignore 

3357 lambda l: "%%-%is |" % l, 

3358 lambda l: "%%-%is |" % l, 

3359 "", 

3360 *args, 

3361 seplinefunc=lambda a, x: "+".join( 

3362 '-' * (y + 2) for y in [a - 1] + x + [-2] 

3363 ), 

3364 **kargs 

3365 ) 

3366 

3367 

3368def make_tex_table(*args, **kargs): 

3369 # type: (*Any, **Any) -> Optional[str] 

3370 return __make_table( # type: ignore 

3371 lambda l: "%s", 

3372 lambda l: "& %s", 

3373 "\\\\", 

3374 *args, 

3375 seplinefunc=lambda a, x: "\\hline", 

3376 **kargs 

3377 ) 

3378 

3379#################### 

3380# WHOIS CLIENT # 

3381#################### 

3382 

3383 

3384def whois(ip_address): 

3385 # type: (str) -> bytes 

3386 """Whois client for Python""" 

3387 whois_ip = str(ip_address) 

3388 try: 

3389 query = socket.gethostbyname(whois_ip) 

3390 except Exception: 

3391 query = whois_ip 

3392 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

3393 s.connect(("whois.ripe.net", 43)) 

3394 s.send(query.encode("utf8") + b"\r\n") 

3395 answer = b"" 

3396 while True: 

3397 d = s.recv(4096) 

3398 answer += d 

3399 if not d: 

3400 break 

3401 s.close() 

3402 ignore_tag = b"remarks:" 

3403 # ignore all lines starting with the ignore_tag 

3404 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501 

3405 # remove empty lines at the bottom 

3406 for i in range(1, len(lines)): 

3407 if not lines[-i].strip(): 

3408 del lines[-i] 

3409 else: 

3410 break 

3411 return b"\n".join(lines[3:]) 

3412 

3413#################### 

3414# CLI utils # 

3415#################### 

3416 

3417 

3418class CLIUtil: 

3419 """ 

3420 Provides a Util class to easily create simple CLI tools in Scapy, 

3421 that can still be used as an API. 

3422 

3423 Doc: 

3424 - override the ps1() function 

3425 - register commands with the @CLIUtil.addcomment decorator 

3426 - call the loop() function when ready 

3427 """ 

3428 

3429 def _depcheck(self) -> None: 

3430 """ 

3431 Check that all dependencies are installed 

3432 """ 

3433 try: 

3434 import prompt_toolkit # noqa: F401 

3435 except ImportError: 

3436 # okay we lie but prompt_toolkit is a dependency... 

3437 raise ImportError("You need to have IPython installed to use the CLI") 

3438 

3439 # Okay let's do nice code 

3440 commands: Dict[str, Callable[..., Any]] = {} 

3441 # print output of command 

3442 commands_output: Dict[str, Callable[..., str]] = {} 

3443 # provides completion to command 

3444 commands_complete: Dict[str, Callable[..., List[str]]] = {} 

3445 

3446 @staticmethod 

3447 def _inspectkwargs(func: DecoratorCallable) -> None: 

3448 """ 

3449 Internal function to parse arguments from the kwargs of the functions 

3450 """ 

3451 func._flagnames = [ # type: ignore 

3452 x.name for x in 

3453 inspect.signature(func).parameters.values() 

3454 if x.kind == inspect.Parameter.KEYWORD_ONLY 

3455 ] 

3456 func._flags = [ # type: ignore 

3457 ("-%s" % x) if len(x) == 1 else ("--%s" % x) 

3458 for x in func._flagnames # type: ignore 

3459 ] 

3460 

3461 @staticmethod 

3462 def _parsekwargs( 

3463 func: DecoratorCallable, 

3464 args: List[str] 

3465 ) -> Tuple[List[str], Dict[str, Literal[True]]]: 

3466 """ 

3467 Internal function to parse CLI arguments of a function. 

3468 """ 

3469 kwargs: Dict[str, Literal[True]] = {} 

3470 if func._flags: # type: ignore 

3471 i = 0 

3472 for arg in args: 

3473 if arg in func._flags: # type: ignore 

3474 i += 1 

3475 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501 

3476 continue 

3477 break 

3478 args = args[i:] 

3479 return args, kwargs 

3480 

3481 @classmethod 

3482 def _parseallargs( 

3483 cls, 

3484 func: DecoratorCallable, 

3485 cmd: str, args: List[str] 

3486 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]: 

3487 """ 

3488 Internal function to parse CLI arguments of both the function 

3489 and its output function. 

3490 """ 

3491 args, kwargs = cls._parsekwargs(func, args) 

3492 outkwargs: Dict[str, Literal[True]] = {} 

3493 if cmd in cls.commands_output: 

3494 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args) 

3495 return args, kwargs, outkwargs 

3496 

3497 @classmethod 

3498 def addcommand( 

3499 cls, 

3500 spaces: bool = False, 

3501 globsupport: bool = False, 

3502 ) -> Callable[[DecoratorCallable], DecoratorCallable]: 

3503 """ 

3504 Decorator to register a command 

3505 """ 

3506 def func(cmd: DecoratorCallable) -> DecoratorCallable: 

3507 cls.commands[cmd.__name__] = cmd 

3508 cmd._spaces = spaces # type: ignore 

3509 cmd._globsupport = globsupport # type: ignore 

3510 cls._inspectkwargs(cmd) 

3511 if cmd._globsupport and not cmd._spaces: # type: ignore 

3512 raise ValueError("Cannot use globsupport without spaces.") 

3513 return cmd 

3514 return func 

3515 

3516 @classmethod 

3517 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3518 """ 

3519 Decorator to register a command output processor 

3520 """ 

3521 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3522 cls.commands_output[cmd.__name__] = processor 

3523 cls._inspectkwargs(processor) 

3524 return processor 

3525 return func 

3526 

3527 @classmethod 

3528 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3529 """ 

3530 Decorator to register a command completor 

3531 """ 

3532 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3533 cls.commands_complete[cmd.__name__] = processor 

3534 return processor 

3535 return func 

3536 

3537 def ps1(self) -> str: 

3538 """ 

3539 Return the PS1 of the shell 

3540 """ 

3541 return "> " 

3542 

3543 def close(self) -> None: 

3544 """ 

3545 Function called on exiting 

3546 """ 

3547 print("Exited") 

3548 

3549 def help(self, cmd: Optional[str] = None) -> None: 

3550 """ 

3551 Return the help related to this CLI util 

3552 """ 

3553 def _args(func: Any) -> str: 

3554 flags = func._flags.copy() 

3555 if func.__name__ in self.commands_output: 

3556 flags += self.commands_output[func.__name__]._flags # type: ignore 

3557 return " %s%s" % ( 

3558 ( 

3559 "%s " % " ".join("[%s]" % x for x in flags) 

3560 if flags else "" 

3561 ), 

3562 " ".join( 

3563 "<%s%s>" % ( 

3564 x.name, 

3565 "?" if 

3566 (x.default is None or x.default != inspect.Parameter.empty) 

3567 else "" 

3568 ) 

3569 for x in list(inspect.signature(func).parameters.values())[1:] 

3570 if x.name not in func._flagnames and x.name[0] != "_" 

3571 ) 

3572 ) 

3573 

3574 if cmd: 

3575 if cmd not in self.commands: 

3576 print("Unknown command '%s'" % cmd) 

3577 return 

3578 # help for one command 

3579 func = self.commands[cmd] 

3580 print("%s%s: %s" % ( 

3581 cmd, 

3582 _args(func), 

3583 func.__doc__ and func.__doc__.strip() 

3584 )) 

3585 else: 

3586 header = "│ %s - Help │" % self.__class__.__name__ 

3587 print("┌" + "─" * (len(header) - 2) + "┐") 

3588 print(header) 

3589 print("└" + "─" * (len(header) - 2) + "┘") 

3590 print( 

3591 pretty_list( 

3592 [ 

3593 ( 

3594 cmd, 

3595 _args(func), 

3596 func.__doc__ and func.__doc__.strip().split("\n")[0] or "" 

3597 ) 

3598 for cmd, func in self.commands.items() 

3599 ], 

3600 [("Command", "Arguments", "Description")] 

3601 ) 

3602 ) 

3603 

3604 def _completer(self) -> 'prompt_toolkit.completion.Completer': 

3605 """ 

3606 Returns a prompt_toolkit custom completer 

3607 """ 

3608 from prompt_toolkit.completion import Completer, Completion 

3609 

3610 class CLICompleter(Completer): 

3611 def get_completions(cmpl, document, complete_event): # type: ignore 

3612 if not complete_event.completion_requested: 

3613 # Only activate when the user does <TAB> 

3614 return 

3615 parts = document.text.split(" ") 

3616 cmd = parts[0].lower() 

3617 if cmd not in self.commands: 

3618 # We are trying to complete the command 

3619 for possible_cmd in (x for x in self.commands if x.startswith(cmd)): 

3620 yield Completion(possible_cmd, start_position=-len(cmd)) 

3621 else: 

3622 # We are trying to complete the command content 

3623 if len(parts) == 1: 

3624 return 

3625 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:]) 

3626 arg = " ".join(args) 

3627 if cmd in self.commands_complete: 

3628 for possible_arg in self.commands_complete[cmd](self, arg): 

3629 yield Completion(possible_arg, start_position=-len(arg)) 

3630 return 

3631 return CLICompleter() 

3632 

3633 def loop(self, debug: int = 0) -> None: 

3634 """ 

3635 Main command handling loop 

3636 """ 

3637 from prompt_toolkit import PromptSession 

3638 session = PromptSession(completer=self._completer()) 

3639 

3640 while True: 

3641 try: 

3642 cmd = session.prompt(self.ps1()).strip() 

3643 except KeyboardInterrupt: 

3644 continue 

3645 except EOFError: 

3646 self.close() 

3647 break 

3648 args = cmd.split(" ")[1:] 

3649 cmd = cmd.split(" ")[0].strip().lower() 

3650 if not cmd: 

3651 continue 

3652 if cmd in ["help", "h", "?"]: 

3653 self.help(" ".join(args)) 

3654 continue 

3655 if cmd in "exit": 

3656 break 

3657 if cmd not in self.commands: 

3658 print("Unknown command. Type help or ?") 

3659 else: 

3660 # check the number of arguments 

3661 func = self.commands[cmd] 

3662 args, kwargs, outkwargs = self._parseallargs(func, cmd, args) 

3663 if func._spaces: # type: ignore 

3664 args = [" ".join(args)] 

3665 # if globsupport is set, we might need to do several calls 

3666 if func._globsupport and "*" in args[0]: # type: ignore 

3667 if args[0].count("*") > 1: 

3668 print("More than 1 glob star (*) is currently unsupported.") 

3669 continue 

3670 before, after = args[0].split("*", 1) 

3671 reg = re.compile(re.escape(before) + r".*" + after) 

3672 calls = [ 

3673 [x] for x in 

3674 self.commands_complete[cmd](self, before) 

3675 if reg.match(x) 

3676 ] 

3677 else: 

3678 calls = [args] 

3679 else: 

3680 calls = [args] 

3681 # now iterate if required, call the function and print its output 

3682 res = None 

3683 for args in calls: 

3684 try: 

3685 res = func(self, *args, **kwargs) 

3686 except TypeError: 

3687 print("Bad number of arguments !") 

3688 self.help(cmd=cmd) 

3689 continue 

3690 except Exception as ex: 

3691 print("Command failed with error: %s" % ex) 

3692 if debug: 

3693 traceback.print_exception(ex) 

3694 try: 

3695 if res and cmd in self.commands_output: 

3696 self.commands_output[cmd](self, res, **outkwargs) 

3697 except Exception as ex: 

3698 print("Output processor failed with error: %s" % ex) 

3699 

3700 

3701def AutoArgparse(func: DecoratorCallable) -> None: 

3702 """ 

3703 Generate an Argparse call from a function, then call this function. 

3704 

3705 Notes: 

3706 

3707 - for the arguments to have a description, the sphinx docstring format 

3708 must be used. See 

3709 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html 

3710 - the arguments must be typed in Python (we ignore Sphinx-specific types) 

3711 untyped arguments are ignored. 

3712 - only types that would be supported by argparse are supported. The others 

3713 are omitted. 

3714 """ 

3715 argsdoc = {} 

3716 if func.__doc__: 

3717 # Sphinx doc format parser 

3718 m = re.match( 

3719 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)", 

3720 func.__doc__.strip(), 

3721 ) 

3722 if not m: 

3723 desc = func.__doc__.strip() 

3724 else: 

3725 desc = m.group(1) 

3726 sphinxargs = re.findall( 

3727 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)", 

3728 m.group(2), 

3729 ) 

3730 for argtype, argparam, argdesc in sphinxargs: 

3731 argparam = argparam.strip() 

3732 argdesc = argdesc.strip() 

3733 if argtype == "param": 

3734 if not argparam: 

3735 raise ValueError(":param: without a name !") 

3736 argsdoc[argparam] = argdesc 

3737 else: 

3738 desc = "" 

3739 # Now build the argparse.ArgumentParser 

3740 parser = argparse.ArgumentParser( 

3741 prog=func.__name__, 

3742 description=desc, 

3743 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

3744 ) 

3745 # Process the parameters 

3746 positional = [] 

3747 for param in inspect.signature(func).parameters.values(): 

3748 if not param.annotation: 

3749 continue 

3750 parname = param.name 

3751 paramkwargs = {} 

3752 if param.annotation is bool: 

3753 if param.default is True: 

3754 parname = "no-" + parname 

3755 paramkwargs["action"] = "store_false" 

3756 else: 

3757 paramkwargs["action"] = "store_true" 

3758 elif param.annotation in [str, int, float]: 

3759 paramkwargs["type"] = param.annotation 

3760 else: 

3761 continue 

3762 if param.default != inspect.Parameter.empty: 

3763 if param.kind == inspect.Parameter.POSITIONAL_ONLY: 

3764 positional.append(param.name) 

3765 paramkwargs["nargs"] = '?' 

3766 else: 

3767 parname = "--" + parname 

3768 paramkwargs["default"] = param.default 

3769 else: 

3770 positional.append(param.name) 

3771 if param.kind == inspect.Parameter.VAR_POSITIONAL: 

3772 paramkwargs["action"] = "append" 

3773 if param.name in argsdoc: 

3774 paramkwargs["help"] = argsdoc[param.name] 

3775 parser.add_argument(parname, **paramkwargs) # type: ignore 

3776 # Now parse the sys.argv parameters 

3777 params = vars(parser.parse_args()) 

3778 # Act as in interactive mode 

3779 conf.logLevel = 20 

3780 from scapy.themes import DefaultTheme 

3781 conf.color_theme = DefaultTheme() 

3782 # And call the function 

3783 try: 

3784 func( 

3785 *[params.pop(x) for x in positional], 

3786 **{ 

3787 (k[3:] if k.startswith("no_") else k): v 

3788 for k, v in params.items() 

3789 } 

3790 ) 

3791 except AssertionError as ex: 

3792 print("ERROR: " + str(ex)) 

3793 parser.print_help() 

3794 

3795 

3796####################### 

3797# PERIODIC SENDER # 

3798####################### 

3799 

3800 

3801class PeriodicSenderThread(threading.Thread): 

3802 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True): 

3803 # type: (Any, _PacketIterable, float, bool) -> None 

3804 """ Thread to send packets periodically 

3805 

3806 Args: 

3807 sock: socket where packet is sent periodically 

3808 pkt: packet or list of packets to send 

3809 interval: interval between two packets 

3810 """ 

3811 if not isinstance(pkt, list): 

3812 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable 

3813 else: 

3814 self._pkts = pkt 

3815 self._socket = sock 

3816 self._stopped = threading.Event() 

3817 self._enabled = threading.Event() 

3818 self._enabled.set() 

3819 self._interval = interval 

3820 self._ignore_exceptions = ignore_exceptions 

3821 threading.Thread.__init__(self) 

3822 

3823 def enable(self): 

3824 # type: () -> None 

3825 self._enabled.set() 

3826 

3827 def disable(self): 

3828 # type: () -> None 

3829 self._enabled.clear() 

3830 

3831 def run(self): 

3832 # type: () -> None 

3833 while not self._stopped.is_set() and not self._socket.closed: 

3834 for p in self._pkts: 

3835 try: 

3836 if self._enabled.is_set(): 

3837 self._socket.send(p) 

3838 except (OSError, TimeoutError) as e: 

3839 if self._ignore_exceptions: 

3840 return 

3841 else: 

3842 raise e 

3843 self._stopped.wait(timeout=self._interval) 

3844 if self._stopped.is_set() or self._socket.closed: 

3845 break 

3846 

3847 def stop(self): 

3848 # type: () -> None 

3849 self._stopped.set() 

3850 self.join(self._interval * 2) 

3851 

3852 

3853class SingleConversationSocket(object): 

3854 def __init__(self, o): 

3855 # type: (Any) -> None 

3856 self._inner = o 

3857 self._tx_mutex = threading.RLock() 

3858 

3859 @property 

3860 def __dict__(self): # type: ignore 

3861 return self._inner.__dict__ 

3862 

3863 def __getattr__(self, name): 

3864 # type: (str) -> Any 

3865 return getattr(self._inner, name) 

3866 

3867 def sr1(self, *args, **kargs): 

3868 # type: (*Any, **Any) -> Any 

3869 with self._tx_mutex: 

3870 return self._inner.sr1(*args, **kargs) 

3871 

3872 def sr(self, *args, **kargs): 

3873 # type: (*Any, **Any) -> Any 

3874 with self._tx_mutex: 

3875 return self._inner.sr(*args, **kargs) 

3876 

3877 def send(self, x): 

3878 # type: (Packet) -> Any 

3879 with self._tx_mutex: 

3880 try: 

3881 return self._inner.send(x) 

3882 except (ConnectionError, OSError) as e: 

3883 self._inner.close() 

3884 raise e