Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/utils.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7General utility functions.
8"""
11from decimal import Decimal
12from io import StringIO
13from itertools import zip_longest
14from uuid import UUID
16import argparse
17import array
18import base64
19import collections
20import decimal
21import difflib
22import enum
23import gzip
24import inspect
25import locale
26import math
27import os
28import pickle
29import random
30import re
31import shutil
32import socket
33import struct
34import subprocess
35import sys
36import tempfile
37import threading
38import time
39import traceback
40import warnings
42from scapy.config import conf
43from scapy.consts import DARWIN, OPENBSD, WINDOWS
44from scapy.data import MTU, DLT_EN10MB, DLT_RAW
45from scapy.compat import (
46 orb,
47 plain_str,
48 chb,
49 hex_bytes,
50 bytes_encode,
51)
52from scapy.error import (
53 log_interactive,
54 log_runtime,
55 Scapy_Exception,
56 warning,
57)
58from scapy.pton_ntop import inet_pton
60# Typing imports
61from typing import (
62 cast,
63 Any,
64 AnyStr,
65 Callable,
66 Dict,
67 IO,
68 Iterator,
69 List,
70 Optional,
71 TYPE_CHECKING,
72 Tuple,
73 Type,
74 Union,
75 overload,
76)
77from scapy.compat import (
78 DecoratorCallable,
79 Literal,
80)
82if TYPE_CHECKING:
83 from scapy.packet import Packet
84 from scapy.plist import _PacketIterable, PacketList
85 from scapy.supersocket import SuperSocket
86 import prompt_toolkit
88_ByteStream = Union[IO[bytes], gzip.GzipFile]
90###########
91# Tools #
92###########
95def issubtype(x, # type: Any
96 t, # type: Union[type, str]
97 ):
98 # type: (...) -> bool
99 """issubtype(C, B) -> bool
101 Return whether C is a class and if it is a subclass of class B.
102 When using a tuple as the second argument issubtype(X, (A, B, ...)),
103 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.).
104 """
105 if isinstance(t, str):
106 return t in (z.__name__ for z in x.__bases__)
107 if isinstance(x, type) and issubclass(x, t):
108 return True
109 return False
112_Decimal = Union[Decimal, int]
115class EDecimal(Decimal):
116 """Extended Decimal
118 This implements arithmetic and comparison with float for
119 backward compatibility
120 """
122 def __add__(self, other, context=None):
123 # type: (_Decimal, Any) -> EDecimal
124 return EDecimal(Decimal.__add__(self, Decimal(other)))
126 def __radd__(self, other):
127 # type: (_Decimal) -> EDecimal
128 return EDecimal(Decimal.__add__(self, Decimal(other)))
130 def __sub__(self, other):
131 # type: (_Decimal) -> EDecimal
132 return EDecimal(Decimal.__sub__(self, Decimal(other)))
134 def __rsub__(self, other):
135 # type: (_Decimal) -> EDecimal
136 return EDecimal(Decimal.__rsub__(self, Decimal(other)))
138 def __mul__(self, other):
139 # type: (_Decimal) -> EDecimal
140 return EDecimal(Decimal.__mul__(self, Decimal(other)))
142 def __rmul__(self, other):
143 # type: (_Decimal) -> EDecimal
144 return EDecimal(Decimal.__mul__(self, Decimal(other)))
146 def __truediv__(self, other):
147 # type: (_Decimal) -> EDecimal
148 return EDecimal(Decimal.__truediv__(self, Decimal(other)))
150 def __floordiv__(self, other):
151 # type: (_Decimal) -> EDecimal
152 return EDecimal(Decimal.__floordiv__(self, Decimal(other)))
154 def __divmod__(self, other):
155 # type: (_Decimal) -> Tuple[EDecimal, EDecimal]
156 r = Decimal.__divmod__(self, Decimal(other))
157 return EDecimal(r[0]), EDecimal(r[1])
159 def __mod__(self, other):
160 # type: (_Decimal) -> EDecimal
161 return EDecimal(Decimal.__mod__(self, Decimal(other)))
163 def __rmod__(self, other):
164 # type: (_Decimal) -> EDecimal
165 return EDecimal(Decimal.__rmod__(self, Decimal(other)))
167 def __pow__(self, other, modulo=None):
168 # type: (_Decimal, Optional[_Decimal]) -> EDecimal
169 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo))
171 def __eq__(self, other):
172 # type: (Any) -> bool
173 if isinstance(other, Decimal):
174 return super(EDecimal, self).__eq__(other)
175 else:
176 return bool(float(self) == other)
178 def normalize(self, precision): # type: ignore
179 # type: (int) -> EDecimal
180 with decimal.localcontext() as ctx:
181 ctx.prec = precision
182 return EDecimal(super(EDecimal, self).normalize(ctx))
185@overload
186def get_temp_file(keep, autoext, fd):
187 # type: (bool, str, Literal[True]) -> IO[bytes]
188 pass
191@overload
192def get_temp_file(keep=False, autoext="", fd=False):
193 # type: (bool, str, Literal[False]) -> str
194 pass
197def get_temp_file(keep=False, autoext="", fd=False):
198 # type: (bool, str, bool) -> Union[IO[bytes], str]
199 """Creates a temporary file.
201 :param keep: If False, automatically delete the file when Scapy exits.
202 :param autoext: Suffix to add to the generated file name.
203 :param fd: If True, this returns a file-like object with the temporary
204 file opened. If False (default), this returns a file path.
205 """
206 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
207 delete=False)
208 if not keep:
209 conf.temp_files.append(f.name)
211 if fd:
212 return f
213 else:
214 # Close the file so something else can take it.
215 f.close()
216 return f.name
219def get_temp_dir(keep=False):
220 # type: (bool) -> str
221 """Creates a temporary file, and returns its name.
223 :param keep: If False (default), the directory will be recursively
224 deleted when Scapy exits.
225 :return: A full path to a temporary directory.
226 """
228 dname = tempfile.mkdtemp(prefix="scapy")
230 if not keep:
231 conf.temp_files.append(dname)
233 return dname
236def _create_fifo() -> Tuple[str, Any]:
237 """Creates a temporary fifo.
239 You must then use open_fifo() on the server_fd once
240 the client is connected to use it.
242 :returns: (client_file, server_fd)
243 """
244 if WINDOWS:
245 from scapy.arch.windows.structures import _get_win_fifo
246 return _get_win_fifo()
247 else:
248 f = get_temp_file()
249 os.unlink(f)
250 os.mkfifo(f)
251 return f, f
254def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]:
255 """Open the server_fd (see create_fifo)
256 """
257 if WINDOWS:
258 from scapy.arch.windows.structures import _win_fifo_open
259 return _win_fifo_open(fd)
260 else:
261 return open(fd, mode)
264def sane(x, color=False):
265 # type: (AnyStr, bool) -> str
266 r = ""
267 for i in x:
268 j = orb(i)
269 if (j < 32) or (j >= 127):
270 if color:
271 r += conf.color_theme.not_printable(".")
272 else:
273 r += "."
274 else:
275 r += chr(j)
276 return r
279@conf.commands.register
280def restart():
281 # type: () -> None
282 """Restarts scapy"""
283 if not conf.interactive or not os.path.isfile(sys.argv[0]):
284 raise OSError("Scapy was not started from console")
285 if WINDOWS:
286 res_code = 1
287 try:
288 res_code = subprocess.call([sys.executable] + sys.argv)
289 finally:
290 os._exit(res_code)
291 os.execv(sys.executable, [sys.executable] + sys.argv)
294def lhex(x):
295 # type: (Any) -> str
296 from scapy.volatile import VolatileValue
297 if isinstance(x, VolatileValue):
298 return repr(x)
299 if isinstance(x, int):
300 return hex(x)
301 if isinstance(x, tuple):
302 return "(%s)" % ", ".join(lhex(v) for v in x)
303 if isinstance(x, list):
304 return "[%s]" % ", ".join(lhex(v) for v in x)
305 return str(x)
308@conf.commands.register
309def hexdump(p, dump=False):
310 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
311 """Build a tcpdump like hexadecimal view
313 :param p: a Packet
314 :param dump: define if the result must be printed or returned in a variable
315 :return: a String only when dump=True
316 """
317 s = ""
318 x = bytes_encode(p)
319 x_len = len(x)
320 i = 0
321 while i < x_len:
322 s += "%04x " % i
323 for j in range(16):
324 if i + j < x_len:
325 s += "%02X " % orb(x[i + j])
326 else:
327 s += " "
328 s += " %s\n" % sane(x[i:i + 16], color=True)
329 i += 16
330 # remove trailing \n
331 s = s[:-1] if s.endswith("\n") else s
332 if dump:
333 return s
334 else:
335 print(s)
336 return None
339@conf.commands.register
340def linehexdump(p, onlyasc=0, onlyhex=0, dump=False):
341 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str]
342 """Build an equivalent view of hexdump() on a single line
344 Note that setting both onlyasc and onlyhex to 1 results in a empty output
346 :param p: a Packet
347 :param onlyasc: 1 to display only the ascii view
348 :param onlyhex: 1 to display only the hexadecimal view
349 :param dump: print the view if False
350 :return: a String only when dump=True
351 """
352 s = ""
353 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump)
354 if dump:
355 return s
356 else:
357 print(s)
358 return None
361@conf.commands.register
362def chexdump(p, dump=False):
363 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
364 """Build a per byte hexadecimal representation
366 Example:
367 >>> chexdump(IP())
368 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501
370 :param p: a Packet
371 :param dump: print the view if False
372 :return: a String only if dump=True
373 """
374 x = bytes_encode(p)
375 s = ", ".join("%#04x" % orb(x) for x in x)
376 if dump:
377 return s
378 else:
379 print(s)
380 return None
383@conf.commands.register
384def hexstr(p, onlyasc=0, onlyhex=0, color=False):
385 # type: (Union[Packet, AnyStr], int, int, bool) -> str
386 """Build a fancy tcpdump like hex from bytes."""
387 x = bytes_encode(p)
388 s = []
389 if not onlyasc:
390 s.append(" ".join("%02X" % orb(b) for b in x))
391 if not onlyhex:
392 s.append(sane(x, color=color))
393 return " ".join(s)
396def repr_hex(s):
397 # type: (bytes) -> str
398 """ Convert provided bitstring to a simple string of hex digits """
399 return "".join("%02x" % orb(x) for x in s)
402@conf.commands.register
403def hexdiff(
404 a: Union['Packet', AnyStr],
405 b: Union['Packet', AnyStr],
406 algo: Optional[str] = None,
407 autojunk: bool = False,
408) -> None:
409 """
410 Show differences between 2 binary strings, Packets...
412 Available algorithms:
413 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the
414 Levenstein distance between the strings then backtrack.
415 - difflib: Use the difflib.SequenceMatcher implementation. This based on a
416 modified version of the Ratcliff and Obershelp algorithm.
417 This is much faster, but far less accurate.
418 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher
420 :param a:
421 :param b: The binary strings, packets... to compare
422 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'.
423 By default, this is chosen depending on the complexity, optimistically
424 preferring wagnerfischer unless really necessary.
425 :param autojunk: (difflib only) See difflib documentation.
426 """
427 xb = bytes_encode(a)
428 yb = bytes_encode(b)
430 if algo is None:
431 # Choose the best algorithm
432 complexity = len(xb) * len(yb)
433 if complexity < 1e7:
434 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable.
435 # Anything much larger than this shouldn't be attempted by default.
436 algo = "wagnerfischer"
437 if complexity > 1e6:
438 log_interactive.info(
439 "Complexity is a bit high. hexdiff will take a few seconds."
440 )
441 else:
442 algo = "difflib"
444 backtrackx = []
445 backtracky = []
447 if algo == "wagnerfischer":
448 xb = xb[::-1]
449 yb = yb[::-1]
451 # costs for the 3 operations
452 INSERT = 1
453 DELETE = 1
454 SUBST = 1
456 # Typically, d[i,j] will hold the distance between
457 # the first i characters of xb and the first j characters of yb.
458 # We change the Wagner Fischer to also store pointers to all
459 # the intermediate steps taken while calculating the Levenstein distance.
460 d = {(-1, -1): (0, (-1, -1))}
461 for j in range(len(yb)):
462 d[-1, j] = (j + 1) * INSERT, (-1, j - 1)
463 for i in range(len(xb)):
464 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1)
466 # Compute the Levenstein distance between the two strings, but
467 # store all the steps to be able to backtrack at the end.
468 for j in range(len(yb)):
469 for i in range(len(xb)):
470 d[i, j] = min(
471 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)),
472 (d[i - 1, j][0] + DELETE, (i - 1, j)),
473 (d[i, j - 1][0] + INSERT, (i, j - 1)),
474 )
476 # Iterate through the steps backwards to create the diff
477 i = len(xb) - 1
478 j = len(yb) - 1
479 while not (i == j == -1):
480 i2, j2 = d[i, j][1]
481 backtrackx.append(xb[i2 + 1:i + 1])
482 backtracky.append(yb[j2 + 1:j + 1])
483 i, j = i2, j2
484 elif algo == "difflib":
485 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk)
486 xarr = [xb[i:i + 1] for i in range(len(xb))]
487 yarr = [yb[i:i + 1] for i in range(len(yb))]
488 # Iterate through opcodes to build the backtrack
489 for opcode in sm.get_opcodes():
490 typ, x0, x1, y0, y1 = opcode
491 if typ == 'delete':
492 backtrackx += xarr[x0:x1]
493 backtracky += [b''] * (x1 - x0)
494 elif typ == 'insert':
495 backtrackx += [b''] * (y1 - y0)
496 backtracky += yarr[y0:y1]
497 elif typ in ['equal', 'replace']:
498 backtrackx += xarr[x0:x1]
499 backtracky += yarr[y0:y1]
500 # Some lines may have been considered as junk. Check the sizes
501 if autojunk:
502 lbx = len(backtrackx)
503 lby = len(backtracky)
504 backtrackx += [b''] * (max(lbx, lby) - lbx)
505 backtracky += [b''] * (max(lbx, lby) - lby)
506 else:
507 raise ValueError("Unknown algorithm '%s'" % algo)
509 # Print the diff
511 x = y = i = 0
512 colorize: Dict[int, Callable[[str], str]] = {
513 0: lambda x: x,
514 -1: conf.color_theme.left,
515 1: conf.color_theme.right
516 }
518 dox = 1
519 doy = 0
520 btx_len = len(backtrackx)
521 while i < btx_len:
522 linex = backtrackx[i:i + 16]
523 liney = backtracky[i:i + 16]
524 xx = sum(len(k) for k in linex)
525 yy = sum(len(k) for k in liney)
526 if dox and not xx:
527 dox = 0
528 doy = 1
529 if dox and linex == liney:
530 doy = 1
532 if dox:
533 xd = y
534 j = 0
535 while not linex[j]:
536 j += 1
537 xd -= 1
538 print(colorize[doy - dox]("%04x" % xd), end=' ')
539 x += xx
540 line = linex
541 else:
542 print(" ", end=' ')
543 if doy:
544 yd = y
545 j = 0
546 while not liney[j]:
547 j += 1
548 yd -= 1
549 print(colorize[doy - dox]("%04x" % yd), end=' ')
550 y += yy
551 line = liney
552 else:
553 print(" ", end=' ')
555 print(" ", end=' ')
557 cl = ""
558 for j in range(16):
559 if i + j < min(len(backtrackx), len(backtracky)):
560 if line[j]:
561 col = colorize[(linex[j] != liney[j]) * (doy - dox)]
562 print(col("%02X" % orb(line[j])), end=' ')
563 if linex[j] == liney[j]:
564 cl += sane(line[j], color=True)
565 else:
566 cl += col(sane(line[j]))
567 else:
568 print(" ", end=' ')
569 cl += " "
570 else:
571 print(" ", end=' ')
572 if j == 7:
573 print("", end=' ')
575 print(" ", cl)
577 if doy or not yy:
578 doy = 0
579 dox = 1
580 i += 16
581 else:
582 if yy:
583 dox = 0
584 doy = 1
585 else:
586 i += 16
589if struct.pack("H", 1) == b"\x00\x01": # big endian
590 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int]
591else:
592 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8
595def checksum(pkt):
596 # type: (bytes) -> int
597 if len(pkt) % 2 == 1:
598 pkt += b"\0"
599 s = sum(array.array("H", pkt))
600 s = (s >> 16) + (s & 0xffff)
601 s += s >> 16
602 s = ~s
603 return checksum_endian_transform(s) & 0xffff
606def _fletcher16(charbuf):
607 # type: (bytes) -> Tuple[int, int]
608 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
609 c0 = c1 = 0
610 for char in charbuf:
611 c0 += char
612 c1 += c0
614 c0 %= 255
615 c1 %= 255
616 return (c0, c1)
619@conf.commands.register
620def fletcher16_checksum(binbuf):
621 # type: (bytes) -> int
622 """Calculates Fletcher-16 checksum of the given buffer.
624 Note:
625 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501
626 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501
627 """
628 (c0, c1) = _fletcher16(binbuf)
629 return (c1 << 8) | c0
632@conf.commands.register
633def fletcher16_checkbytes(binbuf, offset):
634 # type: (bytes, int) -> bytes
635 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.
637 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501
638 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501
639 the integrity of the buffer on the receiver side.
641 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501
642 """
644 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
645 if len(binbuf) < offset:
646 raise Exception("Packet too short for checkbytes %d" % len(binbuf))
648 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
649 (c0, c1) = _fletcher16(binbuf)
651 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255
653 if (x <= 0):
654 x += 255
656 y = 510 - c0 - x
658 if (y > 255):
659 y -= 255
660 return chb(x) + chb(y)
663def mac2str(mac):
664 # type: (str) -> bytes
665 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':'))
668def valid_mac(mac):
669 # type: (str) -> bool
670 try:
671 return len(mac2str(mac)) == 6
672 except ValueError:
673 pass
674 return False
677def str2mac(s):
678 # type: (bytes) -> str
679 if isinstance(s, str):
680 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s))
681 return ("%02x:" * len(s))[:-1] % tuple(s)
684def randstring(length):
685 # type: (int) -> bytes
686 """
687 Returns a random string of length (length >= 0)
688 """
689 return b"".join(struct.pack('B', random.randint(0, 255))
690 for _ in range(length))
693def zerofree_randstring(length):
694 # type: (int) -> bytes
695 """
696 Returns a random string of length (length >= 0) without zero in it.
697 """
698 return b"".join(struct.pack('B', random.randint(1, 255))
699 for _ in range(length))
702def stror(s1, s2):
703 # type: (bytes, bytes) -> bytes
704 """
705 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2
706 must be of same length.
707 """
708 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2))
711def strxor(s1, s2):
712 # type: (bytes, bytes) -> bytes
713 """
714 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
715 must be of same length.
716 """
717 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2))
720def strand(s1, s2):
721 # type: (bytes, bytes) -> bytes
722 """
723 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
724 must be of same length.
725 """
726 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2))
729def strrot(s1, count, right=True):
730 # type: (bytes, int, bool) -> bytes
731 """
732 Rotate the binary by 'count' bytes
733 """
734 off = count % len(s1)
735 if right:
736 return s1[-off:] + s1[:-off]
737 else:
738 return s1[off:] + s1[:off]
741# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501
742try:
743 socket.inet_aton("255.255.255.255")
744except socket.error:
745 def inet_aton(ip_string):
746 # type: (str) -> bytes
747 if ip_string == "255.255.255.255":
748 return b"\xff" * 4
749 else:
750 return socket.inet_aton(ip_string)
751else:
752 inet_aton = socket.inet_aton # type: ignore
754inet_ntoa = socket.inet_ntoa
757def atol(x):
758 # type: (str) -> int
759 try:
760 ip = inet_aton(x)
761 except socket.error:
762 raise ValueError("Bad IP format: %s" % x)
763 return cast(int, struct.unpack("!I", ip)[0])
766def valid_ip(addr):
767 # type: (str) -> bool
768 try:
769 addr = plain_str(addr)
770 except UnicodeDecodeError:
771 return False
772 try:
773 atol(addr)
774 except (OSError, ValueError, socket.error):
775 return False
776 return True
779def valid_net(addr):
780 # type: (str) -> bool
781 try:
782 addr = plain_str(addr)
783 except UnicodeDecodeError:
784 return False
785 if '/' in addr:
786 ip, mask = addr.split('/', 1)
787 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32
788 return valid_ip(addr)
791def valid_ip6(addr):
792 # type: (str) -> bool
793 try:
794 addr = plain_str(addr)
795 except UnicodeDecodeError:
796 return False
797 try:
798 inet_pton(socket.AF_INET6, addr)
799 except socket.error:
800 return False
801 return True
804def valid_net6(addr):
805 # type: (str) -> bool
806 try:
807 addr = plain_str(addr)
808 except UnicodeDecodeError:
809 return False
810 if '/' in addr:
811 ip, mask = addr.split('/', 1)
812 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128
813 return valid_ip6(addr)
816def ltoa(x):
817 # type: (int) -> str
818 return inet_ntoa(struct.pack("!I", x & 0xffffffff))
821def itom(x):
822 # type: (int) -> int
823 return (0xffffffff00000000 >> x) & 0xffffffff
826def in4_cidr2mask(m):
827 # type: (int) -> bytes
828 """
829 Return the mask (bitstring) associated with provided length
830 value. For instance if function is called on 20, return value is
831 b'\xff\xff\xf0\x00'.
832 """
833 if m > 32 or m < 0:
834 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501
836 return strxor(
837 b"\xff" * 4,
838 struct.pack(">I", 2**(32 - m) - 1)
839 )
842def in4_isincluded(addr, prefix, mask):
843 # type: (str, str, int) -> bool
844 """
845 Returns True when 'addr' belongs to prefix/mask. False otherwise.
846 """
847 temp = inet_pton(socket.AF_INET, addr)
848 pref = in4_cidr2mask(mask)
849 zero = inet_pton(socket.AF_INET, prefix)
850 return zero == strand(temp, pref)
853def in4_ismaddr(str):
854 # type: (str) -> bool
855 """
856 Returns True if provided address in printable format belongs to
857 allocated Multicast address space (224.0.0.0/4).
858 """
859 return in4_isincluded(str, "224.0.0.0", 4)
862def in4_ismlladdr(str):
863 # type: (str) -> bool
864 """
865 Returns True if address belongs to link-local multicast address
866 space (224.0.0.0/24)
867 """
868 return in4_isincluded(str, "224.0.0.0", 24)
871def in4_ismgladdr(str):
872 # type: (str) -> bool
873 """
874 Returns True if address belongs to global multicast address
875 space (224.0.1.0-238.255.255.255).
876 """
877 return (
878 in4_isincluded(str, "224.0.0.0", 4) and
879 not in4_isincluded(str, "224.0.0.0", 24) and
880 not in4_isincluded(str, "239.0.0.0", 8)
881 )
884def in4_ismlsaddr(str):
885 # type: (str) -> bool
886 """
887 Returns True if address belongs to limited scope multicast address
888 space (239.0.0.0/8).
889 """
890 return in4_isincluded(str, "239.0.0.0", 8)
893def in4_isaddrllallnodes(str):
894 # type: (str) -> bool
895 """
896 Returns True if address is the link-local all-nodes multicast
897 address (224.0.0.1).
898 """
899 return (inet_pton(socket.AF_INET, "224.0.0.1") ==
900 inet_pton(socket.AF_INET, str))
903def in4_getnsmac(a):
904 # type: (bytes) -> str
905 """
906 Return the multicast mac address associated with provided
907 IPv4 address. Passed address must be in network format.
908 """
910 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3])
913def decode_locale_str(x):
914 # type: (bytes) -> str
915 """
916 Decode bytes into a string using the system locale.
917 Useful on Windows where it can be unusual (e.g. cp1252)
918 """
919 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace")
922class ContextManagerSubprocess(object):
923 """
924 Context manager that eases checking for unknown command, without
925 crashing.
927 Example:
928 >>> with ContextManagerSubprocess("tcpdump"):
929 >>> subprocess.Popen(["tcpdump", "--version"])
930 ERROR: Could not execute tcpdump, is it installed?
932 """
934 def __init__(self, prog, suppress=True):
935 # type: (str, bool) -> None
936 self.prog = prog
937 self.suppress = suppress
939 def __enter__(self):
940 # type: () -> None
941 pass
943 def __exit__(self,
944 exc_type, # type: Optional[type]
945 exc_value, # type: Optional[Exception]
946 traceback, # type: Optional[Any]
947 ):
948 # type: (...) -> Optional[bool]
949 if exc_value is None or exc_type is None:
950 return None
951 # Errored
952 if isinstance(exc_value, EnvironmentError):
953 msg = "Could not execute %s, is it installed?" % self.prog
954 else:
955 msg = "%s: execution failed (%s)" % (
956 self.prog,
957 exc_type.__class__.__name__
958 )
959 if not self.suppress:
960 raise exc_type(msg)
961 log_runtime.error(msg, exc_info=True)
962 return True # Suppress the exception
965class ContextManagerCaptureOutput(object):
966 """
967 Context manager that intercept the console's output.
969 Example:
970 >>> with ContextManagerCaptureOutput() as cmco:
971 ... print("hey")
972 ... assert cmco.get_output() == "hey"
973 """
975 def __init__(self):
976 # type: () -> None
977 self.result_export_object = ""
979 def __enter__(self):
980 # type: () -> ContextManagerCaptureOutput
981 from unittest import mock
983 def write(s, decorator=self):
984 # type: (str, ContextManagerCaptureOutput) -> None
985 decorator.result_export_object += s
986 mock_stdout = mock.Mock()
987 mock_stdout.write = write
988 self.bck_stdout = sys.stdout
989 sys.stdout = mock_stdout
990 return self
992 def __exit__(self, *exc):
993 # type: (*Any) -> Literal[False]
994 sys.stdout = self.bck_stdout
995 return False
997 def get_output(self, eval_bytes=False):
998 # type: (bool) -> str
999 if self.result_export_object.startswith("b'") and eval_bytes:
1000 return plain_str(eval(self.result_export_object))
1001 return self.result_export_object
1004def do_graph(
1005 graph, # type: str
1006 prog=None, # type: Optional[str]
1007 format=None, # type: Optional[str]
1008 target=None, # type: Optional[Union[IO[bytes], str]]
1009 type=None, # type: Optional[str]
1010 string=None, # type: Optional[bool]
1011 options=None # type: Optional[List[str]]
1012):
1013 # type: (...) -> Optional[str]
1014 """Processes graph description using an external software.
1015 This method is used to convert a graphviz format to an image.
1017 :param graph: GraphViz graph description
1018 :param prog: which graphviz program to use
1019 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T"
1020 option
1021 :param string: if not None, simply return the graph string
1022 :param target: filename or redirect. Defaults pipe to Imagemagick's
1023 display program
1024 :param options: options to be passed to prog
1025 """
1027 if format is None:
1028 format = "svg"
1029 if string:
1030 return graph
1031 if type is not None:
1032 warnings.warn(
1033 "type is deprecated, and was renamed format",
1034 DeprecationWarning
1035 )
1036 format = type
1037 if prog is None:
1038 prog = conf.prog.dot
1039 start_viewer = False
1040 if target is None:
1041 if WINDOWS:
1042 target = get_temp_file(autoext="." + format)
1043 start_viewer = True
1044 else:
1045 with ContextManagerSubprocess(conf.prog.display):
1046 target = subprocess.Popen([conf.prog.display],
1047 stdin=subprocess.PIPE).stdin
1048 if format is not None:
1049 format = "-T%s" % format
1050 if isinstance(target, str):
1051 if target.startswith('|'):
1052 target = subprocess.Popen(target[1:].lstrip(), shell=True,
1053 stdin=subprocess.PIPE).stdin
1054 elif target.startswith('>'):
1055 target = open(target[1:].lstrip(), "wb")
1056 else:
1057 target = open(os.path.abspath(target), "wb")
1058 target = cast(IO[bytes], target)
1059 proc = subprocess.Popen(
1060 "\"%s\" %s %s" % (prog, options or "", format or ""),
1061 shell=True, stdin=subprocess.PIPE, stdout=target,
1062 stderr=subprocess.PIPE
1063 )
1064 _, stderr = proc.communicate(bytes_encode(graph))
1065 if proc.returncode != 0:
1066 raise OSError(
1067 "GraphViz call failed (is it installed?):\n" +
1068 plain_str(stderr)
1069 )
1070 try:
1071 target.close()
1072 except Exception:
1073 pass
1074 if start_viewer:
1075 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501
1076 waiting_start = time.time()
1077 while not os.path.exists(target.name):
1078 time.sleep(0.1)
1079 if time.time() - waiting_start > 3:
1080 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501
1081 break
1082 else:
1083 if WINDOWS and conf.prog.display == conf.prog._default:
1084 os.startfile(target.name)
1085 else:
1086 with ContextManagerSubprocess(conf.prog.display):
1087 subprocess.Popen([conf.prog.display, target.name])
1088 return None
1091_TEX_TR = {
1092 "{": "{\\tt\\char123}",
1093 "}": "{\\tt\\char125}",
1094 "\\": "{\\tt\\char92}",
1095 "^": "\\^{}",
1096 "$": "\\$",
1097 "#": "\\#",
1098 "_": "\\_",
1099 "&": "\\&",
1100 "%": "\\%",
1101 "|": "{\\tt\\char124}",
1102 "~": "{\\tt\\char126}",
1103 "<": "{\\tt\\char60}",
1104 ">": "{\\tt\\char62}",
1105}
1108def tex_escape(x):
1109 # type: (str) -> str
1110 s = ""
1111 for c in x:
1112 s += _TEX_TR.get(c, c)
1113 return s
1116def colgen(*lstcol, # type: Any
1117 **kargs # type: Any
1118 ):
1119 # type: (...) -> Iterator[Any]
1120 """Returns a generator that mixes provided quantities forever
1121 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501
1122 if len(lstcol) < 2:
1123 lstcol *= 2
1124 trans = kargs.get("trans", lambda x, y, z: (x, y, z))
1125 while True:
1126 for i in range(len(lstcol)):
1127 for j in range(len(lstcol)):
1128 for k in range(len(lstcol)):
1129 if i != j or j != k or k != i:
1130 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501
1133def incremental_label(label="tag%05i", start=0):
1134 # type: (str, int) -> Iterator[str]
1135 while True:
1136 yield label % start
1137 start += 1
1140def binrepr(val):
1141 # type: (int) -> str
1142 return bin(val)[2:]
1145def long_converter(s):
1146 # type: (str) -> int
1147 return int(s.replace('\n', '').replace(' ', ''), 16)
1149#########################
1150# Enum management #
1151#########################
1154class EnumElement:
1155 def __init__(self, key, value):
1156 # type: (str, int) -> None
1157 self._key = key
1158 self._value = value
1160 def __repr__(self):
1161 # type: () -> str
1162 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501
1164 def __getattr__(self, attr):
1165 # type: (str) -> Any
1166 return getattr(self._value, attr)
1168 def __str__(self):
1169 # type: () -> str
1170 return self._key
1172 def __bytes__(self):
1173 # type: () -> bytes
1174 return bytes_encode(self.__str__())
1176 def __hash__(self):
1177 # type: () -> int
1178 return self._value
1180 def __int__(self):
1181 # type: () -> int
1182 return int(self._value)
1184 def __eq__(self, other):
1185 # type: (Any) -> bool
1186 return self._value == int(other)
1188 def __neq__(self, other):
1189 # type: (Any) -> bool
1190 return not self.__eq__(other)
1193class Enum_metaclass(type):
1194 element_class = EnumElement
1196 def __new__(cls, name, bases, dct):
1197 # type: (Any, str, Any, Dict[str, Any]) -> Any
1198 rdict = {}
1199 for k, v in dct.items():
1200 if isinstance(v, int):
1201 v = cls.element_class(k, v)
1202 dct[k] = v
1203 rdict[v] = k
1204 dct["__rdict__"] = rdict
1205 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)
1207 def __getitem__(self, attr):
1208 # type: (int) -> Any
1209 return self.__rdict__[attr] # type: ignore
1211 def __contains__(self, val):
1212 # type: (int) -> bool
1213 return val in self.__rdict__ # type: ignore
1215 def get(self, attr, val=None):
1216 # type: (str, Optional[Any]) -> Any
1217 return self.__rdict__.get(attr, val) # type: ignore
1219 def __repr__(self):
1220 # type: () -> str
1221 return "<%s>" % self.__dict__.get("name", self.__name__)
1224###################
1225# Object saving #
1226###################
1229def export_object(obj):
1230 # type: (Any) -> None
1231 import zlib
1232 print(base64.b64encode(zlib.compress(pickle.dumps(obj, 2), 9)).decode())
1235def import_object(obj=None):
1236 # type: (Optional[str]) -> Any
1237 import zlib
1238 if obj is None:
1239 obj = sys.stdin.read()
1240 return pickle.loads(zlib.decompress(base64.b64decode(obj.strip())))
1243def save_object(fname, obj):
1244 # type: (str, Any) -> None
1245 """Pickle a Python object"""
1247 fd = gzip.open(fname, "wb")
1248 pickle.dump(obj, fd)
1249 fd.close()
1252def load_object(fname):
1253 # type: (str) -> Any
1254 """unpickle a Python object"""
1255 return pickle.load(gzip.open(fname, "rb"))
1258@conf.commands.register
1259def corrupt_bytes(data, p=0.01, n=None):
1260 # type: (str, float, Optional[int]) -> bytes
1261 """
1262 Corrupt a given percentage (at least one byte) or number of bytes
1263 from a string
1264 """
1265 s = array.array("B", bytes_encode(data))
1266 s_len = len(s)
1267 if n is None:
1268 n = max(1, int(s_len * p))
1269 for i in random.sample(range(s_len), n):
1270 s[i] = (s[i] + random.randint(1, 255)) % 256
1271 return s.tobytes()
1274@conf.commands.register
1275def corrupt_bits(data, p=0.01, n=None):
1276 # type: (str, float, Optional[int]) -> bytes
1277 """
1278 Flip a given percentage (at least one bit) or number of bits
1279 from a string
1280 """
1281 s = array.array("B", bytes_encode(data))
1282 s_len = len(s) * 8
1283 if n is None:
1284 n = max(1, int(s_len * p))
1285 for i in random.sample(range(s_len), n):
1286 s[i // 8] ^= 1 << (i % 8)
1287 return s.tobytes()
1290#############################
1291# pcap capture file stuff #
1292#############################
1294@conf.commands.register
1295def wrpcap(filename, # type: Union[IO[bytes], str]
1296 pkt, # type: _PacketIterable
1297 *args, # type: Any
1298 **kargs # type: Any
1299 ):
1300 # type: (...) -> None
1301 """Write a list of packets to a pcap file
1303 :param filename: the name of the file to write packets to, or an open,
1304 writable file-like object. The file descriptor will be
1305 closed at the end of the call, so do not use an object you
1306 do not want to close (e.g., running wrpcap(sys.stdout, [])
1307 in interactive mode will crash Scapy).
1308 :param gz: set to 1 to save a gzipped capture
1309 :param linktype: force linktype value
1310 :param endianness: "<" or ">", force endianness
1311 :param sync: do not bufferize writes to the capture file
1312 """
1313 with PcapWriter(filename, *args, **kargs) as fdesc:
1314 fdesc.write(pkt)
1317@conf.commands.register
1318def wrpcapng(filename, # type: str
1319 pkt, # type: _PacketIterable
1320 ):
1321 # type: (...) -> None
1322 """Write a list of packets to a pcapng file
1324 :param filename: the name of the file to write packets to, or an open,
1325 writable file-like object. The file descriptor will be
1326 closed at the end of the call, so do not use an object you
1327 do not want to close (e.g., running wrpcapng(sys.stdout, [])
1328 in interactive mode will crash Scapy).
1329 :param pkt: packets to write
1330 """
1331 with PcapNgWriter(filename) as fdesc:
1332 fdesc.write(pkt)
1335@conf.commands.register
1336def rdpcap(filename, count=-1):
1337 # type: (Union[IO[bytes], str], int) -> PacketList
1338 """Read a pcap or pcapng file and return a packet list
1340 :param count: read only <count> packets
1341 """
1342 # Rant: Our complicated use of metaclasses and especially the
1343 # __call__ function is, of course, not supported by MyPy.
1344 # One day we should simplify this mess and use a much simpler
1345 # layout that will actually be supported and properly dissected.
1346 with PcapReader(filename) as fdesc: # type: ignore
1347 return fdesc.read_all(count=count)
1350# NOTE: Type hinting
1351# Mypy doesn't understand the following metaclass, and thinks each
1352# constructor (PcapReader...) needs 3 arguments each. To avoid this,
1353# we add a fake (=None) to the last 2 arguments then force the value
1354# to not be None in the signature and pack the whole thing in an ignore.
1355# This allows to not have # type: ignore every time we call those
1356# constructors.
1358class PcapReader_metaclass(type):
1359 """Metaclass for (Raw)Pcap(Ng)Readers"""
1361 def __new__(cls, name, bases, dct):
1362 # type: (Any, str, Any, Dict[str, Any]) -> Any
1363 """The `alternative` class attribute is declared in the PcapNg
1364 variant, and set here to the Pcap variant.
1366 """
1367 newcls = super(PcapReader_metaclass, cls).__new__(
1368 cls, name, bases, dct
1369 )
1370 if 'alternative' in dct:
1371 dct['alternative'].alternative = newcls
1372 return newcls
1374 def __call__(cls, filename):
1375 # type: (Union[IO[bytes], str]) -> Any
1376 """Creates a cls instance, use the `alternative` if that
1377 fails.
1379 """
1380 i = cls.__new__(
1381 cls,
1382 cls.__name__,
1383 cls.__bases__,
1384 cls.__dict__ # type: ignore
1385 )
1386 filename, fdesc, magic = cls.open(filename)
1387 if not magic:
1388 raise Scapy_Exception(
1389 "No data could be read!"
1390 )
1391 try:
1392 i.__init__(filename, fdesc, magic)
1393 return i
1394 except (Scapy_Exception, EOFError):
1395 pass
1397 if "alternative" in cls.__dict__:
1398 cls = cls.__dict__["alternative"]
1399 i = cls.__new__(
1400 cls,
1401 cls.__name__,
1402 cls.__bases__,
1403 cls.__dict__ # type: ignore
1404 )
1405 try:
1406 i.__init__(filename, fdesc, magic)
1407 return i
1408 except (Scapy_Exception, EOFError):
1409 pass
1411 raise Scapy_Exception("Not a supported capture file")
1413 @staticmethod
1414 def open(fname # type: Union[IO[bytes], str]
1415 ):
1416 # type: (...) -> Tuple[str, _ByteStream, bytes]
1417 """Open (if necessary) filename, and read the magic."""
1418 if isinstance(fname, str):
1419 filename = fname
1420 fdesc = open(filename, "rb") # type: _ByteStream
1421 magic = fdesc.read(2)
1422 if magic == b"\x1f\x8b":
1423 # GZIP header detected.
1424 fdesc.seek(0)
1425 fdesc = gzip.GzipFile(fileobj=fdesc)
1426 magic = fdesc.read(2)
1427 magic += fdesc.read(2)
1428 else:
1429 fdesc = fname
1430 filename = getattr(fdesc, "name", "No name")
1431 magic = fdesc.read(4)
1432 return filename, fdesc, magic
1435class RawPcapReader(metaclass=PcapReader_metaclass):
1436 """A stateful pcap reader. Each packet is returned as a string"""
1438 # TODO: use Generics to properly type the various readers.
1439 # As of right now, RawPcapReader is typed as if it returned packets
1440 # because all of its child do. Fix that
1442 nonblocking_socket = True
1443 PacketMetadata = collections.namedtuple("PacketMetadata",
1444 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501
1446 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1447 # type: (str, _ByteStream, bytes) -> None
1448 self.filename = filename
1449 self.f = fdesc
1450 if magic == b"\xa1\xb2\xc3\xd4": # big endian
1451 self.endian = ">"
1452 self.nano = False
1453 elif magic == b"\xd4\xc3\xb2\xa1": # little endian
1454 self.endian = "<"
1455 self.nano = False
1456 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision
1457 self.endian = ">"
1458 self.nano = True
1459 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501
1460 self.endian = "<"
1461 self.nano = True
1462 else:
1463 raise Scapy_Exception(
1464 "Not a pcap capture file (bad magic: %r)" % magic
1465 )
1466 hdr = self.f.read(20)
1467 if len(hdr) < 20:
1468 raise Scapy_Exception("Invalid pcap file (too short)")
1469 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
1470 self.endian + "HHIIII", hdr
1471 )
1472 self.linktype = linktype
1473 self.snaplen = snaplen
1475 def __enter__(self):
1476 # type: () -> RawPcapReader
1477 return self
1479 def __iter__(self):
1480 # type: () -> RawPcapReader
1481 return self
1483 def __next__(self):
1484 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata]
1485 """
1486 implement the iterator protocol on a set of packets in a pcap file
1487 """
1488 try:
1489 return self._read_packet()
1490 except EOFError:
1491 raise StopIteration
1493 def _read_packet(self, size=MTU):
1494 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata]
1495 """return a single packet read from the file as a tuple containing
1496 (pkt_data, pkt_metadata)
1498 raise EOFError when no more packets are available
1499 """
1500 hdr = self.f.read(16)
1501 if len(hdr) < 16:
1502 raise EOFError
1503 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr)
1505 try:
1506 data = self.f.read(caplen)[:size]
1507 except OverflowError as e:
1508 warning(f"Pcap: {e}")
1509 raise EOFError
1511 return (data,
1512 RawPcapReader.PacketMetadata(sec=sec, usec=usec,
1513 wirelen=wirelen, caplen=caplen))
1515 def read_packet(self, size=MTU):
1516 # type: (int) -> Packet
1517 raise Exception(
1518 "Cannot call read_packet() in RawPcapReader. Use "
1519 "_read_packet()"
1520 )
1522 def dispatch(self,
1523 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501
1524 ):
1525 # type: (...) -> None
1526 """call the specified callback routine for each packet read
1528 This is just a convenience function for the main loop
1529 that allows for easy launching of packet processing in a
1530 thread.
1531 """
1532 for p in self:
1533 callback(p)
1535 def _read_all(self, count=-1):
1536 # type: (int) -> List[Packet]
1537 """return a list of all packets in the pcap file
1538 """
1539 res = [] # type: List[Packet]
1540 while count != 0:
1541 count -= 1
1542 try:
1543 p = self.read_packet() # type: Packet
1544 except EOFError:
1545 break
1546 res.append(p)
1547 return res
1549 def recv(self, size=MTU):
1550 # type: (int) -> bytes
1551 """ Emulate a socket
1552 """
1553 return self._read_packet(size=size)[0]
1555 def fileno(self):
1556 # type: () -> int
1557 return -1 if WINDOWS else self.f.fileno()
1559 def close(self):
1560 # type: () -> None
1561 if isinstance(self.f, gzip.GzipFile):
1562 self.f.fileobj.close() # type: ignore
1563 self.f.close()
1565 def __exit__(self, exc_type, exc_value, tracback):
1566 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
1567 self.close()
1569 # emulate SuperSocket
1570 @staticmethod
1571 def select(sockets, # type: List[SuperSocket]
1572 remain=None, # type: Optional[float]
1573 ):
1574 # type: (...) -> List[SuperSocket]
1575 return sockets
1578class PcapReader(RawPcapReader):
1579 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1580 # type: (str, IO[bytes], bytes) -> None
1581 RawPcapReader.__init__(self, filename, fdesc, magic)
1582 try:
1583 self.LLcls = conf.l2types.num2layer[
1584 self.linktype
1585 ] # type: Type[Packet]
1586 except KeyError:
1587 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501
1588 if conf.raw_layer is None:
1589 # conf.raw_layer is set on import
1590 import scapy.packet # noqa: F401
1591 self.LLcls = conf.raw_layer
1593 def __enter__(self):
1594 # type: () -> PcapReader
1595 return self
1597 def read_packet(self, size=MTU, **kwargs):
1598 # type: (int, **Any) -> Packet
1599 rp = super(PcapReader, self)._read_packet(size=size)
1600 if rp is None:
1601 raise EOFError
1602 s, pkt_info = rp
1604 try:
1605 p = self.LLcls(s, **kwargs) # type: Packet
1606 except KeyboardInterrupt:
1607 raise
1608 except Exception:
1609 if conf.debug_dissector:
1610 from scapy.sendrecv import debug
1611 debug.crashed_on = (self.LLcls, s)
1612 raise
1613 if conf.raw_layer is None:
1614 # conf.raw_layer is set on import
1615 import scapy.packet # noqa: F401
1616 p = conf.raw_layer(s)
1617 power = Decimal(10) ** Decimal(-9 if self.nano else -6)
1618 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec)
1619 p.wirelen = pkt_info.wirelen
1620 return p
1622 def recv(self, size=MTU, **kwargs): # type: ignore
1623 # type: (int, **Any) -> Packet
1624 return self.read_packet(size=size, **kwargs)
1626 def __iter__(self):
1627 # type: () -> PcapReader
1628 return self
1630 def __next__(self): # type: ignore
1631 # type: () -> Packet
1632 try:
1633 return self.read_packet()
1634 except EOFError:
1635 raise StopIteration
1637 def read_all(self, count=-1):
1638 # type: (int) -> PacketList
1639 res = self._read_all(count)
1640 from scapy import plist
1641 return plist.PacketList(res, name=os.path.basename(self.filename))
1644class RawPcapNgReader(RawPcapReader):
1645 """A stateful pcapng reader. Each packet is returned as
1646 bytes.
1648 """
1650 alternative = RawPcapReader # type: Type[Any]
1652 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore
1653 ["linktype", "tsresol",
1654 "tshigh", "tslow", "wirelen",
1655 "comment", "ifname", "direction",
1656 "process_information"])
1658 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1659 # type: (str, IO[bytes], bytes) -> None
1660 self.filename = filename
1661 self.f = fdesc
1662 # A list of (linktype, snaplen, tsresol); will be populated by IDBs.
1663 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]]
1664 self.default_options = {
1665 "tsresol": 1000000
1666 }
1667 self.blocktypes: Dict[
1668 int,
1669 Callable[
1670 [bytes, int],
1671 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]
1672 ]] = {
1673 1: self._read_block_idb,
1674 2: self._read_block_pkt,
1675 3: self._read_block_spb,
1676 6: self._read_block_epb,
1677 10: self._read_block_dsb,
1678 0x80000001: self._read_block_pib,
1679 }
1680 self.endian = "!" # Will be overwritten by first SHB
1681 self.process_information = [] # type: List[Dict[str, Any]]
1683 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg:
1684 raise Scapy_Exception(
1685 "Not a pcapng capture file (bad magic: %r)" % magic
1686 )
1688 try:
1689 self._read_block_shb()
1690 except EOFError:
1691 raise Scapy_Exception(
1692 "The first SHB of the pcapng file is malformed !"
1693 )
1695 def _read_block(self, size=MTU):
1696 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501
1697 try:
1698 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0]
1699 except struct.error:
1700 raise EOFError
1701 if blocktype == 0x0A0D0D0A:
1702 # This function updates the endianness based on the block content.
1703 self._read_block_shb()
1704 return None
1705 try:
1706 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0]
1707 except struct.error:
1708 warning("PcapNg: Error reading blocklen before block body")
1709 raise EOFError
1710 if blocklen < 12:
1711 warning("PcapNg: Invalid block length !")
1712 raise EOFError
1714 _block_body_length = blocklen - 12
1715 block = self.f.read(_block_body_length)
1716 if len(block) != _block_body_length:
1717 raise Scapy_Exception("PcapNg: Invalid Block body length "
1718 "(too short)")
1719 self._read_block_tail(blocklen)
1720 if blocktype in self.blocktypes:
1721 return self.blocktypes[blocktype](block, size)
1722 return None
1724 def _read_block_tail(self, blocklen):
1725 # type: (int) -> None
1726 if blocklen % 4:
1727 pad = self.f.read(-blocklen % 4)
1728 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
1729 "Ignored padding %r" % (blocklen, pad))
1730 try:
1731 if blocklen != struct.unpack(self.endian + 'I',
1732 self.f.read(4))[0]:
1733 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)")
1734 except struct.error:
1735 warning("PcapNg: Could not read blocklen after block body")
1736 raise EOFError
1738 def _read_block_shb(self):
1739 # type: () -> None
1740 """Section Header Block"""
1741 _blocklen = self.f.read(4)
1742 endian = self.f.read(4)
1743 if endian == b"\x1a\x2b\x3c\x4d":
1744 self.endian = ">"
1745 elif endian == b"\x4d\x3c\x2b\x1a":
1746 self.endian = "<"
1747 else:
1748 warning("PcapNg: Bad magic in Section Header Block"
1749 " (not a pcapng file?)")
1750 raise EOFError
1752 try:
1753 blocklen = struct.unpack(self.endian + "I", _blocklen)[0]
1754 except struct.error:
1755 warning("PcapNg: Could not read blocklen")
1756 raise EOFError
1757 if blocklen < 28:
1758 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501
1759 raise EOFError
1761 # Major version must be 1
1762 _major = self.f.read(2)
1763 try:
1764 major = struct.unpack(self.endian + "H", _major)[0]
1765 except struct.error:
1766 warning("PcapNg: Could not read major value")
1767 raise EOFError
1768 if major != 1:
1769 warning(f"PcapNg: SHB Major version {major} unsupported !")
1770 raise EOFError
1772 # Skip minor version & section length
1773 skipped = self.f.read(10)
1774 if len(skipped) != 10:
1775 warning("PcapNg: Could not read minor value & section length")
1776 raise EOFError
1778 _options_len = blocklen - 28
1779 options = self.f.read(_options_len)
1780 if len(options) != _options_len:
1781 raise Scapy_Exception("PcapNg: Invalid Section Header Block "
1782 " options (too short)")
1783 self._read_block_tail(blocklen)
1784 self._read_options(options)
1786 def _read_packet(self, size=MTU): # type: ignore
1787 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1788 """Read blocks until it reaches either EOF or a packet, and
1789 returns None or (packet, (linktype, sec, usec, wirelen)),
1790 where packet is a string.
1792 """
1793 while True:
1794 res = self._read_block(size=size)
1795 if res is not None:
1796 return res
1798 def _read_options(self, options):
1799 # type: (bytes) -> Dict[int, bytes]
1800 opts = dict()
1801 while len(options) >= 4:
1802 try:
1803 code, length = struct.unpack(self.endian + "HH", options[:4])
1804 except struct.error:
1805 warning("PcapNg: options header is too small "
1806 "%d !" % len(options))
1807 raise EOFError
1808 if code != 0 and 4 + length <= len(options):
1809 opts[code] = options[4:4 + length]
1810 if code == 0:
1811 if length != 0:
1812 warning("PcapNg: invalid option "
1813 "length %d for end-of-option" % length)
1814 break
1815 if length % 4:
1816 length += (4 - (length % 4))
1817 options = options[4 + length:]
1818 return opts
1820 def _read_block_idb(self, block, _):
1821 # type: (bytes, int) -> None
1822 """Interface Description Block"""
1823 # 2 bytes LinkType + 2 bytes Reserved
1824 # 4 bytes Snaplen
1825 options_raw = self._read_options(block[8:])
1826 options = self.default_options.copy() # type: Dict[str, Any]
1827 for c, v in options_raw.items():
1828 if c == 9:
1829 length = len(v)
1830 if length == 1:
1831 tsresol = orb(v)
1832 options["tsresol"] = (2 if tsresol & 128 else 10) ** (
1833 tsresol & 127
1834 )
1835 else:
1836 warning("PcapNg: invalid options "
1837 "length %d for IDB tsresol" % length)
1838 elif c == 2:
1839 options["name"] = v
1840 elif c == 1:
1841 options["comment"] = v
1842 try:
1843 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack(
1844 self.endian + "HxxI",
1845 block[:8]
1846 ) + (options,)
1847 except struct.error:
1848 warning("PcapNg: IDB is too small %d/8 !" % len(block))
1849 raise EOFError
1850 self.interfaces.append(interface)
1852 def _check_interface_id(self, intid):
1853 # type: (int) -> None
1854 """Check the interface id value and raise EOFError if invalid."""
1855 tmp_len = len(self.interfaces)
1856 if intid >= tmp_len:
1857 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len))
1858 raise EOFError
1860 def _read_block_epb(self, block, size):
1861 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1862 """Enhanced Packet Block"""
1863 try:
1864 intid, tshigh, tslow, caplen, wirelen = struct.unpack(
1865 self.endian + "5I",
1866 block[:20],
1867 )
1868 except struct.error:
1869 warning("PcapNg: EPB is too small %d/20 !" % len(block))
1870 raise EOFError
1872 # Compute the options offset taking padding into account
1873 if caplen % 4:
1874 opt_offset = 20 + caplen + (-caplen) % 4
1875 else:
1876 opt_offset = 20 + caplen
1878 # Parse options
1879 options = self._read_options(block[opt_offset:])
1881 process_information = {}
1882 for code, value in options.items():
1883 if code in [0x8001, 0x8003]: # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX
1884 try:
1885 proc_index = struct.unpack(self.endian + "I", value)[0]
1886 except struct.error:
1887 warning("PcapNg: EPB invalid proc index"
1888 "(expected 4 bytes, got %d) !" % len(value))
1889 raise EOFError
1890 if proc_index < len(self.process_information):
1891 key = "proc" if code == 0x8001 else "eproc"
1892 process_information[key] = self.process_information[proc_index]
1893 else:
1894 warning("PcapNg: EPB invalid process information index "
1895 "(%d/%d) !" % (proc_index, len(self.process_information)))
1897 comment = options.get(1, None)
1898 epb_flags_raw = options.get(2, None)
1899 if epb_flags_raw:
1900 try:
1901 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw)
1902 except struct.error:
1903 warning("PcapNg: EPB invalid flags size"
1904 "(expected 4 bytes, got %d) !" % len(epb_flags_raw))
1905 raise EOFError
1906 direction = epb_flags & 3
1908 else:
1909 direction = None
1911 self._check_interface_id(intid)
1912 ifname = self.interfaces[intid][2].get('name', None)
1914 return (block[20:20 + caplen][:size],
1915 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1916 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1917 tshigh=tshigh,
1918 tslow=tslow,
1919 wirelen=wirelen,
1920 comment=comment,
1921 ifname=ifname,
1922 direction=direction,
1923 process_information=process_information))
1925 def _read_block_spb(self, block, size):
1926 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1927 """Simple Packet Block"""
1928 # "it MUST be assumed that all the Simple Packet Blocks have
1929 # been captured on the interface previously specified in the
1930 # first Interface Description Block."
1931 intid = 0
1932 self._check_interface_id(intid)
1934 try:
1935 wirelen, = struct.unpack(self.endian + "I", block[:4])
1936 except struct.error:
1937 warning("PcapNg: SPB is too small %d/4 !" % len(block))
1938 raise EOFError
1940 caplen = min(wirelen, self.interfaces[intid][1])
1941 return (block[4:4 + caplen][:size],
1942 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1943 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1944 tshigh=None,
1945 tslow=None,
1946 wirelen=wirelen,
1947 comment=None,
1948 ifname=None,
1949 direction=None,
1950 process_information={}))
1952 def _read_block_pkt(self, block, size):
1953 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1954 """(Obsolete) Packet Block"""
1955 try:
1956 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
1957 self.endian + "HH4I",
1958 block[:20],
1959 )
1960 except struct.error:
1961 warning("PcapNg: PKT is too small %d/20 !" % len(block))
1962 raise EOFError
1964 self._check_interface_id(intid)
1965 return (block[20:20 + caplen][:size],
1966 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1967 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1968 tshigh=tshigh,
1969 tslow=tslow,
1970 wirelen=wirelen,
1971 comment=None,
1972 ifname=None,
1973 direction=None,
1974 process_information={}))
1976 def _read_block_dsb(self, block, size):
1977 # type: (bytes, int) -> None
1978 """Decryption Secrets Block"""
1980 # Parse the secrets type and length fields
1981 try:
1982 secrets_type, secrets_length = struct.unpack(
1983 self.endian + "II",
1984 block[:8],
1985 )
1986 block = block[8:]
1987 except struct.error:
1988 warning("PcapNg: DSB is too small %d!", len(block))
1989 raise EOFError
1991 # Compute the secrets length including the padding
1992 padded_secrets_length = secrets_length + (-secrets_length) % 4
1993 if len(block) < padded_secrets_length:
1994 warning("PcapNg: invalid DSB secrets length!")
1995 raise EOFError
1997 # Extract secrets data and options
1998 secrets_data = block[:padded_secrets_length][:secrets_length]
1999 if block[padded_secrets_length:]:
2000 warning("PcapNg: DSB options are not supported!")
2002 # TLS Key Log
2003 if secrets_type == 0x544c534b:
2004 if getattr(conf, "tls_sessions", False) is False:
2005 warning("PcapNg: TLS Key Log available, but "
2006 "the TLS layer is not loaded! Scapy won't be able "
2007 "to decrypt the packets.")
2008 else:
2009 from scapy.layers.tls.session import load_nss_keys
2011 # Write Key Log to a file and parse it
2012 filename = get_temp_file()
2013 with open(filename, "wb") as fd:
2014 fd.write(secrets_data)
2015 fd.close()
2017 keys = load_nss_keys(filename)
2018 if not keys:
2019 warning("PcapNg: invalid TLS Key Log in DSB!")
2020 else:
2021 # Note: these attributes are only available when the TLS
2022 # layer is loaded.
2023 conf.tls_nss_keys = keys
2024 conf.tls_session_enable = True
2025 else:
2026 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type)
2028 def _read_block_pib(self, block, _):
2029 # type: (bytes, int) -> None
2030 """Apple Process Information Block"""
2032 # Get the Process ID
2033 try:
2034 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0]
2035 process_information = {"id": dpeb_pid}
2036 block = block[4:]
2037 except struct.error:
2038 warning("PcapNg: DPEB is too small (%d). Cannot get PID!",
2039 len(block))
2040 raise EOFError
2042 # Get Options
2043 options = self._read_options(block)
2044 for code, value in options.items():
2045 if code == 2:
2046 process_information["name"] = value.decode("ascii", "backslashreplace")
2047 elif code == 4:
2048 if len(value) == 16:
2049 process_information["uuid"] = str(UUID(bytes=value))
2050 else:
2051 warning("PcapNg: DPEB UUID length is invalid (%d)!",
2052 len(value))
2054 # Store process information
2055 self.process_information.append(process_information)
2058class PcapNgReader(RawPcapNgReader, PcapReader):
2060 alternative = PcapReader
2062 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
2063 # type: (str, IO[bytes], bytes) -> None
2064 RawPcapNgReader.__init__(self, filename, fdesc, magic)
2066 def __enter__(self):
2067 # type: () -> PcapNgReader
2068 return self
2070 def read_packet(self, size=MTU, **kwargs):
2071 # type: (int, **Any) -> Packet
2072 rp = super(PcapNgReader, self)._read_packet(size=size)
2073 if rp is None:
2074 raise EOFError
2075 s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction, process_information) = rp # noqa: E501
2076 try:
2077 cls = conf.l2types.num2layer[linktype] # type: Type[Packet]
2078 p = cls(s, **kwargs) # type: Packet
2079 except KeyboardInterrupt:
2080 raise
2081 except Exception:
2082 if conf.debug_dissector:
2083 raise
2084 if conf.raw_layer is None:
2085 # conf.raw_layer is set on import
2086 import scapy.packet # noqa: F401
2087 p = conf.raw_layer(s)
2088 if tshigh is not None:
2089 p.time = EDecimal((tshigh << 32) + tslow) / tsresol
2090 p.wirelen = wirelen
2091 p.comment = comment
2092 p.direction = direction
2093 p.process_information = process_information.copy()
2094 if ifname is not None:
2095 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace')
2096 return p
2098 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore
2099 return self.read_packet(size=size, **kwargs)
2102class GenericPcapWriter(object):
2103 nano = False
2104 linktype: int
2106 def _write_header(self, pkt):
2107 # type: (Optional[Union[Packet, bytes]]) -> None
2108 raise NotImplementedError
2110 def _write_packet(self,
2111 packet, # type: Union[bytes, Packet]
2112 linktype, # type: int
2113 sec=None, # type: Optional[float]
2114 usec=None, # type: Optional[int]
2115 caplen=None, # type: Optional[int]
2116 wirelen=None, # type: Optional[int]
2117 comment=None, # type: Optional[bytes]
2118 ifname=None, # type: Optional[bytes]
2119 direction=None, # type: Optional[int]
2120 ):
2121 # type: (...) -> None
2122 raise NotImplementedError
2124 def _get_time(self,
2125 packet, # type: Union[bytes, Packet]
2126 sec, # type: Optional[float]
2127 usec # type: Optional[int]
2128 ):
2129 # type: (...) -> Tuple[float, int]
2130 if hasattr(packet, "time"):
2131 if sec is None:
2132 packet_time = packet.time
2133 tmp = int(packet_time)
2134 usec = int(round((packet_time - tmp) *
2135 (1000000000 if self.nano else 1000000)))
2136 sec = float(packet_time)
2137 if sec is not None and usec is None:
2138 usec = 0
2139 return sec, usec # type: ignore
2141 def write_header(self, pkt):
2142 # type: (Optional[Union[Packet, bytes]]) -> None
2143 if not hasattr(self, 'linktype'):
2144 try:
2145 if pkt is None or isinstance(pkt, bytes):
2146 # Can't guess LL
2147 raise KeyError
2148 self.linktype = conf.l2types.layer2num[
2149 pkt.__class__
2150 ]
2151 except KeyError:
2152 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)"
2153 warning(msg, self.__class__.__name__, pkt.__class__.__name__)
2154 self.linktype = DLT_EN10MB
2155 self._write_header(pkt)
2157 def write_packet(self,
2158 packet, # type: Union[bytes, Packet]
2159 sec=None, # type: Optional[float]
2160 usec=None, # type: Optional[int]
2161 caplen=None, # type: Optional[int]
2162 wirelen=None, # type: Optional[int]
2163 ):
2164 # type: (...) -> None
2165 """
2166 Writes a single packet to the pcap file.
2168 :param packet: Packet, or bytes for a single packet
2169 :type packet: scapy.packet.Packet or bytes
2170 :param sec: time the packet was captured, in seconds since epoch. If
2171 not supplied, defaults to now.
2172 :type sec: float
2173 :param usec: If ``nano=True``, then number of nanoseconds after the
2174 second that the packet was captured. If ``nano=False``,
2175 then the number of microseconds after the second the
2176 packet was captured. If ``sec`` is not specified,
2177 this value is ignored.
2178 :type usec: int or long
2179 :param caplen: The length of the packet in the capture file. If not
2180 specified, uses ``len(raw(packet))``.
2181 :type caplen: int
2182 :param wirelen: The length of the packet on the wire. If not
2183 specified, tries ``packet.wirelen``, otherwise uses
2184 ``caplen``.
2185 :type wirelen: int
2186 :return: None
2187 :rtype: None
2188 """
2189 f_sec, usec = self._get_time(packet, sec, usec)
2191 rawpkt = bytes_encode(packet)
2192 caplen = len(rawpkt) if caplen is None else caplen
2194 if wirelen is None:
2195 if hasattr(packet, "wirelen"):
2196 wirelen = packet.wirelen
2197 if wirelen is None:
2198 wirelen = caplen
2200 comment = getattr(packet, "comment", None)
2201 ifname = getattr(packet, "sniffed_on", None)
2202 direction = getattr(packet, "direction", None)
2203 if not isinstance(packet, bytes):
2204 linktype: int = conf.l2types.layer2num[
2205 packet.__class__
2206 ]
2207 else:
2208 linktype = self.linktype
2209 if ifname is not None:
2210 ifname = str(ifname).encode('utf-8')
2211 self._write_packet(
2212 rawpkt,
2213 sec=f_sec, usec=usec,
2214 caplen=caplen, wirelen=wirelen,
2215 comment=comment,
2216 ifname=ifname,
2217 direction=direction,
2218 linktype=linktype
2219 )
2222class GenericRawPcapWriter(GenericPcapWriter):
2223 header_present = False
2224 nano = False
2225 sync = False
2226 f = None # type: Union[IO[bytes], gzip.GzipFile]
2228 def fileno(self):
2229 # type: () -> int
2230 return -1 if WINDOWS else self.f.fileno()
2232 def flush(self):
2233 # type: () -> Optional[Any]
2234 return self.f.flush()
2236 def close(self):
2237 # type: () -> Optional[Any]
2238 if not self.header_present:
2239 self.write_header(None)
2240 return self.f.close()
2242 def __enter__(self):
2243 # type: () -> GenericRawPcapWriter
2244 return self
2246 def __exit__(self, exc_type, exc_value, tracback):
2247 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
2248 self.flush()
2249 self.close()
2251 def write(self, pkt):
2252 # type: (Union[_PacketIterable, bytes]) -> None
2253 """
2254 Writes a Packet, a SndRcvList object, or bytes to a pcap file.
2256 :param pkt: Packet(s) to write (one record for each Packet), or raw
2257 bytes to write (as one record).
2258 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes
2259 """
2260 if isinstance(pkt, bytes):
2261 if not self.header_present:
2262 self.write_header(pkt)
2263 self.write_packet(pkt)
2264 else:
2265 # Import here to avoid circular dependency
2266 from scapy.supersocket import IterSocket
2267 for p in IterSocket(pkt).iter:
2268 if not self.header_present:
2269 self.write_header(p)
2271 if not isinstance(p, bytes) and \
2272 self.linktype != conf.l2types.get(type(p), None):
2273 warning("Inconsistent linktypes detected!"
2274 " The resulting file might contain"
2275 " invalid packets."
2276 )
2278 self.write_packet(p)
2281class RawPcapWriter(GenericRawPcapWriter):
2282 """A stream PCAP writer with more control than wrpcap()"""
2284 def __init__(self,
2285 filename, # type: Union[IO[bytes], str]
2286 linktype=None, # type: Optional[int]
2287 gz=False, # type: bool
2288 endianness="", # type: str
2289 append=False, # type: bool
2290 sync=False, # type: bool
2291 nano=False, # type: bool
2292 snaplen=MTU, # type: int
2293 bufsz=4096, # type: int
2294 ):
2295 # type: (...) -> None
2296 """
2297 :param filename: the name of the file to write packets to, or an open,
2298 writable file-like object.
2299 :param linktype: force linktype to a given value. If None, linktype is
2300 taken from the first writer packet
2301 :param gz: compress the capture on the fly
2302 :param endianness: force an endianness (little:"<", big:">").
2303 Default is native
2304 :param append: append packets to the capture file instead of
2305 truncating it
2306 :param sync: do not bufferize writes to the capture file
2307 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0)
2309 """
2311 if linktype:
2312 self.linktype = linktype
2313 self.snaplen = snaplen
2314 self.append = append
2315 self.gz = gz
2316 self.endian = endianness
2317 self.sync = sync
2318 self.nano = nano
2319 if sync:
2320 bufsz = 0
2322 if isinstance(filename, str):
2323 self.filename = filename
2324 if gz:
2325 self.f = cast(_ByteStream, gzip.open(
2326 filename, append and "ab" or "wb", 9
2327 ))
2328 else:
2329 self.f = open(filename, append and "ab" or "wb", bufsz)
2330 else:
2331 self.f = filename
2332 self.filename = getattr(filename, "name", "No name")
2334 def _write_header(self, pkt):
2335 # type: (Optional[Union[Packet, bytes]]) -> None
2336 self.header_present = True
2338 if self.append:
2339 # Even if prone to race conditions, this seems to be
2340 # safest way to tell whether the header is already present
2341 # because we have to handle compressed streams that
2342 # are not as flexible as basic files
2343 if self.gz:
2344 g = gzip.open(self.filename, "rb") # type: _ByteStream
2345 else:
2346 g = open(self.filename, "rb")
2347 try:
2348 if g.read(16):
2349 return
2350 finally:
2351 g.close()
2353 if not hasattr(self, 'linktype'):
2354 raise ValueError(
2355 "linktype could not be guessed. "
2356 "Please pass a linktype while creating the writer"
2357 )
2359 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501
2360 2, 4, 0, 0, self.snaplen, self.linktype))
2361 self.f.flush()
2363 def _write_packet(self,
2364 packet, # type: Union[bytes, Packet]
2365 linktype, # type: int
2366 sec=None, # type: Optional[float]
2367 usec=None, # type: Optional[int]
2368 caplen=None, # type: Optional[int]
2369 wirelen=None, # type: Optional[int]
2370 comment=None, # type: Optional[bytes]
2371 ifname=None, # type: Optional[bytes]
2372 direction=None, # type: Optional[int]
2373 ):
2374 # type: (...) -> None
2375 """
2376 Writes a single packet to the pcap file.
2378 :param packet: bytes for a single packet
2379 :type packet: bytes
2380 :param linktype: linktype value associated with the packet
2381 :type linktype: int
2382 :param sec: time the packet was captured, in seconds since epoch. If
2383 not supplied, defaults to now.
2384 :type sec: float
2385 :param usec: not used with pcapng
2386 packet was captured
2387 :type usec: int or long
2388 :param caplen: The length of the packet in the capture file. If not
2389 specified, uses ``len(packet)``.
2390 :type caplen: int
2391 :param wirelen: The length of the packet on the wire. If not
2392 specified, uses ``caplen``.
2393 :type wirelen: int
2394 :return: None
2395 :rtype: None
2396 """
2397 if caplen is None:
2398 caplen = len(packet)
2399 if wirelen is None:
2400 wirelen = caplen
2401 if sec is None or usec is None:
2402 t = time.time()
2403 it = int(t)
2404 if sec is None:
2405 sec = it
2406 usec = int(round((t - it) *
2407 (1000000000 if self.nano else 1000000)))
2408 elif usec is None:
2409 usec = 0
2411 self.f.write(struct.pack(self.endian + "IIII",
2412 int(sec), usec, caplen, wirelen))
2413 self.f.write(bytes(packet))
2414 if self.sync:
2415 self.f.flush()
2418class RawPcapNgWriter(GenericRawPcapWriter):
2419 """A stream pcapng writer with more control than wrpcapng()"""
2421 def __init__(self,
2422 filename, # type: str
2423 ):
2424 # type: (...) -> None
2426 self.header_present = False
2427 self.tsresol = 1000000
2428 # A dict to keep if_name to IDB id mapping.
2429 # unknown if_name(None) id=0
2430 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0}
2432 # tcpdump only support little-endian in PCAPng files
2433 self.endian = "<"
2434 self.endian_magic = b"\x4d\x3c\x2b\x1a"
2436 self.filename = filename
2437 self.f = open(filename, "wb", 4096)
2439 def _get_time(self,
2440 packet, # type: Union[bytes, Packet]
2441 sec, # type: Optional[float]
2442 usec # type: Optional[int]
2443 ):
2444 # type: (...) -> Tuple[float, int]
2445 if hasattr(packet, "time"):
2446 if sec is None:
2447 sec = float(packet.time)
2449 if usec is None:
2450 usec = 0
2452 return sec, usec # type: ignore
2454 def _add_padding(self, raw_data):
2455 # type: (bytes) -> bytes
2456 raw_data += ((-len(raw_data)) % 4) * b"\x00"
2457 return raw_data
2459 def build_block(self, block_type, block_body, options=None):
2460 # type: (bytes, bytes, Optional[bytes]) -> bytes
2462 # Pad Block Body to 32 bits
2463 block_body = self._add_padding(block_body)
2465 if options:
2466 block_body += options
2468 # An empty block is 12 bytes long
2469 block_total_length = 12 + len(block_body)
2471 # Block Type
2472 block = block_type
2473 # Block Total Length$
2474 block += struct.pack(self.endian + "I", block_total_length)
2475 # Block Body
2476 block += block_body
2477 # Block Total Length$
2478 block += struct.pack(self.endian + "I", block_total_length)
2480 return block
2482 def _write_header(self, pkt):
2483 # type: (Optional[Union[Packet, bytes]]) -> None
2484 if not self.header_present:
2485 self.header_present = True
2486 self._write_block_shb()
2487 self._write_block_idb(linktype=self.linktype)
2489 def _write_block_shb(self):
2490 # type: () -> None
2492 # Block Type
2493 block_type = b"\x0A\x0D\x0D\x0A"
2494 # Byte-Order Magic
2495 block_shb = self.endian_magic
2496 # Major Version
2497 block_shb += struct.pack(self.endian + "H", 1)
2498 # Minor Version
2499 block_shb += struct.pack(self.endian + "H", 0)
2500 # Section Length
2501 block_shb += struct.pack(self.endian + "q", -1)
2503 self.f.write(self.build_block(block_type, block_shb))
2505 def _write_block_idb(self,
2506 linktype, # type: int
2507 ifname=None # type: Optional[bytes]
2508 ):
2509 # type: (...) -> None
2511 # Block Type
2512 block_type = struct.pack(self.endian + "I", 1)
2513 # LinkType
2514 block_idb = struct.pack(self.endian + "H", linktype)
2515 # Reserved
2516 block_idb += struct.pack(self.endian + "H", 0)
2517 # SnapLen
2518 block_idb += struct.pack(self.endian + "I", 262144)
2520 # if_name option
2521 opts = None
2522 if ifname is not None:
2523 opts = struct.pack(self.endian + "HH", 2, len(ifname))
2524 # Pad Option Value to 32 bits
2525 opts += self._add_padding(ifname)
2526 opts += struct.pack(self.endian + "HH", 0, 0)
2528 self.f.write(self.build_block(block_type, block_idb, options=opts))
2530 def _write_block_spb(self, raw_pkt):
2531 # type: (bytes) -> None
2533 # Block Type
2534 block_type = struct.pack(self.endian + "I", 3)
2535 # Original Packet Length
2536 block_spb = struct.pack(self.endian + "I", len(raw_pkt))
2537 # Packet Data
2538 block_spb += raw_pkt
2540 self.f.write(self.build_block(block_type, block_spb))
2542 def _write_block_epb(self,
2543 raw_pkt, # type: bytes
2544 ifid, # type: int
2545 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501
2546 caplen=None, # type: Optional[int]
2547 orglen=None, # type: Optional[int]
2548 comment=None, # type: Optional[bytes]
2549 flags=None, # type: Optional[int]
2550 ):
2551 # type: (...) -> None
2553 if timestamp:
2554 tmp_ts = int(timestamp * self.tsresol)
2555 ts_high = tmp_ts >> 32
2556 ts_low = tmp_ts & 0xFFFFFFFF
2557 else:
2558 ts_high = ts_low = 0
2560 if not caplen:
2561 caplen = len(raw_pkt)
2563 if not orglen:
2564 orglen = len(raw_pkt)
2566 # Block Type
2567 block_type = struct.pack(self.endian + "I", 6)
2568 # Interface ID
2569 block_epb = struct.pack(self.endian + "I", ifid)
2570 # Timestamp (High)
2571 block_epb += struct.pack(self.endian + "I", ts_high)
2572 # Timestamp (Low)
2573 block_epb += struct.pack(self.endian + "I", ts_low)
2574 # Captured Packet Length
2575 block_epb += struct.pack(self.endian + "I", caplen)
2576 # Original Packet Length
2577 block_epb += struct.pack(self.endian + "I", orglen)
2578 # Packet Data
2579 block_epb += raw_pkt
2581 # Options
2582 opts = b''
2583 if comment is not None:
2584 comment = bytes_encode(comment)
2585 opts += struct.pack(self.endian + "HH", 1, len(comment))
2586 # Pad Option Value to 32 bits
2587 opts += self._add_padding(comment)
2588 if type(flags) == int:
2589 opts += struct.pack(self.endian + "HH", 2, 4)
2590 opts += struct.pack(self.endian + "I", flags)
2591 if opts:
2592 opts += struct.pack(self.endian + "HH", 0, 0)
2594 self.f.write(self.build_block(block_type, block_epb,
2595 options=opts))
2597 def _write_packet(self, # type: ignore
2598 packet, # type: bytes
2599 linktype, # type: int
2600 sec=None, # type: Optional[float]
2601 usec=None, # type: Optional[int]
2602 caplen=None, # type: Optional[int]
2603 wirelen=None, # type: Optional[int]
2604 comment=None, # type: Optional[bytes]
2605 ifname=None, # type: Optional[bytes]
2606 direction=None, # type: Optional[int]
2607 ):
2608 # type: (...) -> None
2609 """
2610 Writes a single packet to the pcap file.
2612 :param packet: bytes for a single packet
2613 :type packet: bytes
2614 :param linktype: linktype value associated with the packet
2615 :type linktype: int
2616 :param sec: time the packet was captured, in seconds since epoch. If
2617 not supplied, defaults to now.
2618 :type sec: float
2619 :param caplen: The length of the packet in the capture file. If not
2620 specified, uses ``len(packet)``.
2621 :type caplen: int
2622 :param wirelen: The length of the packet on the wire. If not
2623 specified, uses ``caplen``.
2624 :type wirelen: int
2625 :param comment: UTF-8 string containing human-readable comment text
2626 that is associated to the current block. Line separators
2627 SHOULD be a carriage-return + linefeed ('\r\n') or
2628 just linefeed ('\n'); either form may appear and
2629 be considered a line separator. The string is not
2630 zero-terminated.
2631 :type bytes
2632 :param ifname: UTF-8 string containing the
2633 name of the device used to capture data.
2634 The string is not zero-terminated.
2635 :type bytes
2636 :param direction: 0 = information not available,
2637 1 = inbound,
2638 2 = outbound
2639 :type int
2640 :return: None
2641 :rtype: None
2642 """
2643 if caplen is None:
2644 caplen = len(packet)
2645 if wirelen is None:
2646 wirelen = caplen
2648 ifid = self.interfaces2id.get(ifname, None)
2649 if ifid is None:
2650 ifid = max(self.interfaces2id.values()) + 1
2651 self.interfaces2id[ifname] = ifid
2652 self._write_block_idb(linktype=linktype, ifname=ifname)
2654 # EPB flags (32 bits).
2655 # currently only direction is implemented (least 2 significant bits)
2656 if type(direction) == int:
2657 flags = direction & 0x3
2658 else:
2659 flags = None
2661 self._write_block_epb(packet, timestamp=sec, caplen=caplen,
2662 orglen=wirelen, comment=comment, ifid=ifid, flags=flags)
2663 if self.sync:
2664 self.f.flush()
2667class PcapWriter(RawPcapWriter):
2668 """A stream PCAP writer with more control than wrpcap()"""
2669 pass
2672class PcapNgWriter(RawPcapNgWriter):
2673 """A stream pcapng writer with more control than wrpcapng()"""
2675 def _get_time(self,
2676 packet, # type: Union[bytes, Packet]
2677 sec, # type: Optional[float]
2678 usec # type: Optional[int]
2679 ):
2680 # type: (...) -> Tuple[float, int]
2681 if hasattr(packet, "time"):
2682 if sec is None:
2683 sec = float(packet.time)
2685 if usec is None:
2686 usec = 0
2688 return sec, usec # type: ignore
2691@conf.commands.register
2692def rderf(filename, count=-1):
2693 # type: (Union[IO[bytes], str], int) -> PacketList
2694 """Read a ERF file and return a packet list
2696 :param count: read only <count> packets
2697 """
2698 with ERFEthernetReader(filename) as fdesc:
2699 return fdesc.read_all(count=count)
2702class ERFEthernetReader_metaclass(PcapReader_metaclass):
2703 def __call__(cls, filename):
2704 # type: (Union[IO[bytes], str]) -> Any
2705 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore
2706 filename, fdesc = cls.open(filename)
2707 try:
2708 i.__init__(filename, fdesc)
2709 return i
2710 except (Scapy_Exception, EOFError):
2711 pass
2713 if "alternative" in cls.__dict__:
2714 cls = cls.__dict__["alternative"]
2715 i = cls.__new__(
2716 cls,
2717 cls.__name__,
2718 cls.__bases__,
2719 cls.__dict__ # type: ignore
2720 )
2721 try:
2722 i.__init__(filename, fdesc)
2723 return i
2724 except (Scapy_Exception, EOFError):
2725 pass
2727 raise Scapy_Exception("Not a supported capture file")
2729 @staticmethod
2730 def open(fname # type: ignore
2731 ):
2732 # type: (...) -> Tuple[str, _ByteStream]
2733 """Open (if necessary) filename"""
2734 if isinstance(fname, str):
2735 filename = fname
2736 try:
2737 with gzip.open(filename, "rb") as tmp:
2738 tmp.read(1)
2739 fdesc = gzip.open(filename, "rb") # type: _ByteStream
2740 except IOError:
2741 fdesc = open(filename, "rb")
2743 else:
2744 fdesc = fname
2745 filename = getattr(fdesc, "name", "No name")
2746 return filename, fdesc
2749class ERFEthernetReader(PcapReader,
2750 metaclass=ERFEthernetReader_metaclass):
2752 def __init__(self, filename, fdesc=None): # type: ignore
2753 # type: (Union[IO[bytes], str], IO[bytes]) -> None
2754 self.filename = filename # type: ignore
2755 self.f = fdesc
2756 self.power = Decimal(10) ** Decimal(-9)
2758 # time is in 64-bits Endace's format which can be see here:
2759 # https://www.endace.com/erf-extensible-record-format-types.pdf
2760 def _convert_erf_timestamp(self, t):
2761 # type: (int) -> EDecimal
2762 sec = t >> 32
2763 frac_sec = t & 0xffffffff
2764 frac_sec *= 10**9
2765 frac_sec += (frac_sec & 0x80000000) << 1
2766 frac_sec >>= 32
2767 return EDecimal(sec + self.power * frac_sec)
2769 # The details of ERF Packet format can be see here:
2770 # https://www.endace.com/erf-extensible-record-format-types.pdf
2771 def read_packet(self, size=MTU, **kwargs):
2772 # type: (int, **Any) -> Packet
2774 # General ERF Header have exactly 16 bytes
2775 hdr = self.f.read(16)
2776 if len(hdr) < 16:
2777 raise EOFError
2779 # The timestamp is in little-endian byte-order.
2780 time = struct.unpack('<Q', hdr[:8])[0]
2781 # The rest is in big-endian byte-order.
2782 # Ignoring flags and lctr (loss counter) since they are ERF specific
2783 # header fields which Packet object does not support.
2784 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:])
2785 # Check if the type != 0x02, type Ethernet
2786 if type & 0x02 == 0:
2787 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)")
2789 # If there are extended headers, ignore it because Packet object does
2790 # not support it. Extended headers size is 8 bytes before the payload.
2791 if type & 0x80:
2792 _ = self.f.read(8)
2793 s = self.f.read(rlen - 24)
2794 else:
2795 s = self.f.read(rlen - 16)
2797 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both
2798 # of the fields are disregarded by Endace.
2799 pb = s[2:size]
2800 from scapy.layers.l2 import Ether
2801 try:
2802 p = Ether(pb, **kwargs) # type: Packet
2803 except KeyboardInterrupt:
2804 raise
2805 except Exception:
2806 if conf.debug_dissector:
2807 from scapy.sendrecv import debug
2808 debug.crashed_on = (Ether, s)
2809 raise
2810 if conf.raw_layer is None:
2811 # conf.raw_layer is set on import
2812 import scapy.packet # noqa: F401
2813 p = conf.raw_layer(s)
2815 p.time = self._convert_erf_timestamp(time)
2816 p.wirelen = wlen
2818 return p
2821@conf.commands.register
2822def wrerf(filename, # type: Union[IO[bytes], str]
2823 pkt, # type: _PacketIterable
2824 *args, # type: Any
2825 **kargs # type: Any
2826 ):
2827 # type: (...) -> None
2828 """Write a list of packets to a ERF file
2830 :param filename: the name of the file to write packets to, or an open,
2831 writable file-like object. The file descriptor will be
2832 closed at the end of the call, so do not use an object you
2833 do not want to close (e.g., running wrerf(sys.stdout, [])
2834 in interactive mode will crash Scapy).
2835 :param gz: set to 1 to save a gzipped capture
2836 :param append: append packets to the capture file instead of
2837 truncating it
2838 :param sync: do not bufferize writes to the capture file
2839 """
2840 with ERFEthernetWriter(filename, *args, **kargs) as fdesc:
2841 fdesc.write(pkt)
2844class ERFEthernetWriter(PcapWriter):
2845 """A stream ERF Ethernet writer with more control than wrerf()"""
2847 def __init__(self,
2848 filename, # type: Union[IO[bytes], str]
2849 gz=False, # type: bool
2850 append=False, # type: bool
2851 sync=False, # type: bool
2852 ):
2853 # type: (...) -> None
2854 """
2855 :param filename: the name of the file to write packets to, or an open,
2856 writable file-like object.
2857 :param gz: compress the capture on the fly
2858 :param append: append packets to the capture file instead of
2859 truncating it
2860 :param sync: do not bufferize writes to the capture file
2861 """
2862 super(ERFEthernetWriter, self).__init__(filename,
2863 gz=gz,
2864 append=append,
2865 sync=sync)
2867 def write(self, pkt): # type: ignore
2868 # type: (_PacketIterable) -> None
2869 """
2870 Writes a Packet, a SndRcvList object, or bytes to a ERF file.
2872 :param pkt: Packet(s) to write (one record for each Packet)
2873 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet
2874 """
2875 # Import here to avoid circular dependency
2876 from scapy.supersocket import IterSocket
2877 for p in IterSocket(pkt).iter:
2878 self.write_packet(p)
2880 def write_packet(self, pkt): # type: ignore
2881 # type: (Packet) -> None
2883 if hasattr(pkt, "time"):
2884 sec = int(pkt.time)
2885 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9)
2886 t = (sec << 32) + usec
2887 else:
2888 t = int(time.time()) << 32
2890 # There are 16 bytes of headers + 2 bytes of padding before the packets
2891 # payload.
2892 rlen = len(pkt) + 18
2894 if hasattr(pkt, "wirelen"):
2895 wirelen = pkt.wirelen
2896 if wirelen is None:
2897 wirelen = rlen
2899 self.f.write(struct.pack("<Q", t))
2900 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0))
2901 self.f.write(bytes(pkt))
2902 self.f.flush()
2904 def close(self):
2905 # type: () -> Optional[Any]
2906 return self.f.close()
2909@conf.commands.register
2910def import_hexcap(input_string=None):
2911 # type: (Optional[str]) -> bytes
2912 """Imports a tcpdump like hexadecimal view
2914 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex"
2916 :param input_string: String containing the hexdump input to parse. If None,
2917 read from standard input.
2918 """
2919 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501
2920 p = ""
2921 try:
2922 if input_string:
2923 input_function = StringIO(input_string).readline
2924 else:
2925 input_function = input
2926 while True:
2927 line = input_function().strip()
2928 if not line:
2929 break
2930 try:
2931 p += re_extract_hexcap.match(line).groups()[2] # type: ignore
2932 except Exception:
2933 warning("Parsing error during hexcap")
2934 continue
2935 except EOFError:
2936 pass
2938 p = p.replace(" ", "")
2939 return hex_bytes(p)
2942@conf.commands.register
2943def wireshark(pktlist, wait=False, **kwargs):
2944 # type: (List[Packet], bool, **Any) -> Optional[Any]
2945 """
2946 Runs Wireshark on a list of packets.
2948 See :func:`tcpdump` for more parameter description.
2950 Note: this defaults to wait=False, to run Wireshark in the background.
2951 """
2952 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs)
2955@conf.commands.register
2956def tdecode(
2957 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable]
2958 args=None, # type: Optional[List[str]]
2959 **kwargs # type: Any
2960):
2961 # type: (...) -> Any
2962 """
2963 Run tshark on a list of packets.
2965 :param args: If not specified, defaults to ``tshark -V``.
2967 See :func:`tcpdump` for more parameters.
2968 """
2969 if args is None:
2970 args = ["-V"]
2971 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs)
2974def _guess_linktype_name(value):
2975 # type: (int) -> str
2976 """Guess the DLT name from its value."""
2977 from scapy.libs.winpcapy import pcap_datalink_val_to_name
2978 return cast(bytes, pcap_datalink_val_to_name(value)).decode()
2981def _guess_linktype_value(name):
2982 # type: (str) -> int
2983 """Guess the value of a DLT name."""
2984 from scapy.libs.winpcapy import pcap_datalink_name_to_val
2985 val = cast(int, pcap_datalink_name_to_val(name.encode()))
2986 if val == -1:
2987 warning("Unknown linktype: %s. Using EN10MB", name)
2988 return DLT_EN10MB
2989 return val
2992@conf.commands.register
2993def tcpdump(
2994 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable]
2995 dump=False, # type: bool
2996 getfd=False, # type: bool
2997 args=None, # type: Optional[List[str]]
2998 flt=None, # type: Optional[str]
2999 prog=None, # type: Optional[Any]
3000 getproc=False, # type: bool
3001 quiet=False, # type: bool
3002 use_tempfile=None, # type: Optional[Any]
3003 read_stdin_opts=None, # type: Optional[Any]
3004 linktype=None, # type: Optional[Any]
3005 wait=True, # type: bool
3006 _suppress=False # type: bool
3007):
3008 # type: (...) -> Any
3009 """Run tcpdump or tshark on a list of packets.
3011 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a
3012 temporary file to store the packets. This works around a bug in Apple's
3013 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/
3015 Otherwise, the packets are passed in stdin.
3017 This function can be explicitly enabled or disabled with the
3018 ``use_tempfile`` parameter.
3020 When using ``wireshark``, it will be called with ``-ki -`` to start
3021 immediately capturing packets from stdin.
3023 Otherwise, the command will be run with ``-r -`` (which is correct for
3024 ``tcpdump`` and ``tshark``).
3026 This can be overridden with ``read_stdin_opts``. This has no effect when
3027 ``use_tempfile=True``, or otherwise reading packets from a regular file.
3029 :param pktlist: a Packet instance, a PacketList instance or a list of
3030 Packet instances. Can also be a filename (as a string), an open
3031 file-like object that must be a file format readable by
3032 tshark (Pcap, PcapNg, etc.) or None (to sniff)
3033 :param flt: a filter to use with tcpdump
3034 :param dump: when set to True, returns a string instead of displaying it.
3035 :param getfd: when set to True, returns a file-like object to read data
3036 from tcpdump or tshark from.
3037 :param getproc: when set to True, the subprocess.Popen object is returned
3038 :param args: arguments (as a list) to pass to tshark (example for tshark:
3039 args=["-T", "json"]).
3040 :param prog: program to use (defaults to tcpdump, will work with tshark)
3041 :param quiet: when set to True, the process stderr is discarded
3042 :param use_tempfile: When set to True, always use a temporary file to store
3043 packets.
3044 When set to False, pipe packets through stdin.
3045 When set to None (default), only use a temporary file with
3046 ``tcpdump`` on OSX.
3047 :param read_stdin_opts: When set, a list of arguments needed to capture
3048 from stdin. Otherwise, attempts to guess.
3049 :param linktype: A custom DLT value or name, to overwrite the default
3050 values.
3051 :param wait: If True (default), waits for the process to terminate before
3052 returning to Scapy. If False, the process will be detached to the
3053 background. If dump, getproc or getfd is True, these have the same
3054 effect as ``wait=False``.
3056 Examples::
3058 >>> tcpdump([IP()/TCP(), IP()/UDP()])
3059 reading from file -, link-type RAW (Raw IP)
3060 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501
3061 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]
3063 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
3064 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501
3065 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0
3067 To get a JSON representation of a tshark-parsed PacketList(), one can::
3069 >>> import json, pprint
3070 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5",
3071 ... dst="45.33.32.156"),
3072 ... prog=conf.prog.tshark,
3073 ... args=["-T", "json"],
3074 ... getfd=True))
3075 >>> pprint.pprint(json_data)
3076 [{u'_index': u'packets-2016-12-23',
3077 u'_score': None,
3078 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
3079 u'frame.encap_type': u'7',
3080 [...]
3081 },
3082 u'ip': {u'ip.addr': u'45.33.32.156',
3083 u'ip.checksum': u'0x0000a20d',
3084 [...]
3085 u'ip.ttl': u'64',
3086 u'ip.version': u'4'},
3087 u'raw': u'Raw packet data'}},
3088 u'_type': u'pcap_file'}]
3089 >>> json_data[0]['_source']['layers']['ip']['ip.ttl']
3090 u'64'
3091 """
3092 getfd = getfd or getproc
3093 if prog is None:
3094 if not conf.prog.tcpdump:
3095 raise Scapy_Exception(
3096 "tcpdump is not available"
3097 )
3098 prog = [conf.prog.tcpdump]
3099 elif isinstance(prog, str):
3100 prog = [prog]
3101 else:
3102 raise ValueError("prog must be a string")
3104 if linktype is not None:
3105 if isinstance(linktype, int):
3106 # Guess name from value
3107 try:
3108 linktype_name = _guess_linktype_name(linktype)
3109 except StopIteration:
3110 linktype = -1
3111 else:
3112 # Guess value from name
3113 if linktype.startswith("DLT_"):
3114 linktype = linktype[4:]
3115 linktype_name = linktype
3116 try:
3117 linktype = _guess_linktype_value(linktype)
3118 except KeyError:
3119 linktype = -1
3120 if linktype == -1:
3121 raise ValueError(
3122 "Unknown linktype. Try passing its datalink name instead"
3123 )
3124 prog += ["-y", linktype_name]
3126 # Build Popen arguments
3127 if args is None:
3128 args = []
3129 else:
3130 # Make a copy of args
3131 args = list(args)
3133 if flt is not None:
3134 # Check the validity of the filter
3135 if linktype is None and isinstance(pktlist, str):
3136 # linktype is unknown but required. Read it from file
3137 with PcapReader(pktlist) as rd:
3138 if isinstance(rd, PcapNgReader):
3139 # Get the linktype from the first packet
3140 try:
3141 _, metadata = rd._read_packet()
3142 linktype = metadata.linktype
3143 if OPENBSD and linktype == 228:
3144 linktype = DLT_RAW
3145 except EOFError:
3146 raise ValueError(
3147 "Cannot get linktype from a PcapNg packet."
3148 )
3149 else:
3150 linktype = rd.linktype
3151 from scapy.arch.common import compile_filter
3152 compile_filter(flt, linktype=linktype)
3153 args.append(flt)
3155 stdout = subprocess.PIPE if dump or getfd else None
3156 stderr = open(os.devnull) if quiet else None
3157 proc = None
3159 if use_tempfile is None:
3160 # Apple's tcpdump cannot read from stdin, see:
3161 # http://apple.stackexchange.com/questions/152682/
3162 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump
3164 if read_stdin_opts is None:
3165 if prog[0] == conf.prog.wireshark:
3166 # Start capturing immediately (-k) from stdin (-i -)
3167 read_stdin_opts = ["-ki", "-"]
3168 elif prog[0] == conf.prog.tcpdump and not OPENBSD:
3169 # Capture in packet-buffered mode (-U) from stdin (-r -)
3170 read_stdin_opts = ["-U", "-r", "-"]
3171 else:
3172 read_stdin_opts = ["-r", "-"]
3173 else:
3174 # Make a copy of read_stdin_opts
3175 read_stdin_opts = list(read_stdin_opts)
3177 if pktlist is None:
3178 # sniff
3179 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3180 proc = subprocess.Popen(
3181 prog + args,
3182 stdout=stdout,
3183 stderr=stderr,
3184 )
3185 elif isinstance(pktlist, str):
3186 # file
3187 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3188 proc = subprocess.Popen(
3189 prog + ["-r", pktlist] + args,
3190 stdout=stdout,
3191 stderr=stderr,
3192 )
3193 elif use_tempfile:
3194 tmpfile = get_temp_file( # type: ignore
3195 autoext=".pcap",
3196 fd=True
3197 ) # type: IO[bytes]
3198 try:
3199 tmpfile.writelines(
3200 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3201 )
3202 except AttributeError:
3203 pktlist = cast("_PacketIterable", pktlist)
3204 wrpcap(tmpfile, pktlist, linktype=linktype)
3205 else:
3206 tmpfile.close()
3207 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3208 proc = subprocess.Popen(
3209 prog + ["-r", tmpfile.name] + args,
3210 stdout=stdout,
3211 stderr=stderr,
3212 )
3213 else:
3214 try:
3215 pktlist.fileno() # type: ignore
3216 # pass the packet stream
3217 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3218 proc = subprocess.Popen(
3219 prog + read_stdin_opts + args,
3220 stdin=pktlist, # type: ignore
3221 stdout=stdout,
3222 stderr=stderr,
3223 )
3224 except (AttributeError, ValueError):
3225 # write the packet stream to stdin
3226 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3227 proc = subprocess.Popen(
3228 prog + read_stdin_opts + args,
3229 stdin=subprocess.PIPE,
3230 stdout=stdout,
3231 stderr=stderr,
3232 )
3233 if proc is None:
3234 # An error has occurred
3235 return
3236 try:
3237 proc.stdin.writelines( # type: ignore
3238 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3239 )
3240 except AttributeError:
3241 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore
3242 except UnboundLocalError:
3243 # The error was handled by ContextManagerSubprocess
3244 pass
3245 else:
3246 proc.stdin.close() # type: ignore
3247 if proc is None:
3248 # An error has occurred
3249 return
3250 if dump:
3251 data = b"".join(
3252 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore
3253 )
3254 proc.terminate()
3255 return data
3256 if getproc:
3257 return proc
3258 if getfd:
3259 return proc.stdout
3260 if wait:
3261 proc.wait()
3264@conf.commands.register
3265def hexedit(pktlist):
3266 # type: (_PacketIterable) -> PacketList
3267 """Run hexedit on a list of packets, then return the edited packets."""
3268 f = get_temp_file()
3269 wrpcap(f, pktlist)
3270 with ContextManagerSubprocess(conf.prog.hexedit):
3271 subprocess.call([conf.prog.hexedit, f])
3272 rpktlist = rdpcap(f)
3273 os.unlink(f)
3274 return rpktlist
3277def get_terminal_width():
3278 # type: () -> Optional[int]
3279 """Get terminal width (number of characters) if in a window.
3281 Notice: this will try several methods in order to
3282 support as many terminals and OS as possible.
3283 """
3284 sizex = shutil.get_terminal_size(fallback=(0, 0))[0]
3285 if sizex != 0:
3286 return sizex
3287 # Backups
3288 if WINDOWS:
3289 from ctypes import windll, create_string_buffer
3290 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
3291 h = windll.kernel32.GetStdHandle(-12)
3292 csbi = create_string_buffer(22)
3293 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
3294 if res:
3295 (bufx, bufy, curx, cury, wattr,
3296 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501
3297 sizex = right - left + 1
3298 # sizey = bottom - top + 1
3299 return sizex
3300 return sizex
3301 # We have various methods
3302 # COLUMNS is set on some terminals
3303 try:
3304 sizex = int(os.environ['COLUMNS'])
3305 except Exception:
3306 pass
3307 if sizex:
3308 return sizex
3309 # We can query TIOCGWINSZ
3310 try:
3311 import fcntl
3312 import termios
3313 s = struct.pack('HHHH', 0, 0, 0, 0)
3314 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
3315 sizex = struct.unpack('HHHH', x)[1]
3316 except (IOError, ModuleNotFoundError):
3317 # If everything failed, return default terminal size
3318 sizex = 79
3319 return sizex
3322def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]]
3323 header, # type: List[Tuple[str, ...]]
3324 sortBy=0, # type: Optional[int]
3325 borders=False, # type: bool
3326 ):
3327 # type: (...) -> str
3328 """
3329 Pretty list to fit the terminal, and add header.
3331 :param rtlst: a list of tuples. each tuple contains a value which can
3332 be either a string or a list of string.
3333 :param sortBy: the column id (starting with 0) which will be used for
3334 ordering
3335 :param borders: whether to put borders on the table or not
3336 """
3337 if borders:
3338 _space = "|"
3339 else:
3340 _space = " "
3341 cols = len(header[0])
3342 # Windows has a fat terminal border
3343 _spacelen = len(_space) * (cols - 1) + int(WINDOWS)
3344 _croped = False
3345 if sortBy is not None:
3346 # Sort correctly
3347 rtlst.sort(key=lambda x: x[sortBy])
3348 # Resolve multi-values
3349 for i, line in enumerate(rtlst):
3350 ids = [] # type: List[int]
3351 values = [] # type: List[Union[str, List[str]]]
3352 for j, val in enumerate(line):
3353 if isinstance(val, list):
3354 ids.append(j)
3355 values.append(val or " ")
3356 if values:
3357 del rtlst[i]
3358 k = 0
3359 for ex_vals in zip_longest(*values, fillvalue=" "):
3360 if k:
3361 extra_line = [" "] * cols
3362 else:
3363 extra_line = list(line) # type: ignore
3364 for j, h in enumerate(ids):
3365 extra_line[h] = ex_vals[j]
3366 rtlst.insert(i + k, tuple(extra_line))
3367 k += 1
3368 rtslst = cast(List[Tuple[str, ...]], rtlst)
3369 # Append tag
3370 rtslst = header + rtslst
3371 # Detect column's width
3372 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)]
3373 # Make text fit in box (if required)
3374 width = get_terminal_width()
3375 if conf.auto_crop_tables and width:
3376 width = width - _spacelen
3377 while sum(colwidth) > width:
3378 _croped = True
3379 # Needs to be cropped
3380 # Get the longest row
3381 i = colwidth.index(max(colwidth))
3382 # Get all elements of this row
3383 row = [len(x[i]) for x in rtslst]
3384 # Get biggest element of this row: biggest of the array
3385 j = row.index(max(row))
3386 # Re-build column tuple with the edited element
3387 t = list(rtslst[j])
3388 t[i] = t[i][:-2] + "_"
3389 rtslst[j] = tuple(t)
3390 # Update max size
3391 row[j] = len(t[i])
3392 colwidth[i] = max(row)
3393 if _croped:
3394 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501
3395 # Generate padding scheme
3396 fmt = _space.join(["%%-%ds" % x for x in colwidth])
3397 # Append separation line if needed
3398 if borders:
3399 rtslst.insert(1, tuple("-" * x for x in colwidth))
3400 # Compile
3401 return "\n".join(fmt % x for x in rtslst)
3404def human_size(x, fmt=".1f"):
3405 # type: (int, str) -> str
3406 """
3407 Convert a size in octets to a human string representation
3408 """
3409 units = ['K', 'M', 'G', 'T', 'P', 'E']
3410 if not x:
3411 return "0B"
3412 i = int(math.log(x, 2**10))
3413 if i and i < len(units):
3414 return format(x / 2**(10 * i), fmt) + units[i - 1]
3415 return str(x) + "B"
3418def __make_table(
3419 yfmtfunc, # type: Callable[[int], str]
3420 fmtfunc, # type: Callable[[int], str]
3421 endline, # type: str
3422 data, # type: List[Tuple[Packet, Packet]]
3423 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]]
3424 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3425 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3426 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]]
3427 dump=False # type: bool
3428):
3429 # type: (...) -> Optional[str]
3430 """Core function of the make_table suite, which generates the table"""
3431 vx = {} # type: Dict[str, int]
3432 vy = {} # type: Dict[str, Optional[int]]
3433 vz = {} # type: Dict[Tuple[str, str], str]
3434 vxf = {} # type: Dict[str, str]
3436 tmp_len = 0
3437 for e in data:
3438 xx, yy, zz = [str(s) for s in fxyz(*e)]
3439 tmp_len = max(len(yy), tmp_len)
3440 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz))
3441 vy[yy] = None
3442 vz[(xx, yy)] = zz
3444 vxk = list(vx)
3445 vyk = list(vy)
3446 if sortx:
3447 vxk.sort(key=sortx)
3448 else:
3449 try:
3450 vxk.sort(key=int)
3451 except Exception:
3452 try:
3453 vxk.sort(key=atol)
3454 except Exception:
3455 vxk.sort()
3456 if sorty:
3457 vyk.sort(key=sorty)
3458 else:
3459 try:
3460 vyk.sort(key=int)
3461 except Exception:
3462 try:
3463 vyk.sort(key=atol)
3464 except Exception:
3465 vyk.sort()
3467 s = ""
3468 if seplinefunc:
3469 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk])
3470 s += sepline + "\n"
3472 fmt = yfmtfunc(tmp_len)
3473 s += fmt % ""
3474 s += ' '
3475 for x in vxk:
3476 vxf[x] = fmtfunc(vx[x])
3477 s += vxf[x] % x
3478 s += ' '
3479 s += endline + "\n"
3480 if seplinefunc:
3481 s += sepline + "\n"
3482 for y in vyk:
3483 s += fmt % y
3484 s += ' '
3485 for x in vxk:
3486 s += vxf[x] % vz.get((x, y), "-")
3487 s += ' '
3488 s += endline + "\n"
3489 if seplinefunc:
3490 s += sepline + "\n"
3492 if dump:
3493 return s
3494 else:
3495 print(s, end="")
3496 return None
3499def make_table(*args, **kargs):
3500 # type: (*Any, **Any) -> Optional[Any]
3501 return __make_table(
3502 lambda l: "%%-%is" % l,
3503 lambda l: "%%-%is" % l,
3504 "",
3505 *args,
3506 **kargs
3507 )
3510def make_lined_table(*args, **kargs):
3511 # type: (*Any, **Any) -> Optional[str]
3512 return __make_table( # type: ignore
3513 lambda l: "%%-%is |" % l,
3514 lambda l: "%%-%is |" % l,
3515 "",
3516 *args,
3517 seplinefunc=lambda a, x: "+".join(
3518 '-' * (y + 2) for y in [a - 1] + x + [-2]
3519 ),
3520 **kargs
3521 )
3524def make_tex_table(*args, **kargs):
3525 # type: (*Any, **Any) -> Optional[str]
3526 return __make_table( # type: ignore
3527 lambda l: "%s",
3528 lambda l: "& %s",
3529 "\\\\",
3530 *args,
3531 seplinefunc=lambda a, x: "\\hline",
3532 **kargs
3533 )
3535####################
3536# WHOIS CLIENT #
3537####################
3540def whois(ip_address):
3541 # type: (str) -> bytes
3542 """Whois client for Python"""
3543 whois_ip = str(ip_address)
3544 try:
3545 query = socket.gethostbyname(whois_ip)
3546 except Exception:
3547 query = whois_ip
3548 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3549 s.connect(("whois.ripe.net", 43))
3550 s.send(query.encode("utf8") + b"\r\n")
3551 answer = b""
3552 while True:
3553 d = s.recv(4096)
3554 answer += d
3555 if not d:
3556 break
3557 s.close()
3558 ignore_tag = b"remarks:"
3559 # ignore all lines starting with the ignore_tag
3560 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501
3561 # remove empty lines at the bottom
3562 for i in range(1, len(lines)):
3563 if not lines[-i].strip():
3564 del lines[-i]
3565 else:
3566 break
3567 return b"\n".join(lines[3:])
3569####################
3570# CLI utils #
3571####################
3574class _CLIUtilMetaclass(type):
3575 class TYPE(enum.Enum):
3576 COMMAND = 0
3577 OUTPUT = 1
3578 COMPLETE = 2
3580 def __new__(cls, # type: Type[_CLIUtilMetaclass]
3581 name, # type: str
3582 bases, # type: Tuple[type, ...]
3583 dct # type: Dict[str, Any]
3584 ):
3585 # type: (...) -> Type[CLIUtil]
3586 dct["commands"] = {
3587 x.__name__: x
3588 for x in dct.values()
3589 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMMAND
3590 }
3591 dct["commands_output"] = {
3592 x.cliutil_ref.__name__: x
3593 for x in dct.values()
3594 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.OUTPUT
3595 }
3596 dct["commands_complete"] = {
3597 x.cliutil_ref.__name__: x
3598 for x in dct.values()
3599 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMPLETE
3600 }
3601 newcls = cast(Type['CLIUtil'], type.__new__(cls, name, bases, dct))
3602 return newcls
3605class CLIUtil(metaclass=_CLIUtilMetaclass):
3606 """
3607 Provides a Util class to easily create simple CLI tools in Scapy,
3608 that can still be used as an API.
3610 Doc:
3611 - override the ps1() function
3612 - register commands with the @CLIUtil.addcomment decorator
3613 - call the loop() function when ready
3614 """
3616 def _depcheck(self) -> None:
3617 """
3618 Check that all dependencies are installed
3619 """
3620 try:
3621 import prompt_toolkit # noqa: F401
3622 except ImportError:
3623 # okay we lie but prompt_toolkit is a dependency...
3624 raise ImportError("You need to have IPython installed to use the CLI")
3626 # Okay let's do nice code
3627 commands: Dict[str, Callable[..., Any]] = {}
3628 # print output of command
3629 commands_output: Dict[str, Callable[..., str]] = {}
3630 # provides completion to command
3631 commands_complete: Dict[str, Callable[..., List[str]]] = {}
3633 def __init__(self, cli: bool = True, debug: bool = False) -> None:
3634 """
3635 DEV: overwrite
3636 """
3637 if cli:
3638 self._depcheck()
3639 self.loop(debug=debug)
3641 @staticmethod
3642 def _inspectkwargs(func: DecoratorCallable) -> None:
3643 """
3644 Internal function to parse arguments from the kwargs of the functions
3645 """
3646 func._flagnames = [ # type: ignore
3647 x.name for x in
3648 inspect.signature(func).parameters.values()
3649 if x.kind == inspect.Parameter.KEYWORD_ONLY
3650 ]
3651 func._flags = [ # type: ignore
3652 ("-%s" % x) if len(x) == 1 else ("--%s" % x)
3653 for x in func._flagnames # type: ignore
3654 ]
3656 @staticmethod
3657 def _parsekwargs(
3658 func: DecoratorCallable,
3659 args: List[str]
3660 ) -> Tuple[List[str], Dict[str, Literal[True]]]:
3661 """
3662 Internal function to parse CLI arguments of a function.
3663 """
3664 kwargs: Dict[str, Literal[True]] = {}
3665 if func._flags: # type: ignore
3666 i = 0
3667 for arg in args:
3668 if arg in func._flags: # type: ignore
3669 i += 1
3670 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501
3671 continue
3672 break
3673 args = args[i:]
3674 return args, kwargs
3676 @classmethod
3677 def _parseallargs(
3678 cls,
3679 func: DecoratorCallable,
3680 cmd: str, args: List[str]
3681 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]:
3682 """
3683 Internal function to parse CLI arguments of both the function
3684 and its output function.
3685 """
3686 args, kwargs = cls._parsekwargs(func, args)
3687 outkwargs: Dict[str, Literal[True]] = {}
3688 if cmd in cls.commands_output:
3689 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args)
3690 return args, kwargs, outkwargs
3692 @classmethod
3693 def addcommand(
3694 cls,
3695 spaces: bool = False,
3696 globsupport: bool = False,
3697 ) -> Callable[[DecoratorCallable], DecoratorCallable]:
3698 """
3699 Decorator to register a command
3700 """
3701 def func(cmd: DecoratorCallable) -> DecoratorCallable:
3702 cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore
3703 cmd._spaces = spaces # type: ignore
3704 cmd._globsupport = globsupport # type: ignore
3705 cls._inspectkwargs(cmd)
3706 if cmd._globsupport and not cmd._spaces: # type: ignore
3707 raise ValueError("Cannot use globsupport without spaces.")
3708 return cmd
3709 return func
3711 @classmethod
3712 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3713 """
3714 Decorator to register a command output processor
3715 """
3716 def func(processor: DecoratorCallable) -> DecoratorCallable:
3717 processor.cliutil_type = _CLIUtilMetaclass.TYPE.OUTPUT # type: ignore
3718 processor.cliutil_ref = cmd # type: ignore
3719 cls._inspectkwargs(processor)
3720 return processor
3721 return func
3723 @classmethod
3724 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3725 """
3726 Decorator to register a command completor
3727 """
3728 def func(processor: DecoratorCallable) -> DecoratorCallable:
3729 processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore
3730 processor.cliutil_ref = cmd # type: ignore
3731 return processor
3732 return func
3734 def ps1(self) -> str:
3735 """
3736 Return the PS1 of the shell
3737 """
3738 return "> "
3740 def close(self) -> None:
3741 """
3742 Function called on exiting
3743 """
3744 print("Exited")
3746 def help(self, cmd: Optional[str] = None) -> None:
3747 """
3748 Return the help related to this CLI util
3749 """
3750 def _args(func: Any) -> str:
3751 flags = func._flags.copy()
3752 if func.__name__ in self.commands_output:
3753 flags += self.commands_output[func.__name__]._flags # type: ignore
3754 return " %s%s" % (
3755 (
3756 "%s " % " ".join("[%s]" % x for x in flags)
3757 if flags else ""
3758 ),
3759 " ".join(
3760 "<%s%s>" % (
3761 x.name,
3762 "?" if
3763 (x.default is None or x.default != inspect.Parameter.empty)
3764 else ""
3765 )
3766 for x in list(inspect.signature(func).parameters.values())[1:]
3767 if x.name not in func._flagnames and x.name[0] != "_"
3768 )
3769 )
3771 if cmd:
3772 if cmd not in self.commands:
3773 print("Unknown command '%s'" % cmd)
3774 return
3775 # help for one command
3776 func = self.commands[cmd]
3777 print("%s%s: %s" % (
3778 cmd,
3779 _args(func),
3780 func.__doc__ and func.__doc__.strip()
3781 ))
3782 else:
3783 header = "│ %s - Help │" % self.__class__.__name__
3784 print("┌" + "─" * (len(header) - 2) + "┐")
3785 print(header)
3786 print("└" + "─" * (len(header) - 2) + "┘")
3787 print(
3788 pretty_list(
3789 [
3790 (
3791 cmd,
3792 _args(func),
3793 func.__doc__ and func.__doc__.strip().split("\n")[0] or ""
3794 )
3795 for cmd, func in self.commands.items()
3796 ],
3797 [("Command", "Arguments", "Description")]
3798 )
3799 )
3801 def _completer(self) -> 'prompt_toolkit.completion.Completer':
3802 """
3803 Returns a prompt_toolkit custom completer
3804 """
3805 from prompt_toolkit.completion import Completer, Completion
3807 class CLICompleter(Completer):
3808 def get_completions(cmpl, document, complete_event): # type: ignore
3809 if not complete_event.completion_requested:
3810 # Only activate when the user does <TAB>
3811 return
3812 parts = document.text.split(" ")
3813 cmd = parts[0].lower()
3814 if cmd not in self.commands:
3815 # We are trying to complete the command
3816 for possible_cmd in (x for x in self.commands if x.startswith(cmd)):
3817 yield Completion(possible_cmd, start_position=-len(cmd))
3818 else:
3819 # We are trying to complete the command content
3820 if len(parts) == 1:
3821 return
3822 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:])
3823 arg = " ".join(args)
3824 if cmd in self.commands_complete:
3825 for possible_arg in self.commands_complete[cmd](self, arg):
3826 yield Completion(possible_arg, start_position=-len(arg))
3827 return
3828 return CLICompleter()
3830 def loop(self, debug: int = 0) -> None:
3831 """
3832 Main command handling loop
3833 """
3834 from prompt_toolkit import PromptSession
3835 session = PromptSession(completer=self._completer())
3837 while True:
3838 try:
3839 cmd = session.prompt(self.ps1()).strip()
3840 except KeyboardInterrupt:
3841 continue
3842 except EOFError:
3843 self.close()
3844 break
3845 args = cmd.split(" ")[1:]
3846 cmd = cmd.split(" ")[0].strip().lower()
3847 if not cmd:
3848 continue
3849 if cmd in ["help", "h", "?"]:
3850 self.help(" ".join(args))
3851 continue
3852 if cmd in "exit":
3853 break
3854 if cmd not in self.commands:
3855 print("Unknown command. Type help or ?")
3856 else:
3857 # check the number of arguments
3858 func = self.commands[cmd]
3859 args, kwargs, outkwargs = self._parseallargs(func, cmd, args)
3860 if func._spaces: # type: ignore
3861 args = [" ".join(args)]
3862 # if globsupport is set, we might need to do several calls
3863 if func._globsupport and "*" in args[0]: # type: ignore
3864 if args[0].count("*") > 1:
3865 print("More than 1 glob star (*) is currently unsupported.")
3866 continue
3867 before, after = args[0].split("*", 1)
3868 reg = re.compile(re.escape(before) + r".*" + after)
3869 calls = [
3870 [x] for x in
3871 self.commands_complete[cmd](self, before)
3872 if reg.match(x)
3873 ]
3874 else:
3875 calls = [args]
3876 else:
3877 calls = [args]
3878 # now iterate if required, call the function and print its output
3879 res = None
3880 for args in calls:
3881 try:
3882 res = func(self, *args, **kwargs)
3883 except TypeError:
3884 print("Bad number of arguments !")
3885 self.help(cmd=cmd)
3886 continue
3887 except Exception as ex:
3888 print("Command failed with error: %s" % ex)
3889 if debug:
3890 traceback.print_exception(ex)
3891 try:
3892 if res and cmd in self.commands_output:
3893 self.commands_output[cmd](self, res, **outkwargs)
3894 except Exception as ex:
3895 print("Output processor failed with error: %s" % ex)
3898def AutoArgparse(
3899 func: DecoratorCallable,
3900 _parseonly: bool = False,
3901) -> Optional[Tuple[List[str], List[str]]]:
3902 """
3903 Generate an Argparse call from a function, then call this function.
3905 Notes:
3907 - for the arguments to have a description, the sphinx docstring format
3908 must be used. See
3909 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html
3910 - the arguments must be typed in Python (we ignore Sphinx-specific types)
3911 untyped arguments are ignored.
3912 - only types that would be supported by argparse are supported. The others
3913 are omitted.
3914 """
3915 argsdoc = {}
3916 if func.__doc__:
3917 # Sphinx doc format parser
3918 m = re.match(
3919 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)",
3920 func.__doc__.strip(),
3921 )
3922 if not m:
3923 desc = func.__doc__.strip()
3924 else:
3925 desc = m.group(1)
3926 sphinxargs = re.findall(
3927 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)",
3928 m.group(2),
3929 )
3930 for argtype, argparam, argdesc in sphinxargs:
3931 argparam = argparam.strip()
3932 argdesc = argdesc.strip()
3933 if argtype == "param":
3934 if not argparam:
3935 raise ValueError(":param: without a name !")
3936 argsdoc[argparam] = argdesc
3937 else:
3938 desc = ""
3940 # Process the parameters
3941 positional = []
3942 noargument = []
3943 hexarguments = []
3944 parameters = {}
3945 for param in inspect.signature(func).parameters.values():
3946 if not param.annotation:
3947 continue
3948 noarg = False
3949 parname = param.name.replace("_", "-")
3950 paramkwargs: Dict[str, Any] = {}
3951 if param.annotation is bool:
3952 if param.default is True:
3953 parname = "no-" + parname
3954 paramkwargs["action"] = "store_false"
3955 else:
3956 paramkwargs["action"] = "store_true"
3957 noarg = True
3958 elif param.annotation is bytes:
3959 paramkwargs["type"] = str
3960 hexarguments.append(parname)
3961 elif param.annotation in [str, int, float]:
3962 paramkwargs["type"] = param.annotation
3963 else:
3964 continue
3965 if param.default != inspect.Parameter.empty:
3966 if param.kind == inspect.Parameter.POSITIONAL_ONLY:
3967 positional.append(param.name)
3968 paramkwargs["nargs"] = '?'
3969 else:
3970 parname = "--" + parname
3971 paramkwargs["default"] = param.default
3972 else:
3973 positional.append(param.name)
3974 if param.kind == inspect.Parameter.VAR_POSITIONAL:
3975 paramkwargs["action"] = "append"
3976 if param.name in argsdoc:
3977 paramkwargs["help"] = argsdoc[param.name]
3978 if param.annotation is bytes:
3979 paramkwargs["help"] = "(hex) " + paramkwargs["help"]
3980 elif param.annotation is bool:
3981 paramkwargs["help"] = "(flag) " + paramkwargs["help"]
3982 else:
3983 paramkwargs["help"] = (
3984 "(%s) " % param.annotation.__name__ + paramkwargs["help"]
3985 )
3986 # Add to the parameter list
3987 parameters[parname] = paramkwargs
3988 if noarg:
3989 noargument.append(parname)
3991 if _parseonly:
3992 # An internal mode used to generate bash autocompletion, do it then exit.
3993 return (
3994 [x for x in parameters if x not in positional] + ["--help"],
3995 [x for x in noargument if x not in positional] + ["--help"],
3996 )
3998 # Now build the argparse.ArgumentParser
3999 parser = argparse.ArgumentParser(
4000 prog=func.__name__,
4001 description=desc,
4002 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
4003 )
4005 # Add parameters to parser
4006 for parname, paramkwargs in parameters.items():
4007 parser.add_argument(parname, **paramkwargs)
4009 # Now parse the sys.argv parameters
4010 params = vars(parser.parse_args())
4012 # Convert hex parameters if provided
4013 for p in hexarguments:
4014 if params[p] is not None:
4015 try:
4016 params[p] = bytes.fromhex(params[p])
4017 except ValueError:
4018 print(
4019 conf.color_theme.fail(
4020 "ERROR: the value of parameter %s "
4021 "'%s' is not valid hexadecimal !" % (p, params[p])
4022 )
4023 )
4024 return None
4026 # Act as in interactive mode
4027 conf.logLevel = 20
4028 from scapy.themes import DefaultTheme
4029 conf.color_theme = DefaultTheme()
4030 # And call the function
4031 try:
4032 func(
4033 *[params.pop(x) for x in positional],
4034 **{
4035 (k[3:] if k.startswith("no_") else k): v
4036 for k, v in params.items()
4037 }
4038 )
4039 except AssertionError as ex:
4040 print(conf.color_theme.fail("ERROR: " + str(ex)))
4041 parser.print_help()
4042 return None
4045#######################
4046# PERIODIC SENDER #
4047#######################
4050class PeriodicSenderThread(threading.Thread):
4051 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True):
4052 # type: (Any, _PacketIterable, float, bool) -> None
4053 """ Thread to send packets periodically
4055 Args:
4056 sock: socket where packet is sent periodically
4057 pkt: packet or list of packets to send
4058 interval: interval between two packets
4059 """
4060 if not isinstance(pkt, list):
4061 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable
4062 else:
4063 self._pkts = pkt
4064 self._socket = sock
4065 self._stopped = threading.Event()
4066 self._enabled = threading.Event()
4067 self._enabled.set()
4068 self._interval = interval
4069 self._ignore_exceptions = ignore_exceptions
4070 threading.Thread.__init__(self)
4072 def enable(self):
4073 # type: () -> None
4074 self._enabled.set()
4076 def disable(self):
4077 # type: () -> None
4078 self._enabled.clear()
4080 def run(self):
4081 # type: () -> None
4082 while not self._stopped.is_set() and not self._socket.closed:
4083 for p in self._pkts:
4084 try:
4085 if self._enabled.is_set():
4086 self._socket.send(p)
4087 except (OSError, TimeoutError) as e:
4088 if self._ignore_exceptions:
4089 return
4090 else:
4091 raise e
4092 self._stopped.wait(timeout=self._interval)
4093 if self._stopped.is_set() or self._socket.closed:
4094 break
4096 def stop(self):
4097 # type: () -> None
4098 self._stopped.set()
4099 self.join(self._interval * 2)
4102class SingleConversationSocket(object):
4103 def __init__(self, o):
4104 # type: (Any) -> None
4105 self._inner = o
4106 self._tx_mutex = threading.RLock()
4108 @property
4109 def __dict__(self): # type: ignore
4110 return self._inner.__dict__
4112 def __getattr__(self, name):
4113 # type: (str) -> Any
4114 return getattr(self._inner, name)
4116 def sr1(self, *args, **kargs):
4117 # type: (*Any, **Any) -> Any
4118 with self._tx_mutex:
4119 return self._inner.sr1(*args, **kargs)
4121 def sr(self, *args, **kargs):
4122 # type: (*Any, **Any) -> Any
4123 with self._tx_mutex:
4124 return self._inner.sr(*args, **kargs)
4126 def send(self, x):
4127 # type: (Packet) -> Any
4128 with self._tx_mutex:
4129 try:
4130 return self._inner.send(x)
4131 except (ConnectionError, OSError) as e:
4132 self._inner.close()
4133 raise e