Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/utils.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7General utility functions.
8"""
11from decimal import Decimal
12from io import StringIO
13from itertools import zip_longest
14from uuid import UUID
16import argparse
17import array
18import base64
19import collections
20import decimal
21import difflib
22import enum
23import gzip
24import inspect
25import locale
26import math
27import os
28import pickle
29import random
30import re
31import shutil
32import socket
33import struct
34import subprocess
35import sys
36import tempfile
37import threading
38import time
39import traceback
40import warnings
42from scapy.config import conf
43from scapy.consts import DARWIN, OPENBSD, WINDOWS
44from scapy.data import MTU, DLT_EN10MB, DLT_RAW
45from scapy.compat import (
46 orb,
47 plain_str,
48 chb,
49 hex_bytes,
50 bytes_encode,
51)
52from scapy.error import (
53 log_interactive,
54 log_runtime,
55 Scapy_Exception,
56 warning,
57)
58from scapy.pton_ntop import inet_pton
60# Typing imports
61from typing import (
62 cast,
63 Any,
64 AnyStr,
65 Callable,
66 Dict,
67 IO,
68 Iterator,
69 List,
70 Optional,
71 TYPE_CHECKING,
72 Tuple,
73 Type,
74 Union,
75 overload,
76)
77from scapy.compat import (
78 DecoratorCallable,
79 Literal,
80)
82if TYPE_CHECKING:
83 from scapy.packet import Packet
84 from scapy.plist import _PacketIterable, PacketList
85 from scapy.supersocket import SuperSocket
86 import prompt_toolkit
88_ByteStream = Union[IO[bytes], gzip.GzipFile]
90###########
91# Tools #
92###########
95def issubtype(x, # type: Any
96 t, # type: Union[type, str]
97 ):
98 # type: (...) -> bool
99 """issubtype(C, B) -> bool
101 Return whether C is a class and if it is a subclass of class B.
102 When using a tuple as the second argument issubtype(X, (A, B, ...)),
103 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.).
104 """
105 if isinstance(t, str):
106 return t in (z.__name__ for z in x.__bases__)
107 if isinstance(x, type) and issubclass(x, t):
108 return True
109 return False
112_Decimal = Union[Decimal, int]
115class EDecimal(Decimal):
116 """Extended Decimal
118 This implements arithmetic and comparison with float for
119 backward compatibility
120 """
122 def __add__(self, other, context=None):
123 # type: (_Decimal, Any) -> EDecimal
124 return EDecimal(Decimal.__add__(self, Decimal(other)))
126 def __radd__(self, other):
127 # type: (_Decimal) -> EDecimal
128 return EDecimal(Decimal.__add__(self, Decimal(other)))
130 def __sub__(self, other):
131 # type: (_Decimal) -> EDecimal
132 return EDecimal(Decimal.__sub__(self, Decimal(other)))
134 def __rsub__(self, other):
135 # type: (_Decimal) -> EDecimal
136 return EDecimal(Decimal.__rsub__(self, Decimal(other)))
138 def __mul__(self, other):
139 # type: (_Decimal) -> EDecimal
140 return EDecimal(Decimal.__mul__(self, Decimal(other)))
142 def __rmul__(self, other):
143 # type: (_Decimal) -> EDecimal
144 return EDecimal(Decimal.__mul__(self, Decimal(other)))
146 def __truediv__(self, other):
147 # type: (_Decimal) -> EDecimal
148 return EDecimal(Decimal.__truediv__(self, Decimal(other)))
150 def __floordiv__(self, other):
151 # type: (_Decimal) -> EDecimal
152 return EDecimal(Decimal.__floordiv__(self, Decimal(other)))
154 def __divmod__(self, other):
155 # type: (_Decimal) -> Tuple[EDecimal, EDecimal]
156 r = Decimal.__divmod__(self, Decimal(other))
157 return EDecimal(r[0]), EDecimal(r[1])
159 def __mod__(self, other):
160 # type: (_Decimal) -> EDecimal
161 return EDecimal(Decimal.__mod__(self, Decimal(other)))
163 def __rmod__(self, other):
164 # type: (_Decimal) -> EDecimal
165 return EDecimal(Decimal.__rmod__(self, Decimal(other)))
167 def __pow__(self, other, modulo=None):
168 # type: (_Decimal, Optional[_Decimal]) -> EDecimal
169 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo))
171 def __eq__(self, other):
172 # type: (Any) -> bool
173 if isinstance(other, Decimal):
174 return super(EDecimal, self).__eq__(other)
175 else:
176 return bool(float(self) == other)
178 def normalize(self, precision): # type: ignore
179 # type: (int) -> EDecimal
180 with decimal.localcontext() as ctx:
181 ctx.prec = precision
182 return EDecimal(super(EDecimal, self).normalize(ctx))
185@overload
186def get_temp_file(keep, autoext, fd):
187 # type: (bool, str, Literal[True]) -> IO[bytes]
188 pass
191@overload
192def get_temp_file(keep=False, autoext="", fd=False):
193 # type: (bool, str, Literal[False]) -> str
194 pass
197def get_temp_file(keep=False, autoext="", fd=False):
198 # type: (bool, str, bool) -> Union[IO[bytes], str]
199 """Creates a temporary file.
201 :param keep: If False, automatically delete the file when Scapy exits.
202 :param autoext: Suffix to add to the generated file name.
203 :param fd: If True, this returns a file-like object with the temporary
204 file opened. If False (default), this returns a file path.
205 """
206 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
207 delete=False)
208 if not keep:
209 conf.temp_files.append(f.name)
211 if fd:
212 return f
213 else:
214 # Close the file so something else can take it.
215 f.close()
216 return f.name
219def get_temp_dir(keep=False):
220 # type: (bool) -> str
221 """Creates a temporary file, and returns its name.
223 :param keep: If False (default), the directory will be recursively
224 deleted when Scapy exits.
225 :return: A full path to a temporary directory.
226 """
228 dname = tempfile.mkdtemp(prefix="scapy")
230 if not keep:
231 conf.temp_files.append(dname)
233 return dname
236def _create_fifo() -> Tuple[str, Any]:
237 """Creates a temporary fifo.
239 You must then use open_fifo() on the server_fd once
240 the client is connected to use it.
242 :returns: (client_file, server_fd)
243 """
244 if WINDOWS:
245 from scapy.arch.windows.structures import _get_win_fifo
246 return _get_win_fifo()
247 else:
248 f = get_temp_file()
249 os.unlink(f)
250 os.mkfifo(f)
251 return f, f
254def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]:
255 """Open the server_fd (see create_fifo)
256 """
257 if WINDOWS:
258 from scapy.arch.windows.structures import _win_fifo_open
259 return _win_fifo_open(fd)
260 else:
261 return open(fd, mode)
264def sane(x, color=False):
265 # type: (AnyStr, bool) -> str
266 r = ""
267 for i in x:
268 j = orb(i)
269 if (j < 32) or (j >= 127):
270 if color:
271 r += conf.color_theme.not_printable(".")
272 else:
273 r += "."
274 else:
275 r += chr(j)
276 return r
279@conf.commands.register
280def restart():
281 # type: () -> None
282 """Restarts scapy"""
283 if not conf.interactive or not os.path.isfile(sys.argv[0]):
284 raise OSError("Scapy was not started from console")
285 if WINDOWS:
286 res_code = 1
287 try:
288 res_code = subprocess.call([sys.executable] + sys.argv)
289 finally:
290 os._exit(res_code)
291 os.execv(sys.executable, [sys.executable] + sys.argv)
294def lhex(x):
295 # type: (Any) -> str
296 from scapy.volatile import VolatileValue
297 if isinstance(x, VolatileValue):
298 return repr(x)
299 if isinstance(x, int):
300 return hex(x)
301 if isinstance(x, tuple):
302 return "(%s)" % ", ".join(lhex(v) for v in x)
303 if isinstance(x, list):
304 return "[%s]" % ", ".join(lhex(v) for v in x)
305 return str(x)
308@conf.commands.register
309def hexdump(p, dump=False):
310 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
311 """Build a tcpdump like hexadecimal view
313 :param p: a Packet
314 :param dump: define if the result must be printed or returned in a variable
315 :return: a String only when dump=True
316 """
317 s = ""
318 x = bytes_encode(p)
319 x_len = len(x)
320 i = 0
321 while i < x_len:
322 s += "%04x " % i
323 for j in range(16):
324 if i + j < x_len:
325 s += "%02X " % orb(x[i + j])
326 else:
327 s += " "
328 s += " %s\n" % sane(x[i:i + 16], color=True)
329 i += 16
330 # remove trailing \n
331 s = s[:-1] if s.endswith("\n") else s
332 if dump:
333 return s
334 else:
335 print(s)
336 return None
339@conf.commands.register
340def linehexdump(p, onlyasc=0, onlyhex=0, dump=False):
341 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str]
342 """Build an equivalent view of hexdump() on a single line
344 Note that setting both onlyasc and onlyhex to 1 results in a empty output
346 :param p: a Packet
347 :param onlyasc: 1 to display only the ascii view
348 :param onlyhex: 1 to display only the hexadecimal view
349 :param dump: print the view if False
350 :return: a String only when dump=True
351 """
352 s = ""
353 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump)
354 if dump:
355 return s
356 else:
357 print(s)
358 return None
361@conf.commands.register
362def chexdump(p, dump=False):
363 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
364 """Build a per byte hexadecimal representation
366 Example:
367 >>> chexdump(IP())
368 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501
370 :param p: a Packet
371 :param dump: print the view if False
372 :return: a String only if dump=True
373 """
374 x = bytes_encode(p)
375 s = ", ".join("%#04x" % orb(x) for x in x)
376 if dump:
377 return s
378 else:
379 print(s)
380 return None
383@conf.commands.register
384def hexstr(p, onlyasc=0, onlyhex=0, color=False):
385 # type: (Union[Packet, AnyStr], int, int, bool) -> str
386 """Build a fancy tcpdump like hex from bytes."""
387 x = bytes_encode(p)
388 s = []
389 if not onlyasc:
390 s.append(" ".join("%02X" % orb(b) for b in x))
391 if not onlyhex:
392 s.append(sane(x, color=color))
393 return " ".join(s)
396def repr_hex(s):
397 # type: (bytes) -> str
398 """ Convert provided bitstring to a simple string of hex digits """
399 return "".join("%02x" % orb(x) for x in s)
402@conf.commands.register
403def hexdiff(
404 a: Union['Packet', AnyStr],
405 b: Union['Packet', AnyStr],
406 algo: Optional[str] = None,
407 autojunk: bool = False,
408) -> None:
409 """
410 Show differences between 2 binary strings, Packets...
412 Available algorithms:
413 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the
414 Levenstein distance between the strings then backtrack.
415 - difflib: Use the difflib.SequenceMatcher implementation. This based on a
416 modified version of the Ratcliff and Obershelp algorithm.
417 This is much faster, but far less accurate.
418 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher
420 :param a:
421 :param b: The binary strings, packets... to compare
422 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'.
423 By default, this is chosen depending on the complexity, optimistically
424 preferring wagnerfischer unless really necessary.
425 :param autojunk: (difflib only) See difflib documentation.
426 """
427 xb = bytes_encode(a)
428 yb = bytes_encode(b)
430 if algo is None:
431 # Choose the best algorithm
432 complexity = len(xb) * len(yb)
433 if complexity < 1e7:
434 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable.
435 # Anything much larger than this shouldn't be attempted by default.
436 algo = "wagnerfischer"
437 if complexity > 1e6:
438 log_interactive.info(
439 "Complexity is a bit high. hexdiff will take a few seconds."
440 )
441 else:
442 algo = "difflib"
444 backtrackx = []
445 backtracky = []
447 if algo == "wagnerfischer":
448 xb = xb[::-1]
449 yb = yb[::-1]
451 # costs for the 3 operations
452 INSERT = 1
453 DELETE = 1
454 SUBST = 1
456 # Typically, d[i,j] will hold the distance between
457 # the first i characters of xb and the first j characters of yb.
458 # We change the Wagner Fischer to also store pointers to all
459 # the intermediate steps taken while calculating the Levenstein distance.
460 d = {(-1, -1): (0, (-1, -1))}
461 for j in range(len(yb)):
462 d[-1, j] = (j + 1) * INSERT, (-1, j - 1)
463 for i in range(len(xb)):
464 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1)
466 # Compute the Levenstein distance between the two strings, but
467 # store all the steps to be able to backtrack at the end.
468 for j in range(len(yb)):
469 for i in range(len(xb)):
470 d[i, j] = min(
471 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)),
472 (d[i - 1, j][0] + DELETE, (i - 1, j)),
473 (d[i, j - 1][0] + INSERT, (i, j - 1)),
474 )
476 # Iterate through the steps backwards to create the diff
477 i = len(xb) - 1
478 j = len(yb) - 1
479 while not (i == j == -1):
480 i2, j2 = d[i, j][1]
481 backtrackx.append(xb[i2 + 1:i + 1])
482 backtracky.append(yb[j2 + 1:j + 1])
483 i, j = i2, j2
484 elif algo == "difflib":
485 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk)
486 xarr = [xb[i:i + 1] for i in range(len(xb))]
487 yarr = [yb[i:i + 1] for i in range(len(yb))]
488 # Iterate through opcodes to build the backtrack
489 for opcode in sm.get_opcodes():
490 typ, x0, x1, y0, y1 = opcode
491 if typ == 'delete':
492 backtrackx += xarr[x0:x1]
493 backtracky += [b''] * (x1 - x0)
494 elif typ == 'insert':
495 backtrackx += [b''] * (y1 - y0)
496 backtracky += yarr[y0:y1]
497 elif typ in ['equal', 'replace']:
498 backtrackx += xarr[x0:x1]
499 backtracky += yarr[y0:y1]
500 # Some lines may have been considered as junk. Check the sizes
501 if autojunk:
502 lbx = len(backtrackx)
503 lby = len(backtracky)
504 backtrackx += [b''] * (max(lbx, lby) - lbx)
505 backtracky += [b''] * (max(lbx, lby) - lby)
506 else:
507 raise ValueError("Unknown algorithm '%s'" % algo)
509 # Print the diff
511 x = y = i = 0
512 colorize: Dict[int, Callable[[str], str]] = {
513 0: lambda x: x,
514 -1: conf.color_theme.left,
515 1: conf.color_theme.right
516 }
518 dox = 1
519 doy = 0
520 btx_len = len(backtrackx)
521 while i < btx_len:
522 linex = backtrackx[i:i + 16]
523 liney = backtracky[i:i + 16]
524 xx = sum(len(k) for k in linex)
525 yy = sum(len(k) for k in liney)
526 if dox and not xx:
527 dox = 0
528 doy = 1
529 if dox and linex == liney:
530 doy = 1
532 if dox:
533 xd = y
534 j = 0
535 while not linex[j]:
536 j += 1
537 xd -= 1
538 print(colorize[doy - dox]("%04x" % xd), end=' ')
539 x += xx
540 line = linex
541 else:
542 print(" ", end=' ')
543 if doy:
544 yd = y
545 j = 0
546 while not liney[j]:
547 j += 1
548 yd -= 1
549 print(colorize[doy - dox]("%04x" % yd), end=' ')
550 y += yy
551 line = liney
552 else:
553 print(" ", end=' ')
555 print(" ", end=' ')
557 cl = ""
558 for j in range(16):
559 if i + j < min(len(backtrackx), len(backtracky)):
560 if line[j]:
561 col = colorize[(linex[j] != liney[j]) * (doy - dox)]
562 print(col("%02X" % orb(line[j])), end=' ')
563 if linex[j] == liney[j]:
564 cl += sane(line[j], color=True)
565 else:
566 cl += col(sane(line[j]))
567 else:
568 print(" ", end=' ')
569 cl += " "
570 else:
571 print(" ", end=' ')
572 if j == 7:
573 print("", end=' ')
575 print(" ", cl)
577 if doy or not yy:
578 doy = 0
579 dox = 1
580 i += 16
581 else:
582 if yy:
583 dox = 0
584 doy = 1
585 else:
586 i += 16
589if struct.pack("H", 1) == b"\x00\x01": # big endian
590 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int]
591else:
592 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8
595def checksum(pkt):
596 # type: (bytes) -> int
597 if len(pkt) % 2 == 1:
598 pkt += b"\0"
599 s = sum(array.array("H", pkt))
600 s = (s >> 16) + (s & 0xffff)
601 s += s >> 16
602 s = ~s
603 return checksum_endian_transform(s) & 0xffff
606def _fletcher16(charbuf):
607 # type: (bytes) -> Tuple[int, int]
608 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
609 c0 = c1 = 0
610 for char in charbuf:
611 c0 += char
612 c1 += c0
614 c0 %= 255
615 c1 %= 255
616 return (c0, c1)
619@conf.commands.register
620def fletcher16_checksum(binbuf):
621 # type: (bytes) -> int
622 """Calculates Fletcher-16 checksum of the given buffer.
624 Note:
625 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501
626 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501
627 """
628 (c0, c1) = _fletcher16(binbuf)
629 return (c1 << 8) | c0
632@conf.commands.register
633def fletcher16_checkbytes(binbuf, offset):
634 # type: (bytes, int) -> bytes
635 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.
637 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501
638 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501
639 the integrity of the buffer on the receiver side.
641 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501
642 """
644 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
645 if len(binbuf) < offset:
646 raise Exception("Packet too short for checkbytes %d" % len(binbuf))
648 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
649 (c0, c1) = _fletcher16(binbuf)
651 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255
653 if (x <= 0):
654 x += 255
656 y = 510 - c0 - x
658 if (y > 255):
659 y -= 255
660 return chb(x) + chb(y)
663def mac2str(mac):
664 # type: (str) -> bytes
665 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':'))
668def valid_mac(mac):
669 # type: (str) -> bool
670 try:
671 return len(mac2str(mac)) == 6
672 except ValueError:
673 pass
674 return False
677def str2mac(s):
678 # type: (bytes) -> str
679 if isinstance(s, str):
680 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s))
681 return ("%02x:" * len(s))[:-1] % tuple(s)
684def randstring(length):
685 # type: (int) -> bytes
686 """
687 Returns a random string of length (length >= 0)
688 """
689 return b"".join(struct.pack('B', random.randint(0, 255))
690 for _ in range(length))
693def zerofree_randstring(length):
694 # type: (int) -> bytes
695 """
696 Returns a random string of length (length >= 0) without zero in it.
697 """
698 return b"".join(struct.pack('B', random.randint(1, 255))
699 for _ in range(length))
702def stror(s1, s2):
703 # type: (bytes, bytes) -> bytes
704 """
705 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2
706 must be of same length.
707 """
708 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2))
711def strxor(s1, s2):
712 # type: (bytes, bytes) -> bytes
713 """
714 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
715 must be of same length.
716 """
717 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2))
720def strand(s1, s2):
721 # type: (bytes, bytes) -> bytes
722 """
723 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
724 must be of same length.
725 """
726 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2))
729def strrot(s1, count, right=True):
730 # type: (bytes, int, bool) -> bytes
731 """
732 Rotate the binary by 'count' bytes
733 """
734 off = count % len(s1)
735 if right:
736 return s1[-off:] + s1[:-off]
737 else:
738 return s1[off:] + s1[:off]
741# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501
742try:
743 socket.inet_aton("255.255.255.255")
744except socket.error:
745 def inet_aton(ip_string):
746 # type: (str) -> bytes
747 if ip_string == "255.255.255.255":
748 return b"\xff" * 4
749 else:
750 return socket.inet_aton(ip_string)
751else:
752 inet_aton = socket.inet_aton # type: ignore
754inet_ntoa = socket.inet_ntoa
757def atol(x):
758 # type: (str) -> int
759 try:
760 ip = inet_aton(x)
761 except socket.error:
762 raise ValueError("Bad IP format: %s" % x)
763 return cast(int, struct.unpack("!I", ip)[0])
766def valid_ip(addr):
767 # type: (str) -> bool
768 try:
769 addr = plain_str(addr)
770 except UnicodeDecodeError:
771 return False
772 try:
773 atol(addr)
774 except (OSError, ValueError, socket.error):
775 return False
776 return True
779def valid_net(addr):
780 # type: (str) -> bool
781 try:
782 addr = plain_str(addr)
783 except UnicodeDecodeError:
784 return False
785 if '/' in addr:
786 ip, mask = addr.split('/', 1)
787 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32
788 return valid_ip(addr)
791def valid_ip6(addr):
792 # type: (str) -> bool
793 try:
794 addr = plain_str(addr)
795 except UnicodeDecodeError:
796 return False
797 try:
798 inet_pton(socket.AF_INET6, addr)
799 except socket.error:
800 return False
801 return True
804def valid_net6(addr):
805 # type: (str) -> bool
806 try:
807 addr = plain_str(addr)
808 except UnicodeDecodeError:
809 return False
810 if '/' in addr:
811 ip, mask = addr.split('/', 1)
812 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128
813 return valid_ip6(addr)
816def ltoa(x):
817 # type: (int) -> str
818 return inet_ntoa(struct.pack("!I", x & 0xffffffff))
821def itom(x):
822 # type: (int) -> int
823 return (0xffffffff00000000 >> x) & 0xffffffff
826def in4_cidr2mask(m):
827 # type: (int) -> bytes
828 """
829 Return the mask (bitstring) associated with provided length
830 value. For instance if function is called on 20, return value is
831 b'\xff\xff\xf0\x00'.
832 """
833 if m > 32 or m < 0:
834 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501
836 return strxor(
837 b"\xff" * 4,
838 struct.pack(">I", 2**(32 - m) - 1)
839 )
842def in4_isincluded(addr, prefix, mask):
843 # type: (str, str, int) -> bool
844 """
845 Returns True when 'addr' belongs to prefix/mask. False otherwise.
846 """
847 temp = inet_pton(socket.AF_INET, addr)
848 pref = in4_cidr2mask(mask)
849 zero = inet_pton(socket.AF_INET, prefix)
850 return zero == strand(temp, pref)
853def in4_ismaddr(str):
854 # type: (str) -> bool
855 """
856 Returns True if provided address in printable format belongs to
857 allocated Multicast address space (224.0.0.0/4).
858 """
859 return in4_isincluded(str, "224.0.0.0", 4)
862def in4_ismlladdr(str):
863 # type: (str) -> bool
864 """
865 Returns True if address belongs to link-local multicast address
866 space (224.0.0.0/24)
867 """
868 return in4_isincluded(str, "224.0.0.0", 24)
871def in4_ismgladdr(str):
872 # type: (str) -> bool
873 """
874 Returns True if address belongs to global multicast address
875 space (224.0.1.0-238.255.255.255).
876 """
877 return (
878 in4_isincluded(str, "224.0.0.0", 4) and
879 not in4_isincluded(str, "224.0.0.0", 24) and
880 not in4_isincluded(str, "239.0.0.0", 8)
881 )
884def in4_ismlsaddr(str):
885 # type: (str) -> bool
886 """
887 Returns True if address belongs to limited scope multicast address
888 space (239.0.0.0/8).
889 """
890 return in4_isincluded(str, "239.0.0.0", 8)
893def in4_isaddrllallnodes(str):
894 # type: (str) -> bool
895 """
896 Returns True if address is the link-local all-nodes multicast
897 address (224.0.0.1).
898 """
899 return (inet_pton(socket.AF_INET, "224.0.0.1") ==
900 inet_pton(socket.AF_INET, str))
903def in4_getnsmac(a):
904 # type: (bytes) -> str
905 """
906 Return the multicast mac address associated with provided
907 IPv4 address. Passed address must be in network format.
908 """
910 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3])
913def decode_locale_str(x):
914 # type: (bytes) -> str
915 """
916 Decode bytes into a string using the system locale.
917 Useful on Windows where it can be unusual (e.g. cp1252)
918 """
919 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace")
922class ContextManagerSubprocess(object):
923 """
924 Context manager that eases checking for unknown command, without
925 crashing.
927 Example:
928 >>> with ContextManagerSubprocess("tcpdump"):
929 >>> subprocess.Popen(["tcpdump", "--version"])
930 ERROR: Could not execute tcpdump, is it installed?
932 """
934 def __init__(self, prog, suppress=True):
935 # type: (str, bool) -> None
936 self.prog = prog
937 self.suppress = suppress
939 def __enter__(self):
940 # type: () -> None
941 pass
943 def __exit__(self,
944 exc_type, # type: Optional[type]
945 exc_value, # type: Optional[Exception]
946 traceback, # type: Optional[Any]
947 ):
948 # type: (...) -> Optional[bool]
949 if exc_value is None or exc_type is None:
950 return None
951 # Errored
952 if isinstance(exc_value, EnvironmentError):
953 msg = "Could not execute %s, is it installed?" % self.prog
954 else:
955 msg = "%s: execution failed (%s)" % (
956 self.prog,
957 exc_type.__class__.__name__
958 )
959 if not self.suppress:
960 raise exc_type(msg)
961 log_runtime.error(msg, exc_info=True)
962 return True # Suppress the exception
965class ContextManagerCaptureOutput(object):
966 """
967 Context manager that intercept the console's output.
969 Example:
970 >>> with ContextManagerCaptureOutput() as cmco:
971 ... print("hey")
972 ... assert cmco.get_output() == "hey"
973 """
975 def __init__(self):
976 # type: () -> None
977 self.result_export_object = ""
979 def __enter__(self):
980 # type: () -> ContextManagerCaptureOutput
981 from unittest import mock
983 def write(s, decorator=self):
984 # type: (str, ContextManagerCaptureOutput) -> None
985 decorator.result_export_object += s
986 mock_stdout = mock.Mock()
987 mock_stdout.write = write
988 self.bck_stdout = sys.stdout
989 sys.stdout = mock_stdout
990 return self
992 def __exit__(self, *exc):
993 # type: (*Any) -> Literal[False]
994 sys.stdout = self.bck_stdout
995 return False
997 def get_output(self, eval_bytes=False):
998 # type: (bool) -> str
999 if self.result_export_object.startswith("b'") and eval_bytes:
1000 return plain_str(eval(self.result_export_object))
1001 return self.result_export_object
1004def do_graph(
1005 graph, # type: str
1006 prog=None, # type: Optional[str]
1007 format=None, # type: Optional[str]
1008 target=None, # type: Optional[Union[IO[bytes], str]]
1009 type=None, # type: Optional[str]
1010 string=None, # type: Optional[bool]
1011 options=None # type: Optional[List[str]]
1012):
1013 # type: (...) -> Optional[str]
1014 """Processes graph description using an external software.
1015 This method is used to convert a graphviz format to an image.
1017 :param graph: GraphViz graph description
1018 :param prog: which graphviz program to use
1019 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T"
1020 option
1021 :param string: if not None, simply return the graph string
1022 :param target: filename or redirect. Defaults pipe to Imagemagick's
1023 display program
1024 :param options: options to be passed to prog
1025 """
1027 if format is None:
1028 format = "svg"
1029 if string:
1030 return graph
1031 if type is not None:
1032 warnings.warn(
1033 "type is deprecated, and was renamed format",
1034 DeprecationWarning
1035 )
1036 format = type
1037 if prog is None:
1038 prog = conf.prog.dot
1039 start_viewer = False
1040 if target is None:
1041 if WINDOWS:
1042 target = get_temp_file(autoext="." + format)
1043 start_viewer = True
1044 else:
1045 with ContextManagerSubprocess(conf.prog.display):
1046 target = subprocess.Popen([conf.prog.display],
1047 stdin=subprocess.PIPE).stdin
1048 if format is not None:
1049 format = "-T%s" % format
1050 if isinstance(target, str):
1051 if target.startswith('|'):
1052 target = subprocess.Popen(target[1:].lstrip(), shell=True,
1053 stdin=subprocess.PIPE).stdin
1054 elif target.startswith('>'):
1055 target = open(target[1:].lstrip(), "wb")
1056 else:
1057 target = open(os.path.abspath(target), "wb")
1058 target = cast(IO[bytes], target)
1059 proc = subprocess.Popen(
1060 "\"%s\" %s %s" % (prog, options or "", format or ""),
1061 shell=True, stdin=subprocess.PIPE, stdout=target,
1062 stderr=subprocess.PIPE
1063 )
1064 _, stderr = proc.communicate(bytes_encode(graph))
1065 if proc.returncode != 0:
1066 raise OSError(
1067 "GraphViz call failed (is it installed?):\n" +
1068 plain_str(stderr)
1069 )
1070 try:
1071 target.close()
1072 except Exception:
1073 pass
1074 if start_viewer:
1075 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501
1076 waiting_start = time.time()
1077 while not os.path.exists(target.name):
1078 time.sleep(0.1)
1079 if time.time() - waiting_start > 3:
1080 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501
1081 break
1082 else:
1083 if WINDOWS and conf.prog.display == conf.prog._default:
1084 os.startfile(target.name)
1085 else:
1086 with ContextManagerSubprocess(conf.prog.display):
1087 subprocess.Popen([conf.prog.display, target.name])
1088 return None
1091_TEX_TR = {
1092 "{": "{\\tt\\char123}",
1093 "}": "{\\tt\\char125}",
1094 "\\": "{\\tt\\char92}",
1095 "^": "\\^{}",
1096 "$": "\\$",
1097 "#": "\\#",
1098 "_": "\\_",
1099 "&": "\\&",
1100 "%": "\\%",
1101 "|": "{\\tt\\char124}",
1102 "~": "{\\tt\\char126}",
1103 "<": "{\\tt\\char60}",
1104 ">": "{\\tt\\char62}",
1105}
1108def tex_escape(x):
1109 # type: (str) -> str
1110 s = ""
1111 for c in x:
1112 s += _TEX_TR.get(c, c)
1113 return s
1116def colgen(*lstcol, # type: Any
1117 **kargs # type: Any
1118 ):
1119 # type: (...) -> Iterator[Any]
1120 """Returns a generator that mixes provided quantities forever
1121 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501
1122 if len(lstcol) < 2:
1123 lstcol *= 2
1124 trans = kargs.get("trans", lambda x, y, z: (x, y, z))
1125 while True:
1126 for i in range(len(lstcol)):
1127 for j in range(len(lstcol)):
1128 for k in range(len(lstcol)):
1129 if i != j or j != k or k != i:
1130 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501
1133def incremental_label(label="tag%05i", start=0):
1134 # type: (str, int) -> Iterator[str]
1135 while True:
1136 yield label % start
1137 start += 1
1140def binrepr(val):
1141 # type: (int) -> str
1142 return bin(val)[2:]
1145def long_converter(s):
1146 # type: (str) -> int
1147 return int(s.replace('\n', '').replace(' ', ''), 16)
1149#########################
1150# Enum management #
1151#########################
1154class EnumElement:
1155 def __init__(self, key, value):
1156 # type: (str, int) -> None
1157 self._key = key
1158 self._value = value
1160 def __repr__(self):
1161 # type: () -> str
1162 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501
1164 def __getattr__(self, attr):
1165 # type: (str) -> Any
1166 return getattr(self._value, attr)
1168 def __str__(self):
1169 # type: () -> str
1170 return self._key
1172 def __bytes__(self):
1173 # type: () -> bytes
1174 return bytes_encode(self.__str__())
1176 def __hash__(self):
1177 # type: () -> int
1178 return self._value
1180 def __int__(self):
1181 # type: () -> int
1182 return int(self._value)
1184 def __eq__(self, other):
1185 # type: (Any) -> bool
1186 return self._value == int(other)
1188 def __neq__(self, other):
1189 # type: (Any) -> bool
1190 return not self.__eq__(other)
1193class Enum_metaclass(type):
1194 element_class = EnumElement
1196 def __new__(cls, name, bases, dct):
1197 # type: (Any, str, Any, Dict[str, Any]) -> Any
1198 rdict = {}
1199 for k, v in dct.items():
1200 if isinstance(v, int):
1201 v = cls.element_class(k, v)
1202 dct[k] = v
1203 rdict[v] = k
1204 dct["__rdict__"] = rdict
1205 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)
1207 def __getitem__(self, attr):
1208 # type: (int) -> Any
1209 return self.__rdict__[attr] # type: ignore
1211 def __contains__(self, val):
1212 # type: (int) -> bool
1213 return val in self.__rdict__ # type: ignore
1215 def get(self, attr, val=None):
1216 # type: (str, Optional[Any]) -> Any
1217 return self.__rdict__.get(attr, val) # type: ignore
1219 def __repr__(self):
1220 # type: () -> str
1221 return "<%s>" % self.__dict__.get("name", self.__name__)
1224###################
1225# Object saving #
1226###################
1229def export_object(obj):
1230 # type: (Any) -> None
1231 import zlib
1232 print(base64.b64encode(zlib.compress(pickle.dumps(obj, 2), 9)).decode())
1235def import_object(obj=None):
1236 # type: (Optional[str]) -> Any
1237 import zlib
1238 if obj is None:
1239 obj = sys.stdin.read()
1240 return pickle.loads(zlib.decompress(base64.b64decode(obj.strip())))
1243def save_object(fname, obj):
1244 # type: (str, Any) -> None
1245 """Pickle a Python object"""
1247 fd = gzip.open(fname, "wb")
1248 pickle.dump(obj, fd)
1249 fd.close()
1252def load_object(fname):
1253 # type: (str) -> Any
1254 """unpickle a Python object"""
1255 return pickle.load(gzip.open(fname, "rb"))
1258@conf.commands.register
1259def corrupt_bytes(data, p=0.01, n=None):
1260 # type: (str, float, Optional[int]) -> bytes
1261 """
1262 Corrupt a given percentage (at least one byte) or number of bytes
1263 from a string
1264 """
1265 s = array.array("B", bytes_encode(data))
1266 s_len = len(s)
1267 if n is None:
1268 n = max(1, int(s_len * p))
1269 for i in random.sample(range(s_len), n):
1270 s[i] = (s[i] + random.randint(1, 255)) % 256
1271 return s.tobytes()
1274@conf.commands.register
1275def corrupt_bits(data, p=0.01, n=None):
1276 # type: (str, float, Optional[int]) -> bytes
1277 """
1278 Flip a given percentage (at least one bit) or number of bits
1279 from a string
1280 """
1281 s = array.array("B", bytes_encode(data))
1282 s_len = len(s) * 8
1283 if n is None:
1284 n = max(1, int(s_len * p))
1285 for i in random.sample(range(s_len), n):
1286 s[i // 8] ^= 1 << (i % 8)
1287 return s.tobytes()
1290#############################
1291# pcap capture file stuff #
1292#############################
1294@conf.commands.register
1295def wrpcap(filename, # type: Union[IO[bytes], str]
1296 pkt, # type: _PacketIterable
1297 *args, # type: Any
1298 **kargs # type: Any
1299 ):
1300 # type: (...) -> None
1301 """Write a list of packets to a pcap file
1303 :param filename: the name of the file to write packets to, or an open,
1304 writable file-like object. The file descriptor will be
1305 closed at the end of the call, so do not use an object you
1306 do not want to close (e.g., running wrpcap(sys.stdout, [])
1307 in interactive mode will crash Scapy).
1308 :param gz: set to 1 to save a gzipped capture
1309 :param linktype: force linktype value
1310 :param endianness: "<" or ">", force endianness
1311 :param sync: do not bufferize writes to the capture file
1312 """
1313 with PcapWriter(filename, *args, **kargs) as fdesc:
1314 fdesc.write(pkt)
1317@conf.commands.register
1318def wrpcapng(filename, # type: str
1319 pkt, # type: _PacketIterable
1320 ):
1321 # type: (...) -> None
1322 """Write a list of packets to a pcapng file
1324 :param filename: the name of the file to write packets to, or an open,
1325 writable file-like object. The file descriptor will be
1326 closed at the end of the call, so do not use an object you
1327 do not want to close (e.g., running wrpcapng(sys.stdout, [])
1328 in interactive mode will crash Scapy).
1329 :param pkt: packets to write
1330 """
1331 with PcapNgWriter(filename) as fdesc:
1332 fdesc.write(pkt)
1335@conf.commands.register
1336def rdpcap(filename, count=-1):
1337 # type: (Union[IO[bytes], str], int) -> PacketList
1338 """Read a pcap or pcapng file and return a packet list
1340 :param count: read only <count> packets
1341 """
1342 # Rant: Our complicated use of metaclasses and especially the
1343 # __call__ function is, of course, not supported by MyPy.
1344 # One day we should simplify this mess and use a much simpler
1345 # layout that will actually be supported and properly dissected.
1346 with PcapReader(filename) as fdesc: # type: ignore
1347 return fdesc.read_all(count=count)
1350# NOTE: Type hinting
1351# Mypy doesn't understand the following metaclass, and thinks each
1352# constructor (PcapReader...) needs 3 arguments each. To avoid this,
1353# we add a fake (=None) to the last 2 arguments then force the value
1354# to not be None in the signature and pack the whole thing in an ignore.
1355# This allows to not have # type: ignore every time we call those
1356# constructors.
1358class PcapReader_metaclass(type):
1359 """Metaclass for (Raw)Pcap(Ng)Readers"""
1361 def __new__(cls, name, bases, dct):
1362 # type: (Any, str, Any, Dict[str, Any]) -> Any
1363 """The `alternative` class attribute is declared in the PcapNg
1364 variant, and set here to the Pcap variant.
1366 """
1367 newcls = super(PcapReader_metaclass, cls).__new__(
1368 cls, name, bases, dct
1369 )
1370 if 'alternative' in dct:
1371 dct['alternative'].alternative = newcls
1372 return newcls
1374 def __call__(cls, filename):
1375 # type: (Union[IO[bytes], str]) -> Any
1376 """Creates a cls instance, use the `alternative` if that
1377 fails.
1379 """
1380 i = cls.__new__(
1381 cls,
1382 cls.__name__,
1383 cls.__bases__,
1384 cls.__dict__ # type: ignore
1385 )
1386 filename, fdesc, magic = cls.open(filename)
1387 if not magic:
1388 raise Scapy_Exception(
1389 "No data could be read!"
1390 )
1391 try:
1392 i.__init__(filename, fdesc, magic)
1393 return i
1394 except (Scapy_Exception, EOFError):
1395 pass
1397 if "alternative" in cls.__dict__:
1398 cls = cls.__dict__["alternative"]
1399 i = cls.__new__(
1400 cls,
1401 cls.__name__,
1402 cls.__bases__,
1403 cls.__dict__ # type: ignore
1404 )
1405 try:
1406 i.__init__(filename, fdesc, magic)
1407 return i
1408 except (Scapy_Exception, EOFError):
1409 pass
1411 raise Scapy_Exception("Not a supported capture file")
1413 @staticmethod
1414 def open(fname # type: Union[IO[bytes], str]
1415 ):
1416 # type: (...) -> Tuple[str, _ByteStream, bytes]
1417 """Open (if necessary) filename, and read the magic."""
1418 if isinstance(fname, str):
1419 filename = fname
1420 fdesc = open(filename, "rb") # type: _ByteStream
1421 magic = fdesc.read(2)
1422 if magic == b"\x1f\x8b":
1423 # GZIP header detected.
1424 fdesc.seek(0)
1425 fdesc = gzip.GzipFile(fileobj=fdesc)
1426 magic = fdesc.read(2)
1427 magic += fdesc.read(2)
1428 else:
1429 fdesc = fname
1430 filename = getattr(fdesc, "name", "No name")
1431 magic = fdesc.read(4)
1432 return filename, fdesc, magic
1435class RawPcapReader(metaclass=PcapReader_metaclass):
1436 """A stateful pcap reader. Each packet is returned as a string"""
1438 # TODO: use Generics to properly type the various readers.
1439 # As of right now, RawPcapReader is typed as if it returned packets
1440 # because all of its child do. Fix that
1442 nonblocking_socket = True
1443 PacketMetadata = collections.namedtuple("PacketMetadata",
1444 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501
1446 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1447 # type: (str, _ByteStream, bytes) -> None
1448 self.filename = filename
1449 self.f = fdesc
1450 if magic == b"\xa1\xb2\xc3\xd4": # big endian
1451 self.endian = ">"
1452 self.nano = False
1453 elif magic == b"\xd4\xc3\xb2\xa1": # little endian
1454 self.endian = "<"
1455 self.nano = False
1456 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision
1457 self.endian = ">"
1458 self.nano = True
1459 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501
1460 self.endian = "<"
1461 self.nano = True
1462 else:
1463 raise Scapy_Exception(
1464 "Not a pcap capture file (bad magic: %r)" % magic
1465 )
1466 hdr = self.f.read(20)
1467 if len(hdr) < 20:
1468 raise Scapy_Exception("Invalid pcap file (too short)")
1469 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
1470 self.endian + "HHIIII", hdr
1471 )
1472 self.linktype = linktype
1473 self.snaplen = snaplen
1475 def __enter__(self):
1476 # type: () -> RawPcapReader
1477 return self
1479 def __iter__(self):
1480 # type: () -> RawPcapReader
1481 return self
1483 def __next__(self):
1484 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata]
1485 """
1486 implement the iterator protocol on a set of packets in a pcap file
1487 """
1488 try:
1489 return self._read_packet()
1490 except EOFError:
1491 raise StopIteration
1493 def _read_packet(self, size=MTU):
1494 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata]
1495 """return a single packet read from the file as a tuple containing
1496 (pkt_data, pkt_metadata)
1498 raise EOFError when no more packets are available
1499 """
1500 hdr = self.f.read(16)
1501 if len(hdr) < 16:
1502 raise EOFError
1503 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr)
1505 try:
1506 data = self.f.read(caplen)[:size]
1507 except OverflowError as e:
1508 warning(f"Pcap: {e}")
1509 raise EOFError
1511 return (data,
1512 RawPcapReader.PacketMetadata(sec=sec, usec=usec,
1513 wirelen=wirelen, caplen=caplen))
1515 def read_packet(self, size=MTU):
1516 # type: (int) -> Packet
1517 raise Exception(
1518 "Cannot call read_packet() in RawPcapReader. Use "
1519 "_read_packet()"
1520 )
1522 def dispatch(self,
1523 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501
1524 ):
1525 # type: (...) -> None
1526 """call the specified callback routine for each packet read
1528 This is just a convenience function for the main loop
1529 that allows for easy launching of packet processing in a
1530 thread.
1531 """
1532 for p in self:
1533 callback(p)
1535 def _read_all(self, count=-1):
1536 # type: (int) -> List[Packet]
1537 """return a list of all packets in the pcap file
1538 """
1539 res = [] # type: List[Packet]
1540 while count != 0:
1541 count -= 1
1542 try:
1543 p = self.read_packet() # type: Packet
1544 except EOFError:
1545 break
1546 res.append(p)
1547 return res
1549 def recv(self, size=MTU):
1550 # type: (int) -> bytes
1551 """ Emulate a socket
1552 """
1553 return self._read_packet(size=size)[0]
1555 def fileno(self):
1556 # type: () -> int
1557 return -1 if WINDOWS else self.f.fileno()
1559 def close(self):
1560 # type: () -> None
1561 if isinstance(self.f, gzip.GzipFile):
1562 self.f.fileobj.close() # type: ignore
1563 self.f.close()
1565 def __exit__(self, exc_type, exc_value, tracback):
1566 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
1567 self.close()
1569 # emulate SuperSocket
1570 @staticmethod
1571 def select(sockets, # type: List[SuperSocket]
1572 remain=None, # type: Optional[float]
1573 ):
1574 # type: (...) -> List[SuperSocket]
1575 return sockets
1578class PcapReader(RawPcapReader):
1579 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1580 # type: (str, IO[bytes], bytes) -> None
1581 RawPcapReader.__init__(self, filename, fdesc, magic)
1582 try:
1583 self.LLcls = conf.l2types.num2layer[
1584 self.linktype
1585 ] # type: Type[Packet]
1586 except KeyError:
1587 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501
1588 if conf.raw_layer is None:
1589 # conf.raw_layer is set on import
1590 import scapy.packet # noqa: F401
1591 self.LLcls = conf.raw_layer
1593 def __enter__(self):
1594 # type: () -> PcapReader
1595 return self
1597 def read_packet(self, size=MTU, **kwargs):
1598 # type: (int, **Any) -> Packet
1599 rp = super(PcapReader, self)._read_packet(size=size)
1600 if rp is None:
1601 raise EOFError
1602 s, pkt_info = rp
1604 try:
1605 p = self.LLcls(s, **kwargs) # type: Packet
1606 except KeyboardInterrupt:
1607 raise
1608 except Exception:
1609 if conf.debug_dissector:
1610 from scapy.sendrecv import debug
1611 debug.crashed_on = (self.LLcls, s)
1612 raise
1613 if conf.raw_layer is None:
1614 # conf.raw_layer is set on import
1615 import scapy.packet # noqa: F401
1616 p = conf.raw_layer(s)
1617 power = Decimal(10) ** Decimal(-9 if self.nano else -6)
1618 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec)
1619 p.wirelen = pkt_info.wirelen
1620 return p
1622 def recv(self, size=MTU, **kwargs): # type: ignore
1623 # type: (int, **Any) -> Packet
1624 return self.read_packet(size=size, **kwargs)
1626 def __next__(self): # type: ignore
1627 # type: () -> Packet
1628 try:
1629 return self.read_packet()
1630 except EOFError:
1631 raise StopIteration
1633 def read_all(self, count=-1):
1634 # type: (int) -> PacketList
1635 res = self._read_all(count)
1636 from scapy import plist
1637 return plist.PacketList(res, name=os.path.basename(self.filename))
1640class RawPcapNgReader(RawPcapReader):
1641 """A stateful pcapng reader. Each packet is returned as
1642 bytes.
1644 """
1646 alternative = RawPcapReader # type: Type[Any]
1648 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore
1649 ["linktype", "tsresol",
1650 "tshigh", "tslow", "wirelen",
1651 "comment", "ifname", "direction",
1652 "process_information"])
1654 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1655 # type: (str, IO[bytes], bytes) -> None
1656 self.filename = filename
1657 self.f = fdesc
1658 # A list of (linktype, snaplen, tsresol); will be populated by IDBs.
1659 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]]
1660 self.default_options = {
1661 "tsresol": 1000000
1662 }
1663 self.blocktypes: Dict[
1664 int,
1665 Callable[
1666 [bytes, int],
1667 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]
1668 ]] = {
1669 1: self._read_block_idb,
1670 2: self._read_block_pkt,
1671 3: self._read_block_spb,
1672 6: self._read_block_epb,
1673 10: self._read_block_dsb,
1674 0x80000001: self._read_block_pib,
1675 }
1676 self.endian = "!" # Will be overwritten by first SHB
1677 self.process_information = [] # type: List[Dict[str, Any]]
1679 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg:
1680 raise Scapy_Exception(
1681 "Not a pcapng capture file (bad magic: %r)" % magic
1682 )
1684 try:
1685 self._read_block_shb()
1686 except EOFError:
1687 raise Scapy_Exception(
1688 "The first SHB of the pcapng file is malformed !"
1689 )
1691 def _read_block(self, size=MTU):
1692 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501
1693 try:
1694 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0]
1695 except struct.error:
1696 raise EOFError
1697 if blocktype == 0x0A0D0D0A:
1698 # This function updates the endianness based on the block content.
1699 self._read_block_shb()
1700 return None
1701 try:
1702 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0]
1703 except struct.error:
1704 warning("PcapNg: Error reading blocklen before block body")
1705 raise EOFError
1706 if blocklen < 12:
1707 warning("PcapNg: Invalid block length !")
1708 raise EOFError
1710 _block_body_length = blocklen - 12
1711 block = self.f.read(_block_body_length)
1712 if len(block) != _block_body_length:
1713 raise Scapy_Exception("PcapNg: Invalid Block body length "
1714 "(too short)")
1715 self._read_block_tail(blocklen)
1716 if blocktype in self.blocktypes:
1717 return self.blocktypes[blocktype](block, size)
1718 return None
1720 def _read_block_tail(self, blocklen):
1721 # type: (int) -> None
1722 if blocklen % 4:
1723 pad = self.f.read(-blocklen % 4)
1724 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
1725 "Ignored padding %r" % (blocklen, pad))
1726 try:
1727 if blocklen != struct.unpack(self.endian + 'I',
1728 self.f.read(4))[0]:
1729 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)")
1730 except struct.error:
1731 warning("PcapNg: Could not read blocklen after block body")
1732 raise EOFError
1734 def _read_block_shb(self):
1735 # type: () -> None
1736 """Section Header Block"""
1737 _blocklen = self.f.read(4)
1738 endian = self.f.read(4)
1739 if endian == b"\x1a\x2b\x3c\x4d":
1740 self.endian = ">"
1741 elif endian == b"\x4d\x3c\x2b\x1a":
1742 self.endian = "<"
1743 else:
1744 warning("PcapNg: Bad magic in Section Header Block"
1745 " (not a pcapng file?)")
1746 raise EOFError
1748 try:
1749 blocklen = struct.unpack(self.endian + "I", _blocklen)[0]
1750 except struct.error:
1751 warning("PcapNg: Could not read blocklen")
1752 raise EOFError
1753 if blocklen < 28:
1754 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501
1755 raise EOFError
1757 # Major version must be 1
1758 _major = self.f.read(2)
1759 try:
1760 major = struct.unpack(self.endian + "H", _major)[0]
1761 except struct.error:
1762 warning("PcapNg: Could not read major value")
1763 raise EOFError
1764 if major != 1:
1765 warning(f"PcapNg: SHB Major version {major} unsupported !")
1766 raise EOFError
1768 # Skip minor version & section length
1769 skipped = self.f.read(10)
1770 if len(skipped) != 10:
1771 warning("PcapNg: Could not read minor value & section length")
1772 raise EOFError
1774 _options_len = blocklen - 28
1775 options = self.f.read(_options_len)
1776 if len(options) != _options_len:
1777 raise Scapy_Exception("PcapNg: Invalid Section Header Block "
1778 " options (too short)")
1779 self._read_block_tail(blocklen)
1780 self._read_options(options)
1782 def _read_packet(self, size=MTU): # type: ignore
1783 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1784 """Read blocks until it reaches either EOF or a packet, and
1785 returns None or (packet, (linktype, sec, usec, wirelen)),
1786 where packet is a string.
1788 """
1789 while True:
1790 res = self._read_block(size=size)
1791 if res is not None:
1792 return res
1794 def _read_options(self, options):
1795 # type: (bytes) -> Dict[int, bytes]
1796 opts = dict()
1797 while len(options) >= 4:
1798 try:
1799 code, length = struct.unpack(self.endian + "HH", options[:4])
1800 except struct.error:
1801 warning("PcapNg: options header is too small "
1802 "%d !" % len(options))
1803 raise EOFError
1804 if code != 0 and 4 + length <= len(options):
1805 opts[code] = options[4:4 + length]
1806 if code == 0:
1807 if length != 0:
1808 warning("PcapNg: invalid option "
1809 "length %d for end-of-option" % length)
1810 break
1811 if length % 4:
1812 length += (4 - (length % 4))
1813 options = options[4 + length:]
1814 return opts
1816 def _read_block_idb(self, block, _):
1817 # type: (bytes, int) -> None
1818 """Interface Description Block"""
1819 # 2 bytes LinkType + 2 bytes Reserved
1820 # 4 bytes Snaplen
1821 options_raw = self._read_options(block[8:])
1822 options = self.default_options.copy() # type: Dict[str, Any]
1823 for c, v in options_raw.items():
1824 if c == 9:
1825 length = len(v)
1826 if length == 1:
1827 tsresol = orb(v)
1828 options["tsresol"] = (2 if tsresol & 128 else 10) ** (
1829 tsresol & 127
1830 )
1831 else:
1832 warning("PcapNg: invalid options "
1833 "length %d for IDB tsresol" % length)
1834 elif c == 2:
1835 options["name"] = v
1836 elif c == 1:
1837 options["comment"] = v
1838 try:
1839 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack(
1840 self.endian + "HxxI",
1841 block[:8]
1842 ) + (options,)
1843 except struct.error:
1844 warning("PcapNg: IDB is too small %d/8 !" % len(block))
1845 raise EOFError
1846 self.interfaces.append(interface)
1848 def _check_interface_id(self, intid):
1849 # type: (int) -> None
1850 """Check the interface id value and raise EOFError if invalid."""
1851 tmp_len = len(self.interfaces)
1852 if intid >= tmp_len:
1853 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len))
1854 raise EOFError
1856 def _read_block_epb(self, block, size):
1857 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1858 """Enhanced Packet Block"""
1859 try:
1860 intid, tshigh, tslow, caplen, wirelen = struct.unpack(
1861 self.endian + "5I",
1862 block[:20],
1863 )
1864 except struct.error:
1865 warning("PcapNg: EPB is too small %d/20 !" % len(block))
1866 raise EOFError
1868 # Compute the options offset taking padding into account
1869 if caplen % 4:
1870 opt_offset = 20 + caplen + (-caplen) % 4
1871 else:
1872 opt_offset = 20 + caplen
1874 # Parse options
1875 options = self._read_options(block[opt_offset:])
1877 process_information = {}
1878 for code, value in options.items():
1879 if code in [0x8001, 0x8003]: # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX
1880 try:
1881 proc_index = struct.unpack(self.endian + "I", value)[0]
1882 except struct.error:
1883 warning("PcapNg: EPB invalid proc index"
1884 "(expected 4 bytes, got %d) !" % len(value))
1885 raise EOFError
1886 if proc_index < len(self.process_information):
1887 key = "proc" if code == 0x8001 else "eproc"
1888 process_information[key] = self.process_information[proc_index]
1889 else:
1890 warning("PcapNg: EPB invalid process information index "
1891 "(%d/%d) !" % (proc_index, len(self.process_information)))
1893 comment = options.get(1, None)
1894 epb_flags_raw = options.get(2, None)
1895 if epb_flags_raw:
1896 try:
1897 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw)
1898 except struct.error:
1899 warning("PcapNg: EPB invalid flags size"
1900 "(expected 4 bytes, got %d) !" % len(epb_flags_raw))
1901 raise EOFError
1902 direction = epb_flags & 3
1904 else:
1905 direction = None
1907 self._check_interface_id(intid)
1908 ifname = self.interfaces[intid][2].get('name', None)
1910 return (block[20:20 + caplen][:size],
1911 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1912 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1913 tshigh=tshigh,
1914 tslow=tslow,
1915 wirelen=wirelen,
1916 comment=comment,
1917 ifname=ifname,
1918 direction=direction,
1919 process_information=process_information))
1921 def _read_block_spb(self, block, size):
1922 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1923 """Simple Packet Block"""
1924 # "it MUST be assumed that all the Simple Packet Blocks have
1925 # been captured on the interface previously specified in the
1926 # first Interface Description Block."
1927 intid = 0
1928 self._check_interface_id(intid)
1930 try:
1931 wirelen, = struct.unpack(self.endian + "I", block[:4])
1932 except struct.error:
1933 warning("PcapNg: SPB is too small %d/4 !" % len(block))
1934 raise EOFError
1936 caplen = min(wirelen, self.interfaces[intid][1])
1937 return (block[4:4 + caplen][:size],
1938 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1939 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1940 tshigh=None,
1941 tslow=None,
1942 wirelen=wirelen,
1943 comment=None,
1944 ifname=None,
1945 direction=None,
1946 process_information={}))
1948 def _read_block_pkt(self, block, size):
1949 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1950 """(Obsolete) Packet Block"""
1951 try:
1952 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
1953 self.endian + "HH4I",
1954 block[:20],
1955 )
1956 except struct.error:
1957 warning("PcapNg: PKT is too small %d/20 !" % len(block))
1958 raise EOFError
1960 self._check_interface_id(intid)
1961 return (block[20:20 + caplen][:size],
1962 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1963 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1964 tshigh=tshigh,
1965 tslow=tslow,
1966 wirelen=wirelen,
1967 comment=None,
1968 ifname=None,
1969 direction=None,
1970 process_information={}))
1972 def _read_block_dsb(self, block, size):
1973 # type: (bytes, int) -> None
1974 """Decryption Secrets Block"""
1976 # Parse the secrets type and length fields
1977 try:
1978 secrets_type, secrets_length = struct.unpack(
1979 self.endian + "II",
1980 block[:8],
1981 )
1982 block = block[8:]
1983 except struct.error:
1984 warning("PcapNg: DSB is too small %d!", len(block))
1985 raise EOFError
1987 # Compute the secrets length including the padding
1988 padded_secrets_length = secrets_length + (-secrets_length) % 4
1989 if len(block) < padded_secrets_length:
1990 warning("PcapNg: invalid DSB secrets length!")
1991 raise EOFError
1993 # Extract secrets data and options
1994 secrets_data = block[:padded_secrets_length][:secrets_length]
1995 if block[padded_secrets_length:]:
1996 warning("PcapNg: DSB options are not supported!")
1998 # TLS Key Log
1999 if secrets_type == 0x544c534b:
2000 if getattr(conf, "tls_sessions", False) is False:
2001 warning("PcapNg: TLS Key Log available, but "
2002 "the TLS layer is not loaded! Scapy won't be able "
2003 "to decrypt the packets.")
2004 else:
2005 from scapy.layers.tls.session import load_nss_keys
2007 # Write Key Log to a file and parse it
2008 filename = get_temp_file()
2009 with open(filename, "wb") as fd:
2010 fd.write(secrets_data)
2011 fd.close()
2013 keys = load_nss_keys(filename)
2014 if not keys:
2015 warning("PcapNg: invalid TLS Key Log in DSB!")
2016 else:
2017 # Note: these attributes are only available when the TLS
2018 # layer is loaded.
2019 conf.tls_nss_keys = keys
2020 conf.tls_session_enable = True
2021 else:
2022 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type)
2024 def _read_block_pib(self, block, _):
2025 # type: (bytes, int) -> None
2026 """Apple Process Information Block"""
2028 # Get the Process ID
2029 try:
2030 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0]
2031 process_information = {"id": dpeb_pid}
2032 block = block[4:]
2033 except struct.error:
2034 warning("PcapNg: DPEB is too small (%d). Cannot get PID!",
2035 len(block))
2036 raise EOFError
2038 # Get Options
2039 options = self._read_options(block)
2040 for code, value in options.items():
2041 if code == 2:
2042 process_information["name"] = value.decode("ascii", "backslashreplace")
2043 elif code == 4:
2044 if len(value) == 16:
2045 process_information["uuid"] = str(UUID(bytes=value))
2046 else:
2047 warning("PcapNg: DPEB UUID length is invalid (%d)!",
2048 len(value))
2050 # Store process information
2051 self.process_information.append(process_information)
2054class PcapNgReader(RawPcapNgReader, PcapReader):
2056 alternative = PcapReader
2058 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
2059 # type: (str, IO[bytes], bytes) -> None
2060 RawPcapNgReader.__init__(self, filename, fdesc, magic)
2062 def __enter__(self):
2063 # type: () -> PcapNgReader
2064 return self
2066 def read_packet(self, size=MTU, **kwargs):
2067 # type: (int, **Any) -> Packet
2068 rp = super(PcapNgReader, self)._read_packet(size=size)
2069 if rp is None:
2070 raise EOFError
2071 s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction, process_information) = rp # noqa: E501
2072 try:
2073 cls = conf.l2types.num2layer[linktype] # type: Type[Packet]
2074 p = cls(s, **kwargs) # type: Packet
2075 except KeyboardInterrupt:
2076 raise
2077 except Exception:
2078 if conf.debug_dissector:
2079 raise
2080 if conf.raw_layer is None:
2081 # conf.raw_layer is set on import
2082 import scapy.packet # noqa: F401
2083 p = conf.raw_layer(s)
2084 if tshigh is not None:
2085 p.time = EDecimal((tshigh << 32) + tslow) / tsresol
2086 p.wirelen = wirelen
2087 p.comment = comment
2088 p.direction = direction
2089 p.process_information = process_information.copy()
2090 if ifname is not None:
2091 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace')
2092 return p
2094 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore
2095 return self.read_packet(size=size, **kwargs)
2098class GenericPcapWriter(object):
2099 nano = False
2100 linktype: int
2102 def _write_header(self, pkt):
2103 # type: (Optional[Union[Packet, bytes]]) -> None
2104 raise NotImplementedError
2106 def _write_packet(self,
2107 packet, # type: Union[bytes, Packet]
2108 linktype, # type: int
2109 sec=None, # type: Optional[float]
2110 usec=None, # type: Optional[int]
2111 caplen=None, # type: Optional[int]
2112 wirelen=None, # type: Optional[int]
2113 comment=None, # type: Optional[bytes]
2114 ifname=None, # type: Optional[bytes]
2115 direction=None, # type: Optional[int]
2116 ):
2117 # type: (...) -> None
2118 raise NotImplementedError
2120 def _get_time(self,
2121 packet, # type: Union[bytes, Packet]
2122 sec, # type: Optional[float]
2123 usec # type: Optional[int]
2124 ):
2125 # type: (...) -> Tuple[float, int]
2126 if hasattr(packet, "time"):
2127 if sec is None:
2128 packet_time = packet.time
2129 tmp = int(packet_time)
2130 usec = int(round((packet_time - tmp) *
2131 (1000000000 if self.nano else 1000000)))
2132 sec = float(packet_time)
2133 if sec is not None and usec is None:
2134 usec = 0
2135 return sec, usec # type: ignore
2137 def write_header(self, pkt):
2138 # type: (Optional[Union[Packet, bytes]]) -> None
2139 if not hasattr(self, 'linktype'):
2140 try:
2141 if pkt is None or isinstance(pkt, bytes):
2142 # Can't guess LL
2143 raise KeyError
2144 self.linktype = conf.l2types.layer2num[
2145 pkt.__class__
2146 ]
2147 except KeyError:
2148 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)"
2149 warning(msg, self.__class__.__name__, pkt.__class__.__name__)
2150 self.linktype = DLT_EN10MB
2151 self._write_header(pkt)
2153 def write_packet(self,
2154 packet, # type: Union[bytes, Packet]
2155 sec=None, # type: Optional[float]
2156 usec=None, # type: Optional[int]
2157 caplen=None, # type: Optional[int]
2158 wirelen=None, # type: Optional[int]
2159 ):
2160 # type: (...) -> None
2161 """
2162 Writes a single packet to the pcap file.
2164 :param packet: Packet, or bytes for a single packet
2165 :type packet: scapy.packet.Packet or bytes
2166 :param sec: time the packet was captured, in seconds since epoch. If
2167 not supplied, defaults to now.
2168 :type sec: float
2169 :param usec: If ``nano=True``, then number of nanoseconds after the
2170 second that the packet was captured. If ``nano=False``,
2171 then the number of microseconds after the second the
2172 packet was captured. If ``sec`` is not specified,
2173 this value is ignored.
2174 :type usec: int or long
2175 :param caplen: The length of the packet in the capture file. If not
2176 specified, uses ``len(raw(packet))``.
2177 :type caplen: int
2178 :param wirelen: The length of the packet on the wire. If not
2179 specified, tries ``packet.wirelen``, otherwise uses
2180 ``caplen``.
2181 :type wirelen: int
2182 :return: None
2183 :rtype: None
2184 """
2185 f_sec, usec = self._get_time(packet, sec, usec)
2187 rawpkt = bytes_encode(packet)
2188 caplen = len(rawpkt) if caplen is None else caplen
2190 if wirelen is None:
2191 if hasattr(packet, "wirelen"):
2192 wirelen = packet.wirelen
2193 if wirelen is None:
2194 wirelen = caplen
2196 comment = getattr(packet, "comment", None)
2197 ifname = getattr(packet, "sniffed_on", None)
2198 direction = getattr(packet, "direction", None)
2199 if not isinstance(packet, bytes):
2200 linktype: int = conf.l2types.layer2num[
2201 packet.__class__
2202 ]
2203 else:
2204 linktype = self.linktype
2205 if ifname is not None:
2206 ifname = str(ifname).encode('utf-8')
2207 self._write_packet(
2208 rawpkt,
2209 sec=f_sec, usec=usec,
2210 caplen=caplen, wirelen=wirelen,
2211 comment=comment,
2212 ifname=ifname,
2213 direction=direction,
2214 linktype=linktype
2215 )
2218class GenericRawPcapWriter(GenericPcapWriter):
2219 header_present = False
2220 nano = False
2221 sync = False
2222 f = None # type: Union[IO[bytes], gzip.GzipFile]
2224 def fileno(self):
2225 # type: () -> int
2226 return -1 if WINDOWS else self.f.fileno()
2228 def flush(self):
2229 # type: () -> Optional[Any]
2230 return self.f.flush()
2232 def close(self):
2233 # type: () -> Optional[Any]
2234 if not self.header_present:
2235 self.write_header(None)
2236 return self.f.close()
2238 def __enter__(self):
2239 # type: () -> GenericRawPcapWriter
2240 return self
2242 def __exit__(self, exc_type, exc_value, tracback):
2243 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
2244 self.flush()
2245 self.close()
2247 def write(self, pkt):
2248 # type: (Union[_PacketIterable, bytes]) -> None
2249 """
2250 Writes a Packet, a SndRcvList object, or bytes to a pcap file.
2252 :param pkt: Packet(s) to write (one record for each Packet), or raw
2253 bytes to write (as one record).
2254 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes
2255 """
2256 if isinstance(pkt, bytes):
2257 if not self.header_present:
2258 self.write_header(pkt)
2259 self.write_packet(pkt)
2260 else:
2261 # Import here to avoid circular dependency
2262 from scapy.supersocket import IterSocket
2263 for p in IterSocket(pkt).iter:
2264 if not self.header_present:
2265 self.write_header(p)
2267 if not isinstance(p, bytes) and \
2268 self.linktype != conf.l2types.get(type(p), None):
2269 warning("Inconsistent linktypes detected!"
2270 " The resulting file might contain"
2271 " invalid packets."
2272 )
2274 self.write_packet(p)
2277class RawPcapWriter(GenericRawPcapWriter):
2278 """A stream PCAP writer with more control than wrpcap()"""
2280 def __init__(self,
2281 filename, # type: Union[IO[bytes], str]
2282 linktype=None, # type: Optional[int]
2283 gz=False, # type: bool
2284 endianness="", # type: str
2285 append=False, # type: bool
2286 sync=False, # type: bool
2287 nano=False, # type: bool
2288 snaplen=MTU, # type: int
2289 bufsz=4096, # type: int
2290 ):
2291 # type: (...) -> None
2292 """
2293 :param filename: the name of the file to write packets to, or an open,
2294 writable file-like object.
2295 :param linktype: force linktype to a given value. If None, linktype is
2296 taken from the first writer packet
2297 :param gz: compress the capture on the fly
2298 :param endianness: force an endianness (little:"<", big:">").
2299 Default is native
2300 :param append: append packets to the capture file instead of
2301 truncating it
2302 :param sync: do not bufferize writes to the capture file
2303 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0)
2305 """
2307 if linktype:
2308 self.linktype = linktype
2309 self.snaplen = snaplen
2310 self.append = append
2311 self.gz = gz
2312 self.endian = endianness
2313 self.sync = sync
2314 self.nano = nano
2315 if sync:
2316 bufsz = 0
2318 if isinstance(filename, str):
2319 self.filename = filename
2320 if gz:
2321 self.f = cast(_ByteStream, gzip.open(
2322 filename, append and "ab" or "wb", 9
2323 ))
2324 else:
2325 self.f = open(filename, append and "ab" or "wb", bufsz)
2326 else:
2327 self.f = filename
2328 self.filename = getattr(filename, "name", "No name")
2330 def _write_header(self, pkt):
2331 # type: (Optional[Union[Packet, bytes]]) -> None
2332 self.header_present = True
2334 if self.append:
2335 # Even if prone to race conditions, this seems to be
2336 # safest way to tell whether the header is already present
2337 # because we have to handle compressed streams that
2338 # are not as flexible as basic files
2339 if self.gz:
2340 g = gzip.open(self.filename, "rb") # type: _ByteStream
2341 else:
2342 g = open(self.filename, "rb")
2343 try:
2344 if g.read(16):
2345 return
2346 finally:
2347 g.close()
2349 if not hasattr(self, 'linktype'):
2350 raise ValueError(
2351 "linktype could not be guessed. "
2352 "Please pass a linktype while creating the writer"
2353 )
2355 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501
2356 2, 4, 0, 0, self.snaplen, self.linktype))
2357 self.f.flush()
2359 def _write_packet(self,
2360 packet, # type: Union[bytes, Packet]
2361 linktype, # type: int
2362 sec=None, # type: Optional[float]
2363 usec=None, # type: Optional[int]
2364 caplen=None, # type: Optional[int]
2365 wirelen=None, # type: Optional[int]
2366 comment=None, # type: Optional[bytes]
2367 ifname=None, # type: Optional[bytes]
2368 direction=None, # type: Optional[int]
2369 ):
2370 # type: (...) -> None
2371 """
2372 Writes a single packet to the pcap file.
2374 :param packet: bytes for a single packet
2375 :type packet: bytes
2376 :param linktype: linktype value associated with the packet
2377 :type linktype: int
2378 :param sec: time the packet was captured, in seconds since epoch. If
2379 not supplied, defaults to now.
2380 :type sec: float
2381 :param usec: not used with pcapng
2382 packet was captured
2383 :type usec: int or long
2384 :param caplen: The length of the packet in the capture file. If not
2385 specified, uses ``len(packet)``.
2386 :type caplen: int
2387 :param wirelen: The length of the packet on the wire. If not
2388 specified, uses ``caplen``.
2389 :type wirelen: int
2390 :return: None
2391 :rtype: None
2392 """
2393 if caplen is None:
2394 caplen = len(packet)
2395 if wirelen is None:
2396 wirelen = caplen
2397 if sec is None or usec is None:
2398 t = time.time()
2399 it = int(t)
2400 if sec is None:
2401 sec = it
2402 usec = int(round((t - it) *
2403 (1000000000 if self.nano else 1000000)))
2404 elif usec is None:
2405 usec = 0
2407 self.f.write(struct.pack(self.endian + "IIII",
2408 int(sec), usec, caplen, wirelen))
2409 self.f.write(bytes(packet))
2410 if self.sync:
2411 self.f.flush()
2414class RawPcapNgWriter(GenericRawPcapWriter):
2415 """A stream pcapng writer with more control than wrpcapng()"""
2417 def __init__(self,
2418 filename, # type: str
2419 ):
2420 # type: (...) -> None
2422 self.header_present = False
2423 self.tsresol = 1000000
2424 # A dict to keep if_name to IDB id mapping.
2425 # unknown if_name(None) id=0
2426 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0}
2428 # tcpdump only support little-endian in PCAPng files
2429 self.endian = "<"
2430 self.endian_magic = b"\x4d\x3c\x2b\x1a"
2432 self.filename = filename
2433 self.f = open(filename, "wb", 4096)
2435 def _get_time(self,
2436 packet, # type: Union[bytes, Packet]
2437 sec, # type: Optional[float]
2438 usec # type: Optional[int]
2439 ):
2440 # type: (...) -> Tuple[float, int]
2441 if hasattr(packet, "time"):
2442 if sec is None:
2443 sec = float(packet.time)
2445 if usec is None:
2446 usec = 0
2448 return sec, usec # type: ignore
2450 def _add_padding(self, raw_data):
2451 # type: (bytes) -> bytes
2452 raw_data += ((-len(raw_data)) % 4) * b"\x00"
2453 return raw_data
2455 def build_block(self, block_type, block_body, options=None):
2456 # type: (bytes, bytes, Optional[bytes]) -> bytes
2458 # Pad Block Body to 32 bits
2459 block_body = self._add_padding(block_body)
2461 if options:
2462 block_body += options
2464 # An empty block is 12 bytes long
2465 block_total_length = 12 + len(block_body)
2467 # Block Type
2468 block = block_type
2469 # Block Total Length$
2470 block += struct.pack(self.endian + "I", block_total_length)
2471 # Block Body
2472 block += block_body
2473 # Block Total Length$
2474 block += struct.pack(self.endian + "I", block_total_length)
2476 return block
2478 def _write_header(self, pkt):
2479 # type: (Optional[Union[Packet, bytes]]) -> None
2480 if not self.header_present:
2481 self.header_present = True
2482 self._write_block_shb()
2483 self._write_block_idb(linktype=self.linktype)
2485 def _write_block_shb(self):
2486 # type: () -> None
2488 # Block Type
2489 block_type = b"\x0A\x0D\x0D\x0A"
2490 # Byte-Order Magic
2491 block_shb = self.endian_magic
2492 # Major Version
2493 block_shb += struct.pack(self.endian + "H", 1)
2494 # Minor Version
2495 block_shb += struct.pack(self.endian + "H", 0)
2496 # Section Length
2497 block_shb += struct.pack(self.endian + "q", -1)
2499 self.f.write(self.build_block(block_type, block_shb))
2501 def _write_block_idb(self,
2502 linktype, # type: int
2503 ifname=None # type: Optional[bytes]
2504 ):
2505 # type: (...) -> None
2507 # Block Type
2508 block_type = struct.pack(self.endian + "I", 1)
2509 # LinkType
2510 block_idb = struct.pack(self.endian + "H", linktype)
2511 # Reserved
2512 block_idb += struct.pack(self.endian + "H", 0)
2513 # SnapLen
2514 block_idb += struct.pack(self.endian + "I", 262144)
2516 # if_name option
2517 opts = None
2518 if ifname is not None:
2519 opts = struct.pack(self.endian + "HH", 2, len(ifname))
2520 # Pad Option Value to 32 bits
2521 opts += self._add_padding(ifname)
2522 opts += struct.pack(self.endian + "HH", 0, 0)
2524 self.f.write(self.build_block(block_type, block_idb, options=opts))
2526 def _write_block_spb(self, raw_pkt):
2527 # type: (bytes) -> None
2529 # Block Type
2530 block_type = struct.pack(self.endian + "I", 3)
2531 # Original Packet Length
2532 block_spb = struct.pack(self.endian + "I", len(raw_pkt))
2533 # Packet Data
2534 block_spb += raw_pkt
2536 self.f.write(self.build_block(block_type, block_spb))
2538 def _write_block_epb(self,
2539 raw_pkt, # type: bytes
2540 ifid, # type: int
2541 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501
2542 caplen=None, # type: Optional[int]
2543 orglen=None, # type: Optional[int]
2544 comment=None, # type: Optional[bytes]
2545 flags=None, # type: Optional[int]
2546 ):
2547 # type: (...) -> None
2549 if timestamp:
2550 tmp_ts = int(timestamp * self.tsresol)
2551 ts_high = tmp_ts >> 32
2552 ts_low = tmp_ts & 0xFFFFFFFF
2553 else:
2554 ts_high = ts_low = 0
2556 if not caplen:
2557 caplen = len(raw_pkt)
2559 if not orglen:
2560 orglen = len(raw_pkt)
2562 # Block Type
2563 block_type = struct.pack(self.endian + "I", 6)
2564 # Interface ID
2565 block_epb = struct.pack(self.endian + "I", ifid)
2566 # Timestamp (High)
2567 block_epb += struct.pack(self.endian + "I", ts_high)
2568 # Timestamp (Low)
2569 block_epb += struct.pack(self.endian + "I", ts_low)
2570 # Captured Packet Length
2571 block_epb += struct.pack(self.endian + "I", caplen)
2572 # Original Packet Length
2573 block_epb += struct.pack(self.endian + "I", orglen)
2574 # Packet Data
2575 block_epb += raw_pkt
2577 # Options
2578 opts = b''
2579 if comment is not None:
2580 comment = bytes_encode(comment)
2581 opts += struct.pack(self.endian + "HH", 1, len(comment))
2582 # Pad Option Value to 32 bits
2583 opts += self._add_padding(comment)
2584 if type(flags) == int:
2585 opts += struct.pack(self.endian + "HH", 2, 4)
2586 opts += struct.pack(self.endian + "I", flags)
2587 if opts:
2588 opts += struct.pack(self.endian + "HH", 0, 0)
2590 self.f.write(self.build_block(block_type, block_epb,
2591 options=opts))
2593 def _write_packet(self, # type: ignore
2594 packet, # type: bytes
2595 linktype, # type: int
2596 sec=None, # type: Optional[float]
2597 usec=None, # type: Optional[int]
2598 caplen=None, # type: Optional[int]
2599 wirelen=None, # type: Optional[int]
2600 comment=None, # type: Optional[bytes]
2601 ifname=None, # type: Optional[bytes]
2602 direction=None, # type: Optional[int]
2603 ):
2604 # type: (...) -> None
2605 """
2606 Writes a single packet to the pcap file.
2608 :param packet: bytes for a single packet
2609 :type packet: bytes
2610 :param linktype: linktype value associated with the packet
2611 :type linktype: int
2612 :param sec: time the packet was captured, in seconds since epoch. If
2613 not supplied, defaults to now.
2614 :type sec: float
2615 :param caplen: The length of the packet in the capture file. If not
2616 specified, uses ``len(packet)``.
2617 :type caplen: int
2618 :param wirelen: The length of the packet on the wire. If not
2619 specified, uses ``caplen``.
2620 :type wirelen: int
2621 :param comment: UTF-8 string containing human-readable comment text
2622 that is associated to the current block. Line separators
2623 SHOULD be a carriage-return + linefeed ('\r\n') or
2624 just linefeed ('\n'); either form may appear and
2625 be considered a line separator. The string is not
2626 zero-terminated.
2627 :type bytes
2628 :param ifname: UTF-8 string containing the
2629 name of the device used to capture data.
2630 The string is not zero-terminated.
2631 :type bytes
2632 :param direction: 0 = information not available,
2633 1 = inbound,
2634 2 = outbound
2635 :type int
2636 :return: None
2637 :rtype: None
2638 """
2639 if caplen is None:
2640 caplen = len(packet)
2641 if wirelen is None:
2642 wirelen = caplen
2644 ifid = self.interfaces2id.get(ifname, None)
2645 if ifid is None:
2646 ifid = max(self.interfaces2id.values()) + 1
2647 self.interfaces2id[ifname] = ifid
2648 self._write_block_idb(linktype=linktype, ifname=ifname)
2650 # EPB flags (32 bits).
2651 # currently only direction is implemented (least 2 significant bits)
2652 if type(direction) == int:
2653 flags = direction & 0x3
2654 else:
2655 flags = None
2657 self._write_block_epb(packet, timestamp=sec, caplen=caplen,
2658 orglen=wirelen, comment=comment, ifid=ifid, flags=flags)
2659 if self.sync:
2660 self.f.flush()
2663class PcapWriter(RawPcapWriter):
2664 """A stream PCAP writer with more control than wrpcap()"""
2665 pass
2668class PcapNgWriter(RawPcapNgWriter):
2669 """A stream pcapng writer with more control than wrpcapng()"""
2671 def _get_time(self,
2672 packet, # type: Union[bytes, Packet]
2673 sec, # type: Optional[float]
2674 usec # type: Optional[int]
2675 ):
2676 # type: (...) -> Tuple[float, int]
2677 if hasattr(packet, "time"):
2678 if sec is None:
2679 sec = float(packet.time)
2681 if usec is None:
2682 usec = 0
2684 return sec, usec # type: ignore
2687@conf.commands.register
2688def rderf(filename, count=-1):
2689 # type: (Union[IO[bytes], str], int) -> PacketList
2690 """Read a ERF file and return a packet list
2692 :param count: read only <count> packets
2693 """
2694 with ERFEthernetReader(filename) as fdesc:
2695 return fdesc.read_all(count=count)
2698class ERFEthernetReader_metaclass(PcapReader_metaclass):
2699 def __call__(cls, filename):
2700 # type: (Union[IO[bytes], str]) -> Any
2701 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore
2702 filename, fdesc = cls.open(filename)
2703 try:
2704 i.__init__(filename, fdesc)
2705 return i
2706 except (Scapy_Exception, EOFError):
2707 pass
2709 if "alternative" in cls.__dict__:
2710 cls = cls.__dict__["alternative"]
2711 i = cls.__new__(
2712 cls,
2713 cls.__name__,
2714 cls.__bases__,
2715 cls.__dict__ # type: ignore
2716 )
2717 try:
2718 i.__init__(filename, fdesc)
2719 return i
2720 except (Scapy_Exception, EOFError):
2721 pass
2723 raise Scapy_Exception("Not a supported capture file")
2725 @staticmethod
2726 def open(fname # type: ignore
2727 ):
2728 # type: (...) -> Tuple[str, _ByteStream]
2729 """Open (if necessary) filename"""
2730 if isinstance(fname, str):
2731 filename = fname
2732 try:
2733 with gzip.open(filename, "rb") as tmp:
2734 tmp.read(1)
2735 fdesc = gzip.open(filename, "rb") # type: _ByteStream
2736 except IOError:
2737 fdesc = open(filename, "rb")
2739 else:
2740 fdesc = fname
2741 filename = getattr(fdesc, "name", "No name")
2742 return filename, fdesc
2745class ERFEthernetReader(PcapReader,
2746 metaclass=ERFEthernetReader_metaclass):
2748 def __init__(self, filename, fdesc=None): # type: ignore
2749 # type: (Union[IO[bytes], str], IO[bytes]) -> None
2750 self.filename = filename # type: ignore
2751 self.f = fdesc
2752 self.power = Decimal(10) ** Decimal(-9)
2754 # time is in 64-bits Endace's format which can be see here:
2755 # https://www.endace.com/erf-extensible-record-format-types.pdf
2756 def _convert_erf_timestamp(self, t):
2757 # type: (int) -> EDecimal
2758 sec = t >> 32
2759 frac_sec = t & 0xffffffff
2760 frac_sec *= 10**9
2761 frac_sec += (frac_sec & 0x80000000) << 1
2762 frac_sec >>= 32
2763 return EDecimal(sec + self.power * frac_sec)
2765 # The details of ERF Packet format can be see here:
2766 # https://www.endace.com/erf-extensible-record-format-types.pdf
2767 def read_packet(self, size=MTU, **kwargs):
2768 # type: (int, **Any) -> Packet
2770 # General ERF Header have exactly 16 bytes
2771 hdr = self.f.read(16)
2772 if len(hdr) < 16:
2773 raise EOFError
2775 # The timestamp is in little-endian byte-order.
2776 time = struct.unpack('<Q', hdr[:8])[0]
2777 # The rest is in big-endian byte-order.
2778 # Ignoring flags and lctr (loss counter) since they are ERF specific
2779 # header fields which Packet object does not support.
2780 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:])
2781 # Check if the type != 0x02, type Ethernet
2782 if type & 0x02 == 0:
2783 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)")
2785 # If there are extended headers, ignore it because Packet object does
2786 # not support it. Extended headers size is 8 bytes before the payload.
2787 if type & 0x80:
2788 _ = self.f.read(8)
2789 s = self.f.read(rlen - 24)
2790 else:
2791 s = self.f.read(rlen - 16)
2793 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both
2794 # of the fields are disregarded by Endace.
2795 pb = s[2:size]
2796 from scapy.layers.l2 import Ether
2797 try:
2798 p = Ether(pb, **kwargs) # type: Packet
2799 except KeyboardInterrupt:
2800 raise
2801 except Exception:
2802 if conf.debug_dissector:
2803 from scapy.sendrecv import debug
2804 debug.crashed_on = (Ether, s)
2805 raise
2806 if conf.raw_layer is None:
2807 # conf.raw_layer is set on import
2808 import scapy.packet # noqa: F401
2809 p = conf.raw_layer(s)
2811 p.time = self._convert_erf_timestamp(time)
2812 p.wirelen = wlen
2814 return p
2817@conf.commands.register
2818def wrerf(filename, # type: Union[IO[bytes], str]
2819 pkt, # type: _PacketIterable
2820 *args, # type: Any
2821 **kargs # type: Any
2822 ):
2823 # type: (...) -> None
2824 """Write a list of packets to a ERF file
2826 :param filename: the name of the file to write packets to, or an open,
2827 writable file-like object. The file descriptor will be
2828 closed at the end of the call, so do not use an object you
2829 do not want to close (e.g., running wrerf(sys.stdout, [])
2830 in interactive mode will crash Scapy).
2831 :param gz: set to 1 to save a gzipped capture
2832 :param append: append packets to the capture file instead of
2833 truncating it
2834 :param sync: do not bufferize writes to the capture file
2835 """
2836 with ERFEthernetWriter(filename, *args, **kargs) as fdesc:
2837 fdesc.write(pkt)
2840class ERFEthernetWriter(PcapWriter):
2841 """A stream ERF Ethernet writer with more control than wrerf()"""
2843 def __init__(self,
2844 filename, # type: Union[IO[bytes], str]
2845 gz=False, # type: bool
2846 append=False, # type: bool
2847 sync=False, # type: bool
2848 ):
2849 # type: (...) -> None
2850 """
2851 :param filename: the name of the file to write packets to, or an open,
2852 writable file-like object.
2853 :param gz: compress the capture on the fly
2854 :param append: append packets to the capture file instead of
2855 truncating it
2856 :param sync: do not bufferize writes to the capture file
2857 """
2858 super(ERFEthernetWriter, self).__init__(filename,
2859 gz=gz,
2860 append=append,
2861 sync=sync)
2863 def write(self, pkt): # type: ignore
2864 # type: (_PacketIterable) -> None
2865 """
2866 Writes a Packet, a SndRcvList object, or bytes to a ERF file.
2868 :param pkt: Packet(s) to write (one record for each Packet)
2869 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet
2870 """
2871 # Import here to avoid circular dependency
2872 from scapy.supersocket import IterSocket
2873 for p in IterSocket(pkt).iter:
2874 self.write_packet(p)
2876 def write_packet(self, pkt): # type: ignore
2877 # type: (Packet) -> None
2879 if hasattr(pkt, "time"):
2880 sec = int(pkt.time)
2881 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9)
2882 t = (sec << 32) + usec
2883 else:
2884 t = int(time.time()) << 32
2886 # There are 16 bytes of headers + 2 bytes of padding before the packets
2887 # payload.
2888 rlen = len(pkt) + 18
2890 if hasattr(pkt, "wirelen"):
2891 wirelen = pkt.wirelen
2892 if wirelen is None:
2893 wirelen = rlen
2895 self.f.write(struct.pack("<Q", t))
2896 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0))
2897 self.f.write(bytes(pkt))
2898 self.f.flush()
2900 def close(self):
2901 # type: () -> Optional[Any]
2902 return self.f.close()
2905@conf.commands.register
2906def import_hexcap(input_string=None):
2907 # type: (Optional[str]) -> bytes
2908 """Imports a tcpdump like hexadecimal view
2910 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex"
2912 :param input_string: String containing the hexdump input to parse. If None,
2913 read from standard input.
2914 """
2915 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501
2916 p = ""
2917 try:
2918 if input_string:
2919 input_function = StringIO(input_string).readline
2920 else:
2921 input_function = input
2922 while True:
2923 line = input_function().strip()
2924 if not line:
2925 break
2926 try:
2927 p += re_extract_hexcap.match(line).groups()[2] # type: ignore
2928 except Exception:
2929 warning("Parsing error during hexcap")
2930 continue
2931 except EOFError:
2932 pass
2934 p = p.replace(" ", "")
2935 return hex_bytes(p)
2938@conf.commands.register
2939def wireshark(pktlist, wait=False, **kwargs):
2940 # type: (List[Packet], bool, **Any) -> Optional[Any]
2941 """
2942 Runs Wireshark on a list of packets.
2944 See :func:`tcpdump` for more parameter description.
2946 Note: this defaults to wait=False, to run Wireshark in the background.
2947 """
2948 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs)
2951@conf.commands.register
2952def tdecode(
2953 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable]
2954 args=None, # type: Optional[List[str]]
2955 **kwargs # type: Any
2956):
2957 # type: (...) -> Any
2958 """
2959 Run tshark on a list of packets.
2961 :param args: If not specified, defaults to ``tshark -V``.
2963 See :func:`tcpdump` for more parameters.
2964 """
2965 if args is None:
2966 args = ["-V"]
2967 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs)
2970def _guess_linktype_name(value):
2971 # type: (int) -> str
2972 """Guess the DLT name from its value."""
2973 from scapy.libs.winpcapy import pcap_datalink_val_to_name
2974 return cast(bytes, pcap_datalink_val_to_name(value)).decode()
2977def _guess_linktype_value(name):
2978 # type: (str) -> int
2979 """Guess the value of a DLT name."""
2980 from scapy.libs.winpcapy import pcap_datalink_name_to_val
2981 val = cast(int, pcap_datalink_name_to_val(name.encode()))
2982 if val == -1:
2983 warning("Unknown linktype: %s. Using EN10MB", name)
2984 return DLT_EN10MB
2985 return val
2988@conf.commands.register
2989def tcpdump(
2990 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable]
2991 dump=False, # type: bool
2992 getfd=False, # type: bool
2993 args=None, # type: Optional[List[str]]
2994 flt=None, # type: Optional[str]
2995 prog=None, # type: Optional[Any]
2996 getproc=False, # type: bool
2997 quiet=False, # type: bool
2998 use_tempfile=None, # type: Optional[Any]
2999 read_stdin_opts=None, # type: Optional[Any]
3000 linktype=None, # type: Optional[Any]
3001 wait=True, # type: bool
3002 _suppress=False # type: bool
3003):
3004 # type: (...) -> Any
3005 """Run tcpdump or tshark on a list of packets.
3007 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a
3008 temporary file to store the packets. This works around a bug in Apple's
3009 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/
3011 Otherwise, the packets are passed in stdin.
3013 This function can be explicitly enabled or disabled with the
3014 ``use_tempfile`` parameter.
3016 When using ``wireshark``, it will be called with ``-ki -`` to start
3017 immediately capturing packets from stdin.
3019 Otherwise, the command will be run with ``-r -`` (which is correct for
3020 ``tcpdump`` and ``tshark``).
3022 This can be overridden with ``read_stdin_opts``. This has no effect when
3023 ``use_tempfile=True``, or otherwise reading packets from a regular file.
3025 :param pktlist: a Packet instance, a PacketList instance or a list of
3026 Packet instances. Can also be a filename (as a string), an open
3027 file-like object that must be a file format readable by
3028 tshark (Pcap, PcapNg, etc.) or None (to sniff)
3029 :param flt: a filter to use with tcpdump
3030 :param dump: when set to True, returns a string instead of displaying it.
3031 :param getfd: when set to True, returns a file-like object to read data
3032 from tcpdump or tshark from.
3033 :param getproc: when set to True, the subprocess.Popen object is returned
3034 :param args: arguments (as a list) to pass to tshark (example for tshark:
3035 args=["-T", "json"]).
3036 :param prog: program to use (defaults to tcpdump, will work with tshark)
3037 :param quiet: when set to True, the process stderr is discarded
3038 :param use_tempfile: When set to True, always use a temporary file to store
3039 packets.
3040 When set to False, pipe packets through stdin.
3041 When set to None (default), only use a temporary file with
3042 ``tcpdump`` on OSX.
3043 :param read_stdin_opts: When set, a list of arguments needed to capture
3044 from stdin. Otherwise, attempts to guess.
3045 :param linktype: A custom DLT value or name, to overwrite the default
3046 values.
3047 :param wait: If True (default), waits for the process to terminate before
3048 returning to Scapy. If False, the process will be detached to the
3049 background. If dump, getproc or getfd is True, these have the same
3050 effect as ``wait=False``.
3052 Examples::
3054 >>> tcpdump([IP()/TCP(), IP()/UDP()])
3055 reading from file -, link-type RAW (Raw IP)
3056 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501
3057 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]
3059 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
3060 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501
3061 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0
3063 To get a JSON representation of a tshark-parsed PacketList(), one can::
3065 >>> import json, pprint
3066 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5",
3067 ... dst="45.33.32.156"),
3068 ... prog=conf.prog.tshark,
3069 ... args=["-T", "json"],
3070 ... getfd=True))
3071 >>> pprint.pprint(json_data)
3072 [{u'_index': u'packets-2016-12-23',
3073 u'_score': None,
3074 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
3075 u'frame.encap_type': u'7',
3076 [...]
3077 },
3078 u'ip': {u'ip.addr': u'45.33.32.156',
3079 u'ip.checksum': u'0x0000a20d',
3080 [...]
3081 u'ip.ttl': u'64',
3082 u'ip.version': u'4'},
3083 u'raw': u'Raw packet data'}},
3084 u'_type': u'pcap_file'}]
3085 >>> json_data[0]['_source']['layers']['ip']['ip.ttl']
3086 u'64'
3087 """
3088 getfd = getfd or getproc
3089 if prog is None:
3090 if not conf.prog.tcpdump:
3091 raise Scapy_Exception(
3092 "tcpdump is not available"
3093 )
3094 prog = [conf.prog.tcpdump]
3095 elif isinstance(prog, str):
3096 prog = [prog]
3097 else:
3098 raise ValueError("prog must be a string")
3100 if linktype is not None:
3101 if isinstance(linktype, int):
3102 # Guess name from value
3103 try:
3104 linktype_name = _guess_linktype_name(linktype)
3105 except StopIteration:
3106 linktype = -1
3107 else:
3108 # Guess value from name
3109 if linktype.startswith("DLT_"):
3110 linktype = linktype[4:]
3111 linktype_name = linktype
3112 try:
3113 linktype = _guess_linktype_value(linktype)
3114 except KeyError:
3115 linktype = -1
3116 if linktype == -1:
3117 raise ValueError(
3118 "Unknown linktype. Try passing its datalink name instead"
3119 )
3120 prog += ["-y", linktype_name]
3122 # Build Popen arguments
3123 if args is None:
3124 args = []
3125 else:
3126 # Make a copy of args
3127 args = list(args)
3129 if flt is not None:
3130 # Check the validity of the filter
3131 if linktype is None and isinstance(pktlist, str):
3132 # linktype is unknown but required. Read it from file
3133 with PcapReader(pktlist) as rd:
3134 if isinstance(rd, PcapNgReader):
3135 # Get the linktype from the first packet
3136 try:
3137 _, metadata = rd._read_packet()
3138 linktype = metadata.linktype
3139 if OPENBSD and linktype == 228:
3140 linktype = DLT_RAW
3141 except EOFError:
3142 raise ValueError(
3143 "Cannot get linktype from a PcapNg packet."
3144 )
3145 else:
3146 linktype = rd.linktype
3147 from scapy.arch.common import compile_filter
3148 compile_filter(flt, linktype=linktype)
3149 args.append(flt)
3151 stdout = subprocess.PIPE if dump or getfd else None
3152 stderr = open(os.devnull) if quiet else None
3153 proc = None
3155 if use_tempfile is None:
3156 # Apple's tcpdump cannot read from stdin, see:
3157 # http://apple.stackexchange.com/questions/152682/
3158 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump
3160 if read_stdin_opts is None:
3161 if prog[0] == conf.prog.wireshark:
3162 # Start capturing immediately (-k) from stdin (-i -)
3163 read_stdin_opts = ["-ki", "-"]
3164 elif prog[0] == conf.prog.tcpdump and not OPENBSD:
3165 # Capture in packet-buffered mode (-U) from stdin (-r -)
3166 read_stdin_opts = ["-U", "-r", "-"]
3167 else:
3168 read_stdin_opts = ["-r", "-"]
3169 else:
3170 # Make a copy of read_stdin_opts
3171 read_stdin_opts = list(read_stdin_opts)
3173 if pktlist is None:
3174 # sniff
3175 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3176 proc = subprocess.Popen(
3177 prog + args,
3178 stdout=stdout,
3179 stderr=stderr,
3180 )
3181 elif isinstance(pktlist, str):
3182 # file
3183 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3184 proc = subprocess.Popen(
3185 prog + ["-r", pktlist] + args,
3186 stdout=stdout,
3187 stderr=stderr,
3188 )
3189 elif use_tempfile:
3190 tmpfile = get_temp_file( # type: ignore
3191 autoext=".pcap",
3192 fd=True
3193 ) # type: IO[bytes]
3194 try:
3195 tmpfile.writelines(
3196 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3197 )
3198 except AttributeError:
3199 pktlist = cast("_PacketIterable", pktlist)
3200 wrpcap(tmpfile, pktlist, linktype=linktype)
3201 else:
3202 tmpfile.close()
3203 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3204 proc = subprocess.Popen(
3205 prog + ["-r", tmpfile.name] + args,
3206 stdout=stdout,
3207 stderr=stderr,
3208 )
3209 else:
3210 try:
3211 pktlist.fileno() # type: ignore
3212 # pass the packet stream
3213 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3214 proc = subprocess.Popen(
3215 prog + read_stdin_opts + args,
3216 stdin=pktlist, # type: ignore
3217 stdout=stdout,
3218 stderr=stderr,
3219 )
3220 except (AttributeError, ValueError):
3221 # write the packet stream to stdin
3222 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3223 proc = subprocess.Popen(
3224 prog + read_stdin_opts + args,
3225 stdin=subprocess.PIPE,
3226 stdout=stdout,
3227 stderr=stderr,
3228 )
3229 if proc is None:
3230 # An error has occurred
3231 return
3232 try:
3233 proc.stdin.writelines( # type: ignore
3234 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3235 )
3236 except AttributeError:
3237 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore
3238 except UnboundLocalError:
3239 # The error was handled by ContextManagerSubprocess
3240 pass
3241 else:
3242 proc.stdin.close() # type: ignore
3243 if proc is None:
3244 # An error has occurred
3245 return
3246 if dump:
3247 data = b"".join(
3248 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore
3249 )
3250 proc.terminate()
3251 return data
3252 if getproc:
3253 return proc
3254 if getfd:
3255 return proc.stdout
3256 if wait:
3257 proc.wait()
3260@conf.commands.register
3261def hexedit(pktlist):
3262 # type: (_PacketIterable) -> PacketList
3263 """Run hexedit on a list of packets, then return the edited packets."""
3264 f = get_temp_file()
3265 wrpcap(f, pktlist)
3266 with ContextManagerSubprocess(conf.prog.hexedit):
3267 subprocess.call([conf.prog.hexedit, f])
3268 rpktlist = rdpcap(f)
3269 os.unlink(f)
3270 return rpktlist
3273def get_terminal_width():
3274 # type: () -> Optional[int]
3275 """Get terminal width (number of characters) if in a window.
3277 Notice: this will try several methods in order to
3278 support as many terminals and OS as possible.
3279 """
3280 sizex = shutil.get_terminal_size(fallback=(0, 0))[0]
3281 if sizex != 0:
3282 return sizex
3283 # Backups
3284 if WINDOWS:
3285 from ctypes import windll, create_string_buffer
3286 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
3287 h = windll.kernel32.GetStdHandle(-12)
3288 csbi = create_string_buffer(22)
3289 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
3290 if res:
3291 (bufx, bufy, curx, cury, wattr,
3292 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501
3293 sizex = right - left + 1
3294 # sizey = bottom - top + 1
3295 return sizex
3296 return sizex
3297 # We have various methods
3298 # COLUMNS is set on some terminals
3299 try:
3300 sizex = int(os.environ['COLUMNS'])
3301 except Exception:
3302 pass
3303 if sizex:
3304 return sizex
3305 # We can query TIOCGWINSZ
3306 try:
3307 import fcntl
3308 import termios
3309 s = struct.pack('HHHH', 0, 0, 0, 0)
3310 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
3311 sizex = struct.unpack('HHHH', x)[1]
3312 except (IOError, ModuleNotFoundError):
3313 # If everything failed, return default terminal size
3314 sizex = 79
3315 return sizex
3318def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]]
3319 header, # type: List[Tuple[str, ...]]
3320 sortBy=0, # type: Optional[int]
3321 borders=False, # type: bool
3322 ):
3323 # type: (...) -> str
3324 """
3325 Pretty list to fit the terminal, and add header.
3327 :param rtlst: a list of tuples. each tuple contains a value which can
3328 be either a string or a list of string.
3329 :param sortBy: the column id (starting with 0) which will be used for
3330 ordering
3331 :param borders: whether to put borders on the table or not
3332 """
3333 if borders:
3334 _space = "|"
3335 else:
3336 _space = " "
3337 cols = len(header[0])
3338 # Windows has a fat terminal border
3339 _spacelen = len(_space) * (cols - 1) + int(WINDOWS)
3340 _croped = False
3341 if sortBy is not None:
3342 # Sort correctly
3343 rtlst.sort(key=lambda x: x[sortBy])
3344 # Resolve multi-values
3345 for i, line in enumerate(rtlst):
3346 ids = [] # type: List[int]
3347 values = [] # type: List[Union[str, List[str]]]
3348 for j, val in enumerate(line):
3349 if isinstance(val, list):
3350 ids.append(j)
3351 values.append(val or " ")
3352 if values:
3353 del rtlst[i]
3354 k = 0
3355 for ex_vals in zip_longest(*values, fillvalue=" "):
3356 if k:
3357 extra_line = [" "] * cols
3358 else:
3359 extra_line = list(line) # type: ignore
3360 for j, h in enumerate(ids):
3361 extra_line[h] = ex_vals[j]
3362 rtlst.insert(i + k, tuple(extra_line))
3363 k += 1
3364 rtslst = cast(List[Tuple[str, ...]], rtlst)
3365 # Append tag
3366 rtslst = header + rtslst
3367 # Detect column's width
3368 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)]
3369 # Make text fit in box (if required)
3370 width = get_terminal_width()
3371 if conf.auto_crop_tables and width:
3372 width = width - _spacelen
3373 while sum(colwidth) > width:
3374 _croped = True
3375 # Needs to be cropped
3376 # Get the longest row
3377 i = colwidth.index(max(colwidth))
3378 # Get all elements of this row
3379 row = [len(x[i]) for x in rtslst]
3380 # Get biggest element of this row: biggest of the array
3381 j = row.index(max(row))
3382 # Re-build column tuple with the edited element
3383 t = list(rtslst[j])
3384 t[i] = t[i][:-2] + "_"
3385 rtslst[j] = tuple(t)
3386 # Update max size
3387 row[j] = len(t[i])
3388 colwidth[i] = max(row)
3389 if _croped:
3390 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501
3391 # Generate padding scheme
3392 fmt = _space.join(["%%-%ds" % x for x in colwidth])
3393 # Append separation line if needed
3394 if borders:
3395 rtslst.insert(1, tuple("-" * x for x in colwidth))
3396 # Compile
3397 return "\n".join(fmt % x for x in rtslst)
3400def human_size(x, fmt=".1f"):
3401 # type: (int, str) -> str
3402 """
3403 Convert a size in octets to a human string representation
3404 """
3405 units = ['K', 'M', 'G', 'T', 'P', 'E']
3406 if not x:
3407 return "0B"
3408 i = int(math.log(x, 2**10))
3409 if i and i < len(units):
3410 return format(x / 2**(10 * i), fmt) + units[i - 1]
3411 return str(x) + "B"
3414def __make_table(
3415 yfmtfunc, # type: Callable[[int], str]
3416 fmtfunc, # type: Callable[[int], str]
3417 endline, # type: str
3418 data, # type: List[Tuple[Packet, Packet]]
3419 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]]
3420 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3421 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3422 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]]
3423 dump=False # type: bool
3424):
3425 # type: (...) -> Optional[str]
3426 """Core function of the make_table suite, which generates the table"""
3427 vx = {} # type: Dict[str, int]
3428 vy = {} # type: Dict[str, Optional[int]]
3429 vz = {} # type: Dict[Tuple[str, str], str]
3430 vxf = {} # type: Dict[str, str]
3432 tmp_len = 0
3433 for e in data:
3434 xx, yy, zz = [str(s) for s in fxyz(*e)]
3435 tmp_len = max(len(yy), tmp_len)
3436 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz))
3437 vy[yy] = None
3438 vz[(xx, yy)] = zz
3440 vxk = list(vx)
3441 vyk = list(vy)
3442 if sortx:
3443 vxk.sort(key=sortx)
3444 else:
3445 try:
3446 vxk.sort(key=int)
3447 except Exception:
3448 try:
3449 vxk.sort(key=atol)
3450 except Exception:
3451 vxk.sort()
3452 if sorty:
3453 vyk.sort(key=sorty)
3454 else:
3455 try:
3456 vyk.sort(key=int)
3457 except Exception:
3458 try:
3459 vyk.sort(key=atol)
3460 except Exception:
3461 vyk.sort()
3463 s = ""
3464 if seplinefunc:
3465 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk])
3466 s += sepline + "\n"
3468 fmt = yfmtfunc(tmp_len)
3469 s += fmt % ""
3470 s += ' '
3471 for x in vxk:
3472 vxf[x] = fmtfunc(vx[x])
3473 s += vxf[x] % x
3474 s += ' '
3475 s += endline + "\n"
3476 if seplinefunc:
3477 s += sepline + "\n"
3478 for y in vyk:
3479 s += fmt % y
3480 s += ' '
3481 for x in vxk:
3482 s += vxf[x] % vz.get((x, y), "-")
3483 s += ' '
3484 s += endline + "\n"
3485 if seplinefunc:
3486 s += sepline + "\n"
3488 if dump:
3489 return s
3490 else:
3491 print(s, end="")
3492 return None
3495def make_table(*args, **kargs):
3496 # type: (*Any, **Any) -> Optional[Any]
3497 return __make_table(
3498 lambda l: "%%-%is" % l,
3499 lambda l: "%%-%is" % l,
3500 "",
3501 *args,
3502 **kargs
3503 )
3506def make_lined_table(*args, **kargs):
3507 # type: (*Any, **Any) -> Optional[str]
3508 return __make_table( # type: ignore
3509 lambda l: "%%-%is |" % l,
3510 lambda l: "%%-%is |" % l,
3511 "",
3512 *args,
3513 seplinefunc=lambda a, x: "+".join(
3514 '-' * (y + 2) for y in [a - 1] + x + [-2]
3515 ),
3516 **kargs
3517 )
3520def make_tex_table(*args, **kargs):
3521 # type: (*Any, **Any) -> Optional[str]
3522 return __make_table( # type: ignore
3523 lambda l: "%s",
3524 lambda l: "& %s",
3525 "\\\\",
3526 *args,
3527 seplinefunc=lambda a, x: "\\hline",
3528 **kargs
3529 )
3531####################
3532# WHOIS CLIENT #
3533####################
3536def whois(ip_address):
3537 # type: (str) -> bytes
3538 """Whois client for Python"""
3539 whois_ip = str(ip_address)
3540 try:
3541 query = socket.gethostbyname(whois_ip)
3542 except Exception:
3543 query = whois_ip
3544 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3545 s.connect(("whois.ripe.net", 43))
3546 s.send(query.encode("utf8") + b"\r\n")
3547 answer = b""
3548 while True:
3549 d = s.recv(4096)
3550 answer += d
3551 if not d:
3552 break
3553 s.close()
3554 ignore_tag = b"remarks:"
3555 # ignore all lines starting with the ignore_tag
3556 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501
3557 # remove empty lines at the bottom
3558 for i in range(1, len(lines)):
3559 if not lines[-i].strip():
3560 del lines[-i]
3561 else:
3562 break
3563 return b"\n".join(lines[3:])
3565####################
3566# CLI utils #
3567####################
3570class _CLIUtilMetaclass(type):
3571 class TYPE(enum.Enum):
3572 COMMAND = 0
3573 OUTPUT = 1
3574 COMPLETE = 2
3576 def __new__(cls, # type: Type[_CLIUtilMetaclass]
3577 name, # type: str
3578 bases, # type: Tuple[type, ...]
3579 dct # type: Dict[str, Any]
3580 ):
3581 # type: (...) -> Type[CLIUtil]
3582 dct["commands"] = {
3583 x.__name__: x
3584 for x in dct.values()
3585 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMMAND
3586 }
3587 dct["commands_output"] = {
3588 x.cliutil_ref.__name__: x
3589 for x in dct.values()
3590 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.OUTPUT
3591 }
3592 dct["commands_complete"] = {
3593 x.cliutil_ref.__name__: x
3594 for x in dct.values()
3595 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMPLETE
3596 }
3597 newcls = cast(Type['CLIUtil'], type.__new__(cls, name, bases, dct))
3598 return newcls
3601class CLIUtil(metaclass=_CLIUtilMetaclass):
3602 """
3603 Provides a Util class to easily create simple CLI tools in Scapy,
3604 that can still be used as an API.
3606 Doc:
3607 - override the ps1() function
3608 - register commands with the @CLIUtil.addcomment decorator
3609 - call the loop() function when ready
3610 """
3612 def _depcheck(self) -> None:
3613 """
3614 Check that all dependencies are installed
3615 """
3616 try:
3617 import prompt_toolkit # noqa: F401
3618 except ImportError:
3619 # okay we lie but prompt_toolkit is a dependency...
3620 raise ImportError("You need to have IPython installed to use the CLI")
3622 # Okay let's do nice code
3623 commands: Dict[str, Callable[..., Any]] = {}
3624 # print output of command
3625 commands_output: Dict[str, Callable[..., str]] = {}
3626 # provides completion to command
3627 commands_complete: Dict[str, Callable[..., List[str]]] = {}
3629 def __init__(self, cli: bool = True, debug: bool = False) -> None:
3630 """
3631 DEV: overwrite
3632 """
3633 if cli:
3634 self._depcheck()
3635 self.loop(debug=debug)
3637 @staticmethod
3638 def _inspectkwargs(func: DecoratorCallable) -> None:
3639 """
3640 Internal function to parse arguments from the kwargs of the functions
3641 """
3642 func._flagnames = [ # type: ignore
3643 x.name for x in
3644 inspect.signature(func).parameters.values()
3645 if x.kind == inspect.Parameter.KEYWORD_ONLY
3646 ]
3647 func._flags = [ # type: ignore
3648 ("-%s" % x) if len(x) == 1 else ("--%s" % x)
3649 for x in func._flagnames # type: ignore
3650 ]
3652 @staticmethod
3653 def _parsekwargs(
3654 func: DecoratorCallable,
3655 args: List[str]
3656 ) -> Tuple[List[str], Dict[str, Literal[True]]]:
3657 """
3658 Internal function to parse CLI arguments of a function.
3659 """
3660 kwargs: Dict[str, Literal[True]] = {}
3661 if func._flags: # type: ignore
3662 i = 0
3663 for arg in args:
3664 if arg in func._flags: # type: ignore
3665 i += 1
3666 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501
3667 continue
3668 break
3669 args = args[i:]
3670 return args, kwargs
3672 @classmethod
3673 def _parseallargs(
3674 cls,
3675 func: DecoratorCallable,
3676 cmd: str, args: List[str]
3677 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]:
3678 """
3679 Internal function to parse CLI arguments of both the function
3680 and its output function.
3681 """
3682 args, kwargs = cls._parsekwargs(func, args)
3683 outkwargs: Dict[str, Literal[True]] = {}
3684 if cmd in cls.commands_output:
3685 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args)
3686 return args, kwargs, outkwargs
3688 @classmethod
3689 def addcommand(
3690 cls,
3691 spaces: bool = False,
3692 globsupport: bool = False,
3693 ) -> Callable[[DecoratorCallable], DecoratorCallable]:
3694 """
3695 Decorator to register a command
3696 """
3697 def func(cmd: DecoratorCallable) -> DecoratorCallable:
3698 cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore
3699 cmd._spaces = spaces # type: ignore
3700 cmd._globsupport = globsupport # type: ignore
3701 cls._inspectkwargs(cmd)
3702 if cmd._globsupport and not cmd._spaces: # type: ignore
3703 raise ValueError("Cannot use globsupport without spaces.")
3704 return cmd
3705 return func
3707 @classmethod
3708 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3709 """
3710 Decorator to register a command output processor
3711 """
3712 def func(processor: DecoratorCallable) -> DecoratorCallable:
3713 processor.cliutil_type = _CLIUtilMetaclass.TYPE.OUTPUT # type: ignore
3714 processor.cliutil_ref = cmd # type: ignore
3715 cls._inspectkwargs(processor)
3716 return processor
3717 return func
3719 @classmethod
3720 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3721 """
3722 Decorator to register a command completor
3723 """
3724 def func(processor: DecoratorCallable) -> DecoratorCallable:
3725 processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore
3726 processor.cliutil_ref = cmd # type: ignore
3727 return processor
3728 return func
3730 def ps1(self) -> str:
3731 """
3732 Return the PS1 of the shell
3733 """
3734 return "> "
3736 def close(self) -> None:
3737 """
3738 Function called on exiting
3739 """
3740 print("Exited")
3742 def help(self, cmd: Optional[str] = None) -> None:
3743 """
3744 Return the help related to this CLI util
3745 """
3746 def _args(func: Any) -> str:
3747 flags = func._flags.copy()
3748 if func.__name__ in self.commands_output:
3749 flags += self.commands_output[func.__name__]._flags # type: ignore
3750 return " %s%s" % (
3751 (
3752 "%s " % " ".join("[%s]" % x for x in flags)
3753 if flags else ""
3754 ),
3755 " ".join(
3756 "<%s%s>" % (
3757 x.name,
3758 "?" if
3759 (x.default is None or x.default != inspect.Parameter.empty)
3760 else ""
3761 )
3762 for x in list(inspect.signature(func).parameters.values())[1:]
3763 if x.name not in func._flagnames and x.name[0] != "_"
3764 )
3765 )
3767 if cmd:
3768 if cmd not in self.commands:
3769 print("Unknown command '%s'" % cmd)
3770 return
3771 # help for one command
3772 func = self.commands[cmd]
3773 print("%s%s: %s" % (
3774 cmd,
3775 _args(func),
3776 func.__doc__ and func.__doc__.strip()
3777 ))
3778 else:
3779 header = "│ %s - Help │" % self.__class__.__name__
3780 print("┌" + "─" * (len(header) - 2) + "┐")
3781 print(header)
3782 print("└" + "─" * (len(header) - 2) + "┘")
3783 print(
3784 pretty_list(
3785 [
3786 (
3787 cmd,
3788 _args(func),
3789 func.__doc__ and func.__doc__.strip().split("\n")[0] or ""
3790 )
3791 for cmd, func in self.commands.items()
3792 ],
3793 [("Command", "Arguments", "Description")]
3794 )
3795 )
3797 def _completer(self) -> 'prompt_toolkit.completion.Completer':
3798 """
3799 Returns a prompt_toolkit custom completer
3800 """
3801 from prompt_toolkit.completion import Completer, Completion
3803 class CLICompleter(Completer):
3804 def get_completions(cmpl, document, complete_event): # type: ignore
3805 if not complete_event.completion_requested:
3806 # Only activate when the user does <TAB>
3807 return
3808 parts = document.text.split(" ")
3809 cmd = parts[0].lower()
3810 if cmd not in self.commands:
3811 # We are trying to complete the command
3812 for possible_cmd in (x for x in self.commands if x.startswith(cmd)):
3813 yield Completion(possible_cmd, start_position=-len(cmd))
3814 else:
3815 # We are trying to complete the command content
3816 if len(parts) == 1:
3817 return
3818 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:])
3819 arg = " ".join(args)
3820 if cmd in self.commands_complete:
3821 for possible_arg in self.commands_complete[cmd](self, arg):
3822 yield Completion(possible_arg, start_position=-len(arg))
3823 return
3824 return CLICompleter()
3826 def loop(self, debug: int = 0) -> None:
3827 """
3828 Main command handling loop
3829 """
3830 from prompt_toolkit import PromptSession
3831 session = PromptSession(completer=self._completer())
3833 while True:
3834 try:
3835 cmd = session.prompt(self.ps1()).strip()
3836 except KeyboardInterrupt:
3837 continue
3838 except EOFError:
3839 self.close()
3840 break
3841 args = cmd.split(" ")[1:]
3842 cmd = cmd.split(" ")[0].strip().lower()
3843 if not cmd:
3844 continue
3845 if cmd in ["help", "h", "?"]:
3846 self.help(" ".join(args))
3847 continue
3848 if cmd in "exit":
3849 break
3850 if cmd not in self.commands:
3851 print("Unknown command. Type help or ?")
3852 else:
3853 # check the number of arguments
3854 func = self.commands[cmd]
3855 args, kwargs, outkwargs = self._parseallargs(func, cmd, args)
3856 if func._spaces: # type: ignore
3857 args = [" ".join(args)]
3858 # if globsupport is set, we might need to do several calls
3859 if func._globsupport and "*" in args[0]: # type: ignore
3860 if args[0].count("*") > 1:
3861 print("More than 1 glob star (*) is currently unsupported.")
3862 continue
3863 before, after = args[0].split("*", 1)
3864 reg = re.compile(re.escape(before) + r".*" + after)
3865 calls = [
3866 [x] for x in
3867 self.commands_complete[cmd](self, before)
3868 if reg.match(x)
3869 ]
3870 else:
3871 calls = [args]
3872 else:
3873 calls = [args]
3874 # now iterate if required, call the function and print its output
3875 res = None
3876 for args in calls:
3877 try:
3878 res = func(self, *args, **kwargs)
3879 except TypeError:
3880 print("Bad number of arguments !")
3881 self.help(cmd=cmd)
3882 continue
3883 except Exception as ex:
3884 print("Command failed with error: %s" % ex)
3885 if debug:
3886 traceback.print_exception(ex)
3887 try:
3888 if res and cmd in self.commands_output:
3889 self.commands_output[cmd](self, res, **outkwargs)
3890 except Exception as ex:
3891 print("Output processor failed with error: %s" % ex)
3894def AutoArgparse(func: DecoratorCallable) -> None:
3895 """
3896 Generate an Argparse call from a function, then call this function.
3898 Notes:
3900 - for the arguments to have a description, the sphinx docstring format
3901 must be used. See
3902 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html
3903 - the arguments must be typed in Python (we ignore Sphinx-specific types)
3904 untyped arguments are ignored.
3905 - only types that would be supported by argparse are supported. The others
3906 are omitted.
3907 """
3908 argsdoc = {}
3909 if func.__doc__:
3910 # Sphinx doc format parser
3911 m = re.match(
3912 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)",
3913 func.__doc__.strip(),
3914 )
3915 if not m:
3916 desc = func.__doc__.strip()
3917 else:
3918 desc = m.group(1)
3919 sphinxargs = re.findall(
3920 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)",
3921 m.group(2),
3922 )
3923 for argtype, argparam, argdesc in sphinxargs:
3924 argparam = argparam.strip()
3925 argdesc = argdesc.strip()
3926 if argtype == "param":
3927 if not argparam:
3928 raise ValueError(":param: without a name !")
3929 argsdoc[argparam] = argdesc
3930 else:
3931 desc = ""
3932 # Now build the argparse.ArgumentParser
3933 parser = argparse.ArgumentParser(
3934 prog=func.__name__,
3935 description=desc,
3936 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
3937 )
3938 # Process the parameters
3939 positional = []
3940 for param in inspect.signature(func).parameters.values():
3941 if not param.annotation:
3942 continue
3943 parname = param.name
3944 paramkwargs = {}
3945 if param.annotation is bool:
3946 if param.default is True:
3947 parname = "no-" + parname
3948 paramkwargs["action"] = "store_false"
3949 else:
3950 paramkwargs["action"] = "store_true"
3951 elif param.annotation in [str, int, float]:
3952 paramkwargs["type"] = param.annotation
3953 else:
3954 continue
3955 if param.default != inspect.Parameter.empty:
3956 if param.kind == inspect.Parameter.POSITIONAL_ONLY:
3957 positional.append(param.name)
3958 paramkwargs["nargs"] = '?'
3959 else:
3960 parname = "--" + parname
3961 paramkwargs["default"] = param.default
3962 else:
3963 positional.append(param.name)
3964 if param.kind == inspect.Parameter.VAR_POSITIONAL:
3965 paramkwargs["action"] = "append"
3966 if param.name in argsdoc:
3967 paramkwargs["help"] = argsdoc[param.name]
3968 parser.add_argument(parname, **paramkwargs) # type: ignore
3969 # Now parse the sys.argv parameters
3970 params = vars(parser.parse_args())
3971 # Act as in interactive mode
3972 conf.logLevel = 20
3973 from scapy.themes import DefaultTheme
3974 conf.color_theme = DefaultTheme()
3975 # And call the function
3976 try:
3977 func(
3978 *[params.pop(x) for x in positional],
3979 **{
3980 (k[3:] if k.startswith("no_") else k): v
3981 for k, v in params.items()
3982 }
3983 )
3984 except AssertionError as ex:
3985 print("ERROR: " + str(ex))
3986 parser.print_help()
3989#######################
3990# PERIODIC SENDER #
3991#######################
3994class PeriodicSenderThread(threading.Thread):
3995 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True):
3996 # type: (Any, _PacketIterable, float, bool) -> None
3997 """ Thread to send packets periodically
3999 Args:
4000 sock: socket where packet is sent periodically
4001 pkt: packet or list of packets to send
4002 interval: interval between two packets
4003 """
4004 if not isinstance(pkt, list):
4005 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable
4006 else:
4007 self._pkts = pkt
4008 self._socket = sock
4009 self._stopped = threading.Event()
4010 self._enabled = threading.Event()
4011 self._enabled.set()
4012 self._interval = interval
4013 self._ignore_exceptions = ignore_exceptions
4014 threading.Thread.__init__(self)
4016 def enable(self):
4017 # type: () -> None
4018 self._enabled.set()
4020 def disable(self):
4021 # type: () -> None
4022 self._enabled.clear()
4024 def run(self):
4025 # type: () -> None
4026 while not self._stopped.is_set() and not self._socket.closed:
4027 for p in self._pkts:
4028 try:
4029 if self._enabled.is_set():
4030 self._socket.send(p)
4031 except (OSError, TimeoutError) as e:
4032 if self._ignore_exceptions:
4033 return
4034 else:
4035 raise e
4036 self._stopped.wait(timeout=self._interval)
4037 if self._stopped.is_set() or self._socket.closed:
4038 break
4040 def stop(self):
4041 # type: () -> None
4042 self._stopped.set()
4043 self.join(self._interval * 2)
4046class SingleConversationSocket(object):
4047 def __init__(self, o):
4048 # type: (Any) -> None
4049 self._inner = o
4050 self._tx_mutex = threading.RLock()
4052 @property
4053 def __dict__(self): # type: ignore
4054 return self._inner.__dict__
4056 def __getattr__(self, name):
4057 # type: (str) -> Any
4058 return getattr(self._inner, name)
4060 def sr1(self, *args, **kargs):
4061 # type: (*Any, **Any) -> Any
4062 with self._tx_mutex:
4063 return self._inner.sr1(*args, **kargs)
4065 def sr(self, *args, **kargs):
4066 # type: (*Any, **Any) -> Any
4067 with self._tx_mutex:
4068 return self._inner.sr(*args, **kargs)
4070 def send(self, x):
4071 # type: (Packet) -> Any
4072 with self._tx_mutex:
4073 try:
4074 return self._inner.send(x)
4075 except (ConnectionError, OSError) as e:
4076 self._inner.close()
4077 raise e