Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/utils.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7General utility functions.
8"""
11from decimal import Decimal
12from io import StringIO
13from itertools import zip_longest
14from uuid import UUID
16import argparse
17import array
18import base64
19import collections
20import decimal
21import difflib
22import enum
23import gzip
24import inspect
25import locale
26import math
27import os
28import pickle
29import random
30import re
31import shutil
32import socket
33import struct
34import subprocess
35import sys
36import tempfile
37import threading
38import time
39import traceback
40import warnings
42from scapy.config import conf
43from scapy.consts import DARWIN, OPENBSD, WINDOWS
44from scapy.data import MTU, DLT_EN10MB, DLT_RAW
45from scapy.compat import (
46 orb,
47 plain_str,
48 chb,
49 hex_bytes,
50 bytes_encode,
51)
52from scapy.error import (
53 log_interactive,
54 log_runtime,
55 Scapy_Exception,
56 warning,
57)
58from scapy.pton_ntop import inet_pton
60# Typing imports
61from typing import (
62 cast,
63 Any,
64 AnyStr,
65 Callable,
66 Dict,
67 IO,
68 Iterator,
69 List,
70 Optional,
71 TYPE_CHECKING,
72 Tuple,
73 Type,
74 Union,
75 overload,
76)
77from scapy.compat import (
78 DecoratorCallable,
79 Literal,
80)
82if TYPE_CHECKING:
83 from scapy.packet import Packet
84 from scapy.plist import _PacketIterable, PacketList
85 from scapy.supersocket import SuperSocket
86 import prompt_toolkit
88_ByteStream = Union[IO[bytes], gzip.GzipFile]
90###########
91# Tools #
92###########
95def issubtype(x, # type: Any
96 t, # type: Union[type, str]
97 ):
98 # type: (...) -> bool
99 """issubtype(C, B) -> bool
101 Return whether C is a class and if it is a subclass of class B.
102 When using a tuple as the second argument issubtype(X, (A, B, ...)),
103 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.).
104 """
105 if isinstance(t, str):
106 return t in (z.__name__ for z in x.__bases__)
107 if isinstance(x, type) and issubclass(x, t):
108 return True
109 return False
112_Decimal = Union[Decimal, int]
115class EDecimal(Decimal):
116 """Extended Decimal
118 This implements arithmetic and comparison with float for
119 backward compatibility
120 """
122 def __add__(self, other, context=None):
123 # type: (_Decimal, Any) -> EDecimal
124 return EDecimal(Decimal.__add__(self, Decimal(other)))
126 def __radd__(self, other):
127 # type: (_Decimal) -> EDecimal
128 return EDecimal(Decimal.__add__(self, Decimal(other)))
130 def __sub__(self, other):
131 # type: (_Decimal) -> EDecimal
132 return EDecimal(Decimal.__sub__(self, Decimal(other)))
134 def __rsub__(self, other):
135 # type: (_Decimal) -> EDecimal
136 return EDecimal(Decimal.__rsub__(self, Decimal(other)))
138 def __mul__(self, other):
139 # type: (_Decimal) -> EDecimal
140 return EDecimal(Decimal.__mul__(self, Decimal(other)))
142 def __rmul__(self, other):
143 # type: (_Decimal) -> EDecimal
144 return EDecimal(Decimal.__mul__(self, Decimal(other)))
146 def __truediv__(self, other):
147 # type: (_Decimal) -> EDecimal
148 return EDecimal(Decimal.__truediv__(self, Decimal(other)))
150 def __floordiv__(self, other):
151 # type: (_Decimal) -> EDecimal
152 return EDecimal(Decimal.__floordiv__(self, Decimal(other)))
154 def __divmod__(self, other):
155 # type: (_Decimal) -> Tuple[EDecimal, EDecimal]
156 r = Decimal.__divmod__(self, Decimal(other))
157 return EDecimal(r[0]), EDecimal(r[1])
159 def __mod__(self, other):
160 # type: (_Decimal) -> EDecimal
161 return EDecimal(Decimal.__mod__(self, Decimal(other)))
163 def __rmod__(self, other):
164 # type: (_Decimal) -> EDecimal
165 return EDecimal(Decimal.__rmod__(self, Decimal(other)))
167 def __pow__(self, other, modulo=None):
168 # type: (_Decimal, Optional[_Decimal]) -> EDecimal
169 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo))
171 def __eq__(self, other):
172 # type: (Any) -> bool
173 if isinstance(other, Decimal):
174 return super(EDecimal, self).__eq__(other)
175 else:
176 return bool(float(self) == other)
178 def normalize(self, precision): # type: ignore
179 # type: (int) -> EDecimal
180 with decimal.localcontext() as ctx:
181 ctx.prec = precision
182 return EDecimal(super(EDecimal, self).normalize(ctx))
185@overload
186def get_temp_file(keep, autoext, fd):
187 # type: (bool, str, Literal[True]) -> IO[bytes]
188 pass
191@overload
192def get_temp_file(keep=False, autoext="", fd=False):
193 # type: (bool, str, Literal[False]) -> str
194 pass
197def get_temp_file(keep=False, autoext="", fd=False):
198 # type: (bool, str, bool) -> Union[IO[bytes], str]
199 """Creates a temporary file.
201 :param keep: If False, automatically delete the file when Scapy exits.
202 :param autoext: Suffix to add to the generated file name.
203 :param fd: If True, this returns a file-like object with the temporary
204 file opened. If False (default), this returns a file path.
205 """
206 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
207 delete=False)
208 if not keep:
209 conf.temp_files.append(f.name)
211 if fd:
212 return f
213 else:
214 # Close the file so something else can take it.
215 f.close()
216 return f.name
219def get_temp_dir(keep=False):
220 # type: (bool) -> str
221 """Creates a temporary file, and returns its name.
223 :param keep: If False (default), the directory will be recursively
224 deleted when Scapy exits.
225 :return: A full path to a temporary directory.
226 """
228 dname = tempfile.mkdtemp(prefix="scapy")
230 if not keep:
231 conf.temp_files.append(dname)
233 return dname
236def _create_fifo() -> Tuple[str, Any]:
237 """Creates a temporary fifo.
239 You must then use open_fifo() on the server_fd once
240 the client is connected to use it.
242 :returns: (client_file, server_fd)
243 """
244 if WINDOWS:
245 from scapy.arch.windows.structures import _get_win_fifo
246 return _get_win_fifo()
247 else:
248 f = get_temp_file()
249 os.unlink(f)
250 os.mkfifo(f)
251 return f, f
254def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]:
255 """Open the server_fd (see create_fifo)
256 """
257 if WINDOWS:
258 from scapy.arch.windows.structures import _win_fifo_open
259 return _win_fifo_open(fd)
260 else:
261 return open(fd, mode)
264def sane(x, color=False):
265 # type: (AnyStr, bool) -> str
266 r = ""
267 for i in x:
268 j = orb(i)
269 if (j < 32) or (j >= 127):
270 if color:
271 r += conf.color_theme.not_printable(".")
272 else:
273 r += "."
274 else:
275 r += chr(j)
276 return r
279@conf.commands.register
280def restart():
281 # type: () -> None
282 """Restarts scapy"""
283 if not conf.interactive or not os.path.isfile(sys.argv[0]):
284 raise OSError("Scapy was not started from console")
285 if WINDOWS:
286 res_code = 1
287 try:
288 res_code = subprocess.call([sys.executable] + sys.argv)
289 finally:
290 os._exit(res_code)
291 os.execv(sys.executable, [sys.executable] + sys.argv)
294def lhex(x):
295 # type: (Any) -> str
296 from scapy.volatile import VolatileValue
297 if isinstance(x, VolatileValue):
298 return repr(x)
299 if isinstance(x, int):
300 return hex(x)
301 if isinstance(x, tuple):
302 return "(%s)" % ", ".join(lhex(v) for v in x)
303 if isinstance(x, list):
304 return "[%s]" % ", ".join(lhex(v) for v in x)
305 return str(x)
308@conf.commands.register
309def hexdump(p, dump=False):
310 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
311 """Build a tcpdump like hexadecimal view
313 :param p: a Packet
314 :param dump: define if the result must be printed or returned in a variable
315 :return: a String only when dump=True
316 """
317 s = ""
318 x = bytes_encode(p)
319 x_len = len(x)
320 i = 0
321 while i < x_len:
322 s += "%04x " % i
323 for j in range(16):
324 if i + j < x_len:
325 s += "%02X " % orb(x[i + j])
326 else:
327 s += " "
328 s += " %s\n" % sane(x[i:i + 16], color=True)
329 i += 16
330 # remove trailing \n
331 s = s[:-1] if s.endswith("\n") else s
332 if dump:
333 return s
334 else:
335 print(s)
336 return None
339@conf.commands.register
340def linehexdump(p, onlyasc=0, onlyhex=0, dump=False):
341 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str]
342 """Build an equivalent view of hexdump() on a single line
344 Note that setting both onlyasc and onlyhex to 1 results in a empty output
346 :param p: a Packet
347 :param onlyasc: 1 to display only the ascii view
348 :param onlyhex: 1 to display only the hexadecimal view
349 :param dump: print the view if False
350 :return: a String only when dump=True
351 """
352 s = ""
353 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump)
354 if dump:
355 return s
356 else:
357 print(s)
358 return None
361@conf.commands.register
362def chexdump(p, dump=False):
363 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
364 """Build a per byte hexadecimal representation
366 Example:
367 >>> chexdump(IP())
368 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501
370 :param p: a Packet
371 :param dump: print the view if False
372 :return: a String only if dump=True
373 """
374 x = bytes_encode(p)
375 s = ", ".join("%#04x" % orb(x) for x in x)
376 if dump:
377 return s
378 else:
379 print(s)
380 return None
383@conf.commands.register
384def hexstr(p, onlyasc=0, onlyhex=0, color=False):
385 # type: (Union[Packet, AnyStr], int, int, bool) -> str
386 """Build a fancy tcpdump like hex from bytes."""
387 x = bytes_encode(p)
388 s = []
389 if not onlyasc:
390 s.append(" ".join("%02X" % orb(b) for b in x))
391 if not onlyhex:
392 s.append(sane(x, color=color))
393 return " ".join(s)
396def repr_hex(s):
397 # type: (bytes) -> str
398 """ Convert provided bitstring to a simple string of hex digits """
399 return "".join("%02x" % orb(x) for x in s)
402@conf.commands.register
403def hexdiff(
404 a: Union['Packet', AnyStr],
405 b: Union['Packet', AnyStr],
406 algo: Optional[str] = None,
407 autojunk: bool = False,
408) -> None:
409 """
410 Show differences between 2 binary strings, Packets...
412 Available algorithms:
413 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the
414 Levenstein distance between the strings then backtrack.
415 - difflib: Use the difflib.SequenceMatcher implementation. This based on a
416 modified version of the Ratcliff and Obershelp algorithm.
417 This is much faster, but far less accurate.
418 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher
420 :param a:
421 :param b: The binary strings, packets... to compare
422 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'.
423 By default, this is chosen depending on the complexity, optimistically
424 preferring wagnerfischer unless really necessary.
425 :param autojunk: (difflib only) See difflib documentation.
426 """
427 xb = bytes_encode(a)
428 yb = bytes_encode(b)
430 if algo is None:
431 # Choose the best algorithm
432 complexity = len(xb) * len(yb)
433 if complexity < 1e7:
434 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable.
435 # Anything much larger than this shouldn't be attempted by default.
436 algo = "wagnerfischer"
437 if complexity > 1e6:
438 log_interactive.info(
439 "Complexity is a bit high. hexdiff will take a few seconds."
440 )
441 else:
442 algo = "difflib"
444 backtrackx = []
445 backtracky = []
447 if algo == "wagnerfischer":
448 xb = xb[::-1]
449 yb = yb[::-1]
451 # costs for the 3 operations
452 INSERT = 1
453 DELETE = 1
454 SUBST = 1
456 # Typically, d[i,j] will hold the distance between
457 # the first i characters of xb and the first j characters of yb.
458 # We change the Wagner Fischer to also store pointers to all
459 # the intermediate steps taken while calculating the Levenstein distance.
460 d = {(-1, -1): (0, (-1, -1))}
461 for j in range(len(yb)):
462 d[-1, j] = (j + 1) * INSERT, (-1, j - 1)
463 for i in range(len(xb)):
464 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1)
466 # Compute the Levenstein distance between the two strings, but
467 # store all the steps to be able to backtrack at the end.
468 for j in range(len(yb)):
469 for i in range(len(xb)):
470 d[i, j] = min(
471 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)),
472 (d[i - 1, j][0] + DELETE, (i - 1, j)),
473 (d[i, j - 1][0] + INSERT, (i, j - 1)),
474 )
476 # Iterate through the steps backwards to create the diff
477 i = len(xb) - 1
478 j = len(yb) - 1
479 while not (i == j == -1):
480 i2, j2 = d[i, j][1]
481 backtrackx.append(xb[i2 + 1:i + 1])
482 backtracky.append(yb[j2 + 1:j + 1])
483 i, j = i2, j2
484 elif algo == "difflib":
485 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk)
486 xarr = [xb[i:i + 1] for i in range(len(xb))]
487 yarr = [yb[i:i + 1] for i in range(len(yb))]
488 # Iterate through opcodes to build the backtrack
489 for opcode in sm.get_opcodes():
490 typ, x0, x1, y0, y1 = opcode
491 if typ == 'delete':
492 backtrackx += xarr[x0:x1]
493 backtracky += [b''] * (x1 - x0)
494 elif typ == 'insert':
495 backtrackx += [b''] * (y1 - y0)
496 backtracky += yarr[y0:y1]
497 elif typ in ['equal', 'replace']:
498 backtrackx += xarr[x0:x1]
499 backtracky += yarr[y0:y1]
500 # Some lines may have been considered as junk. Check the sizes
501 if autojunk:
502 lbx = len(backtrackx)
503 lby = len(backtracky)
504 backtrackx += [b''] * (max(lbx, lby) - lbx)
505 backtracky += [b''] * (max(lbx, lby) - lby)
506 else:
507 raise ValueError("Unknown algorithm '%s'" % algo)
509 # Print the diff
511 x = y = i = 0
512 colorize: Dict[int, Callable[[str], str]] = {
513 0: lambda x: x,
514 -1: conf.color_theme.left,
515 1: conf.color_theme.right
516 }
518 dox = 1
519 doy = 0
520 btx_len = len(backtrackx)
521 while i < btx_len:
522 linex = backtrackx[i:i + 16]
523 liney = backtracky[i:i + 16]
524 xx = sum(len(k) for k in linex)
525 yy = sum(len(k) for k in liney)
526 if dox and not xx:
527 dox = 0
528 doy = 1
529 if dox and linex == liney:
530 doy = 1
532 if dox:
533 xd = y
534 j = 0
535 while not linex[j]:
536 j += 1
537 xd -= 1
538 print(colorize[doy - dox]("%04x" % xd), end=' ')
539 x += xx
540 line = linex
541 else:
542 print(" ", end=' ')
543 if doy:
544 yd = y
545 j = 0
546 while not liney[j]:
547 j += 1
548 yd -= 1
549 print(colorize[doy - dox]("%04x" % yd), end=' ')
550 y += yy
551 line = liney
552 else:
553 print(" ", end=' ')
555 print(" ", end=' ')
557 cl = ""
558 for j in range(16):
559 if i + j < min(len(backtrackx), len(backtracky)):
560 if line[j]:
561 col = colorize[(linex[j] != liney[j]) * (doy - dox)]
562 print(col("%02X" % orb(line[j])), end=' ')
563 if linex[j] == liney[j]:
564 cl += sane(line[j], color=True)
565 else:
566 cl += col(sane(line[j]))
567 else:
568 print(" ", end=' ')
569 cl += " "
570 else:
571 print(" ", end=' ')
572 if j == 7:
573 print("", end=' ')
575 print(" ", cl)
577 if doy or not yy:
578 doy = 0
579 dox = 1
580 i += 16
581 else:
582 if yy:
583 dox = 0
584 doy = 1
585 else:
586 i += 16
589if struct.pack("H", 1) == b"\x00\x01": # big endian
590 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int]
591else:
592 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8
595def checksum(pkt):
596 # type: (bytes) -> int
597 if len(pkt) % 2 == 1:
598 pkt += b"\0"
599 s = sum(array.array("H", pkt))
600 s = (s >> 16) + (s & 0xffff)
601 s += s >> 16
602 s = ~s
603 return checksum_endian_transform(s) & 0xffff
606def _fletcher16(charbuf):
607 # type: (bytes) -> Tuple[int, int]
608 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
609 c0 = c1 = 0
610 for char in charbuf:
611 c0 += char
612 c1 += c0
614 c0 %= 255
615 c1 %= 255
616 return (c0, c1)
619@conf.commands.register
620def fletcher16_checksum(binbuf):
621 # type: (bytes) -> int
622 """Calculates Fletcher-16 checksum of the given buffer.
624 Note:
625 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501
626 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501
627 """
628 (c0, c1) = _fletcher16(binbuf)
629 return (c1 << 8) | c0
632@conf.commands.register
633def fletcher16_checkbytes(binbuf, offset):
634 # type: (bytes, int) -> bytes
635 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.
637 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501
638 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501
639 the integrity of the buffer on the receiver side.
641 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501
642 """
644 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
645 if len(binbuf) < offset:
646 raise Exception("Packet too short for checkbytes %d" % len(binbuf))
648 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
649 (c0, c1) = _fletcher16(binbuf)
651 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255
653 if (x <= 0):
654 x += 255
656 y = 510 - c0 - x
658 if (y > 255):
659 y -= 255
660 return chb(x) + chb(y)
663def mac2str(mac):
664 # type: (str) -> bytes
665 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':'))
668def valid_mac(mac):
669 # type: (str) -> bool
670 try:
671 return len(mac2str(mac)) == 6
672 except ValueError:
673 pass
674 return False
677def str2mac(s):
678 # type: (bytes) -> str
679 if isinstance(s, str):
680 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s))
681 return ("%02x:" * len(s))[:-1] % tuple(s)
684def randstring(length):
685 # type: (int) -> bytes
686 """
687 Returns a random string of length (length >= 0)
688 """
689 return b"".join(struct.pack('B', random.randint(0, 255))
690 for _ in range(length))
693def zerofree_randstring(length):
694 # type: (int) -> bytes
695 """
696 Returns a random string of length (length >= 0) without zero in it.
697 """
698 return b"".join(struct.pack('B', random.randint(1, 255))
699 for _ in range(length))
702def stror(s1, s2):
703 # type: (bytes, bytes) -> bytes
704 """
705 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2
706 must be of same length.
707 """
708 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2))
711def strxor(s1, s2):
712 # type: (bytes, bytes) -> bytes
713 """
714 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
715 must be of same length.
716 """
717 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2))
720def strand(s1, s2):
721 # type: (bytes, bytes) -> bytes
722 """
723 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
724 must be of same length.
725 """
726 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2))
729def strrot(s1, count, right=True):
730 # type: (bytes, int, bool) -> bytes
731 """
732 Rotate the binary by 'count' bytes
733 """
734 off = count % len(s1)
735 if right:
736 return s1[-off:] + s1[:-off]
737 else:
738 return s1[off:] + s1[:off]
741# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501
742try:
743 socket.inet_aton("255.255.255.255")
744except socket.error:
745 def inet_aton(ip_string):
746 # type: (str) -> bytes
747 if ip_string == "255.255.255.255":
748 return b"\xff" * 4
749 else:
750 return socket.inet_aton(ip_string)
751else:
752 inet_aton = socket.inet_aton # type: ignore
754inet_ntoa = socket.inet_ntoa
757def atol(x):
758 # type: (str) -> int
759 try:
760 ip = inet_aton(x)
761 except socket.error:
762 raise ValueError("Bad IP format: %s" % x)
763 return cast(int, struct.unpack("!I", ip)[0])
766def valid_ip(addr):
767 # type: (str) -> bool
768 try:
769 addr = plain_str(addr)
770 except UnicodeDecodeError:
771 return False
772 try:
773 atol(addr)
774 except (OSError, ValueError, socket.error):
775 return False
776 return True
779def valid_net(addr):
780 # type: (str) -> bool
781 try:
782 addr = plain_str(addr)
783 except UnicodeDecodeError:
784 return False
785 if '/' in addr:
786 ip, mask = addr.split('/', 1)
787 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32
788 return valid_ip(addr)
791def valid_ip6(addr):
792 # type: (str) -> bool
793 try:
794 addr = plain_str(addr)
795 except UnicodeDecodeError:
796 return False
797 try:
798 inet_pton(socket.AF_INET6, addr)
799 except socket.error:
800 return False
801 return True
804def valid_net6(addr):
805 # type: (str) -> bool
806 try:
807 addr = plain_str(addr)
808 except UnicodeDecodeError:
809 return False
810 if '/' in addr:
811 ip, mask = addr.split('/', 1)
812 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128
813 return valid_ip6(addr)
816def ltoa(x):
817 # type: (int) -> str
818 return inet_ntoa(struct.pack("!I", x & 0xffffffff))
821def itom(x):
822 # type: (int) -> int
823 return (0xffffffff00000000 >> x) & 0xffffffff
826def in4_cidr2mask(m):
827 # type: (int) -> bytes
828 """
829 Return the mask (bitstring) associated with provided length
830 value. For instance if function is called on 20, return value is
831 b'\xff\xff\xf0\x00'.
832 """
833 if m > 32 or m < 0:
834 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501
836 return strxor(
837 b"\xff" * 4,
838 struct.pack(">I", 2**(32 - m) - 1)
839 )
842def in4_isincluded(addr, prefix, mask):
843 # type: (str, str, int) -> bool
844 """
845 Returns True when 'addr' belongs to prefix/mask. False otherwise.
846 """
847 temp = inet_pton(socket.AF_INET, addr)
848 pref = in4_cidr2mask(mask)
849 zero = inet_pton(socket.AF_INET, prefix)
850 return zero == strand(temp, pref)
853def in4_ismaddr(str):
854 # type: (str) -> bool
855 """
856 Returns True if provided address in printable format belongs to
857 allocated Multicast address space (224.0.0.0/4).
858 """
859 return in4_isincluded(str, "224.0.0.0", 4)
862def in4_ismlladdr(str):
863 # type: (str) -> bool
864 """
865 Returns True if address belongs to link-local multicast address
866 space (224.0.0.0/24)
867 """
868 return in4_isincluded(str, "224.0.0.0", 24)
871def in4_ismgladdr(str):
872 # type: (str) -> bool
873 """
874 Returns True if address belongs to global multicast address
875 space (224.0.1.0-238.255.255.255).
876 """
877 return (
878 in4_isincluded(str, "224.0.0.0", 4) and
879 not in4_isincluded(str, "224.0.0.0", 24) and
880 not in4_isincluded(str, "239.0.0.0", 8)
881 )
884def in4_ismlsaddr(str):
885 # type: (str) -> bool
886 """
887 Returns True if address belongs to limited scope multicast address
888 space (239.0.0.0/8).
889 """
890 return in4_isincluded(str, "239.0.0.0", 8)
893def in4_isaddrllallnodes(str):
894 # type: (str) -> bool
895 """
896 Returns True if address is the link-local all-nodes multicast
897 address (224.0.0.1).
898 """
899 return (inet_pton(socket.AF_INET, "224.0.0.1") ==
900 inet_pton(socket.AF_INET, str))
903def in4_getnsmac(a):
904 # type: (bytes) -> str
905 """
906 Return the multicast mac address associated with provided
907 IPv4 address. Passed address must be in network format.
908 """
910 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3])
913def decode_locale_str(x):
914 # type: (bytes) -> str
915 """
916 Decode bytes into a string using the system locale.
917 Useful on Windows where it can be unusual (e.g. cp1252)
918 """
919 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace")
922class ContextManagerSubprocess(object):
923 """
924 Context manager that eases checking for unknown command, without
925 crashing.
927 Example:
928 >>> with ContextManagerSubprocess("tcpdump"):
929 >>> subprocess.Popen(["tcpdump", "--version"])
930 ERROR: Could not execute tcpdump, is it installed?
932 """
934 def __init__(self, prog, suppress=True):
935 # type: (str, bool) -> None
936 self.prog = prog
937 self.suppress = suppress
939 def __enter__(self):
940 # type: () -> None
941 pass
943 def __exit__(self,
944 exc_type, # type: Optional[type]
945 exc_value, # type: Optional[Exception]
946 traceback, # type: Optional[Any]
947 ):
948 # type: (...) -> Optional[bool]
949 if exc_value is None or exc_type is None:
950 return None
951 # Errored
952 if isinstance(exc_value, EnvironmentError):
953 msg = "Could not execute %s, is it installed?" % self.prog
954 else:
955 msg = "%s: execution failed (%s)" % (
956 self.prog,
957 exc_type.__class__.__name__
958 )
959 if not self.suppress:
960 raise exc_type(msg)
961 log_runtime.error(msg, exc_info=True)
962 return True # Suppress the exception
965class ContextManagerCaptureOutput(object):
966 """
967 Context manager that intercept the console's output.
969 Example:
970 >>> with ContextManagerCaptureOutput() as cmco:
971 ... print("hey")
972 ... assert cmco.get_output() == "hey"
973 """
975 def __init__(self):
976 # type: () -> None
977 self.result_export_object = ""
979 def __enter__(self):
980 # type: () -> ContextManagerCaptureOutput
981 from unittest import mock
983 def write(s, decorator=self):
984 # type: (str, ContextManagerCaptureOutput) -> None
985 decorator.result_export_object += s
986 mock_stdout = mock.Mock()
987 mock_stdout.write = write
988 self.bck_stdout = sys.stdout
989 sys.stdout = mock_stdout
990 return self
992 def __exit__(self, *exc):
993 # type: (*Any) -> Literal[False]
994 sys.stdout = self.bck_stdout
995 return False
997 def get_output(self, eval_bytes=False):
998 # type: (bool) -> str
999 if self.result_export_object.startswith("b'") and eval_bytes:
1000 return plain_str(eval(self.result_export_object))
1001 return self.result_export_object
1004def do_graph(
1005 graph, # type: str
1006 prog=None, # type: Optional[str]
1007 format=None, # type: Optional[str]
1008 target=None, # type: Optional[Union[IO[bytes], str]]
1009 type=None, # type: Optional[str]
1010 string=None, # type: Optional[bool]
1011 options=None # type: Optional[List[str]]
1012):
1013 # type: (...) -> Optional[str]
1014 """Processes graph description using an external software.
1015 This method is used to convert a graphviz format to an image.
1017 :param graph: GraphViz graph description
1018 :param prog: which graphviz program to use
1019 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T"
1020 option
1021 :param string: if not None, simply return the graph string
1022 :param target: filename or redirect. Defaults pipe to Imagemagick's
1023 display program
1024 :param options: options to be passed to prog
1025 """
1027 if format is None:
1028 format = "svg"
1029 if string:
1030 return graph
1031 if type is not None:
1032 warnings.warn(
1033 "type is deprecated, and was renamed format",
1034 DeprecationWarning
1035 )
1036 format = type
1037 if prog is None:
1038 prog = conf.prog.dot
1039 start_viewer = False
1040 if target is None:
1041 if WINDOWS:
1042 target = get_temp_file(autoext="." + format)
1043 start_viewer = True
1044 else:
1045 with ContextManagerSubprocess(conf.prog.display):
1046 target = subprocess.Popen([conf.prog.display],
1047 stdin=subprocess.PIPE).stdin
1048 if format is not None:
1049 format = "-T%s" % format
1050 if isinstance(target, str):
1051 if target.startswith('|'):
1052 target = subprocess.Popen(target[1:].lstrip(), shell=True,
1053 stdin=subprocess.PIPE).stdin
1054 elif target.startswith('>'):
1055 target = open(target[1:].lstrip(), "wb")
1056 else:
1057 target = open(os.path.abspath(target), "wb")
1058 target = cast(IO[bytes], target)
1059 proc = subprocess.Popen(
1060 "\"%s\" %s %s" % (prog, options or "", format or ""),
1061 shell=True, stdin=subprocess.PIPE, stdout=target,
1062 stderr=subprocess.PIPE
1063 )
1064 _, stderr = proc.communicate(bytes_encode(graph))
1065 if proc.returncode != 0:
1066 raise OSError(
1067 "GraphViz call failed (is it installed?):\n" +
1068 plain_str(stderr)
1069 )
1070 try:
1071 target.close()
1072 except Exception:
1073 pass
1074 if start_viewer:
1075 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501
1076 waiting_start = time.time()
1077 while not os.path.exists(target.name):
1078 time.sleep(0.1)
1079 if time.time() - waiting_start > 3:
1080 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501
1081 break
1082 else:
1083 if WINDOWS and conf.prog.display == conf.prog._default:
1084 os.startfile(target.name)
1085 else:
1086 with ContextManagerSubprocess(conf.prog.display):
1087 subprocess.Popen([conf.prog.display, target.name])
1088 return None
1091_TEX_TR = {
1092 "{": "{\\tt\\char123}",
1093 "}": "{\\tt\\char125}",
1094 "\\": "{\\tt\\char92}",
1095 "^": "\\^{}",
1096 "$": "\\$",
1097 "#": "\\#",
1098 "_": "\\_",
1099 "&": "\\&",
1100 "%": "\\%",
1101 "|": "{\\tt\\char124}",
1102 "~": "{\\tt\\char126}",
1103 "<": "{\\tt\\char60}",
1104 ">": "{\\tt\\char62}",
1105}
1108def tex_escape(x):
1109 # type: (str) -> str
1110 s = ""
1111 for c in x:
1112 s += _TEX_TR.get(c, c)
1113 return s
1116def colgen(*lstcol, # type: Any
1117 **kargs # type: Any
1118 ):
1119 # type: (...) -> Iterator[Any]
1120 """Returns a generator that mixes provided quantities forever
1121 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501
1122 if len(lstcol) < 2:
1123 lstcol *= 2
1124 trans = kargs.get("trans", lambda x, y, z: (x, y, z))
1125 while True:
1126 for i in range(len(lstcol)):
1127 for j in range(len(lstcol)):
1128 for k in range(len(lstcol)):
1129 if i != j or j != k or k != i:
1130 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501
1133def incremental_label(label="tag%05i", start=0):
1134 # type: (str, int) -> Iterator[str]
1135 while True:
1136 yield label % start
1137 start += 1
1140def binrepr(val):
1141 # type: (int) -> str
1142 return bin(val)[2:]
1145def long_converter(s):
1146 # type: (str) -> int
1147 return int(s.replace('\n', '').replace(' ', ''), 16)
1149#########################
1150# Enum management #
1151#########################
1154class EnumElement:
1155 def __init__(self, key, value):
1156 # type: (str, int) -> None
1157 self._key = key
1158 self._value = value
1160 def __repr__(self):
1161 # type: () -> str
1162 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501
1164 def __getattr__(self, attr):
1165 # type: (str) -> Any
1166 return getattr(self._value, attr)
1168 def __str__(self):
1169 # type: () -> str
1170 return self._key
1172 def __bytes__(self):
1173 # type: () -> bytes
1174 return bytes_encode(self.__str__())
1176 def __hash__(self):
1177 # type: () -> int
1178 return self._value
1180 def __int__(self):
1181 # type: () -> int
1182 return int(self._value)
1184 def __eq__(self, other):
1185 # type: (Any) -> bool
1186 return self._value == int(other)
1188 def __neq__(self, other):
1189 # type: (Any) -> bool
1190 return not self.__eq__(other)
1193class Enum_metaclass(type):
1194 element_class = EnumElement
1196 def __new__(cls, name, bases, dct):
1197 # type: (Any, str, Any, Dict[str, Any]) -> Any
1198 rdict = {}
1199 for k, v in dct.items():
1200 if isinstance(v, int):
1201 v = cls.element_class(k, v)
1202 dct[k] = v
1203 rdict[v] = k
1204 dct["__rdict__"] = rdict
1205 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)
1207 def __getitem__(self, attr):
1208 # type: (int) -> Any
1209 return self.__rdict__[attr] # type: ignore
1211 def __contains__(self, val):
1212 # type: (int) -> bool
1213 return val in self.__rdict__ # type: ignore
1215 def get(self, attr, val=None):
1216 # type: (str, Optional[Any]) -> Any
1217 return self.__rdict__.get(attr, val) # type: ignore
1219 def __repr__(self):
1220 # type: () -> str
1221 return "<%s>" % self.__dict__.get("name", self.__name__)
1224###################
1225# Object saving #
1226###################
1229def export_object(obj):
1230 # type: (Any) -> None
1231 import zlib
1232 print(base64.b64encode(zlib.compress(pickle.dumps(obj, 2), 9)).decode())
1235def import_object(obj=None):
1236 # type: (Optional[str]) -> Any
1237 import zlib
1238 if obj is None:
1239 obj = sys.stdin.read()
1240 return pickle.loads(zlib.decompress(base64.b64decode(obj.strip())))
1243def save_object(fname, obj):
1244 # type: (str, Any) -> None
1245 """Pickle a Python object"""
1247 fd = gzip.open(fname, "wb")
1248 pickle.dump(obj, fd)
1249 fd.close()
1252def load_object(fname):
1253 # type: (str) -> Any
1254 """unpickle a Python object"""
1255 return pickle.load(gzip.open(fname, "rb"))
1258@conf.commands.register
1259def corrupt_bytes(data, p=0.01, n=None):
1260 # type: (str, float, Optional[int]) -> bytes
1261 """
1262 Corrupt a given percentage (at least one byte) or number of bytes
1263 from a string
1264 """
1265 s = array.array("B", bytes_encode(data))
1266 s_len = len(s)
1267 if n is None:
1268 n = max(1, int(s_len * p))
1269 for i in random.sample(range(s_len), n):
1270 s[i] = (s[i] + random.randint(1, 255)) % 256
1271 return s.tobytes()
1274@conf.commands.register
1275def corrupt_bits(data, p=0.01, n=None):
1276 # type: (str, float, Optional[int]) -> bytes
1277 """
1278 Flip a given percentage (at least one bit) or number of bits
1279 from a string
1280 """
1281 s = array.array("B", bytes_encode(data))
1282 s_len = len(s) * 8
1283 if n is None:
1284 n = max(1, int(s_len * p))
1285 for i in random.sample(range(s_len), n):
1286 s[i // 8] ^= 1 << (i % 8)
1287 return s.tobytes()
1290#############################
1291# pcap capture file stuff #
1292#############################
1294@conf.commands.register
1295def wrpcap(filename, # type: Union[IO[bytes], str]
1296 pkt, # type: _PacketIterable
1297 *args, # type: Any
1298 **kargs # type: Any
1299 ):
1300 # type: (...) -> None
1301 """Write a list of packets to a pcap file
1303 :param filename: the name of the file to write packets to, or an open,
1304 writable file-like object. The file descriptor will be
1305 closed at the end of the call, so do not use an object you
1306 do not want to close (e.g., running wrpcap(sys.stdout, [])
1307 in interactive mode will crash Scapy).
1308 :param gz: set to 1 to save a gzipped capture
1309 :param linktype: force linktype value
1310 :param endianness: "<" or ">", force endianness
1311 :param sync: do not bufferize writes to the capture file
1312 """
1313 with PcapWriter(filename, *args, **kargs) as fdesc:
1314 fdesc.write(pkt)
1317@conf.commands.register
1318def wrpcapng(filename, # type: str
1319 pkt, # type: _PacketIterable
1320 ):
1321 # type: (...) -> None
1322 """Write a list of packets to a pcapng file
1324 :param filename: the name of the file to write packets to, or an open,
1325 writable file-like object. The file descriptor will be
1326 closed at the end of the call, so do not use an object you
1327 do not want to close (e.g., running wrpcapng(sys.stdout, [])
1328 in interactive mode will crash Scapy).
1329 :param pkt: packets to write
1330 """
1331 with PcapNgWriter(filename) as fdesc:
1332 fdesc.write(pkt)
1335@conf.commands.register
1336def rdpcap(filename, count=-1):
1337 # type: (Union[IO[bytes], str], int) -> PacketList
1338 """Read a pcap or pcapng file and return a packet list
1340 :param count: read only <count> packets
1341 """
1342 # Rant: Our complicated use of metaclasses and especially the
1343 # __call__ function is, of course, not supported by MyPy.
1344 # One day we should simplify this mess and use a much simpler
1345 # layout that will actually be supported and properly dissected.
1346 with PcapReader(filename) as fdesc: # type: ignore
1347 return fdesc.read_all(count=count)
1350# NOTE: Type hinting
1351# Mypy doesn't understand the following metaclass, and thinks each
1352# constructor (PcapReader...) needs 3 arguments each. To avoid this,
1353# we add a fake (=None) to the last 2 arguments then force the value
1354# to not be None in the signature and pack the whole thing in an ignore.
1355# This allows to not have # type: ignore every time we call those
1356# constructors.
1358class PcapReader_metaclass(type):
1359 """Metaclass for (Raw)Pcap(Ng)Readers"""
1361 def __new__(cls, name, bases, dct):
1362 # type: (Any, str, Any, Dict[str, Any]) -> Any
1363 """The `alternative` class attribute is declared in the PcapNg
1364 variant, and set here to the Pcap variant.
1366 """
1367 newcls = super(PcapReader_metaclass, cls).__new__(
1368 cls, name, bases, dct
1369 )
1370 if 'alternative' in dct:
1371 dct['alternative'].alternative = newcls
1372 return newcls
1374 def __call__(cls, filename):
1375 # type: (Union[IO[bytes], str]) -> Any
1376 """Creates a cls instance, use the `alternative` if that
1377 fails.
1379 """
1380 i = cls.__new__(
1381 cls,
1382 cls.__name__,
1383 cls.__bases__,
1384 cls.__dict__ # type: ignore
1385 )
1386 filename, fdesc, magic = cls.open(filename)
1387 if not magic:
1388 raise Scapy_Exception(
1389 "No data could be read!"
1390 )
1391 try:
1392 i.__init__(filename, fdesc, magic)
1393 return i
1394 except (Scapy_Exception, EOFError):
1395 pass
1397 if "alternative" in cls.__dict__:
1398 cls = cls.__dict__["alternative"]
1399 i = cls.__new__(
1400 cls,
1401 cls.__name__,
1402 cls.__bases__,
1403 cls.__dict__ # type: ignore
1404 )
1405 try:
1406 i.__init__(filename, fdesc, magic)
1407 return i
1408 except (Scapy_Exception, EOFError):
1409 pass
1411 raise Scapy_Exception("Not a supported capture file")
1413 @staticmethod
1414 def open(fname # type: Union[IO[bytes], str]
1415 ):
1416 # type: (...) -> Tuple[str, _ByteStream, bytes]
1417 """Open (if necessary) filename, and read the magic."""
1418 if isinstance(fname, str):
1419 filename = fname
1420 fdesc = open(filename, "rb") # type: _ByteStream
1421 magic = fdesc.read(2)
1422 if magic == b"\x1f\x8b":
1423 # GZIP header detected.
1424 fdesc.seek(0)
1425 fdesc = gzip.GzipFile(fileobj=fdesc)
1426 magic = fdesc.read(2)
1427 magic += fdesc.read(2)
1428 else:
1429 fdesc = fname
1430 filename = getattr(fdesc, "name", "No name")
1431 magic = fdesc.read(4)
1432 return filename, fdesc, magic
1435class RawPcapReader(metaclass=PcapReader_metaclass):
1436 """A stateful pcap reader. Each packet is returned as a string"""
1438 # TODO: use Generics to properly type the various readers.
1439 # As of right now, RawPcapReader is typed as if it returned packets
1440 # because all of its child do. Fix that
1442 nonblocking_socket = True
1443 PacketMetadata = collections.namedtuple("PacketMetadata",
1444 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501
1446 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1447 # type: (str, _ByteStream, bytes) -> None
1448 self.filename = filename
1449 self.f = fdesc
1450 if magic == b"\xa1\xb2\xc3\xd4": # big endian
1451 self.endian = ">"
1452 self.nano = False
1453 elif magic == b"\xd4\xc3\xb2\xa1": # little endian
1454 self.endian = "<"
1455 self.nano = False
1456 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision
1457 self.endian = ">"
1458 self.nano = True
1459 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501
1460 self.endian = "<"
1461 self.nano = True
1462 else:
1463 raise Scapy_Exception(
1464 "Not a pcap capture file (bad magic: %r)" % magic
1465 )
1466 hdr = self.f.read(20)
1467 if len(hdr) < 20:
1468 raise Scapy_Exception("Invalid pcap file (too short)")
1469 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
1470 self.endian + "HHIIII", hdr
1471 )
1472 self.linktype = linktype
1473 self.snaplen = snaplen
1475 def __enter__(self):
1476 # type: () -> RawPcapReader
1477 return self
1479 def __iter__(self):
1480 # type: () -> RawPcapReader
1481 return self
1483 def __next__(self):
1484 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata]
1485 """
1486 implement the iterator protocol on a set of packets in a pcap file
1487 """
1488 try:
1489 return self._read_packet()
1490 except EOFError:
1491 raise StopIteration
1493 def _read_packet(self, size=MTU):
1494 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata]
1495 """return a single packet read from the file as a tuple containing
1496 (pkt_data, pkt_metadata)
1498 raise EOFError when no more packets are available
1499 """
1500 hdr = self.f.read(16)
1501 if len(hdr) < 16:
1502 raise EOFError
1503 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr)
1505 try:
1506 data = self.f.read(caplen)[:size]
1507 except OverflowError as e:
1508 warning(f"Pcap: {e}")
1509 raise EOFError
1511 return (data,
1512 RawPcapReader.PacketMetadata(sec=sec, usec=usec,
1513 wirelen=wirelen, caplen=caplen))
1515 def read_packet(self, size=MTU):
1516 # type: (int) -> Packet
1517 raise Exception(
1518 "Cannot call read_packet() in RawPcapReader. Use "
1519 "_read_packet()"
1520 )
1522 def dispatch(self,
1523 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501
1524 ):
1525 # type: (...) -> None
1526 """call the specified callback routine for each packet read
1528 This is just a convenience function for the main loop
1529 that allows for easy launching of packet processing in a
1530 thread.
1531 """
1532 for p in self:
1533 callback(p)
1535 def _read_all(self, count=-1):
1536 # type: (int) -> List[Packet]
1537 """return a list of all packets in the pcap file
1538 """
1539 res = [] # type: List[Packet]
1540 while count != 0:
1541 count -= 1
1542 try:
1543 p = self.read_packet() # type: Packet
1544 except EOFError:
1545 break
1546 res.append(p)
1547 return res
1549 def recv(self, size=MTU):
1550 # type: (int) -> bytes
1551 """ Emulate a socket
1552 """
1553 return self._read_packet(size=size)[0]
1555 def fileno(self):
1556 # type: () -> int
1557 return -1 if WINDOWS else self.f.fileno()
1559 def close(self):
1560 # type: () -> None
1561 if isinstance(self.f, gzip.GzipFile):
1562 self.f.fileobj.close() # type: ignore
1563 self.f.close()
1565 def __exit__(self, exc_type, exc_value, tracback):
1566 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
1567 self.close()
1569 # emulate SuperSocket
1570 @staticmethod
1571 def select(sockets, # type: List[SuperSocket]
1572 remain=None, # type: Optional[float]
1573 ):
1574 # type: (...) -> List[SuperSocket]
1575 return sockets
1578class PcapReader(RawPcapReader):
1579 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1580 # type: (str, IO[bytes], bytes) -> None
1581 RawPcapReader.__init__(self, filename, fdesc, magic)
1582 try:
1583 self.LLcls = conf.l2types.num2layer[
1584 self.linktype
1585 ] # type: Type[Packet]
1586 except KeyError:
1587 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501
1588 if conf.raw_layer is None:
1589 # conf.raw_layer is set on import
1590 import scapy.packet # noqa: F401
1591 self.LLcls = conf.raw_layer
1593 def __enter__(self):
1594 # type: () -> PcapReader
1595 return self
1597 def read_packet(self, size=MTU, **kwargs):
1598 # type: (int, **Any) -> Packet
1599 rp = super(PcapReader, self)._read_packet(size=size)
1600 if rp is None:
1601 raise EOFError
1602 s, pkt_info = rp
1604 try:
1605 p = self.LLcls(s, **kwargs) # type: Packet
1606 except KeyboardInterrupt:
1607 raise
1608 except Exception:
1609 if conf.debug_dissector:
1610 from scapy.sendrecv import debug
1611 debug.crashed_on = (self.LLcls, s)
1612 raise
1613 if conf.raw_layer is None:
1614 # conf.raw_layer is set on import
1615 import scapy.packet # noqa: F401
1616 p = conf.raw_layer(s)
1617 power = Decimal(10) ** Decimal(-9 if self.nano else -6)
1618 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec)
1619 p.wirelen = pkt_info.wirelen
1620 return p
1622 def recv(self, size=MTU, **kwargs): # type: ignore
1623 # type: (int, **Any) -> Packet
1624 return self.read_packet(size=size, **kwargs)
1626 def __iter__(self):
1627 # type: () -> PcapReader
1628 return self
1630 def __next__(self): # type: ignore
1631 # type: () -> Packet
1632 try:
1633 return self.read_packet()
1634 except EOFError:
1635 raise StopIteration
1637 def read_all(self, count=-1):
1638 # type: (int) -> PacketList
1639 res = self._read_all(count)
1640 from scapy import plist
1641 return plist.PacketList(res, name=os.path.basename(self.filename))
1644class RawPcapNgReader(RawPcapReader):
1645 """A stateful pcapng reader. Each packet is returned as
1646 bytes.
1648 """
1650 alternative = RawPcapReader # type: Type[Any]
1652 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore
1653 ["linktype", "tsresol",
1654 "tshigh", "tslow", "wirelen",
1655 "comments", "ifname", "direction",
1656 "process_information"])
1658 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1659 # type: (str, IO[bytes], bytes) -> None
1660 self.filename = filename
1661 self.f = fdesc
1662 # A list of (linktype, snaplen, tsresol); will be populated by IDBs.
1663 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]]
1664 self.default_options = {
1665 "tsresol": 1000000
1666 }
1667 self.blocktypes: Dict[
1668 int,
1669 Callable[
1670 [bytes, int],
1671 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]
1672 ]] = {
1673 1: self._read_block_idb,
1674 2: self._read_block_pkt,
1675 3: self._read_block_spb,
1676 6: self._read_block_epb,
1677 10: self._read_block_dsb,
1678 0x80000001: self._read_block_pib,
1679 }
1680 self.endian = "!" # Will be overwritten by first SHB
1681 self.process_information = [] # type: List[Dict[str, Any]]
1683 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg:
1684 raise Scapy_Exception(
1685 "Not a pcapng capture file (bad magic: %r)" % magic
1686 )
1688 try:
1689 self._read_block_shb()
1690 except EOFError:
1691 raise Scapy_Exception(
1692 "The first SHB of the pcapng file is malformed !"
1693 )
1695 def _read_block(self, size=MTU):
1696 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501
1697 try:
1698 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0]
1699 except struct.error:
1700 raise EOFError
1701 if blocktype == 0x0A0D0D0A:
1702 # This function updates the endianness based on the block content.
1703 self._read_block_shb()
1704 return None
1705 try:
1706 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0]
1707 except struct.error:
1708 warning("PcapNg: Error reading blocklen before block body")
1709 raise EOFError
1710 if blocklen < 12:
1711 warning("PcapNg: Invalid block length !")
1712 raise EOFError
1714 _block_body_length = blocklen - 12
1715 block = self.f.read(_block_body_length)
1716 if len(block) != _block_body_length:
1717 raise Scapy_Exception("PcapNg: Invalid Block body length "
1718 "(too short)")
1719 self._read_block_tail(blocklen)
1720 if blocktype in self.blocktypes:
1721 return self.blocktypes[blocktype](block, size)
1722 return None
1724 def _read_block_tail(self, blocklen):
1725 # type: (int) -> None
1726 if blocklen % 4:
1727 pad = self.f.read(-blocklen % 4)
1728 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
1729 "Ignored padding %r" % (blocklen, pad))
1730 try:
1731 if blocklen != struct.unpack(self.endian + 'I',
1732 self.f.read(4))[0]:
1733 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)")
1734 except struct.error:
1735 warning("PcapNg: Could not read blocklen after block body")
1736 raise EOFError
1738 def _read_block_shb(self):
1739 # type: () -> None
1740 """Section Header Block"""
1741 _blocklen = self.f.read(4)
1742 endian = self.f.read(4)
1743 if endian == b"\x1a\x2b\x3c\x4d":
1744 self.endian = ">"
1745 elif endian == b"\x4d\x3c\x2b\x1a":
1746 self.endian = "<"
1747 else:
1748 warning("PcapNg: Bad magic in Section Header Block"
1749 " (not a pcapng file?)")
1750 raise EOFError
1752 try:
1753 blocklen = struct.unpack(self.endian + "I", _blocklen)[0]
1754 except struct.error:
1755 warning("PcapNg: Could not read blocklen")
1756 raise EOFError
1757 if blocklen < 28:
1758 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501
1759 raise EOFError
1761 # Major version must be 1
1762 _major = self.f.read(2)
1763 try:
1764 major = struct.unpack(self.endian + "H", _major)[0]
1765 except struct.error:
1766 warning("PcapNg: Could not read major value")
1767 raise EOFError
1768 if major != 1:
1769 warning(f"PcapNg: SHB Major version {major} unsupported !")
1770 raise EOFError
1772 # Skip minor version & section length
1773 skipped = self.f.read(10)
1774 if len(skipped) != 10:
1775 warning("PcapNg: Could not read minor value & section length")
1776 raise EOFError
1778 _options_len = blocklen - 28
1779 options = self.f.read(_options_len)
1780 if len(options) != _options_len:
1781 raise Scapy_Exception("PcapNg: Invalid Section Header Block "
1782 " options (too short)")
1783 self._read_block_tail(blocklen)
1784 self._read_options(options)
1786 def _read_packet(self, size=MTU): # type: ignore
1787 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1788 """Read blocks until it reaches either EOF or a packet, and
1789 returns None or (packet, (linktype, sec, usec, wirelen)),
1790 where packet is a string.
1792 """
1793 while True:
1794 res = self._read_block(size=size)
1795 if res is not None:
1796 return res
1798 def _read_options(self, options):
1799 # type: (bytes) -> Dict[int, Union[bytes, List[bytes]]]
1800 opts = dict() # type: Dict[int, Union[bytes, List[bytes]]]
1801 while len(options) >= 4:
1802 try:
1803 code, length = struct.unpack(self.endian + "HH", options[:4])
1804 except struct.error:
1805 warning("PcapNg: options header is too small "
1806 "%d !" % len(options))
1807 raise EOFError
1808 if code != 0 and 4 + length <= len(options):
1809 # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#name-options-format
1810 if code in [1, 2988, 2989, 19372, 19373]:
1811 if code not in opts:
1812 opts[code] = []
1813 opts[code].append(options[4:4 + length]) # type: ignore
1814 else:
1815 opts[code] = options[4:4 + length]
1816 if code == 0:
1817 if length != 0:
1818 warning("PcapNg: invalid option "
1819 "length %d for end-of-option" % length)
1820 break
1821 if length % 4:
1822 length += (4 - (length % 4))
1823 options = options[4 + length:]
1824 return opts
1826 def _read_block_idb(self, block, _):
1827 # type: (bytes, int) -> None
1828 """Interface Description Block"""
1829 # 2 bytes LinkType + 2 bytes Reserved
1830 # 4 bytes Snaplen
1831 options_raw = self._read_options(block[8:])
1832 options = self.default_options.copy() # type: Dict[str, Any]
1833 for c, v in options_raw.items():
1834 if isinstance(v, list):
1835 # Spec allows multiple occurrences (see
1836 # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#section-4.2-8.6)
1837 # but does not define which to use. We take the first for
1838 # backward compatibility.
1839 v = v[0]
1840 if c == 9:
1841 length = len(v)
1842 if length == 1:
1843 tsresol = orb(v)
1844 options["tsresol"] = (2 if tsresol & 128 else 10) ** (
1845 tsresol & 127
1846 )
1847 else:
1848 warning("PcapNg: invalid options "
1849 "length %d for IDB tsresol" % length)
1850 elif c == 2:
1851 options["name"] = v
1852 elif c == 1:
1853 options["comment"] = v
1854 try:
1855 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack(
1856 self.endian + "HxxI",
1857 block[:8]
1858 ) + (options,)
1859 except struct.error:
1860 warning("PcapNg: IDB is too small %d/8 !" % len(block))
1861 raise EOFError
1862 self.interfaces.append(interface)
1864 def _check_interface_id(self, intid):
1865 # type: (int) -> None
1866 """Check the interface id value and raise EOFError if invalid."""
1867 tmp_len = len(self.interfaces)
1868 if intid >= tmp_len:
1869 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len))
1870 raise EOFError
1872 def _read_block_epb(self, block, size):
1873 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1874 """Enhanced Packet Block"""
1875 try:
1876 intid, tshigh, tslow, caplen, wirelen = struct.unpack(
1877 self.endian + "5I",
1878 block[:20],
1879 )
1880 except struct.error:
1881 warning("PcapNg: EPB is too small %d/20 !" % len(block))
1882 raise EOFError
1884 # Compute the options offset taking padding into account
1885 if caplen % 4:
1886 opt_offset = 20 + caplen + (-caplen) % 4
1887 else:
1888 opt_offset = 20 + caplen
1890 # Parse options
1891 options = self._read_options(block[opt_offset:])
1893 process_information = {}
1894 for code, value in options.items():
1895 # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX
1896 if code in [0x8001, 0x8003]:
1897 try:
1898 proc_index = struct.unpack(
1899 self.endian + "I", value)[0] # type: ignore
1900 except struct.error:
1901 warning("PcapNg: EPB invalid proc index "
1902 "(expected 4 bytes, got %d) !" % len(value))
1903 raise EOFError
1904 if proc_index < len(self.process_information):
1905 key = "proc" if code == 0x8001 else "eproc"
1906 process_information[key] = self.process_information[proc_index]
1907 else:
1908 warning("PcapNg: EPB invalid process information index "
1909 "(%d/%d) !" % (proc_index, len(self.process_information)))
1911 comments = options.get(1, None)
1912 epb_flags_raw = options.get(2, None)
1913 if epb_flags_raw and isinstance(epb_flags_raw, bytes):
1914 try:
1915 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw)
1916 except struct.error:
1917 warning("PcapNg: EPB invalid flags size"
1918 "(expected 4 bytes, got %d) !" % len(epb_flags_raw))
1919 raise EOFError
1920 direction = epb_flags & 3
1922 else:
1923 direction = None
1925 self._check_interface_id(intid)
1926 ifname = self.interfaces[intid][2].get('name', None)
1928 return (block[20:20 + caplen][:size],
1929 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1930 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1931 tshigh=tshigh,
1932 tslow=tslow,
1933 wirelen=wirelen,
1934 ifname=ifname,
1935 direction=direction,
1936 process_information=process_information,
1937 comments=comments))
1939 def _read_block_spb(self, block, size):
1940 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1941 """Simple Packet Block"""
1942 # "it MUST be assumed that all the Simple Packet Blocks have
1943 # been captured on the interface previously specified in the
1944 # first Interface Description Block."
1945 intid = 0
1946 self._check_interface_id(intid)
1948 try:
1949 wirelen, = struct.unpack(self.endian + "I", block[:4])
1950 except struct.error:
1951 warning("PcapNg: SPB is too small %d/4 !" % len(block))
1952 raise EOFError
1954 caplen = min(wirelen, self.interfaces[intid][1])
1955 return (block[4:4 + caplen][:size],
1956 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1957 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1958 tshigh=None,
1959 tslow=None,
1960 wirelen=wirelen,
1961 ifname=None,
1962 direction=None,
1963 process_information={},
1964 comments=None))
1966 def _read_block_pkt(self, block, size):
1967 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1968 """(Obsolete) Packet Block"""
1969 try:
1970 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
1971 self.endian + "HH4I",
1972 block[:20],
1973 )
1974 except struct.error:
1975 warning("PcapNg: PKT is too small %d/20 !" % len(block))
1976 raise EOFError
1978 self._check_interface_id(intid)
1979 return (block[20:20 + caplen][:size],
1980 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1981 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1982 tshigh=tshigh,
1983 tslow=tslow,
1984 wirelen=wirelen,
1985 ifname=None,
1986 direction=None,
1987 process_information={},
1988 comments=None))
1990 def _read_block_dsb(self, block, size):
1991 # type: (bytes, int) -> None
1992 """Decryption Secrets Block"""
1994 # Parse the secrets type and length fields
1995 try:
1996 secrets_type, secrets_length = struct.unpack(
1997 self.endian + "II",
1998 block[:8],
1999 )
2000 block = block[8:]
2001 except struct.error:
2002 warning("PcapNg: DSB is too small %d!", len(block))
2003 raise EOFError
2005 # Compute the secrets length including the padding
2006 padded_secrets_length = secrets_length + (-secrets_length) % 4
2007 if len(block) < padded_secrets_length:
2008 warning("PcapNg: invalid DSB secrets length!")
2009 raise EOFError
2011 # Extract secrets data and options
2012 secrets_data = block[:padded_secrets_length][:secrets_length]
2013 if block[padded_secrets_length:]:
2014 warning("PcapNg: DSB options are not supported!")
2016 # TLS Key Log
2017 if secrets_type == 0x544c534b:
2018 if getattr(conf, "tls_sessions", False) is False:
2019 warning("PcapNg: TLS Key Log available, but "
2020 "the TLS layer is not loaded! Scapy won't be able "
2021 "to decrypt the packets.")
2022 else:
2023 from scapy.layers.tls.session import load_nss_keys
2025 # Write Key Log to a file and parse it
2026 filename = get_temp_file()
2027 with open(filename, "wb") as fd:
2028 fd.write(secrets_data)
2029 fd.close()
2031 keys = load_nss_keys(filename)
2032 if not keys:
2033 warning("PcapNg: invalid TLS Key Log in DSB!")
2034 else:
2035 # Note: these attributes are only available when the TLS
2036 # layer is loaded.
2037 conf.tls_nss_keys = keys
2038 conf.tls_session_enable = True
2039 else:
2040 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type)
2042 def _read_block_pib(self, block, _):
2043 # type: (bytes, int) -> None
2044 """Apple Process Information Block"""
2046 # Get the Process ID
2047 try:
2048 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0]
2049 process_information = {"id": dpeb_pid}
2050 block = block[4:]
2051 except struct.error:
2052 warning("PcapNg: DPEB is too small (%d). Cannot get PID!",
2053 len(block))
2054 raise EOFError
2056 # Get Options
2057 options = self._read_options(block)
2058 for code, value in options.items():
2059 if code == 2:
2060 process_information["name"] = value.decode( # type: ignore
2061 "ascii", "backslashreplace")
2062 elif code == 4:
2063 if len(value) == 16:
2064 process_information["uuid"] = str(UUID(bytes=value)) # type: ignore
2065 else:
2066 warning("PcapNg: DPEB UUID length is invalid (%d)!",
2067 len(value))
2069 # Store process information
2070 self.process_information.append(process_information)
2073class PcapNgReader(RawPcapNgReader, PcapReader):
2075 alternative = PcapReader
2077 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
2078 # type: (str, IO[bytes], bytes) -> None
2079 RawPcapNgReader.__init__(self, filename, fdesc, magic)
2081 def __enter__(self):
2082 # type: () -> PcapNgReader
2083 return self
2085 def read_packet(self, size=MTU, **kwargs):
2086 # type: (int, **Any) -> Packet
2087 rp = super(PcapNgReader, self)._read_packet(size=size)
2088 if rp is None:
2089 raise EOFError
2090 s, (linktype, tsresol, tshigh, tslow, wirelen, comments, ifname, direction, process_information) = rp # noqa: E501
2091 try:
2092 cls = conf.l2types.num2layer[linktype] # type: Type[Packet]
2093 p = cls(s, **kwargs) # type: Packet
2094 except KeyboardInterrupt:
2095 raise
2096 except Exception:
2097 if conf.debug_dissector:
2098 raise
2099 if conf.raw_layer is None:
2100 # conf.raw_layer is set on import
2101 import scapy.packet # noqa: F401
2102 p = conf.raw_layer(s)
2103 if tshigh is not None:
2104 p.time = EDecimal((tshigh << 32) + tslow) / tsresol
2105 p.wirelen = wirelen
2106 p.comments = comments
2107 p.direction = direction
2108 p.process_information = process_information.copy()
2109 if ifname is not None:
2110 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace')
2111 return p
2113 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore
2114 return self.read_packet(size=size, **kwargs)
2117class GenericPcapWriter(object):
2118 nano = False
2119 linktype: int
2121 def _write_header(self, pkt):
2122 # type: (Optional[Union[Packet, bytes]]) -> None
2123 raise NotImplementedError
2125 def _write_packet(self,
2126 packet, # type: Union[bytes, Packet]
2127 linktype, # type: int
2128 sec=None, # type: Optional[float]
2129 usec=None, # type: Optional[int]
2130 caplen=None, # type: Optional[int]
2131 wirelen=None, # type: Optional[int]
2132 ifname=None, # type: Optional[bytes]
2133 direction=None, # type: Optional[int]
2134 comments=None, # type: Optional[List[bytes]]
2135 ):
2136 # type: (...) -> None
2137 raise NotImplementedError
2139 def _get_time(self,
2140 packet, # type: Union[bytes, Packet]
2141 sec, # type: Optional[float]
2142 usec # type: Optional[int]
2143 ):
2144 # type: (...) -> Tuple[float, int]
2145 if hasattr(packet, "time"):
2146 if sec is None:
2147 packet_time = packet.time
2148 tmp = int(packet_time)
2149 usec = int(round((packet_time - tmp) *
2150 (1000000000 if self.nano else 1000000)))
2151 sec = float(packet_time)
2152 if sec is not None and usec is None:
2153 usec = 0
2154 return sec, usec # type: ignore
2156 def write_header(self, pkt):
2157 # type: (Optional[Union[Packet, bytes]]) -> None
2158 if not hasattr(self, 'linktype'):
2159 try:
2160 if pkt is None or isinstance(pkt, bytes):
2161 # Can't guess LL
2162 raise KeyError
2163 self.linktype = conf.l2types.layer2num[
2164 pkt.__class__
2165 ]
2166 except KeyError:
2167 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)"
2168 warning(msg, self.__class__.__name__, pkt.__class__.__name__)
2169 self.linktype = DLT_EN10MB
2170 self._write_header(pkt)
2172 def write_packet(self,
2173 packet, # type: Union[bytes, Packet]
2174 sec=None, # type: Optional[float]
2175 usec=None, # type: Optional[int]
2176 caplen=None, # type: Optional[int]
2177 wirelen=None, # type: Optional[int]
2178 ):
2179 # type: (...) -> None
2180 """
2181 Writes a single packet to the pcap file.
2183 :param packet: Packet, or bytes for a single packet
2184 :type packet: scapy.packet.Packet or bytes
2185 :param sec: time the packet was captured, in seconds since epoch. If
2186 not supplied, defaults to now.
2187 :type sec: float
2188 :param usec: If ``nano=True``, then number of nanoseconds after the
2189 second that the packet was captured. If ``nano=False``,
2190 then the number of microseconds after the second the
2191 packet was captured. If ``sec`` is not specified,
2192 this value is ignored.
2193 :type usec: int or long
2194 :param caplen: The length of the packet in the capture file. If not
2195 specified, uses ``len(raw(packet))``.
2196 :type caplen: int
2197 :param wirelen: The length of the packet on the wire. If not
2198 specified, tries ``packet.wirelen``, otherwise uses
2199 ``caplen``.
2200 :type wirelen: int
2201 :return: None
2202 :rtype: None
2203 """
2204 f_sec, usec = self._get_time(packet, sec, usec)
2206 rawpkt = bytes_encode(packet)
2207 caplen = len(rawpkt) if caplen is None else caplen
2209 if wirelen is None:
2210 if hasattr(packet, "wirelen"):
2211 wirelen = packet.wirelen
2212 if wirelen is None:
2213 wirelen = caplen
2215 comments = getattr(packet, "comments", None)
2216 ifname = getattr(packet, "sniffed_on", None)
2217 direction = getattr(packet, "direction", None)
2218 if not isinstance(packet, bytes):
2219 linktype: int = conf.l2types.layer2num[
2220 packet.__class__
2221 ]
2222 else:
2223 linktype = self.linktype
2224 if ifname is not None:
2225 ifname = str(ifname).encode('utf-8')
2226 self._write_packet(
2227 rawpkt,
2228 sec=f_sec, usec=usec,
2229 caplen=caplen, wirelen=wirelen,
2230 ifname=ifname,
2231 direction=direction,
2232 linktype=linktype,
2233 comments=comments,
2234 )
2237class GenericRawPcapWriter(GenericPcapWriter):
2238 header_present = False
2239 nano = False
2240 sync = False
2241 f = None # type: Union[IO[bytes], gzip.GzipFile]
2243 def fileno(self):
2244 # type: () -> int
2245 return -1 if WINDOWS else self.f.fileno()
2247 def flush(self):
2248 # type: () -> Optional[Any]
2249 return self.f.flush()
2251 def close(self):
2252 # type: () -> Optional[Any]
2253 if not self.header_present:
2254 self.write_header(None)
2255 return self.f.close()
2257 def __enter__(self):
2258 # type: () -> GenericRawPcapWriter
2259 return self
2261 def __exit__(self, exc_type, exc_value, tracback):
2262 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
2263 self.flush()
2264 self.close()
2266 def write(self, pkt):
2267 # type: (Union[_PacketIterable, bytes]) -> None
2268 """
2269 Writes a Packet, a SndRcvList object, or bytes to a pcap file.
2271 :param pkt: Packet(s) to write (one record for each Packet), or raw
2272 bytes to write (as one record).
2273 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes
2274 """
2275 if isinstance(pkt, bytes):
2276 if not self.header_present:
2277 self.write_header(pkt)
2278 self.write_packet(pkt)
2279 else:
2280 # Import here to avoid circular dependency
2281 from scapy.supersocket import IterSocket
2282 for p in IterSocket(pkt).iter:
2283 if not self.header_present:
2284 self.write_header(p)
2286 if not isinstance(p, bytes) and \
2287 self.linktype != conf.l2types.get(type(p), None):
2288 warning("Inconsistent linktypes detected!"
2289 " The resulting file might contain"
2290 " invalid packets."
2291 )
2293 self.write_packet(p)
2296class RawPcapWriter(GenericRawPcapWriter):
2297 """A stream PCAP writer with more control than wrpcap()"""
2299 def __init__(self,
2300 filename, # type: Union[IO[bytes], str]
2301 linktype=None, # type: Optional[int]
2302 gz=False, # type: bool
2303 endianness="", # type: str
2304 append=False, # type: bool
2305 sync=False, # type: bool
2306 nano=False, # type: bool
2307 snaplen=MTU, # type: int
2308 bufsz=4096, # type: int
2309 ):
2310 # type: (...) -> None
2311 """
2312 :param filename: the name of the file to write packets to, or an open,
2313 writable file-like object.
2314 :param linktype: force linktype to a given value. If None, linktype is
2315 taken from the first writer packet
2316 :param gz: compress the capture on the fly
2317 :param endianness: force an endianness (little:"<", big:">").
2318 Default is native
2319 :param append: append packets to the capture file instead of
2320 truncating it
2321 :param sync: do not bufferize writes to the capture file
2322 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0)
2324 """
2326 if linktype:
2327 self.linktype = linktype
2328 self.snaplen = snaplen
2329 self.append = append
2330 self.gz = gz
2331 self.endian = endianness
2332 self.sync = sync
2333 self.nano = nano
2334 if sync:
2335 bufsz = 0
2337 if isinstance(filename, str):
2338 self.filename = filename
2339 if gz:
2340 self.f = cast(_ByteStream, gzip.open(
2341 filename, append and "ab" or "wb", 9
2342 ))
2343 else:
2344 self.f = open(filename, append and "ab" or "wb", bufsz)
2345 else:
2346 self.f = filename
2347 self.filename = getattr(filename, "name", "No name")
2349 def _write_header(self, pkt):
2350 # type: (Optional[Union[Packet, bytes]]) -> None
2351 self.header_present = True
2353 if self.append:
2354 # Even if prone to race conditions, this seems to be
2355 # safest way to tell whether the header is already present
2356 # because we have to handle compressed streams that
2357 # are not as flexible as basic files
2358 if self.gz:
2359 g = gzip.open(self.filename, "rb") # type: _ByteStream
2360 else:
2361 g = open(self.filename, "rb")
2362 try:
2363 if g.read(16):
2364 return
2365 finally:
2366 g.close()
2368 if not hasattr(self, 'linktype'):
2369 raise ValueError(
2370 "linktype could not be guessed. "
2371 "Please pass a linktype while creating the writer"
2372 )
2374 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501
2375 2, 4, 0, 0, self.snaplen, self.linktype))
2376 self.f.flush()
2378 def _write_packet(self,
2379 packet, # type: Union[bytes, Packet]
2380 linktype, # type: int
2381 sec=None, # type: Optional[float]
2382 usec=None, # type: Optional[int]
2383 caplen=None, # type: Optional[int]
2384 wirelen=None, # type: Optional[int]
2385 ifname=None, # type: Optional[bytes]
2386 direction=None, # type: Optional[int]
2387 comments=None, # type: Optional[List[bytes]]
2388 ):
2389 # type: (...) -> None
2390 """
2391 Writes a single packet to the pcap file.
2393 :param packet: bytes for a single packet
2394 :type packet: bytes
2395 :param linktype: linktype value associated with the packet
2396 :type linktype: int
2397 :param sec: time the packet was captured, in seconds since epoch. If
2398 not supplied, defaults to now.
2399 :type sec: float
2400 :param usec: not used with pcapng
2401 packet was captured
2402 :type usec: int or long
2403 :param caplen: The length of the packet in the capture file. If not
2404 specified, uses ``len(packet)``.
2405 :type caplen: int
2406 :param wirelen: The length of the packet on the wire. If not
2407 specified, uses ``caplen``.
2408 :type wirelen: int
2409 :return: None
2410 :rtype: None
2411 """
2412 if caplen is None:
2413 caplen = len(packet)
2414 if wirelen is None:
2415 wirelen = caplen
2416 if sec is None or usec is None:
2417 t = time.time()
2418 it = int(t)
2419 if sec is None:
2420 sec = it
2421 usec = int(round((t - it) *
2422 (1000000000 if self.nano else 1000000)))
2423 elif usec is None:
2424 usec = 0
2426 self.f.write(struct.pack(self.endian + "IIII",
2427 int(sec), usec, caplen, wirelen))
2428 self.f.write(bytes(packet))
2429 if self.sync:
2430 self.f.flush()
2433class RawPcapNgWriter(GenericRawPcapWriter):
2434 """A stream pcapng writer with more control than wrpcapng()"""
2436 def __init__(self,
2437 filename, # type: str
2438 ):
2439 # type: (...) -> None
2441 self.header_present = False
2442 self.tsresol = 1000000
2443 # A dict to keep if_name to IDB id mapping.
2444 # unknown if_name(None) id=0
2445 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0}
2447 # tcpdump only support little-endian in PCAPng files
2448 self.endian = "<"
2449 self.endian_magic = b"\x4d\x3c\x2b\x1a"
2451 self.filename = filename
2452 self.f = open(filename, "wb", 4096)
2454 def _get_time(self,
2455 packet, # type: Union[bytes, Packet]
2456 sec, # type: Optional[float]
2457 usec # type: Optional[int]
2458 ):
2459 # type: (...) -> Tuple[float, int]
2460 if hasattr(packet, "time"):
2461 if sec is None:
2462 sec = float(packet.time)
2464 if usec is None:
2465 usec = 0
2467 return sec, usec # type: ignore
2469 def _add_padding(self, raw_data):
2470 # type: (bytes) -> bytes
2471 raw_data += ((-len(raw_data)) % 4) * b"\x00"
2472 return raw_data
2474 def build_block(self, block_type, block_body, options=None):
2475 # type: (bytes, bytes, Optional[bytes]) -> bytes
2477 # Pad Block Body to 32 bits
2478 block_body = self._add_padding(block_body)
2480 if options:
2481 block_body += options
2483 # An empty block is 12 bytes long
2484 block_total_length = 12 + len(block_body)
2486 # Block Type
2487 block = block_type
2488 # Block Total Length$
2489 block += struct.pack(self.endian + "I", block_total_length)
2490 # Block Body
2491 block += block_body
2492 # Block Total Length$
2493 block += struct.pack(self.endian + "I", block_total_length)
2495 return block
2497 def _write_header(self, pkt):
2498 # type: (Optional[Union[Packet, bytes]]) -> None
2499 if not self.header_present:
2500 self.header_present = True
2501 self._write_block_shb()
2502 self._write_block_idb(linktype=self.linktype)
2504 def _write_block_shb(self):
2505 # type: () -> None
2507 # Block Type
2508 block_type = b"\x0A\x0D\x0D\x0A"
2509 # Byte-Order Magic
2510 block_shb = self.endian_magic
2511 # Major Version
2512 block_shb += struct.pack(self.endian + "H", 1)
2513 # Minor Version
2514 block_shb += struct.pack(self.endian + "H", 0)
2515 # Section Length
2516 block_shb += struct.pack(self.endian + "q", -1)
2518 self.f.write(self.build_block(block_type, block_shb))
2520 def _write_block_idb(self,
2521 linktype, # type: int
2522 ifname=None # type: Optional[bytes]
2523 ):
2524 # type: (...) -> None
2526 # Block Type
2527 block_type = struct.pack(self.endian + "I", 1)
2528 # LinkType
2529 block_idb = struct.pack(self.endian + "H", linktype)
2530 # Reserved
2531 block_idb += struct.pack(self.endian + "H", 0)
2532 # SnapLen
2533 block_idb += struct.pack(self.endian + "I", 262144)
2535 # if_name option
2536 opts = None
2537 if ifname is not None:
2538 opts = struct.pack(self.endian + "HH", 2, len(ifname))
2539 # Pad Option Value to 32 bits
2540 opts += self._add_padding(ifname)
2541 opts += struct.pack(self.endian + "HH", 0, 0)
2543 self.f.write(self.build_block(block_type, block_idb, options=opts))
2545 def _write_block_spb(self, raw_pkt):
2546 # type: (bytes) -> None
2548 # Block Type
2549 block_type = struct.pack(self.endian + "I", 3)
2550 # Original Packet Length
2551 block_spb = struct.pack(self.endian + "I", len(raw_pkt))
2552 # Packet Data
2553 block_spb += raw_pkt
2555 self.f.write(self.build_block(block_type, block_spb))
2557 def _write_block_epb(self,
2558 raw_pkt, # type: bytes
2559 ifid, # type: int
2560 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501
2561 caplen=None, # type: Optional[int]
2562 orglen=None, # type: Optional[int]
2563 comments=None, # type: Optional[List[bytes]]
2564 flags=None, # type: Optional[int]
2565 ):
2566 # type: (...) -> None
2568 if timestamp:
2569 tmp_ts = int(timestamp * self.tsresol)
2570 ts_high = tmp_ts >> 32
2571 ts_low = tmp_ts & 0xFFFFFFFF
2572 else:
2573 ts_high = ts_low = 0
2575 if not caplen:
2576 caplen = len(raw_pkt)
2578 if not orglen:
2579 orglen = len(raw_pkt)
2581 # Block Type
2582 block_type = struct.pack(self.endian + "I", 6)
2583 # Interface ID
2584 block_epb = struct.pack(self.endian + "I", ifid)
2585 # Timestamp (High)
2586 block_epb += struct.pack(self.endian + "I", ts_high)
2587 # Timestamp (Low)
2588 block_epb += struct.pack(self.endian + "I", ts_low)
2589 # Captured Packet Length
2590 block_epb += struct.pack(self.endian + "I", caplen)
2591 # Original Packet Length
2592 block_epb += struct.pack(self.endian + "I", orglen)
2593 # Packet Data
2594 block_epb += raw_pkt
2596 # Options
2597 opts = b''
2598 if comments and len(comments):
2599 for c in comments:
2600 comment = bytes_encode(c)
2601 opts += struct.pack(self.endian + "HH", 1, len(comment))
2602 # Pad Option Value to 32 bits
2603 opts += self._add_padding(comment)
2604 if type(flags) == int:
2605 opts += struct.pack(self.endian + "HH", 2, 4)
2606 opts += struct.pack(self.endian + "I", flags)
2607 if opts:
2608 opts += struct.pack(self.endian + "HH", 0, 0)
2610 self.f.write(self.build_block(block_type, block_epb,
2611 options=opts))
2613 def _write_packet(self, # type: ignore
2614 packet, # type: bytes
2615 linktype, # type: int
2616 sec=None, # type: Optional[float]
2617 usec=None, # type: Optional[int]
2618 caplen=None, # type: Optional[int]
2619 wirelen=None, # type: Optional[int]
2620 ifname=None, # type: Optional[bytes]
2621 direction=None, # type: Optional[int]
2622 comments=None, # type: Optional[List[bytes]]
2623 ):
2624 # type: (...) -> None
2625 """
2626 Writes a single packet to the pcap file.
2628 :param packet: bytes for a single packet
2629 :type packet: bytes
2630 :param linktype: linktype value associated with the packet
2631 :type linktype: int
2632 :param sec: time the packet was captured, in seconds since epoch. If
2633 not supplied, defaults to now.
2634 :type sec: float
2635 :param caplen: The length of the packet in the capture file. If not
2636 specified, uses ``len(packet)``.
2637 :type caplen: int
2638 :param wirelen: The length of the packet on the wire. If not
2639 specified, uses ``caplen``.
2640 :type wirelen: int
2641 :param comment: UTF-8 string containing human-readable comment text
2642 that is associated to the current block. Line separators
2643 SHOULD be a carriage-return + linefeed ('\r\n') or
2644 just linefeed ('\n'); either form may appear and
2645 be considered a line separator. The string is not
2646 zero-terminated.
2647 :type bytes
2648 :param ifname: UTF-8 string containing the
2649 name of the device used to capture data.
2650 The string is not zero-terminated.
2651 :type bytes
2652 :param direction: 0 = information not available,
2653 1 = inbound,
2654 2 = outbound
2655 :type int
2656 :return: None
2657 :rtype: None
2658 """
2659 if caplen is None:
2660 caplen = len(packet)
2661 if wirelen is None:
2662 wirelen = caplen
2664 ifid = self.interfaces2id.get(ifname, None)
2665 if ifid is None:
2666 ifid = max(self.interfaces2id.values()) + 1
2667 self.interfaces2id[ifname] = ifid
2668 self._write_block_idb(linktype=linktype, ifname=ifname)
2670 # EPB flags (32 bits).
2671 # currently only direction is implemented (least 2 significant bits)
2672 if type(direction) == int:
2673 flags = direction & 0x3
2674 else:
2675 flags = None
2677 self._write_block_epb(packet, timestamp=sec, caplen=caplen,
2678 orglen=wirelen, comments=comments, ifid=ifid, flags=flags)
2679 if self.sync:
2680 self.f.flush()
2683class PcapWriter(RawPcapWriter):
2684 """A stream PCAP writer with more control than wrpcap()"""
2685 pass
2688class PcapNgWriter(RawPcapNgWriter):
2689 """A stream pcapng writer with more control than wrpcapng()"""
2691 def _get_time(self,
2692 packet, # type: Union[bytes, Packet]
2693 sec, # type: Optional[float]
2694 usec # type: Optional[int]
2695 ):
2696 # type: (...) -> Tuple[float, int]
2697 if hasattr(packet, "time"):
2698 if sec is None:
2699 sec = float(packet.time)
2701 if usec is None:
2702 usec = 0
2704 return sec, usec # type: ignore
2707@conf.commands.register
2708def rderf(filename, count=-1):
2709 # type: (Union[IO[bytes], str], int) -> PacketList
2710 """Read a ERF file and return a packet list
2712 :param count: read only <count> packets
2713 """
2714 with ERFEthernetReader(filename) as fdesc:
2715 return fdesc.read_all(count=count)
2718class ERFEthernetReader_metaclass(PcapReader_metaclass):
2719 def __call__(cls, filename):
2720 # type: (Union[IO[bytes], str]) -> Any
2721 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore
2722 filename, fdesc = cls.open(filename)
2723 try:
2724 i.__init__(filename, fdesc)
2725 return i
2726 except (Scapy_Exception, EOFError):
2727 pass
2729 if "alternative" in cls.__dict__:
2730 cls = cls.__dict__["alternative"]
2731 i = cls.__new__(
2732 cls,
2733 cls.__name__,
2734 cls.__bases__,
2735 cls.__dict__ # type: ignore
2736 )
2737 try:
2738 i.__init__(filename, fdesc)
2739 return i
2740 except (Scapy_Exception, EOFError):
2741 pass
2743 raise Scapy_Exception("Not a supported capture file")
2745 @staticmethod
2746 def open(fname # type: ignore
2747 ):
2748 # type: (...) -> Tuple[str, _ByteStream]
2749 """Open (if necessary) filename"""
2750 if isinstance(fname, str):
2751 filename = fname
2752 try:
2753 with gzip.open(filename, "rb") as tmp:
2754 tmp.read(1)
2755 fdesc = gzip.open(filename, "rb") # type: _ByteStream
2756 except IOError:
2757 fdesc = open(filename, "rb")
2759 else:
2760 fdesc = fname
2761 filename = getattr(fdesc, "name", "No name")
2762 return filename, fdesc
2765class ERFEthernetReader(PcapReader,
2766 metaclass=ERFEthernetReader_metaclass):
2768 def __init__(self, filename, fdesc=None): # type: ignore
2769 # type: (Union[IO[bytes], str], IO[bytes]) -> None
2770 self.filename = filename # type: ignore
2771 self.f = fdesc
2772 self.power = Decimal(10) ** Decimal(-9)
2774 # time is in 64-bits Endace's format which can be see here:
2775 # https://www.endace.com/erf-extensible-record-format-types.pdf
2776 def _convert_erf_timestamp(self, t):
2777 # type: (int) -> EDecimal
2778 sec = t >> 32
2779 frac_sec = t & 0xffffffff
2780 frac_sec *= 10**9
2781 frac_sec += (frac_sec & 0x80000000) << 1
2782 frac_sec >>= 32
2783 return EDecimal(sec + self.power * frac_sec)
2785 # The details of ERF Packet format can be see here:
2786 # https://www.endace.com/erf-extensible-record-format-types.pdf
2787 def read_packet(self, size=MTU, **kwargs):
2788 # type: (int, **Any) -> Packet
2790 # General ERF Header have exactly 16 bytes
2791 hdr = self.f.read(16)
2792 if len(hdr) < 16:
2793 raise EOFError
2795 # The timestamp is in little-endian byte-order.
2796 time = struct.unpack('<Q', hdr[:8])[0]
2797 # The rest is in big-endian byte-order.
2798 # Ignoring flags and lctr (loss counter) since they are ERF specific
2799 # header fields which Packet object does not support.
2800 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:])
2801 # Check if the type != 0x02, type Ethernet
2802 if type & 0x02 == 0:
2803 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)")
2805 # If there are extended headers, ignore it because Packet object does
2806 # not support it. Extended headers size is 8 bytes before the payload.
2807 if type & 0x80:
2808 _ = self.f.read(8)
2809 s = self.f.read(rlen - 24)
2810 else:
2811 s = self.f.read(rlen - 16)
2813 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both
2814 # of the fields are disregarded by Endace.
2815 pb = s[2:size]
2816 from scapy.layers.l2 import Ether
2817 try:
2818 p = Ether(pb, **kwargs) # type: Packet
2819 except KeyboardInterrupt:
2820 raise
2821 except Exception:
2822 if conf.debug_dissector:
2823 from scapy.sendrecv import debug
2824 debug.crashed_on = (Ether, s)
2825 raise
2826 if conf.raw_layer is None:
2827 # conf.raw_layer is set on import
2828 import scapy.packet # noqa: F401
2829 p = conf.raw_layer(s)
2831 p.time = self._convert_erf_timestamp(time)
2832 p.wirelen = wlen
2834 return p
2837@conf.commands.register
2838def wrerf(filename, # type: Union[IO[bytes], str]
2839 pkt, # type: _PacketIterable
2840 *args, # type: Any
2841 **kargs # type: Any
2842 ):
2843 # type: (...) -> None
2844 """Write a list of packets to a ERF file
2846 :param filename: the name of the file to write packets to, or an open,
2847 writable file-like object. The file descriptor will be
2848 closed at the end of the call, so do not use an object you
2849 do not want to close (e.g., running wrerf(sys.stdout, [])
2850 in interactive mode will crash Scapy).
2851 :param gz: set to 1 to save a gzipped capture
2852 :param append: append packets to the capture file instead of
2853 truncating it
2854 :param sync: do not bufferize writes to the capture file
2855 """
2856 with ERFEthernetWriter(filename, *args, **kargs) as fdesc:
2857 fdesc.write(pkt)
2860class ERFEthernetWriter(PcapWriter):
2861 """A stream ERF Ethernet writer with more control than wrerf()"""
2863 def __init__(self,
2864 filename, # type: Union[IO[bytes], str]
2865 gz=False, # type: bool
2866 append=False, # type: bool
2867 sync=False, # type: bool
2868 ):
2869 # type: (...) -> None
2870 """
2871 :param filename: the name of the file to write packets to, or an open,
2872 writable file-like object.
2873 :param gz: compress the capture on the fly
2874 :param append: append packets to the capture file instead of
2875 truncating it
2876 :param sync: do not bufferize writes to the capture file
2877 """
2878 super(ERFEthernetWriter, self).__init__(filename,
2879 gz=gz,
2880 append=append,
2881 sync=sync)
2883 def write(self, pkt): # type: ignore
2884 # type: (_PacketIterable) -> None
2885 """
2886 Writes a Packet, a SndRcvList object, or bytes to a ERF file.
2888 :param pkt: Packet(s) to write (one record for each Packet)
2889 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet
2890 """
2891 # Import here to avoid circular dependency
2892 from scapy.supersocket import IterSocket
2893 for p in IterSocket(pkt).iter:
2894 self.write_packet(p)
2896 def write_packet(self, pkt): # type: ignore
2897 # type: (Packet) -> None
2899 if hasattr(pkt, "time"):
2900 sec = int(pkt.time)
2901 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9)
2902 t = (sec << 32) + usec
2903 else:
2904 t = int(time.time()) << 32
2906 # There are 16 bytes of headers + 2 bytes of padding before the packets
2907 # payload.
2908 rlen = len(pkt) + 18
2910 if hasattr(pkt, "wirelen"):
2911 wirelen = pkt.wirelen
2912 if wirelen is None:
2913 wirelen = rlen
2915 self.f.write(struct.pack("<Q", t))
2916 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0))
2917 self.f.write(bytes(pkt))
2918 self.f.flush()
2920 def close(self):
2921 # type: () -> Optional[Any]
2922 return self.f.close()
2925@conf.commands.register
2926def import_hexcap(input_string=None):
2927 # type: (Optional[str]) -> bytes
2928 """Imports a tcpdump like hexadecimal view
2930 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex"
2932 :param input_string: String containing the hexdump input to parse. If None,
2933 read from standard input.
2934 """
2935 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501
2936 p = ""
2937 try:
2938 if input_string:
2939 input_function = StringIO(input_string).readline
2940 else:
2941 input_function = input
2942 while True:
2943 line = input_function().strip()
2944 if not line:
2945 break
2946 try:
2947 p += re_extract_hexcap.match(line).groups()[2] # type: ignore
2948 except Exception:
2949 warning("Parsing error during hexcap")
2950 continue
2951 except EOFError:
2952 pass
2954 p = p.replace(" ", "")
2955 return hex_bytes(p)
2958@conf.commands.register
2959def wireshark(pktlist, wait=False, **kwargs):
2960 # type: (List[Packet], bool, **Any) -> Optional[Any]
2961 """
2962 Runs Wireshark on a list of packets.
2964 See :func:`tcpdump` for more parameter description.
2966 Note: this defaults to wait=False, to run Wireshark in the background.
2967 """
2968 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs)
2971@conf.commands.register
2972def tdecode(
2973 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable]
2974 args=None, # type: Optional[List[str]]
2975 **kwargs # type: Any
2976):
2977 # type: (...) -> Any
2978 """
2979 Run tshark on a list of packets.
2981 :param args: If not specified, defaults to ``tshark -V``.
2983 See :func:`tcpdump` for more parameters.
2984 """
2985 if args is None:
2986 args = ["-V"]
2987 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs)
2990def _guess_linktype_name(value):
2991 # type: (int) -> str
2992 """Guess the DLT name from its value."""
2993 from scapy.libs.winpcapy import pcap_datalink_val_to_name
2994 return cast(bytes, pcap_datalink_val_to_name(value)).decode()
2997def _guess_linktype_value(name):
2998 # type: (str) -> int
2999 """Guess the value of a DLT name."""
3000 from scapy.libs.winpcapy import pcap_datalink_name_to_val
3001 val = cast(int, pcap_datalink_name_to_val(name.encode()))
3002 if val == -1:
3003 warning("Unknown linktype: %s. Using EN10MB", name)
3004 return DLT_EN10MB
3005 return val
3008@conf.commands.register
3009def tcpdump(
3010 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable]
3011 dump=False, # type: bool
3012 getfd=False, # type: bool
3013 args=None, # type: Optional[List[str]]
3014 flt=None, # type: Optional[str]
3015 prog=None, # type: Optional[Any]
3016 getproc=False, # type: bool
3017 quiet=False, # type: bool
3018 use_tempfile=None, # type: Optional[Any]
3019 read_stdin_opts=None, # type: Optional[Any]
3020 linktype=None, # type: Optional[Any]
3021 wait=True, # type: bool
3022 _suppress=False # type: bool
3023):
3024 # type: (...) -> Any
3025 """Run tcpdump or tshark on a list of packets.
3027 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a
3028 temporary file to store the packets. This works around a bug in Apple's
3029 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/
3031 Otherwise, the packets are passed in stdin.
3033 This function can be explicitly enabled or disabled with the
3034 ``use_tempfile`` parameter.
3036 When using ``wireshark``, it will be called with ``-ki -`` to start
3037 immediately capturing packets from stdin.
3039 Otherwise, the command will be run with ``-r -`` (which is correct for
3040 ``tcpdump`` and ``tshark``).
3042 This can be overridden with ``read_stdin_opts``. This has no effect when
3043 ``use_tempfile=True``, or otherwise reading packets from a regular file.
3045 :param pktlist: a Packet instance, a PacketList instance or a list of
3046 Packet instances. Can also be a filename (as a string), an open
3047 file-like object that must be a file format readable by
3048 tshark (Pcap, PcapNg, etc.) or None (to sniff)
3049 :param flt: a filter to use with tcpdump
3050 :param dump: when set to True, returns a string instead of displaying it.
3051 :param getfd: when set to True, returns a file-like object to read data
3052 from tcpdump or tshark from.
3053 :param getproc: when set to True, the subprocess.Popen object is returned
3054 :param args: arguments (as a list) to pass to tshark (example for tshark:
3055 args=["-T", "json"]).
3056 :param prog: program to use (defaults to tcpdump, will work with tshark)
3057 :param quiet: when set to True, the process stderr is discarded
3058 :param use_tempfile: When set to True, always use a temporary file to store
3059 packets.
3060 When set to False, pipe packets through stdin.
3061 When set to None (default), only use a temporary file with
3062 ``tcpdump`` on OSX.
3063 :param read_stdin_opts: When set, a list of arguments needed to capture
3064 from stdin. Otherwise, attempts to guess.
3065 :param linktype: A custom DLT value or name, to overwrite the default
3066 values.
3067 :param wait: If True (default), waits for the process to terminate before
3068 returning to Scapy. If False, the process will be detached to the
3069 background. If dump, getproc or getfd is True, these have the same
3070 effect as ``wait=False``.
3072 Examples::
3074 >>> tcpdump([IP()/TCP(), IP()/UDP()])
3075 reading from file -, link-type RAW (Raw IP)
3076 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501
3077 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]
3079 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
3080 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501
3081 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0
3083 To get a JSON representation of a tshark-parsed PacketList(), one can::
3085 >>> import json, pprint
3086 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5",
3087 ... dst="45.33.32.156"),
3088 ... prog=conf.prog.tshark,
3089 ... args=["-T", "json"],
3090 ... getfd=True))
3091 >>> pprint.pprint(json_data)
3092 [{u'_index': u'packets-2016-12-23',
3093 u'_score': None,
3094 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
3095 u'frame.encap_type': u'7',
3096 [...]
3097 },
3098 u'ip': {u'ip.addr': u'45.33.32.156',
3099 u'ip.checksum': u'0x0000a20d',
3100 [...]
3101 u'ip.ttl': u'64',
3102 u'ip.version': u'4'},
3103 u'raw': u'Raw packet data'}},
3104 u'_type': u'pcap_file'}]
3105 >>> json_data[0]['_source']['layers']['ip']['ip.ttl']
3106 u'64'
3107 """
3108 getfd = getfd or getproc
3109 if prog is None:
3110 if not conf.prog.tcpdump:
3111 raise Scapy_Exception(
3112 "tcpdump is not available"
3113 )
3114 prog = [conf.prog.tcpdump]
3115 elif isinstance(prog, str):
3116 prog = [prog]
3117 else:
3118 raise ValueError("prog must be a string")
3120 if linktype is not None:
3121 if isinstance(linktype, int):
3122 # Guess name from value
3123 try:
3124 linktype_name = _guess_linktype_name(linktype)
3125 except StopIteration:
3126 linktype = -1
3127 else:
3128 # Guess value from name
3129 if linktype.startswith("DLT_"):
3130 linktype = linktype[4:]
3131 linktype_name = linktype
3132 try:
3133 linktype = _guess_linktype_value(linktype)
3134 except KeyError:
3135 linktype = -1
3136 if linktype == -1:
3137 raise ValueError(
3138 "Unknown linktype. Try passing its datalink name instead"
3139 )
3140 prog += ["-y", linktype_name]
3142 # Build Popen arguments
3143 if args is None:
3144 args = []
3145 else:
3146 # Make a copy of args
3147 args = list(args)
3149 if flt is not None:
3150 # Check the validity of the filter
3151 if linktype is None and isinstance(pktlist, str):
3152 # linktype is unknown but required. Read it from file
3153 with PcapReader(pktlist) as rd:
3154 if isinstance(rd, PcapNgReader):
3155 # Get the linktype from the first packet
3156 try:
3157 _, metadata = rd._read_packet()
3158 linktype = metadata.linktype
3159 if OPENBSD and linktype == 228:
3160 linktype = DLT_RAW
3161 except EOFError:
3162 raise ValueError(
3163 "Cannot get linktype from a PcapNg packet."
3164 )
3165 else:
3166 linktype = rd.linktype
3167 from scapy.arch.common import compile_filter
3168 compile_filter(flt, linktype=linktype)
3169 args.append(flt)
3171 stdout = subprocess.PIPE if dump or getfd else None
3172 stderr = open(os.devnull) if quiet else None
3173 proc = None
3175 if use_tempfile is None:
3176 # Apple's tcpdump cannot read from stdin, see:
3177 # http://apple.stackexchange.com/questions/152682/
3178 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump
3180 if read_stdin_opts is None:
3181 if prog[0] == conf.prog.wireshark:
3182 # Start capturing immediately (-k) from stdin (-i -)
3183 read_stdin_opts = ["-ki", "-"]
3184 elif prog[0] == conf.prog.tcpdump and not OPENBSD:
3185 # Capture in packet-buffered mode (-U) from stdin (-r -)
3186 read_stdin_opts = ["-U", "-r", "-"]
3187 else:
3188 read_stdin_opts = ["-r", "-"]
3189 else:
3190 # Make a copy of read_stdin_opts
3191 read_stdin_opts = list(read_stdin_opts)
3193 if pktlist is None:
3194 # sniff
3195 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3196 proc = subprocess.Popen(
3197 prog + args,
3198 stdout=stdout,
3199 stderr=stderr,
3200 )
3201 elif isinstance(pktlist, str):
3202 # file
3203 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3204 proc = subprocess.Popen(
3205 prog + ["-r", pktlist] + args,
3206 stdout=stdout,
3207 stderr=stderr,
3208 )
3209 elif use_tempfile:
3210 tmpfile = get_temp_file( # type: ignore
3211 autoext=".pcap",
3212 fd=True
3213 ) # type: IO[bytes]
3214 try:
3215 tmpfile.writelines(
3216 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3217 )
3218 except AttributeError:
3219 pktlist = cast("_PacketIterable", pktlist)
3220 wrpcap(tmpfile, pktlist, linktype=linktype)
3221 else:
3222 tmpfile.close()
3223 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3224 proc = subprocess.Popen(
3225 prog + ["-r", tmpfile.name] + args,
3226 stdout=stdout,
3227 stderr=stderr,
3228 )
3229 else:
3230 try:
3231 pktlist.fileno() # type: ignore
3232 # pass the packet stream
3233 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3234 proc = subprocess.Popen(
3235 prog + read_stdin_opts + args,
3236 stdin=pktlist, # type: ignore
3237 stdout=stdout,
3238 stderr=stderr,
3239 )
3240 except (AttributeError, ValueError):
3241 # write the packet stream to stdin
3242 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3243 proc = subprocess.Popen(
3244 prog + read_stdin_opts + args,
3245 stdin=subprocess.PIPE,
3246 stdout=stdout,
3247 stderr=stderr,
3248 )
3249 if proc is None:
3250 # An error has occurred
3251 return
3252 try:
3253 proc.stdin.writelines( # type: ignore
3254 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3255 )
3256 except AttributeError:
3257 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore
3258 except UnboundLocalError:
3259 # The error was handled by ContextManagerSubprocess
3260 pass
3261 else:
3262 proc.stdin.close() # type: ignore
3263 if proc is None:
3264 # An error has occurred
3265 return
3266 if dump:
3267 data = b"".join(
3268 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore
3269 )
3270 proc.terminate()
3271 return data
3272 if getproc:
3273 return proc
3274 if getfd:
3275 return proc.stdout
3276 if wait:
3277 proc.wait()
3280@conf.commands.register
3281def hexedit(pktlist):
3282 # type: (_PacketIterable) -> PacketList
3283 """Run hexedit on a list of packets, then return the edited packets."""
3284 f = get_temp_file()
3285 wrpcap(f, pktlist)
3286 with ContextManagerSubprocess(conf.prog.hexedit):
3287 subprocess.call([conf.prog.hexedit, f])
3288 rpktlist = rdpcap(f)
3289 os.unlink(f)
3290 return rpktlist
3293def get_terminal_width():
3294 # type: () -> Optional[int]
3295 """Get terminal width (number of characters) if in a window.
3297 Notice: this will try several methods in order to
3298 support as many terminals and OS as possible.
3299 """
3300 sizex = shutil.get_terminal_size(fallback=(0, 0))[0]
3301 if sizex != 0:
3302 return sizex
3303 # Backups
3304 if WINDOWS:
3305 from ctypes import windll, create_string_buffer
3306 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
3307 h = windll.kernel32.GetStdHandle(-12)
3308 csbi = create_string_buffer(22)
3309 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
3310 if res:
3311 (bufx, bufy, curx, cury, wattr,
3312 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501
3313 sizex = right - left + 1
3314 # sizey = bottom - top + 1
3315 return sizex
3316 return sizex
3317 # We have various methods
3318 # COLUMNS is set on some terminals
3319 try:
3320 sizex = int(os.environ['COLUMNS'])
3321 except Exception:
3322 pass
3323 if sizex:
3324 return sizex
3325 # We can query TIOCGWINSZ
3326 try:
3327 import fcntl
3328 import termios
3329 s = struct.pack('HHHH', 0, 0, 0, 0)
3330 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
3331 sizex = struct.unpack('HHHH', x)[1]
3332 except (IOError, ModuleNotFoundError):
3333 # If everything failed, return default terminal size
3334 sizex = 79
3335 return sizex
3338def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]]
3339 header, # type: List[Tuple[str, ...]]
3340 sortBy=0, # type: Optional[int]
3341 borders=False, # type: bool
3342 ):
3343 # type: (...) -> str
3344 """
3345 Pretty list to fit the terminal, and add header.
3347 :param rtlst: a list of tuples. each tuple contains a value which can
3348 be either a string or a list of string.
3349 :param sortBy: the column id (starting with 0) which will be used for
3350 ordering
3351 :param borders: whether to put borders on the table or not
3352 """
3353 if borders:
3354 _space = "|"
3355 else:
3356 _space = " "
3357 cols = len(header[0])
3358 # Windows has a fat terminal border
3359 _spacelen = len(_space) * (cols - 1) + int(WINDOWS)
3360 _croped = False
3361 if sortBy is not None:
3362 # Sort correctly
3363 rtlst.sort(key=lambda x: x[sortBy])
3364 # Resolve multi-values
3365 for i, line in enumerate(rtlst):
3366 ids = [] # type: List[int]
3367 values = [] # type: List[Union[str, List[str]]]
3368 for j, val in enumerate(line):
3369 if isinstance(val, list):
3370 ids.append(j)
3371 values.append(val or " ")
3372 if values:
3373 del rtlst[i]
3374 k = 0
3375 for ex_vals in zip_longest(*values, fillvalue=" "):
3376 if k:
3377 extra_line = [" "] * cols
3378 else:
3379 extra_line = list(line) # type: ignore
3380 for j, h in enumerate(ids):
3381 extra_line[h] = ex_vals[j]
3382 rtlst.insert(i + k, tuple(extra_line))
3383 k += 1
3384 rtslst = cast(List[Tuple[str, ...]], rtlst)
3385 # Append tag
3386 rtslst = header + rtslst
3387 # Detect column's width
3388 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)]
3389 # Make text fit in box (if required)
3390 width = get_terminal_width()
3391 if conf.auto_crop_tables and width:
3392 width = width - _spacelen
3393 while sum(colwidth) > width:
3394 _croped = True
3395 # Needs to be cropped
3396 # Get the longest row
3397 i = colwidth.index(max(colwidth))
3398 # Get all elements of this row
3399 row = [len(x[i]) for x in rtslst]
3400 # Get biggest element of this row: biggest of the array
3401 j = row.index(max(row))
3402 # Re-build column tuple with the edited element
3403 t = list(rtslst[j])
3404 t[i] = t[i][:-2] + "_"
3405 rtslst[j] = tuple(t)
3406 # Update max size
3407 row[j] = len(t[i])
3408 colwidth[i] = max(row)
3409 if _croped:
3410 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501
3411 # Generate padding scheme
3412 fmt = _space.join(["%%-%ds" % x for x in colwidth])
3413 # Append separation line if needed
3414 if borders:
3415 rtslst.insert(1, tuple("-" * x for x in colwidth))
3416 # Compile
3417 return "\n".join(fmt % x for x in rtslst)
3420def human_size(x, fmt=".1f"):
3421 # type: (int, str) -> str
3422 """
3423 Convert a size in octets to a human string representation
3424 """
3425 units = ['K', 'M', 'G', 'T', 'P', 'E']
3426 if not x:
3427 return "0B"
3428 i = int(math.log(x, 2**10))
3429 if i and i < len(units):
3430 return format(x / 2**(10 * i), fmt) + units[i - 1]
3431 return str(x) + "B"
3434def __make_table(
3435 yfmtfunc, # type: Callable[[int], str]
3436 fmtfunc, # type: Callable[[int], str]
3437 endline, # type: str
3438 data, # type: List[Tuple[Packet, Packet]]
3439 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]]
3440 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3441 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3442 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]]
3443 dump=False # type: bool
3444):
3445 # type: (...) -> Optional[str]
3446 """Core function of the make_table suite, which generates the table"""
3447 vx = {} # type: Dict[str, int]
3448 vy = {} # type: Dict[str, Optional[int]]
3449 vz = {} # type: Dict[Tuple[str, str], str]
3450 vxf = {} # type: Dict[str, str]
3452 tmp_len = 0
3453 for e in data:
3454 xx, yy, zz = [str(s) for s in fxyz(*e)]
3455 tmp_len = max(len(yy), tmp_len)
3456 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz))
3457 vy[yy] = None
3458 vz[(xx, yy)] = zz
3460 vxk = list(vx)
3461 vyk = list(vy)
3462 if sortx:
3463 vxk.sort(key=sortx)
3464 else:
3465 try:
3466 vxk.sort(key=int)
3467 except Exception:
3468 try:
3469 vxk.sort(key=atol)
3470 except Exception:
3471 vxk.sort()
3472 if sorty:
3473 vyk.sort(key=sorty)
3474 else:
3475 try:
3476 vyk.sort(key=int)
3477 except Exception:
3478 try:
3479 vyk.sort(key=atol)
3480 except Exception:
3481 vyk.sort()
3483 s = ""
3484 if seplinefunc:
3485 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk])
3486 s += sepline + "\n"
3488 fmt = yfmtfunc(tmp_len)
3489 s += fmt % ""
3490 s += ' '
3491 for x in vxk:
3492 vxf[x] = fmtfunc(vx[x])
3493 s += vxf[x] % x
3494 s += ' '
3495 s += endline + "\n"
3496 if seplinefunc:
3497 s += sepline + "\n"
3498 for y in vyk:
3499 s += fmt % y
3500 s += ' '
3501 for x in vxk:
3502 s += vxf[x] % vz.get((x, y), "-")
3503 s += ' '
3504 s += endline + "\n"
3505 if seplinefunc:
3506 s += sepline + "\n"
3508 if dump:
3509 return s
3510 else:
3511 print(s, end="")
3512 return None
3515def make_table(*args, **kargs):
3516 # type: (*Any, **Any) -> Optional[Any]
3517 return __make_table(
3518 lambda l: "%%-%is" % l,
3519 lambda l: "%%-%is" % l,
3520 "",
3521 *args,
3522 **kargs
3523 )
3526def make_lined_table(*args, **kargs):
3527 # type: (*Any, **Any) -> Optional[str]
3528 return __make_table( # type: ignore
3529 lambda l: "%%-%is |" % l,
3530 lambda l: "%%-%is |" % l,
3531 "",
3532 *args,
3533 seplinefunc=lambda a, x: "+".join(
3534 '-' * (y + 2) for y in [a - 1] + x + [-2]
3535 ),
3536 **kargs
3537 )
3540def make_tex_table(*args, **kargs):
3541 # type: (*Any, **Any) -> Optional[str]
3542 return __make_table( # type: ignore
3543 lambda l: "%s",
3544 lambda l: "& %s",
3545 "\\\\",
3546 *args,
3547 seplinefunc=lambda a, x: "\\hline",
3548 **kargs
3549 )
3551####################
3552# WHOIS CLIENT #
3553####################
3556def whois(ip_address):
3557 # type: (str) -> bytes
3558 """Whois client for Python"""
3559 whois_ip = str(ip_address)
3560 try:
3561 query = socket.gethostbyname(whois_ip)
3562 except Exception:
3563 query = whois_ip
3564 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3565 s.connect(("whois.ripe.net", 43))
3566 s.send(query.encode("utf8") + b"\r\n")
3567 answer = b""
3568 while True:
3569 d = s.recv(4096)
3570 answer += d
3571 if not d:
3572 break
3573 s.close()
3574 ignore_tag = b"remarks:"
3575 # ignore all lines starting with the ignore_tag
3576 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501
3577 # remove empty lines at the bottom
3578 for i in range(1, len(lines)):
3579 if not lines[-i].strip():
3580 del lines[-i]
3581 else:
3582 break
3583 return b"\n".join(lines[3:])
3585####################
3586# CLI utils #
3587####################
3590class _CLIUtilMetaclass(type):
3591 class TYPE(enum.Enum):
3592 COMMAND = 0
3593 OUTPUT = 1
3594 COMPLETE = 2
3596 def __new__(cls, # type: Type[_CLIUtilMetaclass]
3597 name, # type: str
3598 bases, # type: Tuple[type, ...]
3599 dct # type: Dict[str, Any]
3600 ):
3601 # type: (...) -> Type[CLIUtil]
3602 dct["commands"] = {
3603 x.__name__: x
3604 for x in dct.values()
3605 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMMAND
3606 }
3607 dct["commands_output"] = {
3608 x.cliutil_ref.__name__: x
3609 for x in dct.values()
3610 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.OUTPUT
3611 }
3612 dct["commands_complete"] = {
3613 x.cliutil_ref.__name__: x
3614 for x in dct.values()
3615 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMPLETE
3616 }
3617 newcls = cast(Type['CLIUtil'], type.__new__(cls, name, bases, dct))
3618 return newcls
3621class CLIUtil(metaclass=_CLIUtilMetaclass):
3622 """
3623 Provides a Util class to easily create simple CLI tools in Scapy,
3624 that can still be used as an API.
3626 Doc:
3627 - override the ps1() function
3628 - register commands with the @CLIUtil.addcomment decorator
3629 - call the loop() function when ready
3630 """
3632 def _depcheck(self) -> None:
3633 """
3634 Check that all dependencies are installed
3635 """
3636 try:
3637 import prompt_toolkit # noqa: F401
3638 except ImportError:
3639 # okay we lie but prompt_toolkit is a dependency...
3640 raise ImportError("You need to have IPython installed to use the CLI")
3642 # Okay let's do nice code
3643 commands: Dict[str, Callable[..., Any]] = {}
3644 # print output of command
3645 commands_output: Dict[str, Callable[..., str]] = {}
3646 # provides completion to command
3647 commands_complete: Dict[str, Callable[..., List[str]]] = {}
3649 def __init__(self, cli: bool = True, debug: bool = False) -> None:
3650 """
3651 DEV: overwrite
3652 """
3653 if cli:
3654 self._depcheck()
3655 self.loop(debug=debug)
3657 @staticmethod
3658 def _inspectkwargs(func: DecoratorCallable) -> None:
3659 """
3660 Internal function to parse arguments from the kwargs of the functions
3661 """
3662 func._flagnames = [ # type: ignore
3663 x.name for x in
3664 inspect.signature(func).parameters.values()
3665 if x.kind == inspect.Parameter.KEYWORD_ONLY
3666 ]
3667 func._flags = [ # type: ignore
3668 ("-%s" % x) if len(x) == 1 else ("--%s" % x)
3669 for x in func._flagnames # type: ignore
3670 ]
3672 @staticmethod
3673 def _parsekwargs(
3674 func: DecoratorCallable,
3675 args: List[str]
3676 ) -> Tuple[List[str], Dict[str, Literal[True]]]:
3677 """
3678 Internal function to parse CLI arguments of a function.
3679 """
3680 kwargs: Dict[str, Literal[True]] = {}
3681 if func._flags: # type: ignore
3682 i = 0
3683 for arg in args:
3684 if arg in func._flags: # type: ignore
3685 i += 1
3686 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501
3687 continue
3688 break
3689 args = args[i:]
3690 return args, kwargs
3692 @classmethod
3693 def _parseallargs(
3694 cls,
3695 func: DecoratorCallable,
3696 cmd: str, args: List[str]
3697 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]:
3698 """
3699 Internal function to parse CLI arguments of both the function
3700 and its output function.
3701 """
3702 args, kwargs = cls._parsekwargs(func, args)
3703 outkwargs: Dict[str, Literal[True]] = {}
3704 if cmd in cls.commands_output:
3705 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args)
3706 return args, kwargs, outkwargs
3708 @classmethod
3709 def addcommand(
3710 cls,
3711 spaces: bool = False,
3712 globsupport: bool = False,
3713 ) -> Callable[[DecoratorCallable], DecoratorCallable]:
3714 """
3715 Decorator to register a command
3716 """
3717 def func(cmd: DecoratorCallable) -> DecoratorCallable:
3718 cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore
3719 cmd._spaces = spaces # type: ignore
3720 cmd._globsupport = globsupport # type: ignore
3721 cls._inspectkwargs(cmd)
3722 if cmd._globsupport and not cmd._spaces: # type: ignore
3723 raise ValueError("Cannot use globsupport without spaces.")
3724 return cmd
3725 return func
3727 @classmethod
3728 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3729 """
3730 Decorator to register a command output processor
3731 """
3732 def func(processor: DecoratorCallable) -> DecoratorCallable:
3733 processor.cliutil_type = _CLIUtilMetaclass.TYPE.OUTPUT # type: ignore
3734 processor.cliutil_ref = cmd # type: ignore
3735 cls._inspectkwargs(processor)
3736 return processor
3737 return func
3739 @classmethod
3740 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3741 """
3742 Decorator to register a command completor
3743 """
3744 def func(processor: DecoratorCallable) -> DecoratorCallable:
3745 processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore
3746 processor.cliutil_ref = cmd # type: ignore
3747 return processor
3748 return func
3750 def ps1(self) -> str:
3751 """
3752 Return the PS1 of the shell
3753 """
3754 return "> "
3756 def close(self) -> None:
3757 """
3758 Function called on exiting
3759 """
3760 print("Exited")
3762 def help(self, cmd: Optional[str] = None) -> None:
3763 """
3764 Return the help related to this CLI util
3765 """
3766 def _args(func: Any) -> str:
3767 flags = func._flags.copy()
3768 if func.__name__ in self.commands_output:
3769 flags += self.commands_output[func.__name__]._flags # type: ignore
3770 return " %s%s" % (
3771 (
3772 "%s " % " ".join("[%s]" % x for x in flags)
3773 if flags else ""
3774 ),
3775 " ".join(
3776 "<%s%s>" % (
3777 x.name,
3778 "?" if
3779 (x.default is None or x.default != inspect.Parameter.empty)
3780 else ""
3781 )
3782 for x in list(inspect.signature(func).parameters.values())[1:]
3783 if x.name not in func._flagnames and x.name[0] != "_"
3784 )
3785 )
3787 if cmd:
3788 if cmd not in self.commands:
3789 print("Unknown command '%s'" % cmd)
3790 return
3791 # help for one command
3792 func = self.commands[cmd]
3793 print("%s%s: %s" % (
3794 cmd,
3795 _args(func),
3796 func.__doc__ and func.__doc__.strip()
3797 ))
3798 else:
3799 header = "│ %s - Help │" % self.__class__.__name__
3800 print("┌" + "─" * (len(header) - 2) + "┐")
3801 print(header)
3802 print("└" + "─" * (len(header) - 2) + "┘")
3803 print(
3804 pretty_list(
3805 [
3806 (
3807 cmd,
3808 _args(func),
3809 func.__doc__ and func.__doc__.strip().split("\n")[0] or ""
3810 )
3811 for cmd, func in self.commands.items()
3812 ],
3813 [("Command", "Arguments", "Description")]
3814 )
3815 )
3817 def _completer(self) -> 'prompt_toolkit.completion.Completer':
3818 """
3819 Returns a prompt_toolkit custom completer
3820 """
3821 from prompt_toolkit.completion import Completer, Completion
3823 class CLICompleter(Completer):
3824 def get_completions(cmpl, document, complete_event): # type: ignore
3825 if not complete_event.completion_requested:
3826 # Only activate when the user does <TAB>
3827 return
3828 parts = document.text.split(" ")
3829 cmd = parts[0].lower()
3830 if cmd not in self.commands:
3831 # We are trying to complete the command
3832 for possible_cmd in (x for x in self.commands if x.startswith(cmd)):
3833 yield Completion(possible_cmd, start_position=-len(cmd))
3834 else:
3835 # We are trying to complete the command content
3836 if len(parts) == 1:
3837 return
3838 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:])
3839 arg = " ".join(args)
3840 if cmd in self.commands_complete:
3841 for possible_arg in self.commands_complete[cmd](self, arg):
3842 yield Completion(possible_arg, start_position=-len(arg))
3843 return
3844 return CLICompleter()
3846 def loop(self, debug: int = 0) -> None:
3847 """
3848 Main command handling loop
3849 """
3850 from prompt_toolkit import PromptSession
3851 session = PromptSession(completer=self._completer())
3853 while True:
3854 try:
3855 cmd = session.prompt(self.ps1()).strip()
3856 except KeyboardInterrupt:
3857 continue
3858 except EOFError:
3859 self.close()
3860 break
3861 args = cmd.split(" ")[1:]
3862 cmd = cmd.split(" ")[0].strip().lower()
3863 if not cmd:
3864 continue
3865 if cmd in ["help", "h", "?"]:
3866 self.help(" ".join(args))
3867 continue
3868 if cmd in "exit":
3869 break
3870 if cmd not in self.commands:
3871 print("Unknown command. Type help or ?")
3872 else:
3873 # check the number of arguments
3874 func = self.commands[cmd]
3875 args, kwargs, outkwargs = self._parseallargs(func, cmd, args)
3876 if func._spaces: # type: ignore
3877 args = [" ".join(args)]
3878 # if globsupport is set, we might need to do several calls
3879 if func._globsupport and "*" in args[0]: # type: ignore
3880 if args[0].count("*") > 1:
3881 print("More than 1 glob star (*) is currently unsupported.")
3882 continue
3883 before, after = args[0].split("*", 1)
3884 reg = re.compile(re.escape(before) + r".*" + after)
3885 calls = [
3886 [x] for x in
3887 self.commands_complete[cmd](self, before)
3888 if reg.match(x)
3889 ]
3890 else:
3891 calls = [args]
3892 else:
3893 calls = [args]
3894 # now iterate if required, call the function and print its output
3895 res = None
3896 for args in calls:
3897 try:
3898 res = func(self, *args, **kwargs)
3899 except TypeError:
3900 print("Bad number of arguments !")
3901 self.help(cmd=cmd)
3902 continue
3903 except Exception as ex:
3904 print("Command failed with error: %s" % ex)
3905 if debug:
3906 traceback.print_exception(ex)
3907 try:
3908 if res and cmd in self.commands_output:
3909 self.commands_output[cmd](self, res, **outkwargs)
3910 except Exception as ex:
3911 print("Output processor failed with error: %s" % ex)
3914def AutoArgparse(
3915 func: DecoratorCallable,
3916 _parseonly: bool = False,
3917) -> Optional[Tuple[List[str], List[str]]]:
3918 """
3919 Generate an Argparse call from a function, then call this function.
3921 Notes:
3923 - for the arguments to have a description, the sphinx docstring format
3924 must be used. See
3925 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html
3926 - the arguments must be typed in Python (we ignore Sphinx-specific types)
3927 untyped arguments are ignored.
3928 - only types that would be supported by argparse are supported. The others
3929 are omitted.
3930 """
3931 argsdoc = {}
3932 if func.__doc__:
3933 # Sphinx doc format parser
3934 m = re.match(
3935 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)",
3936 func.__doc__.strip(),
3937 )
3938 if not m:
3939 desc = func.__doc__.strip()
3940 else:
3941 desc = m.group(1)
3942 sphinxargs = re.findall(
3943 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)",
3944 m.group(2),
3945 )
3946 for argtype, argparam, argdesc in sphinxargs:
3947 argparam = argparam.strip()
3948 argdesc = argdesc.strip()
3949 if argtype == "param":
3950 if not argparam:
3951 raise ValueError(":param: without a name !")
3952 argsdoc[argparam] = argdesc
3953 else:
3954 desc = ""
3956 # Process the parameters
3957 positional = []
3958 noargument = []
3959 hexarguments = []
3960 parameters = {}
3961 for param in inspect.signature(func).parameters.values():
3962 if not param.annotation:
3963 continue
3964 noarg = False
3965 parname = param.name.replace("_", "-")
3966 paramkwargs: Dict[str, Any] = {}
3967 if param.annotation is bool:
3968 if param.default is True:
3969 parname = "no-" + parname
3970 paramkwargs["action"] = "store_false"
3971 else:
3972 paramkwargs["action"] = "store_true"
3973 noarg = True
3974 elif param.annotation is bytes:
3975 paramkwargs["type"] = str
3976 hexarguments.append(parname)
3977 elif param.annotation in [str, int, float]:
3978 paramkwargs["type"] = param.annotation
3979 else:
3980 continue
3981 if param.default != inspect.Parameter.empty:
3982 if param.kind == inspect.Parameter.POSITIONAL_ONLY:
3983 positional.append(param.name)
3984 paramkwargs["nargs"] = '?'
3985 else:
3986 parname = "--" + parname
3987 paramkwargs["default"] = param.default
3988 else:
3989 positional.append(param.name)
3990 if param.kind == inspect.Parameter.VAR_POSITIONAL:
3991 paramkwargs["action"] = "append"
3992 if param.name in argsdoc:
3993 paramkwargs["help"] = argsdoc[param.name]
3994 if param.annotation is bytes:
3995 paramkwargs["help"] = "(hex) " + paramkwargs["help"]
3996 elif param.annotation is bool:
3997 paramkwargs["help"] = "(flag) " + paramkwargs["help"]
3998 else:
3999 paramkwargs["help"] = (
4000 "(%s) " % param.annotation.__name__ + paramkwargs["help"]
4001 )
4002 # Add to the parameter list
4003 parameters[parname] = paramkwargs
4004 if noarg:
4005 noargument.append(parname)
4007 if _parseonly:
4008 # An internal mode used to generate bash autocompletion, do it then exit.
4009 return (
4010 [x for x in parameters if x not in positional] + ["--help"],
4011 [x for x in noargument if x not in positional] + ["--help"],
4012 )
4014 # Now build the argparse.ArgumentParser
4015 parser = argparse.ArgumentParser(
4016 prog=func.__name__,
4017 description=desc,
4018 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
4019 )
4021 # Add parameters to parser
4022 for parname, paramkwargs in parameters.items():
4023 parser.add_argument(parname, **paramkwargs)
4025 # Now parse the sys.argv parameters
4026 params = vars(parser.parse_args())
4028 # Convert hex parameters if provided
4029 for p in hexarguments:
4030 if params[p] is not None:
4031 try:
4032 params[p] = bytes.fromhex(params[p])
4033 except ValueError:
4034 print(
4035 conf.color_theme.fail(
4036 "ERROR: the value of parameter %s "
4037 "'%s' is not valid hexadecimal !" % (p, params[p])
4038 )
4039 )
4040 return None
4042 # Act as in interactive mode
4043 conf.logLevel = 20
4044 from scapy.themes import DefaultTheme
4045 conf.color_theme = DefaultTheme()
4046 # And call the function
4047 try:
4048 func(
4049 *[params.pop(x) for x in positional],
4050 **{
4051 (k[3:] if k.startswith("no_") else k): v
4052 for k, v in params.items()
4053 }
4054 )
4055 except AssertionError as ex:
4056 print(conf.color_theme.fail("ERROR: " + str(ex)))
4057 parser.print_help()
4058 return None
4061#######################
4062# PERIODIC SENDER #
4063#######################
4066class PeriodicSenderThread(threading.Thread):
4067 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True):
4068 # type: (Any, _PacketIterable, float, bool) -> None
4069 """ Thread to send packets periodically
4071 Args:
4072 sock: socket where packet is sent periodically
4073 pkt: packet or list of packets to send
4074 interval: interval between two packets
4075 """
4076 if not isinstance(pkt, list):
4077 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable
4078 else:
4079 self._pkts = pkt
4080 self._socket = sock
4081 self._stopped = threading.Event()
4082 self._enabled = threading.Event()
4083 self._enabled.set()
4084 self._interval = interval
4085 self._ignore_exceptions = ignore_exceptions
4086 threading.Thread.__init__(self)
4088 def enable(self):
4089 # type: () -> None
4090 self._enabled.set()
4092 def disable(self):
4093 # type: () -> None
4094 self._enabled.clear()
4096 def run(self):
4097 # type: () -> None
4098 while not self._stopped.is_set() and not self._socket.closed:
4099 for p in self._pkts:
4100 try:
4101 if self._enabled.is_set():
4102 self._socket.send(p)
4103 except (OSError, TimeoutError) as e:
4104 if self._ignore_exceptions:
4105 return
4106 else:
4107 raise e
4108 self._stopped.wait(timeout=self._interval)
4109 if self._stopped.is_set() or self._socket.closed:
4110 break
4112 def stop(self):
4113 # type: () -> None
4114 self._stopped.set()
4115 self.join(self._interval * 2)
4118class SingleConversationSocket(object):
4119 def __init__(self, o):
4120 # type: (Any) -> None
4121 self._inner = o
4122 self._tx_mutex = threading.RLock()
4124 @property
4125 def __dict__(self): # type: ignore
4126 return self._inner.__dict__
4128 def __getattr__(self, name):
4129 # type: (str) -> Any
4130 return getattr(self._inner, name)
4132 def sr1(self, *args, **kargs):
4133 # type: (*Any, **Any) -> Any
4134 with self._tx_mutex:
4135 return self._inner.sr1(*args, **kargs)
4137 def sr(self, *args, **kargs):
4138 # type: (*Any, **Any) -> Any
4139 with self._tx_mutex:
4140 return self._inner.sr(*args, **kargs)
4142 def send(self, x):
4143 # type: (Packet) -> Any
4144 with self._tx_mutex:
4145 try:
4146 return self._inner.send(x)
4147 except (ConnectionError, OSError) as e:
4148 self._inner.close()
4149 raise e