Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/utils.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1999 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7General utility functions. 

8""" 

9 

10 

11from decimal import Decimal 

12from io import StringIO 

13from itertools import zip_longest 

14from uuid import UUID 

15 

16import argparse 

17import array 

18import base64 

19import collections 

20import decimal 

21import difflib 

22import enum 

23import gzip 

24import inspect 

25import locale 

26import math 

27import os 

28import pickle 

29import random 

30import re 

31import shutil 

32import socket 

33import struct 

34import subprocess 

35import sys 

36import tempfile 

37import threading 

38import time 

39import traceback 

40import warnings 

41 

42from scapy.config import conf 

43from scapy.consts import DARWIN, OPENBSD, WINDOWS 

44from scapy.data import MTU, DLT_EN10MB, DLT_RAW 

45from scapy.compat import ( 

46 orb, 

47 plain_str, 

48 chb, 

49 hex_bytes, 

50 bytes_encode, 

51) 

52from scapy.error import ( 

53 log_interactive, 

54 log_runtime, 

55 Scapy_Exception, 

56 warning, 

57) 

58from scapy.pton_ntop import inet_pton 

59 

60# Typing imports 

61from typing import ( 

62 cast, 

63 Any, 

64 AnyStr, 

65 Callable, 

66 Dict, 

67 IO, 

68 Iterator, 

69 List, 

70 Optional, 

71 TYPE_CHECKING, 

72 Tuple, 

73 Type, 

74 Union, 

75 overload, 

76) 

77from scapy.compat import ( 

78 DecoratorCallable, 

79 Literal, 

80) 

81 

82if TYPE_CHECKING: 

83 from scapy.packet import Packet 

84 from scapy.plist import _PacketIterable, PacketList 

85 from scapy.supersocket import SuperSocket 

86 import prompt_toolkit 

87 

88_ByteStream = Union[IO[bytes], gzip.GzipFile] 

89 

90########### 

91# Tools # 

92########### 

93 

94 

95def issubtype(x, # type: Any 

96 t, # type: Union[type, str] 

97 ): 

98 # type: (...) -> bool 

99 """issubtype(C, B) -> bool 

100 

101 Return whether C is a class and if it is a subclass of class B. 

102 When using a tuple as the second argument issubtype(X, (A, B, ...)), 

103 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.). 

104 """ 

105 if isinstance(t, str): 

106 return t in (z.__name__ for z in x.__bases__) 

107 if isinstance(x, type) and issubclass(x, t): 

108 return True 

109 return False 

110 

111 

112_Decimal = Union[Decimal, int] 

113 

114 

115class EDecimal(Decimal): 

116 """Extended Decimal 

117 

118 This implements arithmetic and comparison with float for 

119 backward compatibility 

120 """ 

121 

122 def __add__(self, other, context=None): 

123 # type: (_Decimal, Any) -> EDecimal 

124 return EDecimal(Decimal.__add__(self, Decimal(other))) 

125 

126 def __radd__(self, other): 

127 # type: (_Decimal) -> EDecimal 

128 return EDecimal(Decimal.__add__(self, Decimal(other))) 

129 

130 def __sub__(self, other): 

131 # type: (_Decimal) -> EDecimal 

132 return EDecimal(Decimal.__sub__(self, Decimal(other))) 

133 

134 def __rsub__(self, other): 

135 # type: (_Decimal) -> EDecimal 

136 return EDecimal(Decimal.__rsub__(self, Decimal(other))) 

137 

138 def __mul__(self, other): 

139 # type: (_Decimal) -> EDecimal 

140 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

141 

142 def __rmul__(self, other): 

143 # type: (_Decimal) -> EDecimal 

144 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

145 

146 def __truediv__(self, other): 

147 # type: (_Decimal) -> EDecimal 

148 return EDecimal(Decimal.__truediv__(self, Decimal(other))) 

149 

150 def __floordiv__(self, other): 

151 # type: (_Decimal) -> EDecimal 

152 return EDecimal(Decimal.__floordiv__(self, Decimal(other))) 

153 

154 def __divmod__(self, other): 

155 # type: (_Decimal) -> Tuple[EDecimal, EDecimal] 

156 r = Decimal.__divmod__(self, Decimal(other)) 

157 return EDecimal(r[0]), EDecimal(r[1]) 

158 

159 def __mod__(self, other): 

160 # type: (_Decimal) -> EDecimal 

161 return EDecimal(Decimal.__mod__(self, Decimal(other))) 

162 

163 def __rmod__(self, other): 

164 # type: (_Decimal) -> EDecimal 

165 return EDecimal(Decimal.__rmod__(self, Decimal(other))) 

166 

167 def __pow__(self, other, modulo=None): 

168 # type: (_Decimal, Optional[_Decimal]) -> EDecimal 

169 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo)) 

170 

171 def __eq__(self, other): 

172 # type: (Any) -> bool 

173 if isinstance(other, Decimal): 

174 return super(EDecimal, self).__eq__(other) 

175 else: 

176 return bool(float(self) == other) 

177 

178 def normalize(self, precision): # type: ignore 

179 # type: (int) -> EDecimal 

180 with decimal.localcontext() as ctx: 

181 ctx.prec = precision 

182 return EDecimal(super(EDecimal, self).normalize(ctx)) 

183 

184 

185@overload 

186def get_temp_file(keep, autoext, fd): 

187 # type: (bool, str, Literal[True]) -> IO[bytes] 

188 pass 

189 

190 

191@overload 

192def get_temp_file(keep=False, autoext="", fd=False): 

193 # type: (bool, str, Literal[False]) -> str 

194 pass 

195 

196 

197def get_temp_file(keep=False, autoext="", fd=False): 

198 # type: (bool, str, bool) -> Union[IO[bytes], str] 

199 """Creates a temporary file. 

200 

201 :param keep: If False, automatically delete the file when Scapy exits. 

202 :param autoext: Suffix to add to the generated file name. 

203 :param fd: If True, this returns a file-like object with the temporary 

204 file opened. If False (default), this returns a file path. 

205 """ 

206 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext, 

207 delete=False) 

208 if not keep: 

209 conf.temp_files.append(f.name) 

210 

211 if fd: 

212 return f 

213 else: 

214 # Close the file so something else can take it. 

215 f.close() 

216 return f.name 

217 

218 

219def get_temp_dir(keep=False): 

220 # type: (bool) -> str 

221 """Creates a temporary file, and returns its name. 

222 

223 :param keep: If False (default), the directory will be recursively 

224 deleted when Scapy exits. 

225 :return: A full path to a temporary directory. 

226 """ 

227 

228 dname = tempfile.mkdtemp(prefix="scapy") 

229 

230 if not keep: 

231 conf.temp_files.append(dname) 

232 

233 return dname 

234 

235 

236def _create_fifo() -> Tuple[str, Any]: 

237 """Creates a temporary fifo. 

238 

239 You must then use open_fifo() on the server_fd once 

240 the client is connected to use it. 

241 

242 :returns: (client_file, server_fd) 

243 """ 

244 if WINDOWS: 

245 from scapy.arch.windows.structures import _get_win_fifo 

246 return _get_win_fifo() 

247 else: 

248 f = get_temp_file() 

249 os.unlink(f) 

250 os.mkfifo(f) 

251 return f, f 

252 

253 

254def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]: 

255 """Open the server_fd (see create_fifo) 

256 """ 

257 if WINDOWS: 

258 from scapy.arch.windows.structures import _win_fifo_open 

259 return _win_fifo_open(fd) 

260 else: 

261 return open(fd, mode) 

262 

263 

264def sane(x, color=False): 

265 # type: (AnyStr, bool) -> str 

266 r = "" 

267 for i in x: 

268 j = orb(i) 

269 if (j < 32) or (j >= 127): 

270 if color: 

271 r += conf.color_theme.not_printable(".") 

272 else: 

273 r += "." 

274 else: 

275 r += chr(j) 

276 return r 

277 

278 

279@conf.commands.register 

280def restart(): 

281 # type: () -> None 

282 """Restarts scapy""" 

283 if not conf.interactive or not os.path.isfile(sys.argv[0]): 

284 raise OSError("Scapy was not started from console") 

285 if WINDOWS: 

286 res_code = 1 

287 try: 

288 res_code = subprocess.call([sys.executable] + sys.argv) 

289 finally: 

290 os._exit(res_code) 

291 os.execv(sys.executable, [sys.executable] + sys.argv) 

292 

293 

294def lhex(x): 

295 # type: (Any) -> str 

296 from scapy.volatile import VolatileValue 

297 if isinstance(x, VolatileValue): 

298 return repr(x) 

299 if isinstance(x, int): 

300 return hex(x) 

301 if isinstance(x, tuple): 

302 return "(%s)" % ", ".join(lhex(v) for v in x) 

303 if isinstance(x, list): 

304 return "[%s]" % ", ".join(lhex(v) for v in x) 

305 return str(x) 

306 

307 

308@conf.commands.register 

309def hexdump(p, dump=False): 

310 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

311 """Build a tcpdump like hexadecimal view 

312 

313 :param p: a Packet 

314 :param dump: define if the result must be printed or returned in a variable 

315 :return: a String only when dump=True 

316 """ 

317 s = "" 

318 x = bytes_encode(p) 

319 x_len = len(x) 

320 i = 0 

321 while i < x_len: 

322 s += "%04x " % i 

323 for j in range(16): 

324 if i + j < x_len: 

325 s += "%02X " % orb(x[i + j]) 

326 else: 

327 s += " " 

328 s += " %s\n" % sane(x[i:i + 16], color=True) 

329 i += 16 

330 # remove trailing \n 

331 s = s[:-1] if s.endswith("\n") else s 

332 if dump: 

333 return s 

334 else: 

335 print(s) 

336 return None 

337 

338 

339@conf.commands.register 

340def linehexdump(p, onlyasc=0, onlyhex=0, dump=False): 

341 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str] 

342 """Build an equivalent view of hexdump() on a single line 

343 

344 Note that setting both onlyasc and onlyhex to 1 results in a empty output 

345 

346 :param p: a Packet 

347 :param onlyasc: 1 to display only the ascii view 

348 :param onlyhex: 1 to display only the hexadecimal view 

349 :param dump: print the view if False 

350 :return: a String only when dump=True 

351 """ 

352 s = "" 

353 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump) 

354 if dump: 

355 return s 

356 else: 

357 print(s) 

358 return None 

359 

360 

361@conf.commands.register 

362def chexdump(p, dump=False): 

363 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

364 """Build a per byte hexadecimal representation 

365 

366 Example: 

367 >>> chexdump(IP()) 

368 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501 

369 

370 :param p: a Packet 

371 :param dump: print the view if False 

372 :return: a String only if dump=True 

373 """ 

374 x = bytes_encode(p) 

375 s = ", ".join("%#04x" % orb(x) for x in x) 

376 if dump: 

377 return s 

378 else: 

379 print(s) 

380 return None 

381 

382 

383@conf.commands.register 

384def hexstr(p, onlyasc=0, onlyhex=0, color=False): 

385 # type: (Union[Packet, AnyStr], int, int, bool) -> str 

386 """Build a fancy tcpdump like hex from bytes.""" 

387 x = bytes_encode(p) 

388 s = [] 

389 if not onlyasc: 

390 s.append(" ".join("%02X" % orb(b) for b in x)) 

391 if not onlyhex: 

392 s.append(sane(x, color=color)) 

393 return " ".join(s) 

394 

395 

396def repr_hex(s): 

397 # type: (bytes) -> str 

398 """ Convert provided bitstring to a simple string of hex digits """ 

399 return "".join("%02x" % orb(x) for x in s) 

400 

401 

402@conf.commands.register 

403def hexdiff( 

404 a: Union['Packet', AnyStr], 

405 b: Union['Packet', AnyStr], 

406 algo: Optional[str] = None, 

407 autojunk: bool = False, 

408) -> None: 

409 """ 

410 Show differences between 2 binary strings, Packets... 

411 

412 Available algorithms: 

413 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the 

414 Levenstein distance between the strings then backtrack. 

415 - difflib: Use the difflib.SequenceMatcher implementation. This based on a 

416 modified version of the Ratcliff and Obershelp algorithm. 

417 This is much faster, but far less accurate. 

418 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher 

419 

420 :param a: 

421 :param b: The binary strings, packets... to compare 

422 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'. 

423 By default, this is chosen depending on the complexity, optimistically 

424 preferring wagnerfischer unless really necessary. 

425 :param autojunk: (difflib only) See difflib documentation. 

426 """ 

427 xb = bytes_encode(a) 

428 yb = bytes_encode(b) 

429 

430 if algo is None: 

431 # Choose the best algorithm 

432 complexity = len(xb) * len(yb) 

433 if complexity < 1e7: 

434 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable. 

435 # Anything much larger than this shouldn't be attempted by default. 

436 algo = "wagnerfischer" 

437 if complexity > 1e6: 

438 log_interactive.info( 

439 "Complexity is a bit high. hexdiff will take a few seconds." 

440 ) 

441 else: 

442 algo = "difflib" 

443 

444 backtrackx = [] 

445 backtracky = [] 

446 

447 if algo == "wagnerfischer": 

448 xb = xb[::-1] 

449 yb = yb[::-1] 

450 

451 # costs for the 3 operations 

452 INSERT = 1 

453 DELETE = 1 

454 SUBST = 1 

455 

456 # Typically, d[i,j] will hold the distance between 

457 # the first i characters of xb and the first j characters of yb. 

458 # We change the Wagner Fischer to also store pointers to all 

459 # the intermediate steps taken while calculating the Levenstein distance. 

460 d = {(-1, -1): (0, (-1, -1))} 

461 for j in range(len(yb)): 

462 d[-1, j] = (j + 1) * INSERT, (-1, j - 1) 

463 for i in range(len(xb)): 

464 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1) 

465 

466 # Compute the Levenstein distance between the two strings, but 

467 # store all the steps to be able to backtrack at the end. 

468 for j in range(len(yb)): 

469 for i in range(len(xb)): 

470 d[i, j] = min( 

471 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)), 

472 (d[i - 1, j][0] + DELETE, (i - 1, j)), 

473 (d[i, j - 1][0] + INSERT, (i, j - 1)), 

474 ) 

475 

476 # Iterate through the steps backwards to create the diff 

477 i = len(xb) - 1 

478 j = len(yb) - 1 

479 while not (i == j == -1): 

480 i2, j2 = d[i, j][1] 

481 backtrackx.append(xb[i2 + 1:i + 1]) 

482 backtracky.append(yb[j2 + 1:j + 1]) 

483 i, j = i2, j2 

484 elif algo == "difflib": 

485 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk) 

486 xarr = [xb[i:i + 1] for i in range(len(xb))] 

487 yarr = [yb[i:i + 1] for i in range(len(yb))] 

488 # Iterate through opcodes to build the backtrack 

489 for opcode in sm.get_opcodes(): 

490 typ, x0, x1, y0, y1 = opcode 

491 if typ == 'delete': 

492 backtrackx += xarr[x0:x1] 

493 backtracky += [b''] * (x1 - x0) 

494 elif typ == 'insert': 

495 backtrackx += [b''] * (y1 - y0) 

496 backtracky += yarr[y0:y1] 

497 elif typ in ['equal', 'replace']: 

498 backtrackx += xarr[x0:x1] 

499 backtracky += yarr[y0:y1] 

500 # Some lines may have been considered as junk. Check the sizes 

501 if autojunk: 

502 lbx = len(backtrackx) 

503 lby = len(backtracky) 

504 backtrackx += [b''] * (max(lbx, lby) - lbx) 

505 backtracky += [b''] * (max(lbx, lby) - lby) 

506 else: 

507 raise ValueError("Unknown algorithm '%s'" % algo) 

508 

509 # Print the diff 

510 

511 x = y = i = 0 

512 colorize: Dict[int, Callable[[str], str]] = { 

513 0: lambda x: x, 

514 -1: conf.color_theme.left, 

515 1: conf.color_theme.right 

516 } 

517 

518 dox = 1 

519 doy = 0 

520 btx_len = len(backtrackx) 

521 while i < btx_len: 

522 linex = backtrackx[i:i + 16] 

523 liney = backtracky[i:i + 16] 

524 xx = sum(len(k) for k in linex) 

525 yy = sum(len(k) for k in liney) 

526 if dox and not xx: 

527 dox = 0 

528 doy = 1 

529 if dox and linex == liney: 

530 doy = 1 

531 

532 if dox: 

533 xd = y 

534 j = 0 

535 while not linex[j]: 

536 j += 1 

537 xd -= 1 

538 print(colorize[doy - dox]("%04x" % xd), end=' ') 

539 x += xx 

540 line = linex 

541 else: 

542 print(" ", end=' ') 

543 if doy: 

544 yd = y 

545 j = 0 

546 while not liney[j]: 

547 j += 1 

548 yd -= 1 

549 print(colorize[doy - dox]("%04x" % yd), end=' ') 

550 y += yy 

551 line = liney 

552 else: 

553 print(" ", end=' ') 

554 

555 print(" ", end=' ') 

556 

557 cl = "" 

558 for j in range(16): 

559 if i + j < min(len(backtrackx), len(backtracky)): 

560 if line[j]: 

561 col = colorize[(linex[j] != liney[j]) * (doy - dox)] 

562 print(col("%02X" % orb(line[j])), end=' ') 

563 if linex[j] == liney[j]: 

564 cl += sane(line[j], color=True) 

565 else: 

566 cl += col(sane(line[j])) 

567 else: 

568 print(" ", end=' ') 

569 cl += " " 

570 else: 

571 print(" ", end=' ') 

572 if j == 7: 

573 print("", end=' ') 

574 

575 print(" ", cl) 

576 

577 if doy or not yy: 

578 doy = 0 

579 dox = 1 

580 i += 16 

581 else: 

582 if yy: 

583 dox = 0 

584 doy = 1 

585 else: 

586 i += 16 

587 

588 

589if struct.pack("H", 1) == b"\x00\x01": # big endian 

590 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int] 

591else: 

592 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8 

593 

594 

595def checksum(pkt): 

596 # type: (bytes) -> int 

597 if len(pkt) % 2 == 1: 

598 pkt += b"\0" 

599 s = sum(array.array("H", pkt)) 

600 s = (s >> 16) + (s & 0xffff) 

601 s += s >> 16 

602 s = ~s 

603 return checksum_endian_transform(s) & 0xffff 

604 

605 

606def _fletcher16(charbuf): 

607 # type: (bytes) -> Tuple[int, int] 

608 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

609 c0 = c1 = 0 

610 for char in charbuf: 

611 c0 += char 

612 c1 += c0 

613 

614 c0 %= 255 

615 c1 %= 255 

616 return (c0, c1) 

617 

618 

619@conf.commands.register 

620def fletcher16_checksum(binbuf): 

621 # type: (bytes) -> int 

622 """Calculates Fletcher-16 checksum of the given buffer. 

623 

624 Note: 

625 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501 

626 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501 

627 """ 

628 (c0, c1) = _fletcher16(binbuf) 

629 return (c1 << 8) | c0 

630 

631 

632@conf.commands.register 

633def fletcher16_checkbytes(binbuf, offset): 

634 # type: (bytes, int) -> bytes 

635 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string. 

636 

637 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501 

638 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501 

639 the integrity of the buffer on the receiver side. 

640 

641 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501 

642 """ 

643 

644 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

645 if len(binbuf) < offset: 

646 raise Exception("Packet too short for checkbytes %d" % len(binbuf)) 

647 

648 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:] 

649 (c0, c1) = _fletcher16(binbuf) 

650 

651 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255 

652 

653 if (x <= 0): 

654 x += 255 

655 

656 y = 510 - c0 - x 

657 

658 if (y > 255): 

659 y -= 255 

660 return chb(x) + chb(y) 

661 

662 

663def mac2str(mac): 

664 # type: (str) -> bytes 

665 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':')) 

666 

667 

668def valid_mac(mac): 

669 # type: (str) -> bool 

670 try: 

671 return len(mac2str(mac)) == 6 

672 except ValueError: 

673 pass 

674 return False 

675 

676 

677def str2mac(s): 

678 # type: (bytes) -> str 

679 if isinstance(s, str): 

680 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s)) 

681 return ("%02x:" * len(s))[:-1] % tuple(s) 

682 

683 

684def randstring(length): 

685 # type: (int) -> bytes 

686 """ 

687 Returns a random string of length (length >= 0) 

688 """ 

689 return b"".join(struct.pack('B', random.randint(0, 255)) 

690 for _ in range(length)) 

691 

692 

693def zerofree_randstring(length): 

694 # type: (int) -> bytes 

695 """ 

696 Returns a random string of length (length >= 0) without zero in it. 

697 """ 

698 return b"".join(struct.pack('B', random.randint(1, 255)) 

699 for _ in range(length)) 

700 

701 

702def stror(s1, s2): 

703 # type: (bytes, bytes) -> bytes 

704 """ 

705 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2 

706 must be of same length. 

707 """ 

708 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2)) 

709 

710 

711def strxor(s1, s2): 

712 # type: (bytes, bytes) -> bytes 

713 """ 

714 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2 

715 must be of same length. 

716 """ 

717 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2)) 

718 

719 

720def strand(s1, s2): 

721 # type: (bytes, bytes) -> bytes 

722 """ 

723 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 

724 must be of same length. 

725 """ 

726 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2)) 

727 

728 

729def strrot(s1, count, right=True): 

730 # type: (bytes, int, bool) -> bytes 

731 """ 

732 Rotate the binary by 'count' bytes 

733 """ 

734 off = count % len(s1) 

735 if right: 

736 return s1[-off:] + s1[:-off] 

737 else: 

738 return s1[off:] + s1[:off] 

739 

740 

741# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501 

742try: 

743 socket.inet_aton("255.255.255.255") 

744except socket.error: 

745 def inet_aton(ip_string): 

746 # type: (str) -> bytes 

747 if ip_string == "255.255.255.255": 

748 return b"\xff" * 4 

749 else: 

750 return socket.inet_aton(ip_string) 

751else: 

752 inet_aton = socket.inet_aton # type: ignore 

753 

754inet_ntoa = socket.inet_ntoa 

755 

756 

757def atol(x): 

758 # type: (str) -> int 

759 try: 

760 ip = inet_aton(x) 

761 except socket.error: 

762 raise ValueError("Bad IP format: %s" % x) 

763 return cast(int, struct.unpack("!I", ip)[0]) 

764 

765 

766def valid_ip(addr): 

767 # type: (str) -> bool 

768 try: 

769 addr = plain_str(addr) 

770 except UnicodeDecodeError: 

771 return False 

772 try: 

773 atol(addr) 

774 except (OSError, ValueError, socket.error): 

775 return False 

776 return True 

777 

778 

779def valid_net(addr): 

780 # type: (str) -> bool 

781 try: 

782 addr = plain_str(addr) 

783 except UnicodeDecodeError: 

784 return False 

785 if '/' in addr: 

786 ip, mask = addr.split('/', 1) 

787 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32 

788 return valid_ip(addr) 

789 

790 

791def valid_ip6(addr): 

792 # type: (str) -> bool 

793 try: 

794 addr = plain_str(addr) 

795 except UnicodeDecodeError: 

796 return False 

797 try: 

798 inet_pton(socket.AF_INET6, addr) 

799 except socket.error: 

800 return False 

801 return True 

802 

803 

804def valid_net6(addr): 

805 # type: (str) -> bool 

806 try: 

807 addr = plain_str(addr) 

808 except UnicodeDecodeError: 

809 return False 

810 if '/' in addr: 

811 ip, mask = addr.split('/', 1) 

812 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128 

813 return valid_ip6(addr) 

814 

815 

816def ltoa(x): 

817 # type: (int) -> str 

818 return inet_ntoa(struct.pack("!I", x & 0xffffffff)) 

819 

820 

821def itom(x): 

822 # type: (int) -> int 

823 return (0xffffffff00000000 >> x) & 0xffffffff 

824 

825 

826def in4_cidr2mask(m): 

827 # type: (int) -> bytes 

828 """ 

829 Return the mask (bitstring) associated with provided length 

830 value. For instance if function is called on 20, return value is 

831 b'\xff\xff\xf0\x00'. 

832 """ 

833 if m > 32 or m < 0: 

834 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501 

835 

836 return strxor( 

837 b"\xff" * 4, 

838 struct.pack(">I", 2**(32 - m) - 1) 

839 ) 

840 

841 

842def in4_isincluded(addr, prefix, mask): 

843 # type: (str, str, int) -> bool 

844 """ 

845 Returns True when 'addr' belongs to prefix/mask. False otherwise. 

846 """ 

847 temp = inet_pton(socket.AF_INET, addr) 

848 pref = in4_cidr2mask(mask) 

849 zero = inet_pton(socket.AF_INET, prefix) 

850 return zero == strand(temp, pref) 

851 

852 

853def in4_ismaddr(str): 

854 # type: (str) -> bool 

855 """ 

856 Returns True if provided address in printable format belongs to 

857 allocated Multicast address space (224.0.0.0/4). 

858 """ 

859 return in4_isincluded(str, "224.0.0.0", 4) 

860 

861 

862def in4_ismlladdr(str): 

863 # type: (str) -> bool 

864 """ 

865 Returns True if address belongs to link-local multicast address 

866 space (224.0.0.0/24) 

867 """ 

868 return in4_isincluded(str, "224.0.0.0", 24) 

869 

870 

871def in4_ismgladdr(str): 

872 # type: (str) -> bool 

873 """ 

874 Returns True if address belongs to global multicast address 

875 space (224.0.1.0-238.255.255.255). 

876 """ 

877 return ( 

878 in4_isincluded(str, "224.0.0.0", 4) and 

879 not in4_isincluded(str, "224.0.0.0", 24) and 

880 not in4_isincluded(str, "239.0.0.0", 8) 

881 ) 

882 

883 

884def in4_ismlsaddr(str): 

885 # type: (str) -> bool 

886 """ 

887 Returns True if address belongs to limited scope multicast address 

888 space (239.0.0.0/8). 

889 """ 

890 return in4_isincluded(str, "239.0.0.0", 8) 

891 

892 

893def in4_isaddrllallnodes(str): 

894 # type: (str) -> bool 

895 """ 

896 Returns True if address is the link-local all-nodes multicast 

897 address (224.0.0.1). 

898 """ 

899 return (inet_pton(socket.AF_INET, "224.0.0.1") == 

900 inet_pton(socket.AF_INET, str)) 

901 

902 

903def in4_getnsmac(a): 

904 # type: (bytes) -> str 

905 """ 

906 Return the multicast mac address associated with provided 

907 IPv4 address. Passed address must be in network format. 

908 """ 

909 

910 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3]) 

911 

912 

913def decode_locale_str(x): 

914 # type: (bytes) -> str 

915 """ 

916 Decode bytes into a string using the system locale. 

917 Useful on Windows where it can be unusual (e.g. cp1252) 

918 """ 

919 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace") 

920 

921 

922class ContextManagerSubprocess(object): 

923 """ 

924 Context manager that eases checking for unknown command, without 

925 crashing. 

926 

927 Example: 

928 >>> with ContextManagerSubprocess("tcpdump"): 

929 >>> subprocess.Popen(["tcpdump", "--version"]) 

930 ERROR: Could not execute tcpdump, is it installed? 

931 

932 """ 

933 

934 def __init__(self, prog, suppress=True): 

935 # type: (str, bool) -> None 

936 self.prog = prog 

937 self.suppress = suppress 

938 

939 def __enter__(self): 

940 # type: () -> None 

941 pass 

942 

943 def __exit__(self, 

944 exc_type, # type: Optional[type] 

945 exc_value, # type: Optional[Exception] 

946 traceback, # type: Optional[Any] 

947 ): 

948 # type: (...) -> Optional[bool] 

949 if exc_value is None or exc_type is None: 

950 return None 

951 # Errored 

952 if isinstance(exc_value, EnvironmentError): 

953 msg = "Could not execute %s, is it installed?" % self.prog 

954 else: 

955 msg = "%s: execution failed (%s)" % ( 

956 self.prog, 

957 exc_type.__class__.__name__ 

958 ) 

959 if not self.suppress: 

960 raise exc_type(msg) 

961 log_runtime.error(msg, exc_info=True) 

962 return True # Suppress the exception 

963 

964 

965class ContextManagerCaptureOutput(object): 

966 """ 

967 Context manager that intercept the console's output. 

968 

969 Example: 

970 >>> with ContextManagerCaptureOutput() as cmco: 

971 ... print("hey") 

972 ... assert cmco.get_output() == "hey" 

973 """ 

974 

975 def __init__(self): 

976 # type: () -> None 

977 self.result_export_object = "" 

978 

979 def __enter__(self): 

980 # type: () -> ContextManagerCaptureOutput 

981 from unittest import mock 

982 

983 def write(s, decorator=self): 

984 # type: (str, ContextManagerCaptureOutput) -> None 

985 decorator.result_export_object += s 

986 mock_stdout = mock.Mock() 

987 mock_stdout.write = write 

988 self.bck_stdout = sys.stdout 

989 sys.stdout = mock_stdout 

990 return self 

991 

992 def __exit__(self, *exc): 

993 # type: (*Any) -> Literal[False] 

994 sys.stdout = self.bck_stdout 

995 return False 

996 

997 def get_output(self, eval_bytes=False): 

998 # type: (bool) -> str 

999 if self.result_export_object.startswith("b'") and eval_bytes: 

1000 return plain_str(eval(self.result_export_object)) 

1001 return self.result_export_object 

1002 

1003 

1004def do_graph( 

1005 graph, # type: str 

1006 prog=None, # type: Optional[str] 

1007 format=None, # type: Optional[str] 

1008 target=None, # type: Optional[Union[IO[bytes], str]] 

1009 type=None, # type: Optional[str] 

1010 string=None, # type: Optional[bool] 

1011 options=None # type: Optional[List[str]] 

1012): 

1013 # type: (...) -> Optional[str] 

1014 """Processes graph description using an external software. 

1015 This method is used to convert a graphviz format to an image. 

1016 

1017 :param graph: GraphViz graph description 

1018 :param prog: which graphviz program to use 

1019 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" 

1020 option 

1021 :param string: if not None, simply return the graph string 

1022 :param target: filename or redirect. Defaults pipe to Imagemagick's 

1023 display program 

1024 :param options: options to be passed to prog 

1025 """ 

1026 

1027 if format is None: 

1028 format = "svg" 

1029 if string: 

1030 return graph 

1031 if type is not None: 

1032 warnings.warn( 

1033 "type is deprecated, and was renamed format", 

1034 DeprecationWarning 

1035 ) 

1036 format = type 

1037 if prog is None: 

1038 prog = conf.prog.dot 

1039 start_viewer = False 

1040 if target is None: 

1041 if WINDOWS: 

1042 target = get_temp_file(autoext="." + format) 

1043 start_viewer = True 

1044 else: 

1045 with ContextManagerSubprocess(conf.prog.display): 

1046 target = subprocess.Popen([conf.prog.display], 

1047 stdin=subprocess.PIPE).stdin 

1048 if format is not None: 

1049 format = "-T%s" % format 

1050 if isinstance(target, str): 

1051 if target.startswith('|'): 

1052 target = subprocess.Popen(target[1:].lstrip(), shell=True, 

1053 stdin=subprocess.PIPE).stdin 

1054 elif target.startswith('>'): 

1055 target = open(target[1:].lstrip(), "wb") 

1056 else: 

1057 target = open(os.path.abspath(target), "wb") 

1058 target = cast(IO[bytes], target) 

1059 proc = subprocess.Popen( 

1060 "\"%s\" %s %s" % (prog, options or "", format or ""), 

1061 shell=True, stdin=subprocess.PIPE, stdout=target, 

1062 stderr=subprocess.PIPE 

1063 ) 

1064 _, stderr = proc.communicate(bytes_encode(graph)) 

1065 if proc.returncode != 0: 

1066 raise OSError( 

1067 "GraphViz call failed (is it installed?):\n" + 

1068 plain_str(stderr) 

1069 ) 

1070 try: 

1071 target.close() 

1072 except Exception: 

1073 pass 

1074 if start_viewer: 

1075 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501 

1076 waiting_start = time.time() 

1077 while not os.path.exists(target.name): 

1078 time.sleep(0.1) 

1079 if time.time() - waiting_start > 3: 

1080 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501 

1081 break 

1082 else: 

1083 if WINDOWS and conf.prog.display == conf.prog._default: 

1084 os.startfile(target.name) 

1085 else: 

1086 with ContextManagerSubprocess(conf.prog.display): 

1087 subprocess.Popen([conf.prog.display, target.name]) 

1088 return None 

1089 

1090 

1091_TEX_TR = { 

1092 "{": "{\\tt\\char123}", 

1093 "}": "{\\tt\\char125}", 

1094 "\\": "{\\tt\\char92}", 

1095 "^": "\\^{}", 

1096 "$": "\\$", 

1097 "#": "\\#", 

1098 "_": "\\_", 

1099 "&": "\\&", 

1100 "%": "\\%", 

1101 "|": "{\\tt\\char124}", 

1102 "~": "{\\tt\\char126}", 

1103 "<": "{\\tt\\char60}", 

1104 ">": "{\\tt\\char62}", 

1105} 

1106 

1107 

1108def tex_escape(x): 

1109 # type: (str) -> str 

1110 s = "" 

1111 for c in x: 

1112 s += _TEX_TR.get(c, c) 

1113 return s 

1114 

1115 

1116def colgen(*lstcol, # type: Any 

1117 **kargs # type: Any 

1118 ): 

1119 # type: (...) -> Iterator[Any] 

1120 """Returns a generator that mixes provided quantities forever 

1121 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501 

1122 if len(lstcol) < 2: 

1123 lstcol *= 2 

1124 trans = kargs.get("trans", lambda x, y, z: (x, y, z)) 

1125 while True: 

1126 for i in range(len(lstcol)): 

1127 for j in range(len(lstcol)): 

1128 for k in range(len(lstcol)): 

1129 if i != j or j != k or k != i: 

1130 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501 

1131 

1132 

1133def incremental_label(label="tag%05i", start=0): 

1134 # type: (str, int) -> Iterator[str] 

1135 while True: 

1136 yield label % start 

1137 start += 1 

1138 

1139 

1140def binrepr(val): 

1141 # type: (int) -> str 

1142 return bin(val)[2:] 

1143 

1144 

1145def long_converter(s): 

1146 # type: (str) -> int 

1147 return int(s.replace('\n', '').replace(' ', ''), 16) 

1148 

1149######################### 

1150# Enum management # 

1151######################### 

1152 

1153 

1154class EnumElement: 

1155 def __init__(self, key, value): 

1156 # type: (str, int) -> None 

1157 self._key = key 

1158 self._value = value 

1159 

1160 def __repr__(self): 

1161 # type: () -> str 

1162 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501 

1163 

1164 def __getattr__(self, attr): 

1165 # type: (str) -> Any 

1166 return getattr(self._value, attr) 

1167 

1168 def __str__(self): 

1169 # type: () -> str 

1170 return self._key 

1171 

1172 def __bytes__(self): 

1173 # type: () -> bytes 

1174 return bytes_encode(self.__str__()) 

1175 

1176 def __hash__(self): 

1177 # type: () -> int 

1178 return self._value 

1179 

1180 def __int__(self): 

1181 # type: () -> int 

1182 return int(self._value) 

1183 

1184 def __eq__(self, other): 

1185 # type: (Any) -> bool 

1186 return self._value == int(other) 

1187 

1188 def __neq__(self, other): 

1189 # type: (Any) -> bool 

1190 return not self.__eq__(other) 

1191 

1192 

1193class Enum_metaclass(type): 

1194 element_class = EnumElement 

1195 

1196 def __new__(cls, name, bases, dct): 

1197 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1198 rdict = {} 

1199 for k, v in dct.items(): 

1200 if isinstance(v, int): 

1201 v = cls.element_class(k, v) 

1202 dct[k] = v 

1203 rdict[v] = k 

1204 dct["__rdict__"] = rdict 

1205 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) 

1206 

1207 def __getitem__(self, attr): 

1208 # type: (int) -> Any 

1209 return self.__rdict__[attr] # type: ignore 

1210 

1211 def __contains__(self, val): 

1212 # type: (int) -> bool 

1213 return val in self.__rdict__ # type: ignore 

1214 

1215 def get(self, attr, val=None): 

1216 # type: (str, Optional[Any]) -> Any 

1217 return self.__rdict__.get(attr, val) # type: ignore 

1218 

1219 def __repr__(self): 

1220 # type: () -> str 

1221 return "<%s>" % self.__dict__.get("name", self.__name__) 

1222 

1223 

1224################### 

1225# Object saving # 

1226################### 

1227 

1228 

1229def export_object(obj): 

1230 # type: (Any) -> None 

1231 import zlib 

1232 print(base64.b64encode(zlib.compress(pickle.dumps(obj, 2), 9)).decode()) 

1233 

1234 

1235def import_object(obj=None): 

1236 # type: (Optional[str]) -> Any 

1237 import zlib 

1238 if obj is None: 

1239 obj = sys.stdin.read() 

1240 return pickle.loads(zlib.decompress(base64.b64decode(obj.strip()))) 

1241 

1242 

1243def save_object(fname, obj): 

1244 # type: (str, Any) -> None 

1245 """Pickle a Python object""" 

1246 

1247 fd = gzip.open(fname, "wb") 

1248 pickle.dump(obj, fd) 

1249 fd.close() 

1250 

1251 

1252def load_object(fname): 

1253 # type: (str) -> Any 

1254 """unpickle a Python object""" 

1255 return pickle.load(gzip.open(fname, "rb")) 

1256 

1257 

1258@conf.commands.register 

1259def corrupt_bytes(data, p=0.01, n=None): 

1260 # type: (str, float, Optional[int]) -> bytes 

1261 """ 

1262 Corrupt a given percentage (at least one byte) or number of bytes 

1263 from a string 

1264 """ 

1265 s = array.array("B", bytes_encode(data)) 

1266 s_len = len(s) 

1267 if n is None: 

1268 n = max(1, int(s_len * p)) 

1269 for i in random.sample(range(s_len), n): 

1270 s[i] = (s[i] + random.randint(1, 255)) % 256 

1271 return s.tobytes() 

1272 

1273 

1274@conf.commands.register 

1275def corrupt_bits(data, p=0.01, n=None): 

1276 # type: (str, float, Optional[int]) -> bytes 

1277 """ 

1278 Flip a given percentage (at least one bit) or number of bits 

1279 from a string 

1280 """ 

1281 s = array.array("B", bytes_encode(data)) 

1282 s_len = len(s) * 8 

1283 if n is None: 

1284 n = max(1, int(s_len * p)) 

1285 for i in random.sample(range(s_len), n): 

1286 s[i // 8] ^= 1 << (i % 8) 

1287 return s.tobytes() 

1288 

1289 

1290############################# 

1291# pcap capture file stuff # 

1292############################# 

1293 

1294@conf.commands.register 

1295def wrpcap(filename, # type: Union[IO[bytes], str] 

1296 pkt, # type: _PacketIterable 

1297 *args, # type: Any 

1298 **kargs # type: Any 

1299 ): 

1300 # type: (...) -> None 

1301 """Write a list of packets to a pcap file 

1302 

1303 :param filename: the name of the file to write packets to, or an open, 

1304 writable file-like object. The file descriptor will be 

1305 closed at the end of the call, so do not use an object you 

1306 do not want to close (e.g., running wrpcap(sys.stdout, []) 

1307 in interactive mode will crash Scapy). 

1308 :param gz: set to 1 to save a gzipped capture 

1309 :param linktype: force linktype value 

1310 :param endianness: "<" or ">", force endianness 

1311 :param sync: do not bufferize writes to the capture file 

1312 """ 

1313 with PcapWriter(filename, *args, **kargs) as fdesc: 

1314 fdesc.write(pkt) 

1315 

1316 

1317@conf.commands.register 

1318def wrpcapng(filename, # type: str 

1319 pkt, # type: _PacketIterable 

1320 ): 

1321 # type: (...) -> None 

1322 """Write a list of packets to a pcapng file 

1323 

1324 :param filename: the name of the file to write packets to, or an open, 

1325 writable file-like object. The file descriptor will be 

1326 closed at the end of the call, so do not use an object you 

1327 do not want to close (e.g., running wrpcapng(sys.stdout, []) 

1328 in interactive mode will crash Scapy). 

1329 :param pkt: packets to write 

1330 """ 

1331 with PcapNgWriter(filename) as fdesc: 

1332 fdesc.write(pkt) 

1333 

1334 

1335@conf.commands.register 

1336def rdpcap(filename, count=-1): 

1337 # type: (Union[IO[bytes], str], int) -> PacketList 

1338 """Read a pcap or pcapng file and return a packet list 

1339 

1340 :param count: read only <count> packets 

1341 """ 

1342 # Rant: Our complicated use of metaclasses and especially the 

1343 # __call__ function is, of course, not supported by MyPy. 

1344 # One day we should simplify this mess and use a much simpler 

1345 # layout that will actually be supported and properly dissected. 

1346 with PcapReader(filename) as fdesc: # type: ignore 

1347 return fdesc.read_all(count=count) 

1348 

1349 

1350# NOTE: Type hinting 

1351# Mypy doesn't understand the following metaclass, and thinks each 

1352# constructor (PcapReader...) needs 3 arguments each. To avoid this, 

1353# we add a fake (=None) to the last 2 arguments then force the value 

1354# to not be None in the signature and pack the whole thing in an ignore. 

1355# This allows to not have # type: ignore every time we call those 

1356# constructors. 

1357 

1358class PcapReader_metaclass(type): 

1359 """Metaclass for (Raw)Pcap(Ng)Readers""" 

1360 

1361 def __new__(cls, name, bases, dct): 

1362 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1363 """The `alternative` class attribute is declared in the PcapNg 

1364 variant, and set here to the Pcap variant. 

1365 

1366 """ 

1367 newcls = super(PcapReader_metaclass, cls).__new__( 

1368 cls, name, bases, dct 

1369 ) 

1370 if 'alternative' in dct: 

1371 dct['alternative'].alternative = newcls 

1372 return newcls 

1373 

1374 def __call__(cls, filename): 

1375 # type: (Union[IO[bytes], str]) -> Any 

1376 """Creates a cls instance, use the `alternative` if that 

1377 fails. 

1378 

1379 """ 

1380 i = cls.__new__( 

1381 cls, 

1382 cls.__name__, 

1383 cls.__bases__, 

1384 cls.__dict__ # type: ignore 

1385 ) 

1386 filename, fdesc, magic = cls.open(filename) 

1387 if not magic: 

1388 raise Scapy_Exception( 

1389 "No data could be read!" 

1390 ) 

1391 try: 

1392 i.__init__(filename, fdesc, magic) 

1393 return i 

1394 except (Scapy_Exception, EOFError): 

1395 pass 

1396 

1397 if "alternative" in cls.__dict__: 

1398 cls = cls.__dict__["alternative"] 

1399 i = cls.__new__( 

1400 cls, 

1401 cls.__name__, 

1402 cls.__bases__, 

1403 cls.__dict__ # type: ignore 

1404 ) 

1405 try: 

1406 i.__init__(filename, fdesc, magic) 

1407 return i 

1408 except (Scapy_Exception, EOFError): 

1409 pass 

1410 

1411 raise Scapy_Exception("Not a supported capture file") 

1412 

1413 @staticmethod 

1414 def open(fname # type: Union[IO[bytes], str] 

1415 ): 

1416 # type: (...) -> Tuple[str, _ByteStream, bytes] 

1417 """Open (if necessary) filename, and read the magic.""" 

1418 if isinstance(fname, str): 

1419 filename = fname 

1420 fdesc = open(filename, "rb") # type: _ByteStream 

1421 magic = fdesc.read(2) 

1422 if magic == b"\x1f\x8b": 

1423 # GZIP header detected. 

1424 fdesc.seek(0) 

1425 fdesc = gzip.GzipFile(fileobj=fdesc) 

1426 magic = fdesc.read(2) 

1427 magic += fdesc.read(2) 

1428 else: 

1429 fdesc = fname 

1430 filename = getattr(fdesc, "name", "No name") 

1431 magic = fdesc.read(4) 

1432 return filename, fdesc, magic 

1433 

1434 

1435class RawPcapReader(metaclass=PcapReader_metaclass): 

1436 """A stateful pcap reader. Each packet is returned as a string""" 

1437 

1438 # TODO: use Generics to properly type the various readers. 

1439 # As of right now, RawPcapReader is typed as if it returned packets 

1440 # because all of its child do. Fix that 

1441 

1442 nonblocking_socket = True 

1443 PacketMetadata = collections.namedtuple("PacketMetadata", 

1444 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501 

1445 

1446 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1447 # type: (str, _ByteStream, bytes) -> None 

1448 self.filename = filename 

1449 self.f = fdesc 

1450 if magic == b"\xa1\xb2\xc3\xd4": # big endian 

1451 self.endian = ">" 

1452 self.nano = False 

1453 elif magic == b"\xd4\xc3\xb2\xa1": # little endian 

1454 self.endian = "<" 

1455 self.nano = False 

1456 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision 

1457 self.endian = ">" 

1458 self.nano = True 

1459 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501 

1460 self.endian = "<" 

1461 self.nano = True 

1462 else: 

1463 raise Scapy_Exception( 

1464 "Not a pcap capture file (bad magic: %r)" % magic 

1465 ) 

1466 hdr = self.f.read(20) 

1467 if len(hdr) < 20: 

1468 raise Scapy_Exception("Invalid pcap file (too short)") 

1469 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( 

1470 self.endian + "HHIIII", hdr 

1471 ) 

1472 self.linktype = linktype 

1473 self.snaplen = snaplen 

1474 

1475 def __enter__(self): 

1476 # type: () -> RawPcapReader 

1477 return self 

1478 

1479 def __iter__(self): 

1480 # type: () -> RawPcapReader 

1481 return self 

1482 

1483 def __next__(self): 

1484 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1485 """ 

1486 implement the iterator protocol on a set of packets in a pcap file 

1487 """ 

1488 try: 

1489 return self._read_packet() 

1490 except EOFError: 

1491 raise StopIteration 

1492 

1493 def _read_packet(self, size=MTU): 

1494 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1495 """return a single packet read from the file as a tuple containing 

1496 (pkt_data, pkt_metadata) 

1497 

1498 raise EOFError when no more packets are available 

1499 """ 

1500 hdr = self.f.read(16) 

1501 if len(hdr) < 16: 

1502 raise EOFError 

1503 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr) 

1504 

1505 try: 

1506 data = self.f.read(caplen)[:size] 

1507 except OverflowError as e: 

1508 warning(f"Pcap: {e}") 

1509 raise EOFError 

1510 

1511 return (data, 

1512 RawPcapReader.PacketMetadata(sec=sec, usec=usec, 

1513 wirelen=wirelen, caplen=caplen)) 

1514 

1515 def read_packet(self, size=MTU): 

1516 # type: (int) -> Packet 

1517 raise Exception( 

1518 "Cannot call read_packet() in RawPcapReader. Use " 

1519 "_read_packet()" 

1520 ) 

1521 

1522 def dispatch(self, 

1523 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501 

1524 ): 

1525 # type: (...) -> None 

1526 """call the specified callback routine for each packet read 

1527 

1528 This is just a convenience function for the main loop 

1529 that allows for easy launching of packet processing in a 

1530 thread. 

1531 """ 

1532 for p in self: 

1533 callback(p) 

1534 

1535 def _read_all(self, count=-1): 

1536 # type: (int) -> List[Packet] 

1537 """return a list of all packets in the pcap file 

1538 """ 

1539 res = [] # type: List[Packet] 

1540 while count != 0: 

1541 count -= 1 

1542 try: 

1543 p = self.read_packet() # type: Packet 

1544 except EOFError: 

1545 break 

1546 res.append(p) 

1547 return res 

1548 

1549 def recv(self, size=MTU): 

1550 # type: (int) -> bytes 

1551 """ Emulate a socket 

1552 """ 

1553 return self._read_packet(size=size)[0] 

1554 

1555 def fileno(self): 

1556 # type: () -> int 

1557 return -1 if WINDOWS else self.f.fileno() 

1558 

1559 def close(self): 

1560 # type: () -> None 

1561 if isinstance(self.f, gzip.GzipFile): 

1562 self.f.fileobj.close() # type: ignore 

1563 self.f.close() 

1564 

1565 def __exit__(self, exc_type, exc_value, tracback): 

1566 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

1567 self.close() 

1568 

1569 # emulate SuperSocket 

1570 @staticmethod 

1571 def select(sockets, # type: List[SuperSocket] 

1572 remain=None, # type: Optional[float] 

1573 ): 

1574 # type: (...) -> List[SuperSocket] 

1575 return sockets 

1576 

1577 

1578class PcapReader(RawPcapReader): 

1579 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1580 # type: (str, IO[bytes], bytes) -> None 

1581 RawPcapReader.__init__(self, filename, fdesc, magic) 

1582 try: 

1583 self.LLcls = conf.l2types.num2layer[ 

1584 self.linktype 

1585 ] # type: Type[Packet] 

1586 except KeyError: 

1587 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501 

1588 if conf.raw_layer is None: 

1589 # conf.raw_layer is set on import 

1590 import scapy.packet # noqa: F401 

1591 self.LLcls = conf.raw_layer 

1592 

1593 def __enter__(self): 

1594 # type: () -> PcapReader 

1595 return self 

1596 

1597 def read_packet(self, size=MTU, **kwargs): 

1598 # type: (int, **Any) -> Packet 

1599 rp = super(PcapReader, self)._read_packet(size=size) 

1600 if rp is None: 

1601 raise EOFError 

1602 s, pkt_info = rp 

1603 

1604 try: 

1605 p = self.LLcls(s, **kwargs) # type: Packet 

1606 except KeyboardInterrupt: 

1607 raise 

1608 except Exception: 

1609 if conf.debug_dissector: 

1610 from scapy.sendrecv import debug 

1611 debug.crashed_on = (self.LLcls, s) 

1612 raise 

1613 if conf.raw_layer is None: 

1614 # conf.raw_layer is set on import 

1615 import scapy.packet # noqa: F401 

1616 p = conf.raw_layer(s) 

1617 power = Decimal(10) ** Decimal(-9 if self.nano else -6) 

1618 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec) 

1619 p.wirelen = pkt_info.wirelen 

1620 return p 

1621 

1622 def recv(self, size=MTU, **kwargs): # type: ignore 

1623 # type: (int, **Any) -> Packet 

1624 return self.read_packet(size=size, **kwargs) 

1625 

1626 def __next__(self): # type: ignore 

1627 # type: () -> Packet 

1628 try: 

1629 return self.read_packet() 

1630 except EOFError: 

1631 raise StopIteration 

1632 

1633 def read_all(self, count=-1): 

1634 # type: (int) -> PacketList 

1635 res = self._read_all(count) 

1636 from scapy import plist 

1637 return plist.PacketList(res, name=os.path.basename(self.filename)) 

1638 

1639 

1640class RawPcapNgReader(RawPcapReader): 

1641 """A stateful pcapng reader. Each packet is returned as 

1642 bytes. 

1643 

1644 """ 

1645 

1646 alternative = RawPcapReader # type: Type[Any] 

1647 

1648 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore 

1649 ["linktype", "tsresol", 

1650 "tshigh", "tslow", "wirelen", 

1651 "comment", "ifname", "direction", 

1652 "process_information"]) 

1653 

1654 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1655 # type: (str, IO[bytes], bytes) -> None 

1656 self.filename = filename 

1657 self.f = fdesc 

1658 # A list of (linktype, snaplen, tsresol); will be populated by IDBs. 

1659 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]] 

1660 self.default_options = { 

1661 "tsresol": 1000000 

1662 } 

1663 self.blocktypes: Dict[ 

1664 int, 

1665 Callable[ 

1666 [bytes, int], 

1667 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] 

1668 ]] = { 

1669 1: self._read_block_idb, 

1670 2: self._read_block_pkt, 

1671 3: self._read_block_spb, 

1672 6: self._read_block_epb, 

1673 10: self._read_block_dsb, 

1674 0x80000001: self._read_block_pib, 

1675 } 

1676 self.endian = "!" # Will be overwritten by first SHB 

1677 self.process_information = [] # type: List[Dict[str, Any]] 

1678 

1679 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg: 

1680 raise Scapy_Exception( 

1681 "Not a pcapng capture file (bad magic: %r)" % magic 

1682 ) 

1683 

1684 try: 

1685 self._read_block_shb() 

1686 except EOFError: 

1687 raise Scapy_Exception( 

1688 "The first SHB of the pcapng file is malformed !" 

1689 ) 

1690 

1691 def _read_block(self, size=MTU): 

1692 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501 

1693 try: 

1694 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1695 except struct.error: 

1696 raise EOFError 

1697 if blocktype == 0x0A0D0D0A: 

1698 # This function updates the endianness based on the block content. 

1699 self._read_block_shb() 

1700 return None 

1701 try: 

1702 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1703 except struct.error: 

1704 warning("PcapNg: Error reading blocklen before block body") 

1705 raise EOFError 

1706 if blocklen < 12: 

1707 warning("PcapNg: Invalid block length !") 

1708 raise EOFError 

1709 

1710 _block_body_length = blocklen - 12 

1711 block = self.f.read(_block_body_length) 

1712 if len(block) != _block_body_length: 

1713 raise Scapy_Exception("PcapNg: Invalid Block body length " 

1714 "(too short)") 

1715 self._read_block_tail(blocklen) 

1716 if blocktype in self.blocktypes: 

1717 return self.blocktypes[blocktype](block, size) 

1718 return None 

1719 

1720 def _read_block_tail(self, blocklen): 

1721 # type: (int) -> None 

1722 if blocklen % 4: 

1723 pad = self.f.read(-blocklen % 4) 

1724 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. " 

1725 "Ignored padding %r" % (blocklen, pad)) 

1726 try: 

1727 if blocklen != struct.unpack(self.endian + 'I', 

1728 self.f.read(4))[0]: 

1729 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)") 

1730 except struct.error: 

1731 warning("PcapNg: Could not read blocklen after block body") 

1732 raise EOFError 

1733 

1734 def _read_block_shb(self): 

1735 # type: () -> None 

1736 """Section Header Block""" 

1737 _blocklen = self.f.read(4) 

1738 endian = self.f.read(4) 

1739 if endian == b"\x1a\x2b\x3c\x4d": 

1740 self.endian = ">" 

1741 elif endian == b"\x4d\x3c\x2b\x1a": 

1742 self.endian = "<" 

1743 else: 

1744 warning("PcapNg: Bad magic in Section Header Block" 

1745 " (not a pcapng file?)") 

1746 raise EOFError 

1747 

1748 try: 

1749 blocklen = struct.unpack(self.endian + "I", _blocklen)[0] 

1750 except struct.error: 

1751 warning("PcapNg: Could not read blocklen") 

1752 raise EOFError 

1753 if blocklen < 28: 

1754 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501 

1755 raise EOFError 

1756 

1757 # Major version must be 1 

1758 _major = self.f.read(2) 

1759 try: 

1760 major = struct.unpack(self.endian + "H", _major)[0] 

1761 except struct.error: 

1762 warning("PcapNg: Could not read major value") 

1763 raise EOFError 

1764 if major != 1: 

1765 warning(f"PcapNg: SHB Major version {major} unsupported !") 

1766 raise EOFError 

1767 

1768 # Skip minor version & section length 

1769 skipped = self.f.read(10) 

1770 if len(skipped) != 10: 

1771 warning("PcapNg: Could not read minor value & section length") 

1772 raise EOFError 

1773 

1774 _options_len = blocklen - 28 

1775 options = self.f.read(_options_len) 

1776 if len(options) != _options_len: 

1777 raise Scapy_Exception("PcapNg: Invalid Section Header Block " 

1778 " options (too short)") 

1779 self._read_block_tail(blocklen) 

1780 self._read_options(options) 

1781 

1782 def _read_packet(self, size=MTU): # type: ignore 

1783 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1784 """Read blocks until it reaches either EOF or a packet, and 

1785 returns None or (packet, (linktype, sec, usec, wirelen)), 

1786 where packet is a string. 

1787 

1788 """ 

1789 while True: 

1790 res = self._read_block(size=size) 

1791 if res is not None: 

1792 return res 

1793 

1794 def _read_options(self, options): 

1795 # type: (bytes) -> Dict[int, bytes] 

1796 opts = dict() 

1797 while len(options) >= 4: 

1798 try: 

1799 code, length = struct.unpack(self.endian + "HH", options[:4]) 

1800 except struct.error: 

1801 warning("PcapNg: options header is too small " 

1802 "%d !" % len(options)) 

1803 raise EOFError 

1804 if code != 0 and 4 + length <= len(options): 

1805 opts[code] = options[4:4 + length] 

1806 if code == 0: 

1807 if length != 0: 

1808 warning("PcapNg: invalid option " 

1809 "length %d for end-of-option" % length) 

1810 break 

1811 if length % 4: 

1812 length += (4 - (length % 4)) 

1813 options = options[4 + length:] 

1814 return opts 

1815 

1816 def _read_block_idb(self, block, _): 

1817 # type: (bytes, int) -> None 

1818 """Interface Description Block""" 

1819 # 2 bytes LinkType + 2 bytes Reserved 

1820 # 4 bytes Snaplen 

1821 options_raw = self._read_options(block[8:]) 

1822 options = self.default_options.copy() # type: Dict[str, Any] 

1823 for c, v in options_raw.items(): 

1824 if c == 9: 

1825 length = len(v) 

1826 if length == 1: 

1827 tsresol = orb(v) 

1828 options["tsresol"] = (2 if tsresol & 128 else 10) ** ( 

1829 tsresol & 127 

1830 ) 

1831 else: 

1832 warning("PcapNg: invalid options " 

1833 "length %d for IDB tsresol" % length) 

1834 elif c == 2: 

1835 options["name"] = v 

1836 elif c == 1: 

1837 options["comment"] = v 

1838 try: 

1839 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack( 

1840 self.endian + "HxxI", 

1841 block[:8] 

1842 ) + (options,) 

1843 except struct.error: 

1844 warning("PcapNg: IDB is too small %d/8 !" % len(block)) 

1845 raise EOFError 

1846 self.interfaces.append(interface) 

1847 

1848 def _check_interface_id(self, intid): 

1849 # type: (int) -> None 

1850 """Check the interface id value and raise EOFError if invalid.""" 

1851 tmp_len = len(self.interfaces) 

1852 if intid >= tmp_len: 

1853 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len)) 

1854 raise EOFError 

1855 

1856 def _read_block_epb(self, block, size): 

1857 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1858 """Enhanced Packet Block""" 

1859 try: 

1860 intid, tshigh, tslow, caplen, wirelen = struct.unpack( 

1861 self.endian + "5I", 

1862 block[:20], 

1863 ) 

1864 except struct.error: 

1865 warning("PcapNg: EPB is too small %d/20 !" % len(block)) 

1866 raise EOFError 

1867 

1868 # Compute the options offset taking padding into account 

1869 if caplen % 4: 

1870 opt_offset = 20 + caplen + (-caplen) % 4 

1871 else: 

1872 opt_offset = 20 + caplen 

1873 

1874 # Parse options 

1875 options = self._read_options(block[opt_offset:]) 

1876 

1877 process_information = {} 

1878 for code, value in options.items(): 

1879 if code in [0x8001, 0x8003]: # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX 

1880 try: 

1881 proc_index = struct.unpack(self.endian + "I", value)[0] 

1882 except struct.error: 

1883 warning("PcapNg: EPB invalid proc index" 

1884 "(expected 4 bytes, got %d) !" % len(value)) 

1885 raise EOFError 

1886 if proc_index < len(self.process_information): 

1887 key = "proc" if code == 0x8001 else "eproc" 

1888 process_information[key] = self.process_information[proc_index] 

1889 else: 

1890 warning("PcapNg: EPB invalid process information index " 

1891 "(%d/%d) !" % (proc_index, len(self.process_information))) 

1892 

1893 comment = options.get(1, None) 

1894 epb_flags_raw = options.get(2, None) 

1895 if epb_flags_raw: 

1896 try: 

1897 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw) 

1898 except struct.error: 

1899 warning("PcapNg: EPB invalid flags size" 

1900 "(expected 4 bytes, got %d) !" % len(epb_flags_raw)) 

1901 raise EOFError 

1902 direction = epb_flags & 3 

1903 

1904 else: 

1905 direction = None 

1906 

1907 self._check_interface_id(intid) 

1908 ifname = self.interfaces[intid][2].get('name', None) 

1909 

1910 return (block[20:20 + caplen][:size], 

1911 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1912 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1913 tshigh=tshigh, 

1914 tslow=tslow, 

1915 wirelen=wirelen, 

1916 comment=comment, 

1917 ifname=ifname, 

1918 direction=direction, 

1919 process_information=process_information)) 

1920 

1921 def _read_block_spb(self, block, size): 

1922 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1923 """Simple Packet Block""" 

1924 # "it MUST be assumed that all the Simple Packet Blocks have 

1925 # been captured on the interface previously specified in the 

1926 # first Interface Description Block." 

1927 intid = 0 

1928 self._check_interface_id(intid) 

1929 

1930 try: 

1931 wirelen, = struct.unpack(self.endian + "I", block[:4]) 

1932 except struct.error: 

1933 warning("PcapNg: SPB is too small %d/4 !" % len(block)) 

1934 raise EOFError 

1935 

1936 caplen = min(wirelen, self.interfaces[intid][1]) 

1937 return (block[4:4 + caplen][:size], 

1938 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1939 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1940 tshigh=None, 

1941 tslow=None, 

1942 wirelen=wirelen, 

1943 comment=None, 

1944 ifname=None, 

1945 direction=None, 

1946 process_information={})) 

1947 

1948 def _read_block_pkt(self, block, size): 

1949 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1950 """(Obsolete) Packet Block""" 

1951 try: 

1952 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack( 

1953 self.endian + "HH4I", 

1954 block[:20], 

1955 ) 

1956 except struct.error: 

1957 warning("PcapNg: PKT is too small %d/20 !" % len(block)) 

1958 raise EOFError 

1959 

1960 self._check_interface_id(intid) 

1961 return (block[20:20 + caplen][:size], 

1962 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1963 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1964 tshigh=tshigh, 

1965 tslow=tslow, 

1966 wirelen=wirelen, 

1967 comment=None, 

1968 ifname=None, 

1969 direction=None, 

1970 process_information={})) 

1971 

1972 def _read_block_dsb(self, block, size): 

1973 # type: (bytes, int) -> None 

1974 """Decryption Secrets Block""" 

1975 

1976 # Parse the secrets type and length fields 

1977 try: 

1978 secrets_type, secrets_length = struct.unpack( 

1979 self.endian + "II", 

1980 block[:8], 

1981 ) 

1982 block = block[8:] 

1983 except struct.error: 

1984 warning("PcapNg: DSB is too small %d!", len(block)) 

1985 raise EOFError 

1986 

1987 # Compute the secrets length including the padding 

1988 padded_secrets_length = secrets_length + (-secrets_length) % 4 

1989 if len(block) < padded_secrets_length: 

1990 warning("PcapNg: invalid DSB secrets length!") 

1991 raise EOFError 

1992 

1993 # Extract secrets data and options 

1994 secrets_data = block[:padded_secrets_length][:secrets_length] 

1995 if block[padded_secrets_length:]: 

1996 warning("PcapNg: DSB options are not supported!") 

1997 

1998 # TLS Key Log 

1999 if secrets_type == 0x544c534b: 

2000 if getattr(conf, "tls_sessions", False) is False: 

2001 warning("PcapNg: TLS Key Log available, but " 

2002 "the TLS layer is not loaded! Scapy won't be able " 

2003 "to decrypt the packets.") 

2004 else: 

2005 from scapy.layers.tls.session import load_nss_keys 

2006 

2007 # Write Key Log to a file and parse it 

2008 filename = get_temp_file() 

2009 with open(filename, "wb") as fd: 

2010 fd.write(secrets_data) 

2011 fd.close() 

2012 

2013 keys = load_nss_keys(filename) 

2014 if not keys: 

2015 warning("PcapNg: invalid TLS Key Log in DSB!") 

2016 else: 

2017 # Note: these attributes are only available when the TLS 

2018 # layer is loaded. 

2019 conf.tls_nss_keys = keys 

2020 conf.tls_session_enable = True 

2021 else: 

2022 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type) 

2023 

2024 def _read_block_pib(self, block, _): 

2025 # type: (bytes, int) -> None 

2026 """Apple Process Information Block""" 

2027 

2028 # Get the Process ID 

2029 try: 

2030 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0] 

2031 process_information = {"id": dpeb_pid} 

2032 block = block[4:] 

2033 except struct.error: 

2034 warning("PcapNg: DPEB is too small (%d). Cannot get PID!", 

2035 len(block)) 

2036 raise EOFError 

2037 

2038 # Get Options 

2039 options = self._read_options(block) 

2040 for code, value in options.items(): 

2041 if code == 2: 

2042 process_information["name"] = value.decode("ascii", "backslashreplace") 

2043 elif code == 4: 

2044 if len(value) == 16: 

2045 process_information["uuid"] = str(UUID(bytes=value)) 

2046 else: 

2047 warning("PcapNg: DPEB UUID length is invalid (%d)!", 

2048 len(value)) 

2049 

2050 # Store process information 

2051 self.process_information.append(process_information) 

2052 

2053 

2054class PcapNgReader(RawPcapNgReader, PcapReader): 

2055 

2056 alternative = PcapReader 

2057 

2058 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

2059 # type: (str, IO[bytes], bytes) -> None 

2060 RawPcapNgReader.__init__(self, filename, fdesc, magic) 

2061 

2062 def __enter__(self): 

2063 # type: () -> PcapNgReader 

2064 return self 

2065 

2066 def read_packet(self, size=MTU, **kwargs): 

2067 # type: (int, **Any) -> Packet 

2068 rp = super(PcapNgReader, self)._read_packet(size=size) 

2069 if rp is None: 

2070 raise EOFError 

2071 s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction, process_information) = rp # noqa: E501 

2072 try: 

2073 cls = conf.l2types.num2layer[linktype] # type: Type[Packet] 

2074 p = cls(s, **kwargs) # type: Packet 

2075 except KeyboardInterrupt: 

2076 raise 

2077 except Exception: 

2078 if conf.debug_dissector: 

2079 raise 

2080 if conf.raw_layer is None: 

2081 # conf.raw_layer is set on import 

2082 import scapy.packet # noqa: F401 

2083 p = conf.raw_layer(s) 

2084 if tshigh is not None: 

2085 p.time = EDecimal((tshigh << 32) + tslow) / tsresol 

2086 p.wirelen = wirelen 

2087 p.comment = comment 

2088 p.direction = direction 

2089 p.process_information = process_information.copy() 

2090 if ifname is not None: 

2091 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace') 

2092 return p 

2093 

2094 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore 

2095 return self.read_packet(size=size, **kwargs) 

2096 

2097 

2098class GenericPcapWriter(object): 

2099 nano = False 

2100 linktype: int 

2101 

2102 def _write_header(self, pkt): 

2103 # type: (Optional[Union[Packet, bytes]]) -> None 

2104 raise NotImplementedError 

2105 

2106 def _write_packet(self, 

2107 packet, # type: Union[bytes, Packet] 

2108 linktype, # type: int 

2109 sec=None, # type: Optional[float] 

2110 usec=None, # type: Optional[int] 

2111 caplen=None, # type: Optional[int] 

2112 wirelen=None, # type: Optional[int] 

2113 comment=None, # type: Optional[bytes] 

2114 ifname=None, # type: Optional[bytes] 

2115 direction=None, # type: Optional[int] 

2116 ): 

2117 # type: (...) -> None 

2118 raise NotImplementedError 

2119 

2120 def _get_time(self, 

2121 packet, # type: Union[bytes, Packet] 

2122 sec, # type: Optional[float] 

2123 usec # type: Optional[int] 

2124 ): 

2125 # type: (...) -> Tuple[float, int] 

2126 if hasattr(packet, "time"): 

2127 if sec is None: 

2128 packet_time = packet.time 

2129 tmp = int(packet_time) 

2130 usec = int(round((packet_time - tmp) * 

2131 (1000000000 if self.nano else 1000000))) 

2132 sec = float(packet_time) 

2133 if sec is not None and usec is None: 

2134 usec = 0 

2135 return sec, usec # type: ignore 

2136 

2137 def write_header(self, pkt): 

2138 # type: (Optional[Union[Packet, bytes]]) -> None 

2139 if not hasattr(self, 'linktype'): 

2140 try: 

2141 if pkt is None or isinstance(pkt, bytes): 

2142 # Can't guess LL 

2143 raise KeyError 

2144 self.linktype = conf.l2types.layer2num[ 

2145 pkt.__class__ 

2146 ] 

2147 except KeyError: 

2148 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)" 

2149 warning(msg, self.__class__.__name__, pkt.__class__.__name__) 

2150 self.linktype = DLT_EN10MB 

2151 self._write_header(pkt) 

2152 

2153 def write_packet(self, 

2154 packet, # type: Union[bytes, Packet] 

2155 sec=None, # type: Optional[float] 

2156 usec=None, # type: Optional[int] 

2157 caplen=None, # type: Optional[int] 

2158 wirelen=None, # type: Optional[int] 

2159 ): 

2160 # type: (...) -> None 

2161 """ 

2162 Writes a single packet to the pcap file. 

2163 

2164 :param packet: Packet, or bytes for a single packet 

2165 :type packet: scapy.packet.Packet or bytes 

2166 :param sec: time the packet was captured, in seconds since epoch. If 

2167 not supplied, defaults to now. 

2168 :type sec: float 

2169 :param usec: If ``nano=True``, then number of nanoseconds after the 

2170 second that the packet was captured. If ``nano=False``, 

2171 then the number of microseconds after the second the 

2172 packet was captured. If ``sec`` is not specified, 

2173 this value is ignored. 

2174 :type usec: int or long 

2175 :param caplen: The length of the packet in the capture file. If not 

2176 specified, uses ``len(raw(packet))``. 

2177 :type caplen: int 

2178 :param wirelen: The length of the packet on the wire. If not 

2179 specified, tries ``packet.wirelen``, otherwise uses 

2180 ``caplen``. 

2181 :type wirelen: int 

2182 :return: None 

2183 :rtype: None 

2184 """ 

2185 f_sec, usec = self._get_time(packet, sec, usec) 

2186 

2187 rawpkt = bytes_encode(packet) 

2188 caplen = len(rawpkt) if caplen is None else caplen 

2189 

2190 if wirelen is None: 

2191 if hasattr(packet, "wirelen"): 

2192 wirelen = packet.wirelen 

2193 if wirelen is None: 

2194 wirelen = caplen 

2195 

2196 comment = getattr(packet, "comment", None) 

2197 ifname = getattr(packet, "sniffed_on", None) 

2198 direction = getattr(packet, "direction", None) 

2199 if not isinstance(packet, bytes): 

2200 linktype: int = conf.l2types.layer2num[ 

2201 packet.__class__ 

2202 ] 

2203 else: 

2204 linktype = self.linktype 

2205 if ifname is not None: 

2206 ifname = str(ifname).encode('utf-8') 

2207 self._write_packet( 

2208 rawpkt, 

2209 sec=f_sec, usec=usec, 

2210 caplen=caplen, wirelen=wirelen, 

2211 comment=comment, 

2212 ifname=ifname, 

2213 direction=direction, 

2214 linktype=linktype 

2215 ) 

2216 

2217 

2218class GenericRawPcapWriter(GenericPcapWriter): 

2219 header_present = False 

2220 nano = False 

2221 sync = False 

2222 f = None # type: Union[IO[bytes], gzip.GzipFile] 

2223 

2224 def fileno(self): 

2225 # type: () -> int 

2226 return -1 if WINDOWS else self.f.fileno() 

2227 

2228 def flush(self): 

2229 # type: () -> Optional[Any] 

2230 return self.f.flush() 

2231 

2232 def close(self): 

2233 # type: () -> Optional[Any] 

2234 if not self.header_present: 

2235 self.write_header(None) 

2236 return self.f.close() 

2237 

2238 def __enter__(self): 

2239 # type: () -> GenericRawPcapWriter 

2240 return self 

2241 

2242 def __exit__(self, exc_type, exc_value, tracback): 

2243 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

2244 self.flush() 

2245 self.close() 

2246 

2247 def write(self, pkt): 

2248 # type: (Union[_PacketIterable, bytes]) -> None 

2249 """ 

2250 Writes a Packet, a SndRcvList object, or bytes to a pcap file. 

2251 

2252 :param pkt: Packet(s) to write (one record for each Packet), or raw 

2253 bytes to write (as one record). 

2254 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes 

2255 """ 

2256 if isinstance(pkt, bytes): 

2257 if not self.header_present: 

2258 self.write_header(pkt) 

2259 self.write_packet(pkt) 

2260 else: 

2261 # Import here to avoid circular dependency 

2262 from scapy.supersocket import IterSocket 

2263 for p in IterSocket(pkt).iter: 

2264 if not self.header_present: 

2265 self.write_header(p) 

2266 

2267 if not isinstance(p, bytes) and \ 

2268 self.linktype != conf.l2types.get(type(p), None): 

2269 warning("Inconsistent linktypes detected!" 

2270 " The resulting file might contain" 

2271 " invalid packets." 

2272 ) 

2273 

2274 self.write_packet(p) 

2275 

2276 

2277class RawPcapWriter(GenericRawPcapWriter): 

2278 """A stream PCAP writer with more control than wrpcap()""" 

2279 

2280 def __init__(self, 

2281 filename, # type: Union[IO[bytes], str] 

2282 linktype=None, # type: Optional[int] 

2283 gz=False, # type: bool 

2284 endianness="", # type: str 

2285 append=False, # type: bool 

2286 sync=False, # type: bool 

2287 nano=False, # type: bool 

2288 snaplen=MTU, # type: int 

2289 bufsz=4096, # type: int 

2290 ): 

2291 # type: (...) -> None 

2292 """ 

2293 :param filename: the name of the file to write packets to, or an open, 

2294 writable file-like object. 

2295 :param linktype: force linktype to a given value. If None, linktype is 

2296 taken from the first writer packet 

2297 :param gz: compress the capture on the fly 

2298 :param endianness: force an endianness (little:"<", big:">"). 

2299 Default is native 

2300 :param append: append packets to the capture file instead of 

2301 truncating it 

2302 :param sync: do not bufferize writes to the capture file 

2303 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0) 

2304 

2305 """ 

2306 

2307 if linktype: 

2308 self.linktype = linktype 

2309 self.snaplen = snaplen 

2310 self.append = append 

2311 self.gz = gz 

2312 self.endian = endianness 

2313 self.sync = sync 

2314 self.nano = nano 

2315 if sync: 

2316 bufsz = 0 

2317 

2318 if isinstance(filename, str): 

2319 self.filename = filename 

2320 if gz: 

2321 self.f = cast(_ByteStream, gzip.open( 

2322 filename, append and "ab" or "wb", 9 

2323 )) 

2324 else: 

2325 self.f = open(filename, append and "ab" or "wb", bufsz) 

2326 else: 

2327 self.f = filename 

2328 self.filename = getattr(filename, "name", "No name") 

2329 

2330 def _write_header(self, pkt): 

2331 # type: (Optional[Union[Packet, bytes]]) -> None 

2332 self.header_present = True 

2333 

2334 if self.append: 

2335 # Even if prone to race conditions, this seems to be 

2336 # safest way to tell whether the header is already present 

2337 # because we have to handle compressed streams that 

2338 # are not as flexible as basic files 

2339 if self.gz: 

2340 g = gzip.open(self.filename, "rb") # type: _ByteStream 

2341 else: 

2342 g = open(self.filename, "rb") 

2343 try: 

2344 if g.read(16): 

2345 return 

2346 finally: 

2347 g.close() 

2348 

2349 if not hasattr(self, 'linktype'): 

2350 raise ValueError( 

2351 "linktype could not be guessed. " 

2352 "Please pass a linktype while creating the writer" 

2353 ) 

2354 

2355 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501 

2356 2, 4, 0, 0, self.snaplen, self.linktype)) 

2357 self.f.flush() 

2358 

2359 def _write_packet(self, 

2360 packet, # type: Union[bytes, Packet] 

2361 linktype, # type: int 

2362 sec=None, # type: Optional[float] 

2363 usec=None, # type: Optional[int] 

2364 caplen=None, # type: Optional[int] 

2365 wirelen=None, # type: Optional[int] 

2366 comment=None, # type: Optional[bytes] 

2367 ifname=None, # type: Optional[bytes] 

2368 direction=None, # type: Optional[int] 

2369 ): 

2370 # type: (...) -> None 

2371 """ 

2372 Writes a single packet to the pcap file. 

2373 

2374 :param packet: bytes for a single packet 

2375 :type packet: bytes 

2376 :param linktype: linktype value associated with the packet 

2377 :type linktype: int 

2378 :param sec: time the packet was captured, in seconds since epoch. If 

2379 not supplied, defaults to now. 

2380 :type sec: float 

2381 :param usec: not used with pcapng 

2382 packet was captured 

2383 :type usec: int or long 

2384 :param caplen: The length of the packet in the capture file. If not 

2385 specified, uses ``len(packet)``. 

2386 :type caplen: int 

2387 :param wirelen: The length of the packet on the wire. If not 

2388 specified, uses ``caplen``. 

2389 :type wirelen: int 

2390 :return: None 

2391 :rtype: None 

2392 """ 

2393 if caplen is None: 

2394 caplen = len(packet) 

2395 if wirelen is None: 

2396 wirelen = caplen 

2397 if sec is None or usec is None: 

2398 t = time.time() 

2399 it = int(t) 

2400 if sec is None: 

2401 sec = it 

2402 usec = int(round((t - it) * 

2403 (1000000000 if self.nano else 1000000))) 

2404 elif usec is None: 

2405 usec = 0 

2406 

2407 self.f.write(struct.pack(self.endian + "IIII", 

2408 int(sec), usec, caplen, wirelen)) 

2409 self.f.write(bytes(packet)) 

2410 if self.sync: 

2411 self.f.flush() 

2412 

2413 

2414class RawPcapNgWriter(GenericRawPcapWriter): 

2415 """A stream pcapng writer with more control than wrpcapng()""" 

2416 

2417 def __init__(self, 

2418 filename, # type: str 

2419 ): 

2420 # type: (...) -> None 

2421 

2422 self.header_present = False 

2423 self.tsresol = 1000000 

2424 # A dict to keep if_name to IDB id mapping. 

2425 # unknown if_name(None) id=0 

2426 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0} 

2427 

2428 # tcpdump only support little-endian in PCAPng files 

2429 self.endian = "<" 

2430 self.endian_magic = b"\x4d\x3c\x2b\x1a" 

2431 

2432 self.filename = filename 

2433 self.f = open(filename, "wb", 4096) 

2434 

2435 def _get_time(self, 

2436 packet, # type: Union[bytes, Packet] 

2437 sec, # type: Optional[float] 

2438 usec # type: Optional[int] 

2439 ): 

2440 # type: (...) -> Tuple[float, int] 

2441 if hasattr(packet, "time"): 

2442 if sec is None: 

2443 sec = float(packet.time) 

2444 

2445 if usec is None: 

2446 usec = 0 

2447 

2448 return sec, usec # type: ignore 

2449 

2450 def _add_padding(self, raw_data): 

2451 # type: (bytes) -> bytes 

2452 raw_data += ((-len(raw_data)) % 4) * b"\x00" 

2453 return raw_data 

2454 

2455 def build_block(self, block_type, block_body, options=None): 

2456 # type: (bytes, bytes, Optional[bytes]) -> bytes 

2457 

2458 # Pad Block Body to 32 bits 

2459 block_body = self._add_padding(block_body) 

2460 

2461 if options: 

2462 block_body += options 

2463 

2464 # An empty block is 12 bytes long 

2465 block_total_length = 12 + len(block_body) 

2466 

2467 # Block Type 

2468 block = block_type 

2469 # Block Total Length$ 

2470 block += struct.pack(self.endian + "I", block_total_length) 

2471 # Block Body 

2472 block += block_body 

2473 # Block Total Length$ 

2474 block += struct.pack(self.endian + "I", block_total_length) 

2475 

2476 return block 

2477 

2478 def _write_header(self, pkt): 

2479 # type: (Optional[Union[Packet, bytes]]) -> None 

2480 if not self.header_present: 

2481 self.header_present = True 

2482 self._write_block_shb() 

2483 self._write_block_idb(linktype=self.linktype) 

2484 

2485 def _write_block_shb(self): 

2486 # type: () -> None 

2487 

2488 # Block Type 

2489 block_type = b"\x0A\x0D\x0D\x0A" 

2490 # Byte-Order Magic 

2491 block_shb = self.endian_magic 

2492 # Major Version 

2493 block_shb += struct.pack(self.endian + "H", 1) 

2494 # Minor Version 

2495 block_shb += struct.pack(self.endian + "H", 0) 

2496 # Section Length 

2497 block_shb += struct.pack(self.endian + "q", -1) 

2498 

2499 self.f.write(self.build_block(block_type, block_shb)) 

2500 

2501 def _write_block_idb(self, 

2502 linktype, # type: int 

2503 ifname=None # type: Optional[bytes] 

2504 ): 

2505 # type: (...) -> None 

2506 

2507 # Block Type 

2508 block_type = struct.pack(self.endian + "I", 1) 

2509 # LinkType 

2510 block_idb = struct.pack(self.endian + "H", linktype) 

2511 # Reserved 

2512 block_idb += struct.pack(self.endian + "H", 0) 

2513 # SnapLen 

2514 block_idb += struct.pack(self.endian + "I", 262144) 

2515 

2516 # if_name option 

2517 opts = None 

2518 if ifname is not None: 

2519 opts = struct.pack(self.endian + "HH", 2, len(ifname)) 

2520 # Pad Option Value to 32 bits 

2521 opts += self._add_padding(ifname) 

2522 opts += struct.pack(self.endian + "HH", 0, 0) 

2523 

2524 self.f.write(self.build_block(block_type, block_idb, options=opts)) 

2525 

2526 def _write_block_spb(self, raw_pkt): 

2527 # type: (bytes) -> None 

2528 

2529 # Block Type 

2530 block_type = struct.pack(self.endian + "I", 3) 

2531 # Original Packet Length 

2532 block_spb = struct.pack(self.endian + "I", len(raw_pkt)) 

2533 # Packet Data 

2534 block_spb += raw_pkt 

2535 

2536 self.f.write(self.build_block(block_type, block_spb)) 

2537 

2538 def _write_block_epb(self, 

2539 raw_pkt, # type: bytes 

2540 ifid, # type: int 

2541 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501 

2542 caplen=None, # type: Optional[int] 

2543 orglen=None, # type: Optional[int] 

2544 comment=None, # type: Optional[bytes] 

2545 flags=None, # type: Optional[int] 

2546 ): 

2547 # type: (...) -> None 

2548 

2549 if timestamp: 

2550 tmp_ts = int(timestamp * self.tsresol) 

2551 ts_high = tmp_ts >> 32 

2552 ts_low = tmp_ts & 0xFFFFFFFF 

2553 else: 

2554 ts_high = ts_low = 0 

2555 

2556 if not caplen: 

2557 caplen = len(raw_pkt) 

2558 

2559 if not orglen: 

2560 orglen = len(raw_pkt) 

2561 

2562 # Block Type 

2563 block_type = struct.pack(self.endian + "I", 6) 

2564 # Interface ID 

2565 block_epb = struct.pack(self.endian + "I", ifid) 

2566 # Timestamp (High) 

2567 block_epb += struct.pack(self.endian + "I", ts_high) 

2568 # Timestamp (Low) 

2569 block_epb += struct.pack(self.endian + "I", ts_low) 

2570 # Captured Packet Length 

2571 block_epb += struct.pack(self.endian + "I", caplen) 

2572 # Original Packet Length 

2573 block_epb += struct.pack(self.endian + "I", orglen) 

2574 # Packet Data 

2575 block_epb += raw_pkt 

2576 

2577 # Options 

2578 opts = b'' 

2579 if comment is not None: 

2580 comment = bytes_encode(comment) 

2581 opts += struct.pack(self.endian + "HH", 1, len(comment)) 

2582 # Pad Option Value to 32 bits 

2583 opts += self._add_padding(comment) 

2584 if type(flags) == int: 

2585 opts += struct.pack(self.endian + "HH", 2, 4) 

2586 opts += struct.pack(self.endian + "I", flags) 

2587 if opts: 

2588 opts += struct.pack(self.endian + "HH", 0, 0) 

2589 

2590 self.f.write(self.build_block(block_type, block_epb, 

2591 options=opts)) 

2592 

2593 def _write_packet(self, # type: ignore 

2594 packet, # type: bytes 

2595 linktype, # type: int 

2596 sec=None, # type: Optional[float] 

2597 usec=None, # type: Optional[int] 

2598 caplen=None, # type: Optional[int] 

2599 wirelen=None, # type: Optional[int] 

2600 comment=None, # type: Optional[bytes] 

2601 ifname=None, # type: Optional[bytes] 

2602 direction=None, # type: Optional[int] 

2603 ): 

2604 # type: (...) -> None 

2605 """ 

2606 Writes a single packet to the pcap file. 

2607 

2608 :param packet: bytes for a single packet 

2609 :type packet: bytes 

2610 :param linktype: linktype value associated with the packet 

2611 :type linktype: int 

2612 :param sec: time the packet was captured, in seconds since epoch. If 

2613 not supplied, defaults to now. 

2614 :type sec: float 

2615 :param caplen: The length of the packet in the capture file. If not 

2616 specified, uses ``len(packet)``. 

2617 :type caplen: int 

2618 :param wirelen: The length of the packet on the wire. If not 

2619 specified, uses ``caplen``. 

2620 :type wirelen: int 

2621 :param comment: UTF-8 string containing human-readable comment text 

2622 that is associated to the current block. Line separators 

2623 SHOULD be a carriage-return + linefeed ('\r\n') or 

2624 just linefeed ('\n'); either form may appear and 

2625 be considered a line separator. The string is not 

2626 zero-terminated. 

2627 :type bytes 

2628 :param ifname: UTF-8 string containing the 

2629 name of the device used to capture data. 

2630 The string is not zero-terminated. 

2631 :type bytes 

2632 :param direction: 0 = information not available, 

2633 1 = inbound, 

2634 2 = outbound 

2635 :type int 

2636 :return: None 

2637 :rtype: None 

2638 """ 

2639 if caplen is None: 

2640 caplen = len(packet) 

2641 if wirelen is None: 

2642 wirelen = caplen 

2643 

2644 ifid = self.interfaces2id.get(ifname, None) 

2645 if ifid is None: 

2646 ifid = max(self.interfaces2id.values()) + 1 

2647 self.interfaces2id[ifname] = ifid 

2648 self._write_block_idb(linktype=linktype, ifname=ifname) 

2649 

2650 # EPB flags (32 bits). 

2651 # currently only direction is implemented (least 2 significant bits) 

2652 if type(direction) == int: 

2653 flags = direction & 0x3 

2654 else: 

2655 flags = None 

2656 

2657 self._write_block_epb(packet, timestamp=sec, caplen=caplen, 

2658 orglen=wirelen, comment=comment, ifid=ifid, flags=flags) 

2659 if self.sync: 

2660 self.f.flush() 

2661 

2662 

2663class PcapWriter(RawPcapWriter): 

2664 """A stream PCAP writer with more control than wrpcap()""" 

2665 pass 

2666 

2667 

2668class PcapNgWriter(RawPcapNgWriter): 

2669 """A stream pcapng writer with more control than wrpcapng()""" 

2670 

2671 def _get_time(self, 

2672 packet, # type: Union[bytes, Packet] 

2673 sec, # type: Optional[float] 

2674 usec # type: Optional[int] 

2675 ): 

2676 # type: (...) -> Tuple[float, int] 

2677 if hasattr(packet, "time"): 

2678 if sec is None: 

2679 sec = float(packet.time) 

2680 

2681 if usec is None: 

2682 usec = 0 

2683 

2684 return sec, usec # type: ignore 

2685 

2686 

2687@conf.commands.register 

2688def rderf(filename, count=-1): 

2689 # type: (Union[IO[bytes], str], int) -> PacketList 

2690 """Read a ERF file and return a packet list 

2691 

2692 :param count: read only <count> packets 

2693 """ 

2694 with ERFEthernetReader(filename) as fdesc: 

2695 return fdesc.read_all(count=count) 

2696 

2697 

2698class ERFEthernetReader_metaclass(PcapReader_metaclass): 

2699 def __call__(cls, filename): 

2700 # type: (Union[IO[bytes], str]) -> Any 

2701 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore 

2702 filename, fdesc = cls.open(filename) 

2703 try: 

2704 i.__init__(filename, fdesc) 

2705 return i 

2706 except (Scapy_Exception, EOFError): 

2707 pass 

2708 

2709 if "alternative" in cls.__dict__: 

2710 cls = cls.__dict__["alternative"] 

2711 i = cls.__new__( 

2712 cls, 

2713 cls.__name__, 

2714 cls.__bases__, 

2715 cls.__dict__ # type: ignore 

2716 ) 

2717 try: 

2718 i.__init__(filename, fdesc) 

2719 return i 

2720 except (Scapy_Exception, EOFError): 

2721 pass 

2722 

2723 raise Scapy_Exception("Not a supported capture file") 

2724 

2725 @staticmethod 

2726 def open(fname # type: ignore 

2727 ): 

2728 # type: (...) -> Tuple[str, _ByteStream] 

2729 """Open (if necessary) filename""" 

2730 if isinstance(fname, str): 

2731 filename = fname 

2732 try: 

2733 with gzip.open(filename, "rb") as tmp: 

2734 tmp.read(1) 

2735 fdesc = gzip.open(filename, "rb") # type: _ByteStream 

2736 except IOError: 

2737 fdesc = open(filename, "rb") 

2738 

2739 else: 

2740 fdesc = fname 

2741 filename = getattr(fdesc, "name", "No name") 

2742 return filename, fdesc 

2743 

2744 

2745class ERFEthernetReader(PcapReader, 

2746 metaclass=ERFEthernetReader_metaclass): 

2747 

2748 def __init__(self, filename, fdesc=None): # type: ignore 

2749 # type: (Union[IO[bytes], str], IO[bytes]) -> None 

2750 self.filename = filename # type: ignore 

2751 self.f = fdesc 

2752 self.power = Decimal(10) ** Decimal(-9) 

2753 

2754 # time is in 64-bits Endace's format which can be see here: 

2755 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2756 def _convert_erf_timestamp(self, t): 

2757 # type: (int) -> EDecimal 

2758 sec = t >> 32 

2759 frac_sec = t & 0xffffffff 

2760 frac_sec *= 10**9 

2761 frac_sec += (frac_sec & 0x80000000) << 1 

2762 frac_sec >>= 32 

2763 return EDecimal(sec + self.power * frac_sec) 

2764 

2765 # The details of ERF Packet format can be see here: 

2766 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2767 def read_packet(self, size=MTU, **kwargs): 

2768 # type: (int, **Any) -> Packet 

2769 

2770 # General ERF Header have exactly 16 bytes 

2771 hdr = self.f.read(16) 

2772 if len(hdr) < 16: 

2773 raise EOFError 

2774 

2775 # The timestamp is in little-endian byte-order. 

2776 time = struct.unpack('<Q', hdr[:8])[0] 

2777 # The rest is in big-endian byte-order. 

2778 # Ignoring flags and lctr (loss counter) since they are ERF specific 

2779 # header fields which Packet object does not support. 

2780 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:]) 

2781 # Check if the type != 0x02, type Ethernet 

2782 if type & 0x02 == 0: 

2783 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)") 

2784 

2785 # If there are extended headers, ignore it because Packet object does 

2786 # not support it. Extended headers size is 8 bytes before the payload. 

2787 if type & 0x80: 

2788 _ = self.f.read(8) 

2789 s = self.f.read(rlen - 24) 

2790 else: 

2791 s = self.f.read(rlen - 16) 

2792 

2793 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both 

2794 # of the fields are disregarded by Endace. 

2795 pb = s[2:size] 

2796 from scapy.layers.l2 import Ether 

2797 try: 

2798 p = Ether(pb, **kwargs) # type: Packet 

2799 except KeyboardInterrupt: 

2800 raise 

2801 except Exception: 

2802 if conf.debug_dissector: 

2803 from scapy.sendrecv import debug 

2804 debug.crashed_on = (Ether, s) 

2805 raise 

2806 if conf.raw_layer is None: 

2807 # conf.raw_layer is set on import 

2808 import scapy.packet # noqa: F401 

2809 p = conf.raw_layer(s) 

2810 

2811 p.time = self._convert_erf_timestamp(time) 

2812 p.wirelen = wlen 

2813 

2814 return p 

2815 

2816 

2817@conf.commands.register 

2818def wrerf(filename, # type: Union[IO[bytes], str] 

2819 pkt, # type: _PacketIterable 

2820 *args, # type: Any 

2821 **kargs # type: Any 

2822 ): 

2823 # type: (...) -> None 

2824 """Write a list of packets to a ERF file 

2825 

2826 :param filename: the name of the file to write packets to, or an open, 

2827 writable file-like object. The file descriptor will be 

2828 closed at the end of the call, so do not use an object you 

2829 do not want to close (e.g., running wrerf(sys.stdout, []) 

2830 in interactive mode will crash Scapy). 

2831 :param gz: set to 1 to save a gzipped capture 

2832 :param append: append packets to the capture file instead of 

2833 truncating it 

2834 :param sync: do not bufferize writes to the capture file 

2835 """ 

2836 with ERFEthernetWriter(filename, *args, **kargs) as fdesc: 

2837 fdesc.write(pkt) 

2838 

2839 

2840class ERFEthernetWriter(PcapWriter): 

2841 """A stream ERF Ethernet writer with more control than wrerf()""" 

2842 

2843 def __init__(self, 

2844 filename, # type: Union[IO[bytes], str] 

2845 gz=False, # type: bool 

2846 append=False, # type: bool 

2847 sync=False, # type: bool 

2848 ): 

2849 # type: (...) -> None 

2850 """ 

2851 :param filename: the name of the file to write packets to, or an open, 

2852 writable file-like object. 

2853 :param gz: compress the capture on the fly 

2854 :param append: append packets to the capture file instead of 

2855 truncating it 

2856 :param sync: do not bufferize writes to the capture file 

2857 """ 

2858 super(ERFEthernetWriter, self).__init__(filename, 

2859 gz=gz, 

2860 append=append, 

2861 sync=sync) 

2862 

2863 def write(self, pkt): # type: ignore 

2864 # type: (_PacketIterable) -> None 

2865 """ 

2866 Writes a Packet, a SndRcvList object, or bytes to a ERF file. 

2867 

2868 :param pkt: Packet(s) to write (one record for each Packet) 

2869 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet 

2870 """ 

2871 # Import here to avoid circular dependency 

2872 from scapy.supersocket import IterSocket 

2873 for p in IterSocket(pkt).iter: 

2874 self.write_packet(p) 

2875 

2876 def write_packet(self, pkt): # type: ignore 

2877 # type: (Packet) -> None 

2878 

2879 if hasattr(pkt, "time"): 

2880 sec = int(pkt.time) 

2881 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9) 

2882 t = (sec << 32) + usec 

2883 else: 

2884 t = int(time.time()) << 32 

2885 

2886 # There are 16 bytes of headers + 2 bytes of padding before the packets 

2887 # payload. 

2888 rlen = len(pkt) + 18 

2889 

2890 if hasattr(pkt, "wirelen"): 

2891 wirelen = pkt.wirelen 

2892 if wirelen is None: 

2893 wirelen = rlen 

2894 

2895 self.f.write(struct.pack("<Q", t)) 

2896 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0)) 

2897 self.f.write(bytes(pkt)) 

2898 self.f.flush() 

2899 

2900 def close(self): 

2901 # type: () -> Optional[Any] 

2902 return self.f.close() 

2903 

2904 

2905@conf.commands.register 

2906def import_hexcap(input_string=None): 

2907 # type: (Optional[str]) -> bytes 

2908 """Imports a tcpdump like hexadecimal view 

2909 

2910 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex" 

2911 

2912 :param input_string: String containing the hexdump input to parse. If None, 

2913 read from standard input. 

2914 """ 

2915 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501 

2916 p = "" 

2917 try: 

2918 if input_string: 

2919 input_function = StringIO(input_string).readline 

2920 else: 

2921 input_function = input 

2922 while True: 

2923 line = input_function().strip() 

2924 if not line: 

2925 break 

2926 try: 

2927 p += re_extract_hexcap.match(line).groups()[2] # type: ignore 

2928 except Exception: 

2929 warning("Parsing error during hexcap") 

2930 continue 

2931 except EOFError: 

2932 pass 

2933 

2934 p = p.replace(" ", "") 

2935 return hex_bytes(p) 

2936 

2937 

2938@conf.commands.register 

2939def wireshark(pktlist, wait=False, **kwargs): 

2940 # type: (List[Packet], bool, **Any) -> Optional[Any] 

2941 """ 

2942 Runs Wireshark on a list of packets. 

2943 

2944 See :func:`tcpdump` for more parameter description. 

2945 

2946 Note: this defaults to wait=False, to run Wireshark in the background. 

2947 """ 

2948 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs) 

2949 

2950 

2951@conf.commands.register 

2952def tdecode( 

2953 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable] 

2954 args=None, # type: Optional[List[str]] 

2955 **kwargs # type: Any 

2956): 

2957 # type: (...) -> Any 

2958 """ 

2959 Run tshark on a list of packets. 

2960 

2961 :param args: If not specified, defaults to ``tshark -V``. 

2962 

2963 See :func:`tcpdump` for more parameters. 

2964 """ 

2965 if args is None: 

2966 args = ["-V"] 

2967 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs) 

2968 

2969 

2970def _guess_linktype_name(value): 

2971 # type: (int) -> str 

2972 """Guess the DLT name from its value.""" 

2973 from scapy.libs.winpcapy import pcap_datalink_val_to_name 

2974 return cast(bytes, pcap_datalink_val_to_name(value)).decode() 

2975 

2976 

2977def _guess_linktype_value(name): 

2978 # type: (str) -> int 

2979 """Guess the value of a DLT name.""" 

2980 from scapy.libs.winpcapy import pcap_datalink_name_to_val 

2981 val = cast(int, pcap_datalink_name_to_val(name.encode())) 

2982 if val == -1: 

2983 warning("Unknown linktype: %s. Using EN10MB", name) 

2984 return DLT_EN10MB 

2985 return val 

2986 

2987 

2988@conf.commands.register 

2989def tcpdump( 

2990 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable] 

2991 dump=False, # type: bool 

2992 getfd=False, # type: bool 

2993 args=None, # type: Optional[List[str]] 

2994 flt=None, # type: Optional[str] 

2995 prog=None, # type: Optional[Any] 

2996 getproc=False, # type: bool 

2997 quiet=False, # type: bool 

2998 use_tempfile=None, # type: Optional[Any] 

2999 read_stdin_opts=None, # type: Optional[Any] 

3000 linktype=None, # type: Optional[Any] 

3001 wait=True, # type: bool 

3002 _suppress=False # type: bool 

3003): 

3004 # type: (...) -> Any 

3005 """Run tcpdump or tshark on a list of packets. 

3006 

3007 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a 

3008 temporary file to store the packets. This works around a bug in Apple's 

3009 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/ 

3010 

3011 Otherwise, the packets are passed in stdin. 

3012 

3013 This function can be explicitly enabled or disabled with the 

3014 ``use_tempfile`` parameter. 

3015 

3016 When using ``wireshark``, it will be called with ``-ki -`` to start 

3017 immediately capturing packets from stdin. 

3018 

3019 Otherwise, the command will be run with ``-r -`` (which is correct for 

3020 ``tcpdump`` and ``tshark``). 

3021 

3022 This can be overridden with ``read_stdin_opts``. This has no effect when 

3023 ``use_tempfile=True``, or otherwise reading packets from a regular file. 

3024 

3025 :param pktlist: a Packet instance, a PacketList instance or a list of 

3026 Packet instances. Can also be a filename (as a string), an open 

3027 file-like object that must be a file format readable by 

3028 tshark (Pcap, PcapNg, etc.) or None (to sniff) 

3029 :param flt: a filter to use with tcpdump 

3030 :param dump: when set to True, returns a string instead of displaying it. 

3031 :param getfd: when set to True, returns a file-like object to read data 

3032 from tcpdump or tshark from. 

3033 :param getproc: when set to True, the subprocess.Popen object is returned 

3034 :param args: arguments (as a list) to pass to tshark (example for tshark: 

3035 args=["-T", "json"]). 

3036 :param prog: program to use (defaults to tcpdump, will work with tshark) 

3037 :param quiet: when set to True, the process stderr is discarded 

3038 :param use_tempfile: When set to True, always use a temporary file to store 

3039 packets. 

3040 When set to False, pipe packets through stdin. 

3041 When set to None (default), only use a temporary file with 

3042 ``tcpdump`` on OSX. 

3043 :param read_stdin_opts: When set, a list of arguments needed to capture 

3044 from stdin. Otherwise, attempts to guess. 

3045 :param linktype: A custom DLT value or name, to overwrite the default 

3046 values. 

3047 :param wait: If True (default), waits for the process to terminate before 

3048 returning to Scapy. If False, the process will be detached to the 

3049 background. If dump, getproc or getfd is True, these have the same 

3050 effect as ``wait=False``. 

3051 

3052 Examples:: 

3053 

3054 >>> tcpdump([IP()/TCP(), IP()/UDP()]) 

3055 reading from file -, link-type RAW (Raw IP) 

3056 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501 

3057 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain] 

3058 

3059 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark) 

3060 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501 

3061 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0 

3062 

3063 To get a JSON representation of a tshark-parsed PacketList(), one can:: 

3064 

3065 >>> import json, pprint 

3066 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5", 

3067 ... dst="45.33.32.156"), 

3068 ... prog=conf.prog.tshark, 

3069 ... args=["-T", "json"], 

3070 ... getfd=True)) 

3071 >>> pprint.pprint(json_data) 

3072 [{u'_index': u'packets-2016-12-23', 

3073 u'_score': None, 

3074 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20', 

3075 u'frame.encap_type': u'7', 

3076 [...] 

3077 }, 

3078 u'ip': {u'ip.addr': u'45.33.32.156', 

3079 u'ip.checksum': u'0x0000a20d', 

3080 [...] 

3081 u'ip.ttl': u'64', 

3082 u'ip.version': u'4'}, 

3083 u'raw': u'Raw packet data'}}, 

3084 u'_type': u'pcap_file'}] 

3085 >>> json_data[0]['_source']['layers']['ip']['ip.ttl'] 

3086 u'64' 

3087 """ 

3088 getfd = getfd or getproc 

3089 if prog is None: 

3090 if not conf.prog.tcpdump: 

3091 raise Scapy_Exception( 

3092 "tcpdump is not available" 

3093 ) 

3094 prog = [conf.prog.tcpdump] 

3095 elif isinstance(prog, str): 

3096 prog = [prog] 

3097 else: 

3098 raise ValueError("prog must be a string") 

3099 

3100 if linktype is not None: 

3101 if isinstance(linktype, int): 

3102 # Guess name from value 

3103 try: 

3104 linktype_name = _guess_linktype_name(linktype) 

3105 except StopIteration: 

3106 linktype = -1 

3107 else: 

3108 # Guess value from name 

3109 if linktype.startswith("DLT_"): 

3110 linktype = linktype[4:] 

3111 linktype_name = linktype 

3112 try: 

3113 linktype = _guess_linktype_value(linktype) 

3114 except KeyError: 

3115 linktype = -1 

3116 if linktype == -1: 

3117 raise ValueError( 

3118 "Unknown linktype. Try passing its datalink name instead" 

3119 ) 

3120 prog += ["-y", linktype_name] 

3121 

3122 # Build Popen arguments 

3123 if args is None: 

3124 args = [] 

3125 else: 

3126 # Make a copy of args 

3127 args = list(args) 

3128 

3129 if flt is not None: 

3130 # Check the validity of the filter 

3131 if linktype is None and isinstance(pktlist, str): 

3132 # linktype is unknown but required. Read it from file 

3133 with PcapReader(pktlist) as rd: 

3134 if isinstance(rd, PcapNgReader): 

3135 # Get the linktype from the first packet 

3136 try: 

3137 _, metadata = rd._read_packet() 

3138 linktype = metadata.linktype 

3139 if OPENBSD and linktype == 228: 

3140 linktype = DLT_RAW 

3141 except EOFError: 

3142 raise ValueError( 

3143 "Cannot get linktype from a PcapNg packet." 

3144 ) 

3145 else: 

3146 linktype = rd.linktype 

3147 from scapy.arch.common import compile_filter 

3148 compile_filter(flt, linktype=linktype) 

3149 args.append(flt) 

3150 

3151 stdout = subprocess.PIPE if dump or getfd else None 

3152 stderr = open(os.devnull) if quiet else None 

3153 proc = None 

3154 

3155 if use_tempfile is None: 

3156 # Apple's tcpdump cannot read from stdin, see: 

3157 # http://apple.stackexchange.com/questions/152682/ 

3158 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump 

3159 

3160 if read_stdin_opts is None: 

3161 if prog[0] == conf.prog.wireshark: 

3162 # Start capturing immediately (-k) from stdin (-i -) 

3163 read_stdin_opts = ["-ki", "-"] 

3164 elif prog[0] == conf.prog.tcpdump and not OPENBSD: 

3165 # Capture in packet-buffered mode (-U) from stdin (-r -) 

3166 read_stdin_opts = ["-U", "-r", "-"] 

3167 else: 

3168 read_stdin_opts = ["-r", "-"] 

3169 else: 

3170 # Make a copy of read_stdin_opts 

3171 read_stdin_opts = list(read_stdin_opts) 

3172 

3173 if pktlist is None: 

3174 # sniff 

3175 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3176 proc = subprocess.Popen( 

3177 prog + args, 

3178 stdout=stdout, 

3179 stderr=stderr, 

3180 ) 

3181 elif isinstance(pktlist, str): 

3182 # file 

3183 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3184 proc = subprocess.Popen( 

3185 prog + ["-r", pktlist] + args, 

3186 stdout=stdout, 

3187 stderr=stderr, 

3188 ) 

3189 elif use_tempfile: 

3190 tmpfile = get_temp_file( # type: ignore 

3191 autoext=".pcap", 

3192 fd=True 

3193 ) # type: IO[bytes] 

3194 try: 

3195 tmpfile.writelines( 

3196 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3197 ) 

3198 except AttributeError: 

3199 pktlist = cast("_PacketIterable", pktlist) 

3200 wrpcap(tmpfile, pktlist, linktype=linktype) 

3201 else: 

3202 tmpfile.close() 

3203 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3204 proc = subprocess.Popen( 

3205 prog + ["-r", tmpfile.name] + args, 

3206 stdout=stdout, 

3207 stderr=stderr, 

3208 ) 

3209 else: 

3210 try: 

3211 pktlist.fileno() # type: ignore 

3212 # pass the packet stream 

3213 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3214 proc = subprocess.Popen( 

3215 prog + read_stdin_opts + args, 

3216 stdin=pktlist, # type: ignore 

3217 stdout=stdout, 

3218 stderr=stderr, 

3219 ) 

3220 except (AttributeError, ValueError): 

3221 # write the packet stream to stdin 

3222 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3223 proc = subprocess.Popen( 

3224 prog + read_stdin_opts + args, 

3225 stdin=subprocess.PIPE, 

3226 stdout=stdout, 

3227 stderr=stderr, 

3228 ) 

3229 if proc is None: 

3230 # An error has occurred 

3231 return 

3232 try: 

3233 proc.stdin.writelines( # type: ignore 

3234 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3235 ) 

3236 except AttributeError: 

3237 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore 

3238 except UnboundLocalError: 

3239 # The error was handled by ContextManagerSubprocess 

3240 pass 

3241 else: 

3242 proc.stdin.close() # type: ignore 

3243 if proc is None: 

3244 # An error has occurred 

3245 return 

3246 if dump: 

3247 data = b"".join( 

3248 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore 

3249 ) 

3250 proc.terminate() 

3251 return data 

3252 if getproc: 

3253 return proc 

3254 if getfd: 

3255 return proc.stdout 

3256 if wait: 

3257 proc.wait() 

3258 

3259 

3260@conf.commands.register 

3261def hexedit(pktlist): 

3262 # type: (_PacketIterable) -> PacketList 

3263 """Run hexedit on a list of packets, then return the edited packets.""" 

3264 f = get_temp_file() 

3265 wrpcap(f, pktlist) 

3266 with ContextManagerSubprocess(conf.prog.hexedit): 

3267 subprocess.call([conf.prog.hexedit, f]) 

3268 rpktlist = rdpcap(f) 

3269 os.unlink(f) 

3270 return rpktlist 

3271 

3272 

3273def get_terminal_width(): 

3274 # type: () -> Optional[int] 

3275 """Get terminal width (number of characters) if in a window. 

3276 

3277 Notice: this will try several methods in order to 

3278 support as many terminals and OS as possible. 

3279 """ 

3280 sizex = shutil.get_terminal_size(fallback=(0, 0))[0] 

3281 if sizex != 0: 

3282 return sizex 

3283 # Backups 

3284 if WINDOWS: 

3285 from ctypes import windll, create_string_buffer 

3286 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ 

3287 h = windll.kernel32.GetStdHandle(-12) 

3288 csbi = create_string_buffer(22) 

3289 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 

3290 if res: 

3291 (bufx, bufy, curx, cury, wattr, 

3292 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501 

3293 sizex = right - left + 1 

3294 # sizey = bottom - top + 1 

3295 return sizex 

3296 return sizex 

3297 # We have various methods 

3298 # COLUMNS is set on some terminals 

3299 try: 

3300 sizex = int(os.environ['COLUMNS']) 

3301 except Exception: 

3302 pass 

3303 if sizex: 

3304 return sizex 

3305 # We can query TIOCGWINSZ 

3306 try: 

3307 import fcntl 

3308 import termios 

3309 s = struct.pack('HHHH', 0, 0, 0, 0) 

3310 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) 

3311 sizex = struct.unpack('HHHH', x)[1] 

3312 except (IOError, ModuleNotFoundError): 

3313 # If everything failed, return default terminal size 

3314 sizex = 79 

3315 return sizex 

3316 

3317 

3318def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]] 

3319 header, # type: List[Tuple[str, ...]] 

3320 sortBy=0, # type: Optional[int] 

3321 borders=False, # type: bool 

3322 ): 

3323 # type: (...) -> str 

3324 """ 

3325 Pretty list to fit the terminal, and add header. 

3326 

3327 :param rtlst: a list of tuples. each tuple contains a value which can 

3328 be either a string or a list of string. 

3329 :param sortBy: the column id (starting with 0) which will be used for 

3330 ordering 

3331 :param borders: whether to put borders on the table or not 

3332 """ 

3333 if borders: 

3334 _space = "|" 

3335 else: 

3336 _space = " " 

3337 cols = len(header[0]) 

3338 # Windows has a fat terminal border 

3339 _spacelen = len(_space) * (cols - 1) + int(WINDOWS) 

3340 _croped = False 

3341 if sortBy is not None: 

3342 # Sort correctly 

3343 rtlst.sort(key=lambda x: x[sortBy]) 

3344 # Resolve multi-values 

3345 for i, line in enumerate(rtlst): 

3346 ids = [] # type: List[int] 

3347 values = [] # type: List[Union[str, List[str]]] 

3348 for j, val in enumerate(line): 

3349 if isinstance(val, list): 

3350 ids.append(j) 

3351 values.append(val or " ") 

3352 if values: 

3353 del rtlst[i] 

3354 k = 0 

3355 for ex_vals in zip_longest(*values, fillvalue=" "): 

3356 if k: 

3357 extra_line = [" "] * cols 

3358 else: 

3359 extra_line = list(line) # type: ignore 

3360 for j, h in enumerate(ids): 

3361 extra_line[h] = ex_vals[j] 

3362 rtlst.insert(i + k, tuple(extra_line)) 

3363 k += 1 

3364 rtslst = cast(List[Tuple[str, ...]], rtlst) 

3365 # Append tag 

3366 rtslst = header + rtslst 

3367 # Detect column's width 

3368 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)] 

3369 # Make text fit in box (if required) 

3370 width = get_terminal_width() 

3371 if conf.auto_crop_tables and width: 

3372 width = width - _spacelen 

3373 while sum(colwidth) > width: 

3374 _croped = True 

3375 # Needs to be cropped 

3376 # Get the longest row 

3377 i = colwidth.index(max(colwidth)) 

3378 # Get all elements of this row 

3379 row = [len(x[i]) for x in rtslst] 

3380 # Get biggest element of this row: biggest of the array 

3381 j = row.index(max(row)) 

3382 # Re-build column tuple with the edited element 

3383 t = list(rtslst[j]) 

3384 t[i] = t[i][:-2] + "_" 

3385 rtslst[j] = tuple(t) 

3386 # Update max size 

3387 row[j] = len(t[i]) 

3388 colwidth[i] = max(row) 

3389 if _croped: 

3390 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501 

3391 # Generate padding scheme 

3392 fmt = _space.join(["%%-%ds" % x for x in colwidth]) 

3393 # Append separation line if needed 

3394 if borders: 

3395 rtslst.insert(1, tuple("-" * x for x in colwidth)) 

3396 # Compile 

3397 return "\n".join(fmt % x for x in rtslst) 

3398 

3399 

3400def human_size(x, fmt=".1f"): 

3401 # type: (int, str) -> str 

3402 """ 

3403 Convert a size in octets to a human string representation 

3404 """ 

3405 units = ['K', 'M', 'G', 'T', 'P', 'E'] 

3406 if not x: 

3407 return "0B" 

3408 i = int(math.log(x, 2**10)) 

3409 if i and i < len(units): 

3410 return format(x / 2**(10 * i), fmt) + units[i - 1] 

3411 return str(x) + "B" 

3412 

3413 

3414def __make_table( 

3415 yfmtfunc, # type: Callable[[int], str] 

3416 fmtfunc, # type: Callable[[int], str] 

3417 endline, # type: str 

3418 data, # type: List[Tuple[Packet, Packet]] 

3419 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]] 

3420 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3421 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3422 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]] 

3423 dump=False # type: bool 

3424): 

3425 # type: (...) -> Optional[str] 

3426 """Core function of the make_table suite, which generates the table""" 

3427 vx = {} # type: Dict[str, int] 

3428 vy = {} # type: Dict[str, Optional[int]] 

3429 vz = {} # type: Dict[Tuple[str, str], str] 

3430 vxf = {} # type: Dict[str, str] 

3431 

3432 tmp_len = 0 

3433 for e in data: 

3434 xx, yy, zz = [str(s) for s in fxyz(*e)] 

3435 tmp_len = max(len(yy), tmp_len) 

3436 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz)) 

3437 vy[yy] = None 

3438 vz[(xx, yy)] = zz 

3439 

3440 vxk = list(vx) 

3441 vyk = list(vy) 

3442 if sortx: 

3443 vxk.sort(key=sortx) 

3444 else: 

3445 try: 

3446 vxk.sort(key=int) 

3447 except Exception: 

3448 try: 

3449 vxk.sort(key=atol) 

3450 except Exception: 

3451 vxk.sort() 

3452 if sorty: 

3453 vyk.sort(key=sorty) 

3454 else: 

3455 try: 

3456 vyk.sort(key=int) 

3457 except Exception: 

3458 try: 

3459 vyk.sort(key=atol) 

3460 except Exception: 

3461 vyk.sort() 

3462 

3463 s = "" 

3464 if seplinefunc: 

3465 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk]) 

3466 s += sepline + "\n" 

3467 

3468 fmt = yfmtfunc(tmp_len) 

3469 s += fmt % "" 

3470 s += ' ' 

3471 for x in vxk: 

3472 vxf[x] = fmtfunc(vx[x]) 

3473 s += vxf[x] % x 

3474 s += ' ' 

3475 s += endline + "\n" 

3476 if seplinefunc: 

3477 s += sepline + "\n" 

3478 for y in vyk: 

3479 s += fmt % y 

3480 s += ' ' 

3481 for x in vxk: 

3482 s += vxf[x] % vz.get((x, y), "-") 

3483 s += ' ' 

3484 s += endline + "\n" 

3485 if seplinefunc: 

3486 s += sepline + "\n" 

3487 

3488 if dump: 

3489 return s 

3490 else: 

3491 print(s, end="") 

3492 return None 

3493 

3494 

3495def make_table(*args, **kargs): 

3496 # type: (*Any, **Any) -> Optional[Any] 

3497 return __make_table( 

3498 lambda l: "%%-%is" % l, 

3499 lambda l: "%%-%is" % l, 

3500 "", 

3501 *args, 

3502 **kargs 

3503 ) 

3504 

3505 

3506def make_lined_table(*args, **kargs): 

3507 # type: (*Any, **Any) -> Optional[str] 

3508 return __make_table( # type: ignore 

3509 lambda l: "%%-%is |" % l, 

3510 lambda l: "%%-%is |" % l, 

3511 "", 

3512 *args, 

3513 seplinefunc=lambda a, x: "+".join( 

3514 '-' * (y + 2) for y in [a - 1] + x + [-2] 

3515 ), 

3516 **kargs 

3517 ) 

3518 

3519 

3520def make_tex_table(*args, **kargs): 

3521 # type: (*Any, **Any) -> Optional[str] 

3522 return __make_table( # type: ignore 

3523 lambda l: "%s", 

3524 lambda l: "& %s", 

3525 "\\\\", 

3526 *args, 

3527 seplinefunc=lambda a, x: "\\hline", 

3528 **kargs 

3529 ) 

3530 

3531#################### 

3532# WHOIS CLIENT # 

3533#################### 

3534 

3535 

3536def whois(ip_address): 

3537 # type: (str) -> bytes 

3538 """Whois client for Python""" 

3539 whois_ip = str(ip_address) 

3540 try: 

3541 query = socket.gethostbyname(whois_ip) 

3542 except Exception: 

3543 query = whois_ip 

3544 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

3545 s.connect(("whois.ripe.net", 43)) 

3546 s.send(query.encode("utf8") + b"\r\n") 

3547 answer = b"" 

3548 while True: 

3549 d = s.recv(4096) 

3550 answer += d 

3551 if not d: 

3552 break 

3553 s.close() 

3554 ignore_tag = b"remarks:" 

3555 # ignore all lines starting with the ignore_tag 

3556 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501 

3557 # remove empty lines at the bottom 

3558 for i in range(1, len(lines)): 

3559 if not lines[-i].strip(): 

3560 del lines[-i] 

3561 else: 

3562 break 

3563 return b"\n".join(lines[3:]) 

3564 

3565#################### 

3566# CLI utils # 

3567#################### 

3568 

3569 

3570class _CLIUtilMetaclass(type): 

3571 class TYPE(enum.Enum): 

3572 COMMAND = 0 

3573 OUTPUT = 1 

3574 COMPLETE = 2 

3575 

3576 def __new__(cls, # type: Type[_CLIUtilMetaclass] 

3577 name, # type: str 

3578 bases, # type: Tuple[type, ...] 

3579 dct # type: Dict[str, Any] 

3580 ): 

3581 # type: (...) -> Type[CLIUtil] 

3582 dct["commands"] = { 

3583 x.__name__: x 

3584 for x in dct.values() 

3585 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMMAND 

3586 } 

3587 dct["commands_output"] = { 

3588 x.cliutil_ref.__name__: x 

3589 for x in dct.values() 

3590 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.OUTPUT 

3591 } 

3592 dct["commands_complete"] = { 

3593 x.cliutil_ref.__name__: x 

3594 for x in dct.values() 

3595 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMPLETE 

3596 } 

3597 newcls = cast(Type['CLIUtil'], type.__new__(cls, name, bases, dct)) 

3598 return newcls 

3599 

3600 

3601class CLIUtil(metaclass=_CLIUtilMetaclass): 

3602 """ 

3603 Provides a Util class to easily create simple CLI tools in Scapy, 

3604 that can still be used as an API. 

3605 

3606 Doc: 

3607 - override the ps1() function 

3608 - register commands with the @CLIUtil.addcomment decorator 

3609 - call the loop() function when ready 

3610 """ 

3611 

3612 def _depcheck(self) -> None: 

3613 """ 

3614 Check that all dependencies are installed 

3615 """ 

3616 try: 

3617 import prompt_toolkit # noqa: F401 

3618 except ImportError: 

3619 # okay we lie but prompt_toolkit is a dependency... 

3620 raise ImportError("You need to have IPython installed to use the CLI") 

3621 

3622 # Okay let's do nice code 

3623 commands: Dict[str, Callable[..., Any]] = {} 

3624 # print output of command 

3625 commands_output: Dict[str, Callable[..., str]] = {} 

3626 # provides completion to command 

3627 commands_complete: Dict[str, Callable[..., List[str]]] = {} 

3628 

3629 def __init__(self, cli: bool = True, debug: bool = False) -> None: 

3630 """ 

3631 DEV: overwrite 

3632 """ 

3633 if cli: 

3634 self._depcheck() 

3635 self.loop(debug=debug) 

3636 

3637 @staticmethod 

3638 def _inspectkwargs(func: DecoratorCallable) -> None: 

3639 """ 

3640 Internal function to parse arguments from the kwargs of the functions 

3641 """ 

3642 func._flagnames = [ # type: ignore 

3643 x.name for x in 

3644 inspect.signature(func).parameters.values() 

3645 if x.kind == inspect.Parameter.KEYWORD_ONLY 

3646 ] 

3647 func._flags = [ # type: ignore 

3648 ("-%s" % x) if len(x) == 1 else ("--%s" % x) 

3649 for x in func._flagnames # type: ignore 

3650 ] 

3651 

3652 @staticmethod 

3653 def _parsekwargs( 

3654 func: DecoratorCallable, 

3655 args: List[str] 

3656 ) -> Tuple[List[str], Dict[str, Literal[True]]]: 

3657 """ 

3658 Internal function to parse CLI arguments of a function. 

3659 """ 

3660 kwargs: Dict[str, Literal[True]] = {} 

3661 if func._flags: # type: ignore 

3662 i = 0 

3663 for arg in args: 

3664 if arg in func._flags: # type: ignore 

3665 i += 1 

3666 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501 

3667 continue 

3668 break 

3669 args = args[i:] 

3670 return args, kwargs 

3671 

3672 @classmethod 

3673 def _parseallargs( 

3674 cls, 

3675 func: DecoratorCallable, 

3676 cmd: str, args: List[str] 

3677 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]: 

3678 """ 

3679 Internal function to parse CLI arguments of both the function 

3680 and its output function. 

3681 """ 

3682 args, kwargs = cls._parsekwargs(func, args) 

3683 outkwargs: Dict[str, Literal[True]] = {} 

3684 if cmd in cls.commands_output: 

3685 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args) 

3686 return args, kwargs, outkwargs 

3687 

3688 @classmethod 

3689 def addcommand( 

3690 cls, 

3691 spaces: bool = False, 

3692 globsupport: bool = False, 

3693 ) -> Callable[[DecoratorCallable], DecoratorCallable]: 

3694 """ 

3695 Decorator to register a command 

3696 """ 

3697 def func(cmd: DecoratorCallable) -> DecoratorCallable: 

3698 cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore 

3699 cmd._spaces = spaces # type: ignore 

3700 cmd._globsupport = globsupport # type: ignore 

3701 cls._inspectkwargs(cmd) 

3702 if cmd._globsupport and not cmd._spaces: # type: ignore 

3703 raise ValueError("Cannot use globsupport without spaces.") 

3704 return cmd 

3705 return func 

3706 

3707 @classmethod 

3708 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3709 """ 

3710 Decorator to register a command output processor 

3711 """ 

3712 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3713 processor.cliutil_type = _CLIUtilMetaclass.TYPE.OUTPUT # type: ignore 

3714 processor.cliutil_ref = cmd # type: ignore 

3715 cls._inspectkwargs(processor) 

3716 return processor 

3717 return func 

3718 

3719 @classmethod 

3720 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3721 """ 

3722 Decorator to register a command completor 

3723 """ 

3724 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3725 processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore 

3726 processor.cliutil_ref = cmd # type: ignore 

3727 return processor 

3728 return func 

3729 

3730 def ps1(self) -> str: 

3731 """ 

3732 Return the PS1 of the shell 

3733 """ 

3734 return "> " 

3735 

3736 def close(self) -> None: 

3737 """ 

3738 Function called on exiting 

3739 """ 

3740 print("Exited") 

3741 

3742 def help(self, cmd: Optional[str] = None) -> None: 

3743 """ 

3744 Return the help related to this CLI util 

3745 """ 

3746 def _args(func: Any) -> str: 

3747 flags = func._flags.copy() 

3748 if func.__name__ in self.commands_output: 

3749 flags += self.commands_output[func.__name__]._flags # type: ignore 

3750 return " %s%s" % ( 

3751 ( 

3752 "%s " % " ".join("[%s]" % x for x in flags) 

3753 if flags else "" 

3754 ), 

3755 " ".join( 

3756 "<%s%s>" % ( 

3757 x.name, 

3758 "?" if 

3759 (x.default is None or x.default != inspect.Parameter.empty) 

3760 else "" 

3761 ) 

3762 for x in list(inspect.signature(func).parameters.values())[1:] 

3763 if x.name not in func._flagnames and x.name[0] != "_" 

3764 ) 

3765 ) 

3766 

3767 if cmd: 

3768 if cmd not in self.commands: 

3769 print("Unknown command '%s'" % cmd) 

3770 return 

3771 # help for one command 

3772 func = self.commands[cmd] 

3773 print("%s%s: %s" % ( 

3774 cmd, 

3775 _args(func), 

3776 func.__doc__ and func.__doc__.strip() 

3777 )) 

3778 else: 

3779 header = "│ %s - Help │" % self.__class__.__name__ 

3780 print("┌" + "─" * (len(header) - 2) + "┐") 

3781 print(header) 

3782 print("└" + "─" * (len(header) - 2) + "┘") 

3783 print( 

3784 pretty_list( 

3785 [ 

3786 ( 

3787 cmd, 

3788 _args(func), 

3789 func.__doc__ and func.__doc__.strip().split("\n")[0] or "" 

3790 ) 

3791 for cmd, func in self.commands.items() 

3792 ], 

3793 [("Command", "Arguments", "Description")] 

3794 ) 

3795 ) 

3796 

3797 def _completer(self) -> 'prompt_toolkit.completion.Completer': 

3798 """ 

3799 Returns a prompt_toolkit custom completer 

3800 """ 

3801 from prompt_toolkit.completion import Completer, Completion 

3802 

3803 class CLICompleter(Completer): 

3804 def get_completions(cmpl, document, complete_event): # type: ignore 

3805 if not complete_event.completion_requested: 

3806 # Only activate when the user does <TAB> 

3807 return 

3808 parts = document.text.split(" ") 

3809 cmd = parts[0].lower() 

3810 if cmd not in self.commands: 

3811 # We are trying to complete the command 

3812 for possible_cmd in (x for x in self.commands if x.startswith(cmd)): 

3813 yield Completion(possible_cmd, start_position=-len(cmd)) 

3814 else: 

3815 # We are trying to complete the command content 

3816 if len(parts) == 1: 

3817 return 

3818 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:]) 

3819 arg = " ".join(args) 

3820 if cmd in self.commands_complete: 

3821 for possible_arg in self.commands_complete[cmd](self, arg): 

3822 yield Completion(possible_arg, start_position=-len(arg)) 

3823 return 

3824 return CLICompleter() 

3825 

3826 def loop(self, debug: int = 0) -> None: 

3827 """ 

3828 Main command handling loop 

3829 """ 

3830 from prompt_toolkit import PromptSession 

3831 session = PromptSession(completer=self._completer()) 

3832 

3833 while True: 

3834 try: 

3835 cmd = session.prompt(self.ps1()).strip() 

3836 except KeyboardInterrupt: 

3837 continue 

3838 except EOFError: 

3839 self.close() 

3840 break 

3841 args = cmd.split(" ")[1:] 

3842 cmd = cmd.split(" ")[0].strip().lower() 

3843 if not cmd: 

3844 continue 

3845 if cmd in ["help", "h", "?"]: 

3846 self.help(" ".join(args)) 

3847 continue 

3848 if cmd in "exit": 

3849 break 

3850 if cmd not in self.commands: 

3851 print("Unknown command. Type help or ?") 

3852 else: 

3853 # check the number of arguments 

3854 func = self.commands[cmd] 

3855 args, kwargs, outkwargs = self._parseallargs(func, cmd, args) 

3856 if func._spaces: # type: ignore 

3857 args = [" ".join(args)] 

3858 # if globsupport is set, we might need to do several calls 

3859 if func._globsupport and "*" in args[0]: # type: ignore 

3860 if args[0].count("*") > 1: 

3861 print("More than 1 glob star (*) is currently unsupported.") 

3862 continue 

3863 before, after = args[0].split("*", 1) 

3864 reg = re.compile(re.escape(before) + r".*" + after) 

3865 calls = [ 

3866 [x] for x in 

3867 self.commands_complete[cmd](self, before) 

3868 if reg.match(x) 

3869 ] 

3870 else: 

3871 calls = [args] 

3872 else: 

3873 calls = [args] 

3874 # now iterate if required, call the function and print its output 

3875 res = None 

3876 for args in calls: 

3877 try: 

3878 res = func(self, *args, **kwargs) 

3879 except TypeError: 

3880 print("Bad number of arguments !") 

3881 self.help(cmd=cmd) 

3882 continue 

3883 except Exception as ex: 

3884 print("Command failed with error: %s" % ex) 

3885 if debug: 

3886 traceback.print_exception(ex) 

3887 try: 

3888 if res and cmd in self.commands_output: 

3889 self.commands_output[cmd](self, res, **outkwargs) 

3890 except Exception as ex: 

3891 print("Output processor failed with error: %s" % ex) 

3892 

3893 

3894def AutoArgparse(func: DecoratorCallable) -> None: 

3895 """ 

3896 Generate an Argparse call from a function, then call this function. 

3897 

3898 Notes: 

3899 

3900 - for the arguments to have a description, the sphinx docstring format 

3901 must be used. See 

3902 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html 

3903 - the arguments must be typed in Python (we ignore Sphinx-specific types) 

3904 untyped arguments are ignored. 

3905 - only types that would be supported by argparse are supported. The others 

3906 are omitted. 

3907 """ 

3908 argsdoc = {} 

3909 if func.__doc__: 

3910 # Sphinx doc format parser 

3911 m = re.match( 

3912 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)", 

3913 func.__doc__.strip(), 

3914 ) 

3915 if not m: 

3916 desc = func.__doc__.strip() 

3917 else: 

3918 desc = m.group(1) 

3919 sphinxargs = re.findall( 

3920 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)", 

3921 m.group(2), 

3922 ) 

3923 for argtype, argparam, argdesc in sphinxargs: 

3924 argparam = argparam.strip() 

3925 argdesc = argdesc.strip() 

3926 if argtype == "param": 

3927 if not argparam: 

3928 raise ValueError(":param: without a name !") 

3929 argsdoc[argparam] = argdesc 

3930 else: 

3931 desc = "" 

3932 # Now build the argparse.ArgumentParser 

3933 parser = argparse.ArgumentParser( 

3934 prog=func.__name__, 

3935 description=desc, 

3936 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

3937 ) 

3938 # Process the parameters 

3939 positional = [] 

3940 for param in inspect.signature(func).parameters.values(): 

3941 if not param.annotation: 

3942 continue 

3943 parname = param.name 

3944 paramkwargs = {} 

3945 if param.annotation is bool: 

3946 if param.default is True: 

3947 parname = "no-" + parname 

3948 paramkwargs["action"] = "store_false" 

3949 else: 

3950 paramkwargs["action"] = "store_true" 

3951 elif param.annotation in [str, int, float]: 

3952 paramkwargs["type"] = param.annotation 

3953 else: 

3954 continue 

3955 if param.default != inspect.Parameter.empty: 

3956 if param.kind == inspect.Parameter.POSITIONAL_ONLY: 

3957 positional.append(param.name) 

3958 paramkwargs["nargs"] = '?' 

3959 else: 

3960 parname = "--" + parname 

3961 paramkwargs["default"] = param.default 

3962 else: 

3963 positional.append(param.name) 

3964 if param.kind == inspect.Parameter.VAR_POSITIONAL: 

3965 paramkwargs["action"] = "append" 

3966 if param.name in argsdoc: 

3967 paramkwargs["help"] = argsdoc[param.name] 

3968 parser.add_argument(parname, **paramkwargs) # type: ignore 

3969 # Now parse the sys.argv parameters 

3970 params = vars(parser.parse_args()) 

3971 # Act as in interactive mode 

3972 conf.logLevel = 20 

3973 from scapy.themes import DefaultTheme 

3974 conf.color_theme = DefaultTheme() 

3975 # And call the function 

3976 try: 

3977 func( 

3978 *[params.pop(x) for x in positional], 

3979 **{ 

3980 (k[3:] if k.startswith("no_") else k): v 

3981 for k, v in params.items() 

3982 } 

3983 ) 

3984 except AssertionError as ex: 

3985 print("ERROR: " + str(ex)) 

3986 parser.print_help() 

3987 

3988 

3989####################### 

3990# PERIODIC SENDER # 

3991####################### 

3992 

3993 

3994class PeriodicSenderThread(threading.Thread): 

3995 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True): 

3996 # type: (Any, _PacketIterable, float, bool) -> None 

3997 """ Thread to send packets periodically 

3998 

3999 Args: 

4000 sock: socket where packet is sent periodically 

4001 pkt: packet or list of packets to send 

4002 interval: interval between two packets 

4003 """ 

4004 if not isinstance(pkt, list): 

4005 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable 

4006 else: 

4007 self._pkts = pkt 

4008 self._socket = sock 

4009 self._stopped = threading.Event() 

4010 self._enabled = threading.Event() 

4011 self._enabled.set() 

4012 self._interval = interval 

4013 self._ignore_exceptions = ignore_exceptions 

4014 threading.Thread.__init__(self) 

4015 

4016 def enable(self): 

4017 # type: () -> None 

4018 self._enabled.set() 

4019 

4020 def disable(self): 

4021 # type: () -> None 

4022 self._enabled.clear() 

4023 

4024 def run(self): 

4025 # type: () -> None 

4026 while not self._stopped.is_set() and not self._socket.closed: 

4027 for p in self._pkts: 

4028 try: 

4029 if self._enabled.is_set(): 

4030 self._socket.send(p) 

4031 except (OSError, TimeoutError) as e: 

4032 if self._ignore_exceptions: 

4033 return 

4034 else: 

4035 raise e 

4036 self._stopped.wait(timeout=self._interval) 

4037 if self._stopped.is_set() or self._socket.closed: 

4038 break 

4039 

4040 def stop(self): 

4041 # type: () -> None 

4042 self._stopped.set() 

4043 self.join(self._interval * 2) 

4044 

4045 

4046class SingleConversationSocket(object): 

4047 def __init__(self, o): 

4048 # type: (Any) -> None 

4049 self._inner = o 

4050 self._tx_mutex = threading.RLock() 

4051 

4052 @property 

4053 def __dict__(self): # type: ignore 

4054 return self._inner.__dict__ 

4055 

4056 def __getattr__(self, name): 

4057 # type: (str) -> Any 

4058 return getattr(self._inner, name) 

4059 

4060 def sr1(self, *args, **kargs): 

4061 # type: (*Any, **Any) -> Any 

4062 with self._tx_mutex: 

4063 return self._inner.sr1(*args, **kargs) 

4064 

4065 def sr(self, *args, **kargs): 

4066 # type: (*Any, **Any) -> Any 

4067 with self._tx_mutex: 

4068 return self._inner.sr(*args, **kargs) 

4069 

4070 def send(self, x): 

4071 # type: (Packet) -> Any 

4072 with self._tx_mutex: 

4073 try: 

4074 return self._inner.send(x) 

4075 except (ConnectionError, OSError) as e: 

4076 self._inner.close() 

4077 raise e