Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/utils.py: 34%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7General utility functions.
8"""
11from decimal import Decimal
12from io import StringIO
13from itertools import zip_longest
15import array
16import argparse
17import collections
18import decimal
19import difflib
20import gzip
21import inspect
22import locale
23import math
24import os
25import pickle
26import random
27import re
28import shutil
29import socket
30import struct
31import subprocess
32import sys
33import tempfile
34import threading
35import time
36import traceback
37import warnings
39from scapy.config import conf
40from scapy.consts import DARWIN, OPENBSD, WINDOWS
41from scapy.data import MTU, DLT_EN10MB, DLT_RAW
42from scapy.compat import (
43 orb,
44 plain_str,
45 chb,
46 bytes_base64,
47 base64_bytes,
48 hex_bytes,
49 bytes_encode,
50)
51from scapy.error import (
52 log_interactive,
53 log_runtime,
54 Scapy_Exception,
55 warning,
56)
57from scapy.pton_ntop import inet_pton
59# Typing imports
60from typing import (
61 cast,
62 Any,
63 AnyStr,
64 Callable,
65 Dict,
66 IO,
67 Iterator,
68 List,
69 Optional,
70 TYPE_CHECKING,
71 Tuple,
72 Type,
73 Union,
74 overload,
75)
76from scapy.compat import (
77 DecoratorCallable,
78 Literal,
79)
81if TYPE_CHECKING:
82 from scapy.packet import Packet
83 from scapy.plist import _PacketIterable, PacketList
84 from scapy.supersocket import SuperSocket
85 import prompt_toolkit
87_ByteStream = Union[IO[bytes], gzip.GzipFile]
89###########
90# Tools #
91###########
94def issubtype(x, # type: Any
95 t, # type: Union[type, str]
96 ):
97 # type: (...) -> bool
98 """issubtype(C, B) -> bool
100 Return whether C is a class and if it is a subclass of class B.
101 When using a tuple as the second argument issubtype(X, (A, B, ...)),
102 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.).
103 """
104 if isinstance(t, str):
105 return t in (z.__name__ for z in x.__bases__)
106 if isinstance(x, type) and issubclass(x, t):
107 return True
108 return False
111_Decimal = Union[Decimal, int]
114class EDecimal(Decimal):
115 """Extended Decimal
117 This implements arithmetic and comparison with float for
118 backward compatibility
119 """
121 def __add__(self, other, context=None):
122 # type: (_Decimal, Any) -> EDecimal
123 return EDecimal(Decimal.__add__(self, Decimal(other)))
125 def __radd__(self, other):
126 # type: (_Decimal) -> EDecimal
127 return EDecimal(Decimal.__add__(self, Decimal(other)))
129 def __sub__(self, other):
130 # type: (_Decimal) -> EDecimal
131 return EDecimal(Decimal.__sub__(self, Decimal(other)))
133 def __rsub__(self, other):
134 # type: (_Decimal) -> EDecimal
135 return EDecimal(Decimal.__rsub__(self, Decimal(other)))
137 def __mul__(self, other):
138 # type: (_Decimal) -> EDecimal
139 return EDecimal(Decimal.__mul__(self, Decimal(other)))
141 def __rmul__(self, other):
142 # type: (_Decimal) -> EDecimal
143 return EDecimal(Decimal.__mul__(self, Decimal(other)))
145 def __truediv__(self, other):
146 # type: (_Decimal) -> EDecimal
147 return EDecimal(Decimal.__truediv__(self, Decimal(other)))
149 def __floordiv__(self, other):
150 # type: (_Decimal) -> EDecimal
151 return EDecimal(Decimal.__floordiv__(self, Decimal(other)))
153 if sys.version_info >= (3,):
154 def __divmod__(self, other):
155 # type: (_Decimal) -> Tuple[EDecimal, EDecimal]
156 r = Decimal.__divmod__(self, Decimal(other))
157 return EDecimal(r[0]), EDecimal(r[1])
158 else:
159 def __div__(self, other):
160 # type: (_Decimal) -> EDecimal
161 return EDecimal(Decimal.__div__(self, Decimal(other)))
163 def __rdiv__(self, other):
164 # type: (_Decimal) -> EDecimal
165 return EDecimal(Decimal.__rdiv__(self, Decimal(other)))
167 def __mod__(self, other):
168 # type: (_Decimal) -> EDecimal
169 return EDecimal(Decimal.__mod__(self, Decimal(other)))
171 def __rmod__(self, other):
172 # type: (_Decimal) -> EDecimal
173 return EDecimal(Decimal.__rmod__(self, Decimal(other)))
175 def __pow__(self, other, modulo=None):
176 # type: (_Decimal, Optional[_Decimal]) -> EDecimal
177 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo))
179 def __eq__(self, other):
180 # type: (Any) -> bool
181 return super(EDecimal, self).__eq__(other) or float(self) == other
183 def normalize(self, precision): # type: ignore
184 # type: (int) -> EDecimal
185 with decimal.localcontext() as ctx:
186 ctx.prec = precision
187 return EDecimal(super(EDecimal, self).normalize(ctx))
190@overload
191def get_temp_file(keep, autoext, fd):
192 # type: (bool, str, Literal[True]) -> IO[bytes]
193 pass
196@overload
197def get_temp_file(keep=False, autoext="", fd=False):
198 # type: (bool, str, Literal[False]) -> str
199 pass
202def get_temp_file(keep=False, autoext="", fd=False):
203 # type: (bool, str, bool) -> Union[IO[bytes], str]
204 """Creates a temporary file.
206 :param keep: If False, automatically delete the file when Scapy exits.
207 :param autoext: Suffix to add to the generated file name.
208 :param fd: If True, this returns a file-like object with the temporary
209 file opened. If False (default), this returns a file path.
210 """
211 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
212 delete=False)
213 if not keep:
214 conf.temp_files.append(f.name)
216 if fd:
217 return f
218 else:
219 # Close the file so something else can take it.
220 f.close()
221 return f.name
224def get_temp_dir(keep=False):
225 # type: (bool) -> str
226 """Creates a temporary file, and returns its name.
228 :param keep: If False (default), the directory will be recursively
229 deleted when Scapy exits.
230 :return: A full path to a temporary directory.
231 """
233 dname = tempfile.mkdtemp(prefix="scapy")
235 if not keep:
236 conf.temp_files.append(dname)
238 return dname
241def _create_fifo() -> Tuple[str, Any]:
242 """Creates a temporary fifo.
244 You must then use open_fifo() on the server_fd once
245 the client is connected to use it.
247 :returns: (client_file, server_fd)
248 """
249 if WINDOWS:
250 from scapy.arch.windows.structures import _get_win_fifo
251 return _get_win_fifo()
252 else:
253 f = get_temp_file()
254 os.unlink(f)
255 os.mkfifo(f)
256 return f, f
259def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]:
260 """Open the server_fd (see create_fifo)
261 """
262 if WINDOWS:
263 from scapy.arch.windows.structures import _win_fifo_open
264 return _win_fifo_open(fd)
265 else:
266 return open(fd, mode)
269def sane(x, color=False):
270 # type: (AnyStr, bool) -> str
271 r = ""
272 for i in x:
273 j = orb(i)
274 if (j < 32) or (j >= 127):
275 if color:
276 r += conf.color_theme.not_printable(".")
277 else:
278 r += "."
279 else:
280 r += chr(j)
281 return r
284@conf.commands.register
285def restart():
286 # type: () -> None
287 """Restarts scapy"""
288 if not conf.interactive or not os.path.isfile(sys.argv[0]):
289 raise OSError("Scapy was not started from console")
290 if WINDOWS:
291 res_code = 1
292 try:
293 res_code = subprocess.call([sys.executable] + sys.argv)
294 finally:
295 os._exit(res_code)
296 os.execv(sys.executable, [sys.executable] + sys.argv)
299def lhex(x):
300 # type: (Any) -> str
301 from scapy.volatile import VolatileValue
302 if isinstance(x, VolatileValue):
303 return repr(x)
304 if isinstance(x, int):
305 return hex(x)
306 if isinstance(x, tuple):
307 return "(%s)" % ", ".join(lhex(v) for v in x)
308 if isinstance(x, list):
309 return "[%s]" % ", ".join(lhex(v) for v in x)
310 return str(x)
313@conf.commands.register
314def hexdump(p, dump=False):
315 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
316 """Build a tcpdump like hexadecimal view
318 :param p: a Packet
319 :param dump: define if the result must be printed or returned in a variable
320 :return: a String only when dump=True
321 """
322 s = ""
323 x = bytes_encode(p)
324 x_len = len(x)
325 i = 0
326 while i < x_len:
327 s += "%04x " % i
328 for j in range(16):
329 if i + j < x_len:
330 s += "%02X " % orb(x[i + j])
331 else:
332 s += " "
333 s += " %s\n" % sane(x[i:i + 16], color=True)
334 i += 16
335 # remove trailing \n
336 s = s[:-1] if s.endswith("\n") else s
337 if dump:
338 return s
339 else:
340 print(s)
341 return None
344@conf.commands.register
345def linehexdump(p, onlyasc=0, onlyhex=0, dump=False):
346 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str]
347 """Build an equivalent view of hexdump() on a single line
349 Note that setting both onlyasc and onlyhex to 1 results in a empty output
351 :param p: a Packet
352 :param onlyasc: 1 to display only the ascii view
353 :param onlyhex: 1 to display only the hexadecimal view
354 :param dump: print the view if False
355 :return: a String only when dump=True
356 """
357 s = ""
358 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump)
359 if dump:
360 return s
361 else:
362 print(s)
363 return None
366@conf.commands.register
367def chexdump(p, dump=False):
368 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
369 """Build a per byte hexadecimal representation
371 Example:
372 >>> chexdump(IP())
373 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501
375 :param p: a Packet
376 :param dump: print the view if False
377 :return: a String only if dump=True
378 """
379 x = bytes_encode(p)
380 s = ", ".join("%#04x" % orb(x) for x in x)
381 if dump:
382 return s
383 else:
384 print(s)
385 return None
388@conf.commands.register
389def hexstr(p, onlyasc=0, onlyhex=0, color=False):
390 # type: (Union[Packet, AnyStr], int, int, bool) -> str
391 """Build a fancy tcpdump like hex from bytes."""
392 x = bytes_encode(p)
393 s = []
394 if not onlyasc:
395 s.append(" ".join("%02X" % orb(b) for b in x))
396 if not onlyhex:
397 s.append(sane(x, color=color))
398 return " ".join(s)
401def repr_hex(s):
402 # type: (bytes) -> str
403 """ Convert provided bitstring to a simple string of hex digits """
404 return "".join("%02x" % orb(x) for x in s)
407@conf.commands.register
408def hexdiff(
409 a: Union['Packet', AnyStr],
410 b: Union['Packet', AnyStr],
411 algo: Optional[str] = None,
412 autojunk: bool = False,
413) -> None:
414 """
415 Show differences between 2 binary strings, Packets...
417 Available algorithms:
418 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the
419 Levenstein distance between the strings then backtrack.
420 - difflib: Use the difflib.SequenceMatcher implementation. This based on a
421 modified version of the Ratcliff and Obershelp algorithm.
422 This is much faster, but far less accurate.
423 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher
425 :param a:
426 :param b: The binary strings, packets... to compare
427 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'.
428 By default, this is chosen depending on the complexity, optimistically
429 preferring wagnerfischer unless really necessary.
430 :param autojunk: (difflib only) See difflib documentation.
431 """
432 xb = bytes_encode(a)
433 yb = bytes_encode(b)
435 if algo is None:
436 # Choose the best algorithm
437 complexity = len(xb) * len(yb)
438 if complexity < 1e7:
439 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable.
440 # Anything much larger than this shouldn't be attempted by default.
441 algo = "wagnerfischer"
442 if complexity > 1e6:
443 log_interactive.info(
444 "Complexity is a bit high. hexdiff will take a few seconds."
445 )
446 else:
447 algo = "difflib"
449 backtrackx = []
450 backtracky = []
452 if algo == "wagnerfischer":
453 xb = xb[::-1]
454 yb = yb[::-1]
456 # costs for the 3 operations
457 INSERT = 1
458 DELETE = 1
459 SUBST = 1
461 # Typically, d[i,j] will hold the distance between
462 # the first i characters of xb and the first j characters of yb.
463 # We change the Wagner Fischer to also store pointers to all
464 # the intermediate steps taken while calculating the Levenstein distance.
465 d = {(-1, -1): (0, (-1, -1))}
466 for j in range(len(yb)):
467 d[-1, j] = (j + 1) * INSERT, (-1, j - 1)
468 for i in range(len(xb)):
469 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1)
471 # Compute the Levenstein distance between the two strings, but
472 # store all the steps to be able to backtrack at the end.
473 for j in range(len(yb)):
474 for i in range(len(xb)):
475 d[i, j] = min(
476 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)),
477 (d[i - 1, j][0] + DELETE, (i - 1, j)),
478 (d[i, j - 1][0] + INSERT, (i, j - 1)),
479 )
481 # Iterate through the steps backwards to create the diff
482 i = len(xb) - 1
483 j = len(yb) - 1
484 while not (i == j == -1):
485 i2, j2 = d[i, j][1]
486 backtrackx.append(xb[i2 + 1:i + 1])
487 backtracky.append(yb[j2 + 1:j + 1])
488 i, j = i2, j2
489 elif algo == "difflib":
490 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk)
491 xarr = [xb[i:i + 1] for i in range(len(xb))]
492 yarr = [yb[i:i + 1] for i in range(len(yb))]
493 # Iterate through opcodes to build the backtrack
494 for opcode in sm.get_opcodes():
495 typ, x0, x1, y0, y1 = opcode
496 if typ == 'delete':
497 backtrackx += xarr[x0:x1]
498 backtracky += [b''] * (x1 - x0)
499 elif typ == 'insert':
500 backtrackx += [b''] * (y1 - y0)
501 backtracky += yarr[y0:y1]
502 elif typ in ['equal', 'replace']:
503 backtrackx += xarr[x0:x1]
504 backtracky += yarr[y0:y1]
505 # Some lines may have been considered as junk. Check the sizes
506 if autojunk:
507 lbx = len(backtrackx)
508 lby = len(backtracky)
509 backtrackx += [b''] * (max(lbx, lby) - lbx)
510 backtracky += [b''] * (max(lbx, lby) - lby)
511 else:
512 raise ValueError("Unknown algorithm '%s'" % algo)
514 # Print the diff
516 x = y = i = 0
517 colorize: Dict[int, Callable[[str], str]] = {
518 0: lambda x: x,
519 -1: conf.color_theme.left,
520 1: conf.color_theme.right
521 }
523 dox = 1
524 doy = 0
525 btx_len = len(backtrackx)
526 while i < btx_len:
527 linex = backtrackx[i:i + 16]
528 liney = backtracky[i:i + 16]
529 xx = sum(len(k) for k in linex)
530 yy = sum(len(k) for k in liney)
531 if dox and not xx:
532 dox = 0
533 doy = 1
534 if dox and linex == liney:
535 doy = 1
537 if dox:
538 xd = y
539 j = 0
540 while not linex[j]:
541 j += 1
542 xd -= 1
543 print(colorize[doy - dox]("%04x" % xd), end=' ')
544 x += xx
545 line = linex
546 else:
547 print(" ", end=' ')
548 if doy:
549 yd = y
550 j = 0
551 while not liney[j]:
552 j += 1
553 yd -= 1
554 print(colorize[doy - dox]("%04x" % yd), end=' ')
555 y += yy
556 line = liney
557 else:
558 print(" ", end=' ')
560 print(" ", end=' ')
562 cl = ""
563 for j in range(16):
564 if i + j < min(len(backtrackx), len(backtracky)):
565 if line[j]:
566 col = colorize[(linex[j] != liney[j]) * (doy - dox)]
567 print(col("%02X" % orb(line[j])), end=' ')
568 if linex[j] == liney[j]:
569 cl += sane(line[j], color=True)
570 else:
571 cl += col(sane(line[j]))
572 else:
573 print(" ", end=' ')
574 cl += " "
575 else:
576 print(" ", end=' ')
577 if j == 7:
578 print("", end=' ')
580 print(" ", cl)
582 if doy or not yy:
583 doy = 0
584 dox = 1
585 i += 16
586 else:
587 if yy:
588 dox = 0
589 doy = 1
590 else:
591 i += 16
594if struct.pack("H", 1) == b"\x00\x01": # big endian
595 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int]
596else:
597 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8
600def checksum(pkt):
601 # type: (bytes) -> int
602 if len(pkt) % 2 == 1:
603 pkt += b"\0"
604 s = sum(array.array("H", pkt))
605 s = (s >> 16) + (s & 0xffff)
606 s += s >> 16
607 s = ~s
608 return checksum_endian_transform(s) & 0xffff
611def _fletcher16(charbuf):
612 # type: (bytes) -> Tuple[int, int]
613 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
614 c0 = c1 = 0
615 for char in charbuf:
616 c0 += char
617 c1 += c0
619 c0 %= 255
620 c1 %= 255
621 return (c0, c1)
624@conf.commands.register
625def fletcher16_checksum(binbuf):
626 # type: (bytes) -> int
627 """Calculates Fletcher-16 checksum of the given buffer.
629 Note:
630 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501
631 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501
632 """
633 (c0, c1) = _fletcher16(binbuf)
634 return (c1 << 8) | c0
637@conf.commands.register
638def fletcher16_checkbytes(binbuf, offset):
639 # type: (bytes, int) -> bytes
640 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.
642 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501
643 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501
644 the integrity of the buffer on the receiver side.
646 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501
647 """
649 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
650 if len(binbuf) < offset:
651 raise Exception("Packet too short for checkbytes %d" % len(binbuf))
653 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
654 (c0, c1) = _fletcher16(binbuf)
656 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255
658 if (x <= 0):
659 x += 255
661 y = 510 - c0 - x
663 if (y > 255):
664 y -= 255
665 return chb(x) + chb(y)
668def mac2str(mac):
669 # type: (str) -> bytes
670 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':'))
673def valid_mac(mac):
674 # type: (str) -> bool
675 try:
676 return len(mac2str(mac)) == 6
677 except ValueError:
678 pass
679 return False
682def str2mac(s):
683 # type: (bytes) -> str
684 if isinstance(s, str):
685 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s))
686 return ("%02x:" * len(s))[:-1] % tuple(s)
689def randstring(length):
690 # type: (int) -> bytes
691 """
692 Returns a random string of length (length >= 0)
693 """
694 return b"".join(struct.pack('B', random.randint(0, 255))
695 for _ in range(length))
698def zerofree_randstring(length):
699 # type: (int) -> bytes
700 """
701 Returns a random string of length (length >= 0) without zero in it.
702 """
703 return b"".join(struct.pack('B', random.randint(1, 255))
704 for _ in range(length))
707def strxor(s1, s2):
708 # type: (bytes, bytes) -> bytes
709 """
710 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
711 must be of same length.
712 """
713 return b"".join(map(lambda x, y: chb(orb(x) ^ orb(y)), s1, s2))
716def strand(s1, s2):
717 # type: (bytes, bytes) -> bytes
718 """
719 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
720 must be of same length.
721 """
722 return b"".join(map(lambda x, y: chb(orb(x) & orb(y)), s1, s2))
725def strrot(s1, count, right=True):
726 # type: (bytes, int, bool) -> bytes
727 """
728 Rotate the binary by 'count' bytes
729 """
730 off = count % len(s1)
731 if right:
732 return s1[-off:] + s1[:-off]
733 else:
734 return s1[off:] + s1[:off]
737# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501
738try:
739 socket.inet_aton("255.255.255.255")
740except socket.error:
741 def inet_aton(ip_string):
742 # type: (str) -> bytes
743 if ip_string == "255.255.255.255":
744 return b"\xff" * 4
745 else:
746 return socket.inet_aton(ip_string)
747else:
748 inet_aton = socket.inet_aton # type: ignore
750inet_ntoa = socket.inet_ntoa
753def atol(x):
754 # type: (str) -> int
755 try:
756 ip = inet_aton(x)
757 except socket.error:
758 raise ValueError("Bad IP format: %s" % x)
759 return cast(int, struct.unpack("!I", ip)[0])
762def valid_ip(addr):
763 # type: (str) -> bool
764 try:
765 addr = plain_str(addr)
766 except UnicodeDecodeError:
767 return False
768 try:
769 atol(addr)
770 except (OSError, ValueError, socket.error):
771 return False
772 return True
775def valid_net(addr):
776 # type: (str) -> bool
777 try:
778 addr = plain_str(addr)
779 except UnicodeDecodeError:
780 return False
781 if '/' in addr:
782 ip, mask = addr.split('/', 1)
783 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32
784 return valid_ip(addr)
787def valid_ip6(addr):
788 # type: (str) -> bool
789 try:
790 addr = plain_str(addr)
791 except UnicodeDecodeError:
792 return False
793 try:
794 inet_pton(socket.AF_INET6, addr)
795 except socket.error:
796 return False
797 return True
800def valid_net6(addr):
801 # type: (str) -> bool
802 try:
803 addr = plain_str(addr)
804 except UnicodeDecodeError:
805 return False
806 if '/' in addr:
807 ip, mask = addr.split('/', 1)
808 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128
809 return valid_ip6(addr)
812def ltoa(x):
813 # type: (int) -> str
814 return inet_ntoa(struct.pack("!I", x & 0xffffffff))
817def itom(x):
818 # type: (int) -> int
819 return (0xffffffff00000000 >> x) & 0xffffffff
822def decode_locale_str(x):
823 # type: (bytes) -> str
824 """
825 Decode bytes into a string using the system locale.
826 Useful on Windows where it can be unusual (e.g. cp1252)
827 """
828 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace")
831class ContextManagerSubprocess(object):
832 """
833 Context manager that eases checking for unknown command, without
834 crashing.
836 Example:
837 >>> with ContextManagerSubprocess("tcpdump"):
838 >>> subprocess.Popen(["tcpdump", "--version"])
839 ERROR: Could not execute tcpdump, is it installed?
841 """
843 def __init__(self, prog, suppress=True):
844 # type: (str, bool) -> None
845 self.prog = prog
846 self.suppress = suppress
848 def __enter__(self):
849 # type: () -> None
850 pass
852 def __exit__(self,
853 exc_type, # type: Optional[type]
854 exc_value, # type: Optional[Exception]
855 traceback, # type: Optional[Any]
856 ):
857 # type: (...) -> Optional[bool]
858 if exc_value is None or exc_type is None:
859 return None
860 # Errored
861 if isinstance(exc_value, EnvironmentError):
862 msg = "Could not execute %s, is it installed?" % self.prog
863 else:
864 msg = "%s: execution failed (%s)" % (
865 self.prog,
866 exc_type.__class__.__name__
867 )
868 if not self.suppress:
869 raise exc_type(msg)
870 log_runtime.error(msg, exc_info=True)
871 return True # Suppress the exception
874class ContextManagerCaptureOutput(object):
875 """
876 Context manager that intercept the console's output.
878 Example:
879 >>> with ContextManagerCaptureOutput() as cmco:
880 ... print("hey")
881 ... assert cmco.get_output() == "hey"
882 """
884 def __init__(self):
885 # type: () -> None
886 self.result_export_object = ""
887 try:
888 import mock # noqa: F401
889 except Exception:
890 raise ImportError("The mock module needs to be installed !")
892 def __enter__(self):
893 # type: () -> ContextManagerCaptureOutput
894 import mock
896 def write(s, decorator=self):
897 # type: (str, ContextManagerCaptureOutput) -> None
898 decorator.result_export_object += s
899 mock_stdout = mock.Mock()
900 mock_stdout.write = write
901 self.bck_stdout = sys.stdout
902 sys.stdout = mock_stdout
903 return self
905 def __exit__(self, *exc):
906 # type: (*Any) -> Literal[False]
907 sys.stdout = self.bck_stdout
908 return False
910 def get_output(self, eval_bytes=False):
911 # type: (bool) -> str
912 if self.result_export_object.startswith("b'") and eval_bytes:
913 return plain_str(eval(self.result_export_object))
914 return self.result_export_object
917def do_graph(
918 graph, # type: str
919 prog=None, # type: Optional[str]
920 format=None, # type: Optional[str]
921 target=None, # type: Optional[Union[IO[bytes], str]]
922 type=None, # type: Optional[str]
923 string=None, # type: Optional[bool]
924 options=None # type: Optional[List[str]]
925):
926 # type: (...) -> Optional[str]
927 """Processes graph description using an external software.
928 This method is used to convert a graphviz format to an image.
930 :param graph: GraphViz graph description
931 :param prog: which graphviz program to use
932 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T"
933 option
934 :param string: if not None, simply return the graph string
935 :param target: filename or redirect. Defaults pipe to Imagemagick's
936 display program
937 :param options: options to be passed to prog
938 """
940 if format is None:
941 format = "svg"
942 if string:
943 return graph
944 if type is not None:
945 warnings.warn(
946 "type is deprecated, and was renamed format",
947 DeprecationWarning
948 )
949 format = type
950 if prog is None:
951 prog = conf.prog.dot
952 start_viewer = False
953 if target is None:
954 if WINDOWS:
955 target = get_temp_file(autoext="." + format)
956 start_viewer = True
957 else:
958 with ContextManagerSubprocess(conf.prog.display):
959 target = subprocess.Popen([conf.prog.display],
960 stdin=subprocess.PIPE).stdin
961 if format is not None:
962 format = "-T%s" % format
963 if isinstance(target, str):
964 if target.startswith('|'):
965 target = subprocess.Popen(target[1:].lstrip(), shell=True,
966 stdin=subprocess.PIPE).stdin
967 elif target.startswith('>'):
968 target = open(target[1:].lstrip(), "wb")
969 else:
970 target = open(os.path.abspath(target), "wb")
971 target = cast(IO[bytes], target)
972 proc = subprocess.Popen(
973 "\"%s\" %s %s" % (prog, options or "", format or ""),
974 shell=True, stdin=subprocess.PIPE, stdout=target,
975 stderr=subprocess.PIPE
976 )
977 _, stderr = proc.communicate(bytes_encode(graph))
978 if proc.returncode != 0:
979 raise OSError(
980 "GraphViz call failed (is it installed?):\n" +
981 plain_str(stderr)
982 )
983 try:
984 target.close()
985 except Exception:
986 pass
987 if start_viewer:
988 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501
989 waiting_start = time.time()
990 while not os.path.exists(target.name):
991 time.sleep(0.1)
992 if time.time() - waiting_start > 3:
993 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501
994 break
995 else:
996 if WINDOWS and conf.prog.display == conf.prog._default:
997 os.startfile(target.name)
998 else:
999 with ContextManagerSubprocess(conf.prog.display):
1000 subprocess.Popen([conf.prog.display, target.name])
1001 return None
1004_TEX_TR = {
1005 "{": "{\\tt\\char123}",
1006 "}": "{\\tt\\char125}",
1007 "\\": "{\\tt\\char92}",
1008 "^": "\\^{}",
1009 "$": "\\$",
1010 "#": "\\#",
1011 "_": "\\_",
1012 "&": "\\&",
1013 "%": "\\%",
1014 "|": "{\\tt\\char124}",
1015 "~": "{\\tt\\char126}",
1016 "<": "{\\tt\\char60}",
1017 ">": "{\\tt\\char62}",
1018}
1021def tex_escape(x):
1022 # type: (str) -> str
1023 s = ""
1024 for c in x:
1025 s += _TEX_TR.get(c, c)
1026 return s
1029def colgen(*lstcol, # type: Any
1030 **kargs # type: Any
1031 ):
1032 # type: (...) -> Iterator[Any]
1033 """Returns a generator that mixes provided quantities forever
1034 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501
1035 if len(lstcol) < 2:
1036 lstcol *= 2
1037 trans = kargs.get("trans", lambda x, y, z: (x, y, z))
1038 while True:
1039 for i in range(len(lstcol)):
1040 for j in range(len(lstcol)):
1041 for k in range(len(lstcol)):
1042 if i != j or j != k or k != i:
1043 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501
1046def incremental_label(label="tag%05i", start=0):
1047 # type: (str, int) -> Iterator[str]
1048 while True:
1049 yield label % start
1050 start += 1
1053def binrepr(val):
1054 # type: (int) -> str
1055 return bin(val)[2:]
1058def long_converter(s):
1059 # type: (str) -> int
1060 return int(s.replace('\n', '').replace(' ', ''), 16)
1062#########################
1063# Enum management #
1064#########################
1067class EnumElement:
1068 def __init__(self, key, value):
1069 # type: (str, int) -> None
1070 self._key = key
1071 self._value = value
1073 def __repr__(self):
1074 # type: () -> str
1075 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501
1077 def __getattr__(self, attr):
1078 # type: (str) -> Any
1079 return getattr(self._value, attr)
1081 def __str__(self):
1082 # type: () -> str
1083 return self._key
1085 def __bytes__(self):
1086 # type: () -> bytes
1087 return bytes_encode(self.__str__())
1089 def __hash__(self):
1090 # type: () -> int
1091 return self._value
1093 def __int__(self):
1094 # type: () -> int
1095 return int(self._value)
1097 def __eq__(self, other):
1098 # type: (Any) -> bool
1099 return self._value == int(other)
1101 def __neq__(self, other):
1102 # type: (Any) -> bool
1103 return not self.__eq__(other)
1106class Enum_metaclass(type):
1107 element_class = EnumElement
1109 def __new__(cls, name, bases, dct):
1110 # type: (Any, str, Any, Dict[str, Any]) -> Any
1111 rdict = {}
1112 for k, v in dct.items():
1113 if isinstance(v, int):
1114 v = cls.element_class(k, v)
1115 dct[k] = v
1116 rdict[v] = k
1117 dct["__rdict__"] = rdict
1118 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)
1120 def __getitem__(self, attr):
1121 # type: (int) -> Any
1122 return self.__rdict__[attr] # type: ignore
1124 def __contains__(self, val):
1125 # type: (int) -> bool
1126 return val in self.__rdict__ # type: ignore
1128 def get(self, attr, val=None):
1129 # type: (str, Optional[Any]) -> Any
1130 return self.__rdict__.get(attr, val) # type: ignore
1132 def __repr__(self):
1133 # type: () -> str
1134 return "<%s>" % self.__dict__.get("name", self.__name__)
1137###################
1138# Object saving #
1139###################
1142def export_object(obj):
1143 # type: (Any) -> None
1144 import zlib
1145 print(bytes_base64(zlib.compress(pickle.dumps(obj, 2), 9)))
1148def import_object(obj=None):
1149 # type: (Optional[str]) -> Any
1150 import zlib
1151 if obj is None:
1152 obj = sys.stdin.read()
1153 return pickle.loads(zlib.decompress(base64_bytes(obj.strip()))) # noqa: E501
1156def save_object(fname, obj):
1157 # type: (str, Any) -> None
1158 """Pickle a Python object"""
1160 fd = gzip.open(fname, "wb")
1161 pickle.dump(obj, fd)
1162 fd.close()
1165def load_object(fname):
1166 # type: (str) -> Any
1167 """unpickle a Python object"""
1168 return pickle.load(gzip.open(fname, "rb"))
1171@conf.commands.register
1172def corrupt_bytes(data, p=0.01, n=None):
1173 # type: (str, float, Optional[int]) -> bytes
1174 """
1175 Corrupt a given percentage (at least one byte) or number of bytes
1176 from a string
1177 """
1178 s = array.array("B", bytes_encode(data))
1179 s_len = len(s)
1180 if n is None:
1181 n = max(1, int(s_len * p))
1182 for i in random.sample(range(s_len), n):
1183 s[i] = (s[i] + random.randint(1, 255)) % 256
1184 return s.tobytes()
1187@conf.commands.register
1188def corrupt_bits(data, p=0.01, n=None):
1189 # type: (str, float, Optional[int]) -> bytes
1190 """
1191 Flip a given percentage (at least one bit) or number of bits
1192 from a string
1193 """
1194 s = array.array("B", bytes_encode(data))
1195 s_len = len(s) * 8
1196 if n is None:
1197 n = max(1, int(s_len * p))
1198 for i in random.sample(range(s_len), n):
1199 s[i // 8] ^= 1 << (i % 8)
1200 return s.tobytes()
1203#############################
1204# pcap capture file stuff #
1205#############################
1207@conf.commands.register
1208def wrpcap(filename, # type: Union[IO[bytes], str]
1209 pkt, # type: _PacketIterable
1210 *args, # type: Any
1211 **kargs # type: Any
1212 ):
1213 # type: (...) -> None
1214 """Write a list of packets to a pcap file
1216 :param filename: the name of the file to write packets to, or an open,
1217 writable file-like object. The file descriptor will be
1218 closed at the end of the call, so do not use an object you
1219 do not want to close (e.g., running wrpcap(sys.stdout, [])
1220 in interactive mode will crash Scapy).
1221 :param gz: set to 1 to save a gzipped capture
1222 :param linktype: force linktype value
1223 :param endianness: "<" or ">", force endianness
1224 :param sync: do not bufferize writes to the capture file
1225 """
1226 with PcapWriter(filename, *args, **kargs) as fdesc:
1227 fdesc.write(pkt)
1230@conf.commands.register
1231def wrpcapng(filename, # type: str
1232 pkt, # type: _PacketIterable
1233 ):
1234 # type: (...) -> None
1235 """Write a list of packets to a pcapng file
1237 :param filename: the name of the file to write packets to, or an open,
1238 writable file-like object. The file descriptor will be
1239 closed at the end of the call, so do not use an object you
1240 do not want to close (e.g., running wrpcapng(sys.stdout, [])
1241 in interactive mode will crash Scapy).
1242 :param pkt: packets to write
1243 """
1244 with PcapNgWriter(filename) as fdesc:
1245 fdesc.write(pkt)
1248@conf.commands.register
1249def rdpcap(filename, count=-1):
1250 # type: (Union[IO[bytes], str], int) -> PacketList
1251 """Read a pcap or pcapng file and return a packet list
1253 :param count: read only <count> packets
1254 """
1255 # Rant: Our complicated use of metaclasses and especially the
1256 # __call__ function is, of course, not supported by MyPy.
1257 # One day we should simplify this mess and use a much simpler
1258 # layout that will actually be supported and properly dissected.
1259 with PcapReader(filename) as fdesc: # type: ignore
1260 return fdesc.read_all(count=count)
1263# NOTE: Type hinting
1264# Mypy doesn't understand the following metaclass, and thinks each
1265# constructor (PcapReader...) needs 3 arguments each. To avoid this,
1266# we add a fake (=None) to the last 2 arguments then force the value
1267# to not be None in the signature and pack the whole thing in an ignore.
1268# This allows to not have # type: ignore every time we call those
1269# constructors.
1271class PcapReader_metaclass(type):
1272 """Metaclass for (Raw)Pcap(Ng)Readers"""
1274 def __new__(cls, name, bases, dct):
1275 # type: (Any, str, Any, Dict[str, Any]) -> Any
1276 """The `alternative` class attribute is declared in the PcapNg
1277 variant, and set here to the Pcap variant.
1279 """
1280 newcls = super(PcapReader_metaclass, cls).__new__(
1281 cls, name, bases, dct
1282 )
1283 if 'alternative' in dct:
1284 dct['alternative'].alternative = newcls
1285 return newcls
1287 def __call__(cls, filename):
1288 # type: (Union[IO[bytes], str]) -> Any
1289 """Creates a cls instance, use the `alternative` if that
1290 fails.
1292 """
1293 i = cls.__new__(
1294 cls,
1295 cls.__name__,
1296 cls.__bases__,
1297 cls.__dict__ # type: ignore
1298 )
1299 filename, fdesc, magic = cls.open(filename)
1300 if not magic:
1301 raise Scapy_Exception(
1302 "No data could be read!"
1303 )
1304 try:
1305 i.__init__(filename, fdesc, magic)
1306 return i
1307 except (Scapy_Exception, EOFError):
1308 pass
1310 if "alternative" in cls.__dict__:
1311 cls = cls.__dict__["alternative"]
1312 i = cls.__new__(
1313 cls,
1314 cls.__name__,
1315 cls.__bases__,
1316 cls.__dict__ # type: ignore
1317 )
1318 try:
1319 i.__init__(filename, fdesc, magic)
1320 return i
1321 except (Scapy_Exception, EOFError):
1322 pass
1324 raise Scapy_Exception("Not a supported capture file")
1326 @staticmethod
1327 def open(fname # type: Union[IO[bytes], str]
1328 ):
1329 # type: (...) -> Tuple[str, _ByteStream, bytes]
1330 """Open (if necessary) filename, and read the magic."""
1331 if isinstance(fname, str):
1332 filename = fname
1333 try:
1334 fdesc = gzip.open(filename, "rb") # type: _ByteStream
1335 magic = fdesc.read(4)
1336 except IOError:
1337 fdesc = open(filename, "rb")
1338 magic = fdesc.read(4)
1339 else:
1340 fdesc = fname
1341 filename = getattr(fdesc, "name", "No name")
1342 magic = fdesc.read(4)
1343 return filename, fdesc, magic
1346class RawPcapReader(metaclass=PcapReader_metaclass):
1347 """A stateful pcap reader. Each packet is returned as a string"""
1349 # TODO: use Generics to properly type the various readers.
1350 # As of right now, RawPcapReader is typed as if it returned packets
1351 # because all of its child do. Fix that
1353 nonblocking_socket = True
1354 PacketMetadata = collections.namedtuple("PacketMetadata",
1355 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501
1357 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1358 # type: (str, _ByteStream, bytes) -> None
1359 self.filename = filename
1360 self.f = fdesc
1361 if magic == b"\xa1\xb2\xc3\xd4": # big endian
1362 self.endian = ">"
1363 self.nano = False
1364 elif magic == b"\xd4\xc3\xb2\xa1": # little endian
1365 self.endian = "<"
1366 self.nano = False
1367 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision
1368 self.endian = ">"
1369 self.nano = True
1370 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501
1371 self.endian = "<"
1372 self.nano = True
1373 else:
1374 raise Scapy_Exception(
1375 "Not a pcap capture file (bad magic: %r)" % magic
1376 )
1377 hdr = self.f.read(20)
1378 if len(hdr) < 20:
1379 raise Scapy_Exception("Invalid pcap file (too short)")
1380 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
1381 self.endian + "HHIIII", hdr
1382 )
1383 self.linktype = linktype
1384 self.snaplen = snaplen
1386 def __enter__(self):
1387 # type: () -> RawPcapReader
1388 return self
1390 def __iter__(self):
1391 # type: () -> RawPcapReader
1392 return self
1394 def __next__(self):
1395 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata]
1396 """
1397 implement the iterator protocol on a set of packets in a pcap file
1398 """
1399 try:
1400 return self._read_packet()
1401 except EOFError:
1402 raise StopIteration
1404 def _read_packet(self, size=MTU):
1405 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata]
1406 """return a single packet read from the file as a tuple containing
1407 (pkt_data, pkt_metadata)
1409 raise EOFError when no more packets are available
1410 """
1411 hdr = self.f.read(16)
1412 if len(hdr) < 16:
1413 raise EOFError
1414 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr)
1415 return (self.f.read(caplen)[:size],
1416 RawPcapReader.PacketMetadata(sec=sec, usec=usec,
1417 wirelen=wirelen, caplen=caplen))
1419 def read_packet(self, size=MTU):
1420 # type: (int) -> Packet
1421 raise Exception(
1422 "Cannot call read_packet() in RawPcapReader. Use "
1423 "_read_packet()"
1424 )
1426 def dispatch(self,
1427 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501
1428 ):
1429 # type: (...) -> None
1430 """call the specified callback routine for each packet read
1432 This is just a convenience function for the main loop
1433 that allows for easy launching of packet processing in a
1434 thread.
1435 """
1436 for p in self:
1437 callback(p)
1439 def _read_all(self, count=-1):
1440 # type: (int) -> List[Packet]
1441 """return a list of all packets in the pcap file
1442 """
1443 res = [] # type: List[Packet]
1444 while count != 0:
1445 count -= 1
1446 try:
1447 p = self.read_packet() # type: Packet
1448 except EOFError:
1449 break
1450 res.append(p)
1451 return res
1453 def recv(self, size=MTU):
1454 # type: (int) -> bytes
1455 """ Emulate a socket
1456 """
1457 return self._read_packet(size=size)[0]
1459 def fileno(self):
1460 # type: () -> int
1461 return -1 if WINDOWS else self.f.fileno()
1463 def close(self):
1464 # type: () -> Optional[Any]
1465 return self.f.close()
1467 def __exit__(self, exc_type, exc_value, tracback):
1468 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
1469 self.close()
1471 # emulate SuperSocket
1472 @staticmethod
1473 def select(sockets, # type: List[SuperSocket]
1474 remain=None, # type: Optional[float]
1475 ):
1476 # type: (...) -> List[SuperSocket]
1477 return sockets
1480class PcapReader(RawPcapReader):
1481 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1482 # type: (str, IO[bytes], bytes) -> None
1483 RawPcapReader.__init__(self, filename, fdesc, magic)
1484 try:
1485 self.LLcls = conf.l2types.num2layer[
1486 self.linktype
1487 ] # type: Type[Packet]
1488 except KeyError:
1489 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501
1490 if conf.raw_layer is None:
1491 # conf.raw_layer is set on import
1492 import scapy.packet # noqa: F401
1493 self.LLcls = conf.raw_layer
1495 def __enter__(self):
1496 # type: () -> PcapReader
1497 return self
1499 def read_packet(self, size=MTU, **kwargs):
1500 # type: (int, **Any) -> Packet
1501 rp = super(PcapReader, self)._read_packet(size=size)
1502 if rp is None:
1503 raise EOFError
1504 s, pkt_info = rp
1506 try:
1507 p = self.LLcls(s, **kwargs) # type: Packet
1508 except KeyboardInterrupt:
1509 raise
1510 except Exception:
1511 if conf.debug_dissector:
1512 from scapy.sendrecv import debug
1513 debug.crashed_on = (self.LLcls, s)
1514 raise
1515 if conf.raw_layer is None:
1516 # conf.raw_layer is set on import
1517 import scapy.packet # noqa: F401
1518 p = conf.raw_layer(s)
1519 power = Decimal(10) ** Decimal(-9 if self.nano else -6)
1520 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec)
1521 p.wirelen = pkt_info.wirelen
1522 return p
1524 def recv(self, size=MTU, **kwargs): # type: ignore
1525 # type: (int, **Any) -> Packet
1526 return self.read_packet(size=size, **kwargs)
1528 def __next__(self): # type: ignore
1529 # type: () -> Packet
1530 try:
1531 return self.read_packet()
1532 except EOFError:
1533 raise StopIteration
1535 def read_all(self, count=-1):
1536 # type: (int) -> PacketList
1537 res = self._read_all(count)
1538 from scapy import plist
1539 return plist.PacketList(res, name=os.path.basename(self.filename))
1542class RawPcapNgReader(RawPcapReader):
1543 """A stateful pcapng reader. Each packet is returned as
1544 bytes.
1546 """
1548 alternative = RawPcapReader # type: Type[Any]
1550 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore
1551 ["linktype", "tsresol",
1552 "tshigh", "tslow", "wirelen",
1553 "comment", "ifname", "direction"])
1555 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1556 # type: (str, IO[bytes], bytes) -> None
1557 self.filename = filename
1558 self.f = fdesc
1559 # A list of (linktype, snaplen, tsresol); will be populated by IDBs.
1560 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]]
1561 self.default_options = {
1562 "tsresol": 1000000
1563 }
1564 self.blocktypes: Dict[
1565 int,
1566 Callable[
1567 [bytes, int],
1568 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]
1569 ]] = {
1570 1: self._read_block_idb,
1571 2: self._read_block_pkt,
1572 3: self._read_block_spb,
1573 6: self._read_block_epb,
1574 10: self._read_block_dsb,
1575 }
1576 self.endian = "!" # Will be overwritten by first SHB
1578 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg:
1579 raise Scapy_Exception(
1580 "Not a pcapng capture file (bad magic: %r)" % magic
1581 )
1583 try:
1584 self._read_block_shb()
1585 except EOFError:
1586 raise Scapy_Exception(
1587 "The first SHB of the pcapng file is malformed !"
1588 )
1590 def _read_block(self, size=MTU):
1591 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501
1592 try:
1593 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0]
1594 except struct.error:
1595 raise EOFError
1596 if blocktype == 0x0A0D0D0A:
1597 # This function updates the endianness based on the block content.
1598 self._read_block_shb()
1599 return None
1600 try:
1601 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0]
1602 except struct.error:
1603 warning("PcapNg: Error reading blocklen before block body")
1604 raise EOFError
1605 if blocklen < 12:
1606 warning("PcapNg: Invalid block length !")
1607 raise EOFError
1609 _block_body_length = blocklen - 12
1610 block = self.f.read(_block_body_length)
1611 if len(block) != _block_body_length:
1612 raise Scapy_Exception("PcapNg: Invalid Block body length "
1613 "(too short)")
1614 self._read_block_tail(blocklen)
1615 if blocktype in self.blocktypes:
1616 return self.blocktypes[blocktype](block, size)
1617 return None
1619 def _read_block_tail(self, blocklen):
1620 # type: (int) -> None
1621 if blocklen % 4:
1622 pad = self.f.read(-blocklen % 4)
1623 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
1624 "Ignored padding %r" % (blocklen, pad))
1625 try:
1626 if blocklen != struct.unpack(self.endian + 'I',
1627 self.f.read(4))[0]:
1628 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)")
1629 except struct.error:
1630 warning("PcapNg: Could not read blocklen after block body")
1631 raise EOFError
1633 def _read_block_shb(self):
1634 # type: () -> None
1635 """Section Header Block"""
1636 _blocklen = self.f.read(4)
1637 endian = self.f.read(4)
1638 if endian == b"\x1a\x2b\x3c\x4d":
1639 self.endian = ">"
1640 elif endian == b"\x4d\x3c\x2b\x1a":
1641 self.endian = "<"
1642 else:
1643 warning("PcapNg: Bad magic in Section Header Block"
1644 " (not a pcapng file?)")
1645 raise EOFError
1647 try:
1648 blocklen = struct.unpack(self.endian + "I", _blocklen)[0]
1649 except struct.error:
1650 warning("PcapNg: Could not read blocklen")
1651 raise EOFError
1652 if blocklen < 28:
1653 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501
1654 raise EOFError
1656 # Major version must be 1
1657 _major = self.f.read(2)
1658 try:
1659 major = struct.unpack(self.endian + "H", _major)[0]
1660 except struct.error:
1661 warning("PcapNg: Could not read major value")
1662 raise EOFError
1663 if major != 1:
1664 warning(f"PcapNg: SHB Major version {major} unsupported !")
1665 raise EOFError
1667 # Skip minor version & section length
1668 skipped = self.f.read(10)
1669 if len(skipped) != 10:
1670 warning("PcapNg: Could not read minor value & section length")
1671 raise EOFError
1673 _options_len = blocklen - 28
1674 options = self.f.read(_options_len)
1675 if len(options) != _options_len:
1676 raise Scapy_Exception("PcapNg: Invalid Section Header Block "
1677 " options (too short)")
1678 self._read_block_tail(blocklen)
1679 self._read_options(options)
1681 def _read_packet(self, size=MTU): # type: ignore
1682 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1683 """Read blocks until it reaches either EOF or a packet, and
1684 returns None or (packet, (linktype, sec, usec, wirelen)),
1685 where packet is a string.
1687 """
1688 while True:
1689 res = self._read_block()
1690 if res is not None:
1691 return res
1693 def _read_options(self, options):
1694 # type: (bytes) -> Dict[int, bytes]
1695 opts = dict()
1696 while len(options) >= 4:
1697 try:
1698 code, length = struct.unpack(self.endian + "HH", options[:4])
1699 except struct.error:
1700 warning("PcapNg: options header is too small "
1701 "%d !" % len(options))
1702 raise EOFError
1703 if code != 0 and 4 + length < len(options):
1704 opts[code] = options[4:4 + length]
1705 if code == 0:
1706 if length != 0:
1707 warning("PcapNg: invalid option "
1708 "length %d for end-of-option" % length)
1709 break
1710 if length % 4:
1711 length += (4 - (length % 4))
1712 options = options[4 + length:]
1713 return opts
1715 def _read_block_idb(self, block, _):
1716 # type: (bytes, int) -> None
1717 """Interface Description Block"""
1718 # 2 bytes LinkType + 2 bytes Reserved
1719 # 4 bytes Snaplen
1720 options_raw = self._read_options(block[8:])
1721 options = self.default_options.copy() # type: Dict[str, Any]
1722 for c, v in options_raw.items():
1723 if c == 9:
1724 length = len(v)
1725 if length == 1:
1726 tsresol = orb(v)
1727 options["tsresol"] = (2 if tsresol & 128 else 10) ** (
1728 tsresol & 127
1729 )
1730 else:
1731 warning("PcapNg: invalid options "
1732 "length %d for IDB tsresol" % length)
1733 elif c == 2:
1734 options["name"] = v
1735 elif c == 1:
1736 options["comment"] = v
1737 try:
1738 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack(
1739 self.endian + "HxxI",
1740 block[:8]
1741 ) + (options,)
1742 except struct.error:
1743 warning("PcapNg: IDB is too small %d/8 !" % len(block))
1744 raise EOFError
1745 self.interfaces.append(interface)
1747 def _check_interface_id(self, intid):
1748 # type: (int) -> None
1749 """Check the interface id value and raise EOFError if invalid."""
1750 tmp_len = len(self.interfaces)
1751 if intid >= tmp_len:
1752 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len))
1753 raise EOFError
1755 def _read_block_epb(self, block, size):
1756 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1757 """Enhanced Packet Block"""
1758 try:
1759 intid, tshigh, tslow, caplen, wirelen = struct.unpack(
1760 self.endian + "5I",
1761 block[:20],
1762 )
1763 except struct.error:
1764 warning("PcapNg: EPB is too small %d/20 !" % len(block))
1765 raise EOFError
1767 # Compute the options offset taking padding into account
1768 if caplen % 4:
1769 opt_offset = 20 + caplen + (-caplen) % 4
1770 else:
1771 opt_offset = 20 + caplen
1773 # Parse options
1774 options = self._read_options(block[opt_offset:])
1775 comment = options.get(1, None)
1776 epb_flags_raw = options.get(2, None)
1777 if epb_flags_raw:
1778 try:
1779 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw)
1780 except struct.error:
1781 warning("PcapNg: EPB invalid flags size"
1782 "(expected 4 bytes, got %d) !" % len(epb_flags_raw))
1783 raise EOFError
1784 direction = epb_flags & 3
1786 else:
1787 direction = None
1789 self._check_interface_id(intid)
1790 ifname = self.interfaces[intid][2].get('name', None)
1791 return (block[20:20 + caplen][:size],
1792 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1793 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1794 tshigh=tshigh,
1795 tslow=tslow,
1796 wirelen=wirelen,
1797 comment=comment,
1798 ifname=ifname,
1799 direction=direction))
1801 def _read_block_spb(self, block, size):
1802 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1803 """Simple Packet Block"""
1804 # "it MUST be assumed that all the Simple Packet Blocks have
1805 # been captured on the interface previously specified in the
1806 # first Interface Description Block."
1807 intid = 0
1808 self._check_interface_id(intid)
1810 try:
1811 wirelen, = struct.unpack(self.endian + "I", block[:4])
1812 except struct.error:
1813 warning("PcapNg: SPB is too small %d/4 !" % len(block))
1814 raise EOFError
1816 caplen = min(wirelen, self.interfaces[intid][1])
1817 return (block[4:4 + caplen][:size],
1818 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1819 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1820 tshigh=None,
1821 tslow=None,
1822 wirelen=wirelen,
1823 comment=None,
1824 ifname=None,
1825 direction=None))
1827 def _read_block_pkt(self, block, size):
1828 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1829 """(Obsolete) Packet Block"""
1830 try:
1831 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
1832 self.endian + "HH4I",
1833 block[:20],
1834 )
1835 except struct.error:
1836 warning("PcapNg: PKT is too small %d/20 !" % len(block))
1837 raise EOFError
1839 self._check_interface_id(intid)
1840 return (block[20:20 + caplen][:size],
1841 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1842 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1843 tshigh=tshigh,
1844 tslow=tslow,
1845 wirelen=wirelen,
1846 comment=None,
1847 ifname=None,
1848 direction=None))
1850 def _read_block_dsb(self, block, size):
1851 # type: (bytes, int) -> None
1852 """Decryption Secrets Block"""
1854 # Parse the secrets type and length fields
1855 try:
1856 secrets_type, secrets_length = struct.unpack(
1857 self.endian + "II",
1858 block[:8],
1859 )
1860 block = block[8:]
1861 except struct.error:
1862 warning("PcapNg: DSB is too small %d!", len(block))
1863 raise EOFError
1865 # Compute the secrets length including the padding
1866 padded_secrets_length = secrets_length + (-secrets_length) % 4
1867 if len(block) < padded_secrets_length:
1868 warning("PcapNg: invalid DSB secrets length!")
1869 raise EOFError
1871 # Extract secrets data and options
1872 secrets_data = block[:padded_secrets_length][:secrets_length]
1873 if block[padded_secrets_length:]:
1874 warning("PcapNg: DSB options are not supported!")
1876 # TLS Key Log
1877 if secrets_type == 0x544c534b:
1878 if getattr(conf, "tls_sessions", False) is False:
1879 warning("PcapNg: TLS Key Log available, but "
1880 "the TLS layer is not loaded! Scapy won't be able "
1881 "to decrypt the packets.")
1882 else:
1883 from scapy.layers.tls.session import load_nss_keys
1885 # Write Key Log to a file and parse it
1886 filename = get_temp_file()
1887 with open(filename, "wb") as fd:
1888 fd.write(secrets_data)
1889 fd.close()
1891 keys = load_nss_keys(filename)
1892 if not keys:
1893 warning("PcapNg: invalid TLS Key Log in DSB!")
1894 else:
1895 # Note: these attributes are only available when the TLS
1896 # layer is loaded.
1897 conf.tls_nss_keys = keys
1898 conf.tls_session_enable = True
1899 else:
1900 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type)
1903class PcapNgReader(RawPcapNgReader, PcapReader):
1905 alternative = PcapReader
1907 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1908 # type: (str, IO[bytes], bytes) -> None
1909 RawPcapNgReader.__init__(self, filename, fdesc, magic)
1911 def __enter__(self):
1912 # type: () -> PcapNgReader
1913 return self
1915 def read_packet(self, size=MTU, **kwargs):
1916 # type: (int, **Any) -> Packet
1917 rp = super(PcapNgReader, self)._read_packet(size=size)
1918 if rp is None:
1919 raise EOFError
1920 s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction) = rp
1921 try:
1922 cls = conf.l2types.num2layer[linktype] # type: Type[Packet]
1923 p = cls(s, **kwargs) # type: Packet
1924 except KeyboardInterrupt:
1925 raise
1926 except Exception:
1927 if conf.debug_dissector:
1928 raise
1929 if conf.raw_layer is None:
1930 # conf.raw_layer is set on import
1931 import scapy.packet # noqa: F401
1932 p = conf.raw_layer(s)
1933 if tshigh is not None:
1934 p.time = EDecimal((tshigh << 32) + tslow) / tsresol
1935 p.wirelen = wirelen
1936 p.comment = comment
1937 p.direction = direction
1938 if ifname is not None:
1939 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace')
1940 return p
1942 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore
1943 return self.read_packet(size=size, **kwargs)
1946class GenericPcapWriter(object):
1947 nano = False
1948 linktype: int
1950 def _write_header(self, pkt):
1951 # type: (Optional[Union[Packet, bytes]]) -> None
1952 raise NotImplementedError
1954 def _write_packet(self,
1955 packet, # type: Union[bytes, Packet]
1956 linktype, # type: int
1957 sec=None, # type: Optional[float]
1958 usec=None, # type: Optional[int]
1959 caplen=None, # type: Optional[int]
1960 wirelen=None, # type: Optional[int]
1961 comment=None, # type: Optional[bytes]
1962 ifname=None, # type: Optional[bytes]
1963 direction=None, # type: Optional[int]
1964 ):
1965 # type: (...) -> None
1966 raise NotImplementedError
1968 def _get_time(self,
1969 packet, # type: Union[bytes, Packet]
1970 sec, # type: Optional[float]
1971 usec # type: Optional[int]
1972 ):
1973 # type: (...) -> Tuple[float, int]
1974 if hasattr(packet, "time"):
1975 if sec is None:
1976 packet_time = packet.time
1977 tmp = int(packet_time)
1978 usec = int(round((packet_time - tmp) *
1979 (1000000000 if self.nano else 1000000)))
1980 sec = float(packet_time)
1981 if sec is not None and usec is None:
1982 usec = 0
1983 return sec, usec # type: ignore
1985 def write_header(self, pkt):
1986 # type: (Optional[Union[Packet, bytes]]) -> None
1987 if not hasattr(self, 'linktype'):
1988 try:
1989 if pkt is None or isinstance(pkt, bytes):
1990 # Can't guess LL
1991 raise KeyError
1992 self.linktype = conf.l2types.layer2num[
1993 pkt.__class__
1994 ]
1995 except KeyError:
1996 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)"
1997 warning(msg, self.__class__.__name__, pkt.__class__.__name__)
1998 self.linktype = DLT_EN10MB
1999 self._write_header(pkt)
2001 def write_packet(self,
2002 packet, # type: Union[bytes, Packet]
2003 sec=None, # type: Optional[float]
2004 usec=None, # type: Optional[int]
2005 caplen=None, # type: Optional[int]
2006 wirelen=None, # type: Optional[int]
2007 ):
2008 # type: (...) -> None
2009 """
2010 Writes a single packet to the pcap file.
2012 :param packet: Packet, or bytes for a single packet
2013 :type packet: scapy.packet.Packet or bytes
2014 :param sec: time the packet was captured, in seconds since epoch. If
2015 not supplied, defaults to now.
2016 :type sec: float
2017 :param usec: If ``nano=True``, then number of nanoseconds after the
2018 second that the packet was captured. If ``nano=False``,
2019 then the number of microseconds after the second the
2020 packet was captured. If ``sec`` is not specified,
2021 this value is ignored.
2022 :type usec: int or long
2023 :param caplen: The length of the packet in the capture file. If not
2024 specified, uses ``len(raw(packet))``.
2025 :type caplen: int
2026 :param wirelen: The length of the packet on the wire. If not
2027 specified, tries ``packet.wirelen``, otherwise uses
2028 ``caplen``.
2029 :type wirelen: int
2030 :return: None
2031 :rtype: None
2032 """
2033 f_sec, usec = self._get_time(packet, sec, usec)
2035 rawpkt = bytes_encode(packet)
2036 caplen = len(rawpkt) if caplen is None else caplen
2038 if wirelen is None:
2039 if hasattr(packet, "wirelen"):
2040 wirelen = packet.wirelen
2041 if wirelen is None:
2042 wirelen = caplen
2044 comment = getattr(packet, "comment", None)
2045 ifname = getattr(packet, "sniffed_on", None)
2046 direction = getattr(packet, "direction", None)
2047 if not isinstance(packet, bytes):
2048 linktype: int = conf.l2types.layer2num[
2049 packet.__class__
2050 ]
2051 else:
2052 linktype = self.linktype
2053 if ifname is not None:
2054 ifname = str(ifname).encode('utf-8')
2055 self._write_packet(
2056 rawpkt,
2057 sec=f_sec, usec=usec,
2058 caplen=caplen, wirelen=wirelen,
2059 comment=comment,
2060 ifname=ifname,
2061 direction=direction,
2062 linktype=linktype
2063 )
2066class GenericRawPcapWriter(GenericPcapWriter):
2067 header_present = False
2068 nano = False
2069 sync = False
2070 f = None # type: Union[IO[bytes], gzip.GzipFile]
2072 def fileno(self):
2073 # type: () -> int
2074 return -1 if WINDOWS else self.f.fileno()
2076 def flush(self):
2077 # type: () -> Optional[Any]
2078 return self.f.flush()
2080 def close(self):
2081 # type: () -> Optional[Any]
2082 if not self.header_present:
2083 self.write_header(None)
2084 return self.f.close()
2086 def __enter__(self):
2087 # type: () -> GenericRawPcapWriter
2088 return self
2090 def __exit__(self, exc_type, exc_value, tracback):
2091 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
2092 self.flush()
2093 self.close()
2095 def write(self, pkt):
2096 # type: (Union[_PacketIterable, bytes]) -> None
2097 """
2098 Writes a Packet, a SndRcvList object, or bytes to a pcap file.
2100 :param pkt: Packet(s) to write (one record for each Packet), or raw
2101 bytes to write (as one record).
2102 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes
2103 """
2104 if isinstance(pkt, bytes):
2105 if not self.header_present:
2106 self.write_header(pkt)
2107 self.write_packet(pkt)
2108 else:
2109 # Import here to avoid circular dependency
2110 from scapy.supersocket import IterSocket
2111 for p in IterSocket(pkt).iter:
2112 if not self.header_present:
2113 self.write_header(p)
2115 if not isinstance(p, bytes) and \
2116 self.linktype != conf.l2types.get(type(p), None):
2117 warning("Inconsistent linktypes detected!"
2118 " The resulting file might contain"
2119 " invalid packets."
2120 )
2122 self.write_packet(p)
2125class RawPcapWriter(GenericRawPcapWriter):
2126 """A stream PCAP writer with more control than wrpcap()"""
2128 def __init__(self,
2129 filename, # type: Union[IO[bytes], str]
2130 linktype=None, # type: Optional[int]
2131 gz=False, # type: bool
2132 endianness="", # type: str
2133 append=False, # type: bool
2134 sync=False, # type: bool
2135 nano=False, # type: bool
2136 snaplen=MTU, # type: int
2137 bufsz=4096, # type: int
2138 ):
2139 # type: (...) -> None
2140 """
2141 :param filename: the name of the file to write packets to, or an open,
2142 writable file-like object.
2143 :param linktype: force linktype to a given value. If None, linktype is
2144 taken from the first writer packet
2145 :param gz: compress the capture on the fly
2146 :param endianness: force an endianness (little:"<", big:">").
2147 Default is native
2148 :param append: append packets to the capture file instead of
2149 truncating it
2150 :param sync: do not bufferize writes to the capture file
2151 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0)
2153 """
2155 if linktype:
2156 self.linktype = linktype
2157 self.snaplen = snaplen
2158 self.append = append
2159 self.gz = gz
2160 self.endian = endianness
2161 self.sync = sync
2162 self.nano = nano
2163 if sync:
2164 bufsz = 0
2166 if isinstance(filename, str):
2167 self.filename = filename
2168 if gz:
2169 self.f = cast(_ByteStream, gzip.open(
2170 filename, append and "ab" or "wb", 9
2171 ))
2172 else:
2173 self.f = open(filename, append and "ab" or "wb", bufsz)
2174 else:
2175 self.f = filename
2176 self.filename = getattr(filename, "name", "No name")
2178 def _write_header(self, pkt):
2179 # type: (Optional[Union[Packet, bytes]]) -> None
2180 self.header_present = True
2182 if self.append:
2183 # Even if prone to race conditions, this seems to be
2184 # safest way to tell whether the header is already present
2185 # because we have to handle compressed streams that
2186 # are not as flexible as basic files
2187 if self.gz:
2188 g = gzip.open(self.filename, "rb") # type: _ByteStream
2189 else:
2190 g = open(self.filename, "rb")
2191 try:
2192 if g.read(16):
2193 return
2194 finally:
2195 g.close()
2197 if not hasattr(self, 'linktype'):
2198 raise ValueError(
2199 "linktype could not be guessed. "
2200 "Please pass a linktype while creating the writer"
2201 )
2203 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501
2204 2, 4, 0, 0, self.snaplen, self.linktype))
2205 self.f.flush()
2207 def _write_packet(self,
2208 packet, # type: Union[bytes, Packet]
2209 linktype, # type: int
2210 sec=None, # type: Optional[float]
2211 usec=None, # type: Optional[int]
2212 caplen=None, # type: Optional[int]
2213 wirelen=None, # type: Optional[int]
2214 comment=None, # type: Optional[bytes]
2215 ifname=None, # type: Optional[bytes]
2216 direction=None, # type: Optional[int]
2217 ):
2218 # type: (...) -> None
2219 """
2220 Writes a single packet to the pcap file.
2222 :param packet: bytes for a single packet
2223 :type packet: bytes
2224 :param linktype: linktype value associated with the packet
2225 :type linktype: int
2226 :param sec: time the packet was captured, in seconds since epoch. If
2227 not supplied, defaults to now.
2228 :type sec: float
2229 :param usec: not used with pcapng
2230 packet was captured
2231 :type usec: int or long
2232 :param caplen: The length of the packet in the capture file. If not
2233 specified, uses ``len(packet)``.
2234 :type caplen: int
2235 :param wirelen: The length of the packet on the wire. If not
2236 specified, uses ``caplen``.
2237 :type wirelen: int
2238 :return: None
2239 :rtype: None
2240 """
2241 if caplen is None:
2242 caplen = len(packet)
2243 if wirelen is None:
2244 wirelen = caplen
2245 if sec is None or usec is None:
2246 t = time.time()
2247 it = int(t)
2248 if sec is None:
2249 sec = it
2250 usec = int(round((t - it) *
2251 (1000000000 if self.nano else 1000000)))
2252 elif usec is None:
2253 usec = 0
2255 self.f.write(struct.pack(self.endian + "IIII",
2256 int(sec), usec, caplen, wirelen))
2257 self.f.write(bytes(packet))
2258 if self.sync:
2259 self.f.flush()
2262class RawPcapNgWriter(GenericRawPcapWriter):
2263 """A stream pcapng writer with more control than wrpcapng()"""
2265 def __init__(self,
2266 filename, # type: str
2267 ):
2268 # type: (...) -> None
2270 self.header_present = False
2271 self.tsresol = 1000000
2272 # A dict to keep if_name to IDB id mapping.
2273 # unknown if_name(None) id=0
2274 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0}
2276 # tcpdump only support little-endian in PCAPng files
2277 self.endian = "<"
2278 self.endian_magic = b"\x4d\x3c\x2b\x1a"
2280 self.filename = filename
2281 self.f = open(filename, "wb", 4096)
2283 def _get_time(self,
2284 packet, # type: Union[bytes, Packet]
2285 sec, # type: Optional[float]
2286 usec # type: Optional[int]
2287 ):
2288 # type: (...) -> Tuple[float, int]
2289 if hasattr(packet, "time"):
2290 if sec is None:
2291 sec = float(packet.time)
2293 if usec is None:
2294 usec = 0
2296 return sec, usec # type: ignore
2298 def _add_padding(self, raw_data):
2299 # type: (bytes) -> bytes
2300 raw_data += ((-len(raw_data)) % 4) * b"\x00"
2301 return raw_data
2303 def build_block(self, block_type, block_body, options=None):
2304 # type: (bytes, bytes, Optional[bytes]) -> bytes
2306 # Pad Block Body to 32 bits
2307 block_body = self._add_padding(block_body)
2309 if options:
2310 block_body += options
2312 # An empty block is 12 bytes long
2313 block_total_length = 12 + len(block_body)
2315 # Block Type
2316 block = block_type
2317 # Block Total Length$
2318 block += struct.pack(self.endian + "I", block_total_length)
2319 # Block Body
2320 block += block_body
2321 # Block Total Length$
2322 block += struct.pack(self.endian + "I", block_total_length)
2324 return block
2326 def _write_header(self, pkt):
2327 # type: (Optional[Union[Packet, bytes]]) -> None
2328 if not self.header_present:
2329 self.header_present = True
2330 self._write_block_shb()
2331 self._write_block_idb(linktype=self.linktype)
2333 def _write_block_shb(self):
2334 # type: () -> None
2336 # Block Type
2337 block_type = b"\x0A\x0D\x0D\x0A"
2338 # Byte-Order Magic
2339 block_shb = self.endian_magic
2340 # Major Version
2341 block_shb += struct.pack(self.endian + "H", 1)
2342 # Minor Version
2343 block_shb += struct.pack(self.endian + "H", 0)
2344 # Section Length
2345 block_shb += struct.pack(self.endian + "q", -1)
2347 self.f.write(self.build_block(block_type, block_shb))
2349 def _write_block_idb(self,
2350 linktype, # type: int
2351 ifname=None # type: Optional[bytes]
2352 ):
2353 # type: (...) -> None
2355 # Block Type
2356 block_type = struct.pack(self.endian + "I", 1)
2357 # LinkType
2358 block_idb = struct.pack(self.endian + "H", linktype)
2359 # Reserved
2360 block_idb += struct.pack(self.endian + "H", 0)
2361 # SnapLen
2362 block_idb += struct.pack(self.endian + "I", 262144)
2364 # if_name option
2365 opts = None
2366 if ifname is not None:
2367 opts = struct.pack(self.endian + "HH", 2, len(ifname))
2368 # Pad Option Value to 32 bits
2369 opts += self._add_padding(ifname)
2370 opts += struct.pack(self.endian + "HH", 0, 0)
2372 self.f.write(self.build_block(block_type, block_idb, options=opts))
2374 def _write_block_spb(self, raw_pkt):
2375 # type: (bytes) -> None
2377 # Block Type
2378 block_type = struct.pack(self.endian + "I", 3)
2379 # Original Packet Length
2380 block_spb = struct.pack(self.endian + "I", len(raw_pkt))
2381 # Packet Data
2382 block_spb += raw_pkt
2384 self.f.write(self.build_block(block_type, block_spb))
2386 def _write_block_epb(self,
2387 raw_pkt, # type: bytes
2388 ifid, # type: int
2389 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501
2390 caplen=None, # type: Optional[int]
2391 orglen=None, # type: Optional[int]
2392 comment=None, # type: Optional[bytes]
2393 flags=None, # type: Optional[int]
2394 ):
2395 # type: (...) -> None
2397 if timestamp:
2398 tmp_ts = int(timestamp * self.tsresol)
2399 ts_high = tmp_ts >> 32
2400 ts_low = tmp_ts & 0xFFFFFFFF
2401 else:
2402 ts_high = ts_low = 0
2404 if not caplen:
2405 caplen = len(raw_pkt)
2407 if not orglen:
2408 orglen = len(raw_pkt)
2410 # Block Type
2411 block_type = struct.pack(self.endian + "I", 6)
2412 # Interface ID
2413 block_epb = struct.pack(self.endian + "I", ifid)
2414 # Timestamp (High)
2415 block_epb += struct.pack(self.endian + "I", ts_high)
2416 # Timestamp (Low)
2417 block_epb += struct.pack(self.endian + "I", ts_low)
2418 # Captured Packet Length
2419 block_epb += struct.pack(self.endian + "I", caplen)
2420 # Original Packet Length
2421 block_epb += struct.pack(self.endian + "I", orglen)
2422 # Packet Data
2423 block_epb += raw_pkt
2425 # Options
2426 opts = b''
2427 if comment is not None:
2428 comment = bytes_encode(comment)
2429 opts += struct.pack(self.endian + "HH", 1, len(comment))
2430 # Pad Option Value to 32 bits
2431 opts += self._add_padding(comment)
2432 if type(flags) == int:
2433 opts += struct.pack(self.endian + "HH", 2, 4)
2434 opts += struct.pack(self.endian + "I", flags)
2435 if opts:
2436 opts += struct.pack(self.endian + "HH", 0, 0)
2438 self.f.write(self.build_block(block_type, block_epb,
2439 options=opts))
2441 def _write_packet(self, # type: ignore
2442 packet, # type: bytes
2443 linktype, # type: int
2444 sec=None, # type: Optional[float]
2445 usec=None, # type: Optional[int]
2446 caplen=None, # type: Optional[int]
2447 wirelen=None, # type: Optional[int]
2448 comment=None, # type: Optional[bytes]
2449 ifname=None, # type: Optional[bytes]
2450 direction=None, # type: Optional[int]
2451 ):
2452 # type: (...) -> None
2453 """
2454 Writes a single packet to the pcap file.
2456 :param packet: bytes for a single packet
2457 :type packet: bytes
2458 :param linktype: linktype value associated with the packet
2459 :type linktype: int
2460 :param sec: time the packet was captured, in seconds since epoch. If
2461 not supplied, defaults to now.
2462 :type sec: float
2463 :param caplen: The length of the packet in the capture file. If not
2464 specified, uses ``len(packet)``.
2465 :type caplen: int
2466 :param wirelen: The length of the packet on the wire. If not
2467 specified, uses ``caplen``.
2468 :type wirelen: int
2469 :param comment: UTF-8 string containing human-readable comment text
2470 that is associated to the current block. Line separators
2471 SHOULD be a carriage-return + linefeed ('\r\n') or
2472 just linefeed ('\n'); either form may appear and
2473 be considered a line separator. The string is not
2474 zero-terminated.
2475 :type bytes
2476 :param ifname: UTF-8 string containing the
2477 name of the device used to capture data.
2478 The string is not zero-terminated.
2479 :type bytes
2480 :param direction: 0 = information not available,
2481 1 = inbound,
2482 2 = outbound
2483 :type int
2484 :return: None
2485 :rtype: None
2486 """
2487 if caplen is None:
2488 caplen = len(packet)
2489 if wirelen is None:
2490 wirelen = caplen
2492 ifid = self.interfaces2id.get(ifname, None)
2493 if ifid is None:
2494 ifid = max(self.interfaces2id.values()) + 1
2495 self.interfaces2id[ifname] = ifid
2496 self._write_block_idb(linktype=linktype, ifname=ifname)
2498 # EPB flags (32 bits).
2499 # currently only direction is implemented (least 2 significant bits)
2500 if type(direction) == int:
2501 flags = direction & 0x3
2502 else:
2503 flags = None
2505 self._write_block_epb(packet, timestamp=sec, caplen=caplen,
2506 orglen=wirelen, comment=comment, ifid=ifid, flags=flags)
2507 if self.sync:
2508 self.f.flush()
2511class PcapWriter(RawPcapWriter):
2512 """A stream PCAP writer with more control than wrpcap()"""
2513 pass
2516class PcapNgWriter(RawPcapNgWriter):
2517 """A stream pcapng writer with more control than wrpcapng()"""
2519 def _get_time(self,
2520 packet, # type: Union[bytes, Packet]
2521 sec, # type: Optional[float]
2522 usec # type: Optional[int]
2523 ):
2524 # type: (...) -> Tuple[float, int]
2525 if hasattr(packet, "time"):
2526 if sec is None:
2527 sec = float(packet.time)
2529 if usec is None:
2530 usec = 0
2532 return sec, usec # type: ignore
2535@conf.commands.register
2536def rderf(filename, count=-1):
2537 # type: (Union[IO[bytes], str], int) -> PacketList
2538 """Read a ERF file and return a packet list
2540 :param count: read only <count> packets
2541 """
2542 with ERFEthernetReader(filename) as fdesc:
2543 return fdesc.read_all(count=count)
2546class ERFEthernetReader_metaclass(PcapReader_metaclass):
2547 def __call__(cls, filename):
2548 # type: (Union[IO[bytes], str]) -> Any
2549 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore
2550 filename, fdesc = cls.open(filename)
2551 try:
2552 i.__init__(filename, fdesc)
2553 return i
2554 except (Scapy_Exception, EOFError):
2555 pass
2557 if "alternative" in cls.__dict__:
2558 cls = cls.__dict__["alternative"]
2559 i = cls.__new__(
2560 cls,
2561 cls.__name__,
2562 cls.__bases__,
2563 cls.__dict__ # type: ignore
2564 )
2565 try:
2566 i.__init__(filename, fdesc)
2567 return i
2568 except (Scapy_Exception, EOFError):
2569 pass
2571 raise Scapy_Exception("Not a supported capture file")
2573 @staticmethod
2574 def open(fname # type: ignore
2575 ):
2576 # type: (...) -> Tuple[str, _ByteStream]
2577 """Open (if necessary) filename"""
2578 if isinstance(fname, str):
2579 filename = fname
2580 try:
2581 with gzip.open(filename, "rb") as tmp:
2582 tmp.read(1)
2583 fdesc = gzip.open(filename, "rb") # type: _ByteStream
2584 except IOError:
2585 fdesc = open(filename, "rb")
2587 else:
2588 fdesc = fname
2589 filename = getattr(fdesc, "name", "No name")
2590 return filename, fdesc
2593class ERFEthernetReader(PcapReader,
2594 metaclass=ERFEthernetReader_metaclass):
2596 def __init__(self, filename, fdesc=None): # type: ignore
2597 # type: (Union[IO[bytes], str], IO[bytes]) -> None
2598 self.filename = filename # type: ignore
2599 self.f = fdesc
2600 self.power = Decimal(10) ** Decimal(-9)
2602 # time is in 64-bits Endace's format which can be see here:
2603 # https://www.endace.com/erf-extensible-record-format-types.pdf
2604 def _convert_erf_timestamp(self, t):
2605 # type: (int) -> EDecimal
2606 sec = t >> 32
2607 frac_sec = t & 0xffffffff
2608 frac_sec *= 10**9
2609 frac_sec += (frac_sec & 0x80000000) << 1
2610 frac_sec >>= 32
2611 return EDecimal(sec + self.power * frac_sec)
2613 # The details of ERF Packet format can be see here:
2614 # https://www.endace.com/erf-extensible-record-format-types.pdf
2615 def read_packet(self, size=MTU, **kwargs):
2616 # type: (int, **Any) -> Packet
2618 # General ERF Header have exactly 16 bytes
2619 hdr = self.f.read(16)
2620 if len(hdr) < 16:
2621 raise EOFError
2623 # The timestamp is in little-endian byte-order.
2624 time = struct.unpack('<Q', hdr[:8])[0]
2625 # The rest is in big-endian byte-order.
2626 # Ignoring flags and lctr (loss counter) since they are ERF specific
2627 # header fields which Packet object does not support.
2628 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:])
2629 # Check if the type != 0x02, type Ethernet
2630 if type & 0x02 == 0:
2631 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)")
2633 # If there are extended headers, ignore it because Packet object does
2634 # not support it. Extended headers size is 8 bytes before the payload.
2635 if type & 0x80:
2636 _ = self.f.read(8)
2637 s = self.f.read(rlen - 24)
2638 else:
2639 s = self.f.read(rlen - 16)
2641 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both
2642 # of the fields are disregarded by Endace.
2643 pb = s[2:size]
2644 from scapy.layers.l2 import Ether
2645 try:
2646 p = Ether(pb, **kwargs) # type: Packet
2647 except KeyboardInterrupt:
2648 raise
2649 except Exception:
2650 if conf.debug_dissector:
2651 from scapy.sendrecv import debug
2652 debug.crashed_on = (Ether, s)
2653 raise
2654 if conf.raw_layer is None:
2655 # conf.raw_layer is set on import
2656 import scapy.packet # noqa: F401
2657 p = conf.raw_layer(s)
2659 p.time = self._convert_erf_timestamp(time)
2660 p.wirelen = wlen
2662 return p
2665@conf.commands.register
2666def wrerf(filename, # type: Union[IO[bytes], str]
2667 pkt, # type: _PacketIterable
2668 *args, # type: Any
2669 **kargs # type: Any
2670 ):
2671 # type: (...) -> None
2672 """Write a list of packets to a ERF file
2674 :param filename: the name of the file to write packets to, or an open,
2675 writable file-like object. The file descriptor will be
2676 closed at the end of the call, so do not use an object you
2677 do not want to close (e.g., running wrerf(sys.stdout, [])
2678 in interactive mode will crash Scapy).
2679 :param gz: set to 1 to save a gzipped capture
2680 :param append: append packets to the capture file instead of
2681 truncating it
2682 :param sync: do not bufferize writes to the capture file
2683 """
2684 with ERFEthernetWriter(filename, *args, **kargs) as fdesc:
2685 fdesc.write(pkt)
2688class ERFEthernetWriter(PcapWriter):
2689 """A stream ERF Ethernet writer with more control than wrerf()"""
2691 def __init__(self,
2692 filename, # type: Union[IO[bytes], str]
2693 gz=False, # type: bool
2694 append=False, # type: bool
2695 sync=False, # type: bool
2696 ):
2697 # type: (...) -> None
2698 """
2699 :param filename: the name of the file to write packets to, or an open,
2700 writable file-like object.
2701 :param gz: compress the capture on the fly
2702 :param append: append packets to the capture file instead of
2703 truncating it
2704 :param sync: do not bufferize writes to the capture file
2705 """
2706 super(ERFEthernetWriter, self).__init__(filename,
2707 gz=gz,
2708 append=append,
2709 sync=sync)
2711 def write(self, pkt): # type: ignore
2712 # type: (_PacketIterable) -> None
2713 """
2714 Writes a Packet, a SndRcvList object, or bytes to a ERF file.
2716 :param pkt: Packet(s) to write (one record for each Packet)
2717 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet
2718 """
2719 # Import here to avoid circular dependency
2720 from scapy.supersocket import IterSocket
2721 for p in IterSocket(pkt).iter:
2722 self.write_packet(p)
2724 def write_packet(self, pkt): # type: ignore
2725 # type: (Packet) -> None
2727 if hasattr(pkt, "time"):
2728 sec = int(pkt.time)
2729 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9)
2730 t = (sec << 32) + usec
2731 else:
2732 t = int(time.time()) << 32
2734 # There are 16 bytes of headers + 2 bytes of padding before the packets
2735 # payload.
2736 rlen = len(pkt) + 18
2738 if hasattr(pkt, "wirelen"):
2739 wirelen = pkt.wirelen
2740 if wirelen is None:
2741 wirelen = rlen
2743 self.f.write(struct.pack("<Q", t))
2744 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0))
2745 self.f.write(bytes(pkt))
2746 self.f.flush()
2748 def close(self):
2749 # type: () -> Optional[Any]
2750 return self.f.close()
2753@conf.commands.register
2754def import_hexcap(input_string=None):
2755 # type: (Optional[str]) -> bytes
2756 """Imports a tcpdump like hexadecimal view
2758 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex"
2760 :param input_string: String containing the hexdump input to parse. If None,
2761 read from standard input.
2762 """
2763 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501
2764 p = ""
2765 try:
2766 if input_string:
2767 input_function = StringIO(input_string).readline
2768 else:
2769 input_function = input
2770 while True:
2771 line = input_function().strip()
2772 if not line:
2773 break
2774 try:
2775 p += re_extract_hexcap.match(line).groups()[2] # type: ignore
2776 except Exception:
2777 warning("Parsing error during hexcap")
2778 continue
2779 except EOFError:
2780 pass
2782 p = p.replace(" ", "")
2783 return hex_bytes(p)
2786@conf.commands.register
2787def wireshark(pktlist, wait=False, **kwargs):
2788 # type: (List[Packet], bool, **Any) -> Optional[Any]
2789 """
2790 Runs Wireshark on a list of packets.
2792 See :func:`tcpdump` for more parameter description.
2794 Note: this defaults to wait=False, to run Wireshark in the background.
2795 """
2796 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs)
2799@conf.commands.register
2800def tdecode(
2801 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable]
2802 args=None, # type: Optional[List[str]]
2803 **kwargs # type: Any
2804):
2805 # type: (...) -> Any
2806 """
2807 Run tshark on a list of packets.
2809 :param args: If not specified, defaults to ``tshark -V``.
2811 See :func:`tcpdump` for more parameters.
2812 """
2813 if args is None:
2814 args = ["-V"]
2815 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs)
2818def _guess_linktype_name(value):
2819 # type: (int) -> str
2820 """Guess the DLT name from its value."""
2821 from scapy.libs.winpcapy import pcap_datalink_val_to_name
2822 return cast(bytes, pcap_datalink_val_to_name(value)).decode()
2825def _guess_linktype_value(name):
2826 # type: (str) -> int
2827 """Guess the value of a DLT name."""
2828 from scapy.libs.winpcapy import pcap_datalink_name_to_val
2829 val = cast(int, pcap_datalink_name_to_val(name.encode()))
2830 if val == -1:
2831 warning("Unknown linktype: %s. Using EN10MB", name)
2832 return DLT_EN10MB
2833 return val
2836@conf.commands.register
2837def tcpdump(
2838 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable]
2839 dump=False, # type: bool
2840 getfd=False, # type: bool
2841 args=None, # type: Optional[List[str]]
2842 flt=None, # type: Optional[str]
2843 prog=None, # type: Optional[Any]
2844 getproc=False, # type: bool
2845 quiet=False, # type: bool
2846 use_tempfile=None, # type: Optional[Any]
2847 read_stdin_opts=None, # type: Optional[Any]
2848 linktype=None, # type: Optional[Any]
2849 wait=True, # type: bool
2850 _suppress=False # type: bool
2851):
2852 # type: (...) -> Any
2853 """Run tcpdump or tshark on a list of packets.
2855 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a
2856 temporary file to store the packets. This works around a bug in Apple's
2857 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/
2859 Otherwise, the packets are passed in stdin.
2861 This function can be explicitly enabled or disabled with the
2862 ``use_tempfile`` parameter.
2864 When using ``wireshark``, it will be called with ``-ki -`` to start
2865 immediately capturing packets from stdin.
2867 Otherwise, the command will be run with ``-r -`` (which is correct for
2868 ``tcpdump`` and ``tshark``).
2870 This can be overridden with ``read_stdin_opts``. This has no effect when
2871 ``use_tempfile=True``, or otherwise reading packets from a regular file.
2873 :param pktlist: a Packet instance, a PacketList instance or a list of
2874 Packet instances. Can also be a filename (as a string), an open
2875 file-like object that must be a file format readable by
2876 tshark (Pcap, PcapNg, etc.) or None (to sniff)
2877 :param flt: a filter to use with tcpdump
2878 :param dump: when set to True, returns a string instead of displaying it.
2879 :param getfd: when set to True, returns a file-like object to read data
2880 from tcpdump or tshark from.
2881 :param getproc: when set to True, the subprocess.Popen object is returned
2882 :param args: arguments (as a list) to pass to tshark (example for tshark:
2883 args=["-T", "json"]).
2884 :param prog: program to use (defaults to tcpdump, will work with tshark)
2885 :param quiet: when set to True, the process stderr is discarded
2886 :param use_tempfile: When set to True, always use a temporary file to store
2887 packets.
2888 When set to False, pipe packets through stdin.
2889 When set to None (default), only use a temporary file with
2890 ``tcpdump`` on OSX.
2891 :param read_stdin_opts: When set, a list of arguments needed to capture
2892 from stdin. Otherwise, attempts to guess.
2893 :param linktype: A custom DLT value or name, to overwrite the default
2894 values.
2895 :param wait: If True (default), waits for the process to terminate before
2896 returning to Scapy. If False, the process will be detached to the
2897 background. If dump, getproc or getfd is True, these have the same
2898 effect as ``wait=False``.
2900 Examples::
2902 >>> tcpdump([IP()/TCP(), IP()/UDP()])
2903 reading from file -, link-type RAW (Raw IP)
2904 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501
2905 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]
2907 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
2908 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501
2909 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0
2911 To get a JSON representation of a tshark-parsed PacketList(), one can::
2913 >>> import json, pprint
2914 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5",
2915 ... dst="45.33.32.156"),
2916 ... prog=conf.prog.tshark,
2917 ... args=["-T", "json"],
2918 ... getfd=True))
2919 >>> pprint.pprint(json_data)
2920 [{u'_index': u'packets-2016-12-23',
2921 u'_score': None,
2922 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
2923 u'frame.encap_type': u'7',
2924 [...]
2925 },
2926 u'ip': {u'ip.addr': u'45.33.32.156',
2927 u'ip.checksum': u'0x0000a20d',
2928 [...]
2929 u'ip.ttl': u'64',
2930 u'ip.version': u'4'},
2931 u'raw': u'Raw packet data'}},
2932 u'_type': u'pcap_file'}]
2933 >>> json_data[0]['_source']['layers']['ip']['ip.ttl']
2934 u'64'
2935 """
2936 getfd = getfd or getproc
2937 if prog is None:
2938 if not conf.prog.tcpdump:
2939 raise Scapy_Exception(
2940 "tcpdump is not available"
2941 )
2942 prog = [conf.prog.tcpdump]
2943 elif isinstance(prog, str):
2944 prog = [prog]
2945 else:
2946 raise ValueError("prog must be a string")
2948 if linktype is not None:
2949 if isinstance(linktype, int):
2950 # Guess name from value
2951 try:
2952 linktype_name = _guess_linktype_name(linktype)
2953 except StopIteration:
2954 linktype = -1
2955 else:
2956 # Guess value from name
2957 if linktype.startswith("DLT_"):
2958 linktype = linktype[4:]
2959 linktype_name = linktype
2960 try:
2961 linktype = _guess_linktype_value(linktype)
2962 except KeyError:
2963 linktype = -1
2964 if linktype == -1:
2965 raise ValueError(
2966 "Unknown linktype. Try passing its datalink name instead"
2967 )
2968 prog += ["-y", linktype_name]
2970 # Build Popen arguments
2971 if args is None:
2972 args = []
2973 else:
2974 # Make a copy of args
2975 args = list(args)
2977 if flt is not None:
2978 # Check the validity of the filter
2979 if linktype is None and isinstance(pktlist, str):
2980 # linktype is unknown but required. Read it from file
2981 with PcapReader(pktlist) as rd:
2982 if isinstance(rd, PcapNgReader):
2983 # Get the linktype from the first packet
2984 try:
2985 _, metadata = rd._read_packet()
2986 linktype = metadata.linktype
2987 if OPENBSD and linktype == 228:
2988 linktype = DLT_RAW
2989 except EOFError:
2990 raise ValueError(
2991 "Cannot get linktype from a PcapNg packet."
2992 )
2993 else:
2994 linktype = rd.linktype
2995 from scapy.arch.common import compile_filter
2996 compile_filter(flt, linktype=linktype)
2997 args.append(flt)
2999 stdout = subprocess.PIPE if dump or getfd else None
3000 stderr = open(os.devnull) if quiet else None
3001 proc = None
3003 if use_tempfile is None:
3004 # Apple's tcpdump cannot read from stdin, see:
3005 # http://apple.stackexchange.com/questions/152682/
3006 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump
3008 if read_stdin_opts is None:
3009 if prog[0] == conf.prog.wireshark:
3010 # Start capturing immediately (-k) from stdin (-i -)
3011 read_stdin_opts = ["-ki", "-"]
3012 elif prog[0] == conf.prog.tcpdump and not OPENBSD:
3013 # Capture in packet-buffered mode (-U) from stdin (-r -)
3014 read_stdin_opts = ["-U", "-r", "-"]
3015 else:
3016 read_stdin_opts = ["-r", "-"]
3017 else:
3018 # Make a copy of read_stdin_opts
3019 read_stdin_opts = list(read_stdin_opts)
3021 if pktlist is None:
3022 # sniff
3023 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3024 proc = subprocess.Popen(
3025 prog + args,
3026 stdout=stdout,
3027 stderr=stderr,
3028 )
3029 elif isinstance(pktlist, str):
3030 # file
3031 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3032 proc = subprocess.Popen(
3033 prog + ["-r", pktlist] + args,
3034 stdout=stdout,
3035 stderr=stderr,
3036 )
3037 elif use_tempfile:
3038 tmpfile = get_temp_file( # type: ignore
3039 autoext=".pcap",
3040 fd=True
3041 ) # type: IO[bytes]
3042 try:
3043 tmpfile.writelines(
3044 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3045 )
3046 except AttributeError:
3047 pktlist = cast("_PacketIterable", pktlist)
3048 wrpcap(tmpfile, pktlist, linktype=linktype)
3049 else:
3050 tmpfile.close()
3051 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3052 proc = subprocess.Popen(
3053 prog + ["-r", tmpfile.name] + args,
3054 stdout=stdout,
3055 stderr=stderr,
3056 )
3057 else:
3058 try:
3059 pktlist.fileno() # type: ignore
3060 # pass the packet stream
3061 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3062 proc = subprocess.Popen(
3063 prog + read_stdin_opts + args,
3064 stdin=pktlist, # type: ignore
3065 stdout=stdout,
3066 stderr=stderr,
3067 )
3068 except (AttributeError, ValueError):
3069 # write the packet stream to stdin
3070 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3071 proc = subprocess.Popen(
3072 prog + read_stdin_opts + args,
3073 stdin=subprocess.PIPE,
3074 stdout=stdout,
3075 stderr=stderr,
3076 )
3077 if proc is None:
3078 # An error has occurred
3079 return
3080 try:
3081 proc.stdin.writelines( # type: ignore
3082 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3083 )
3084 except AttributeError:
3085 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore
3086 except UnboundLocalError:
3087 # The error was handled by ContextManagerSubprocess
3088 pass
3089 else:
3090 proc.stdin.close() # type: ignore
3091 if proc is None:
3092 # An error has occurred
3093 return
3094 if dump:
3095 data = b"".join(
3096 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore
3097 )
3098 proc.terminate()
3099 return data
3100 if getproc:
3101 return proc
3102 if getfd:
3103 return proc.stdout
3104 if wait:
3105 proc.wait()
3108@conf.commands.register
3109def hexedit(pktlist):
3110 # type: (_PacketIterable) -> PacketList
3111 """Run hexedit on a list of packets, then return the edited packets."""
3112 f = get_temp_file()
3113 wrpcap(f, pktlist)
3114 with ContextManagerSubprocess(conf.prog.hexedit):
3115 subprocess.call([conf.prog.hexedit, f])
3116 rpktlist = rdpcap(f)
3117 os.unlink(f)
3118 return rpktlist
3121def get_terminal_width():
3122 # type: () -> Optional[int]
3123 """Get terminal width (number of characters) if in a window.
3125 Notice: this will try several methods in order to
3126 support as many terminals and OS as possible.
3127 """
3128 sizex = shutil.get_terminal_size(fallback=(0, 0))[0]
3129 if sizex != 0:
3130 return sizex
3131 # Backups
3132 if WINDOWS:
3133 from ctypes import windll, create_string_buffer
3134 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
3135 h = windll.kernel32.GetStdHandle(-12)
3136 csbi = create_string_buffer(22)
3137 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
3138 if res:
3139 (bufx, bufy, curx, cury, wattr,
3140 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501
3141 sizex = right - left + 1
3142 # sizey = bottom - top + 1
3143 return sizex
3144 return sizex
3145 # We have various methods
3146 # COLUMNS is set on some terminals
3147 try:
3148 sizex = int(os.environ['COLUMNS'])
3149 except Exception:
3150 pass
3151 if sizex:
3152 return sizex
3153 # We can query TIOCGWINSZ
3154 try:
3155 import fcntl
3156 import termios
3157 s = struct.pack('HHHH', 0, 0, 0, 0)
3158 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
3159 sizex = struct.unpack('HHHH', x)[1]
3160 except (IOError, ModuleNotFoundError):
3161 # If everything failed, return default terminal size
3162 sizex = 79
3163 return sizex
3166def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]]
3167 header, # type: List[Tuple[str, ...]]
3168 sortBy=0, # type: Optional[int]
3169 borders=False, # type: bool
3170 ):
3171 # type: (...) -> str
3172 """
3173 Pretty list to fit the terminal, and add header.
3175 :param rtlst: a list of tuples. each tuple contains a value which can
3176 be either a string or a list of string.
3177 :param sortBy: the column id (starting with 0) which will be used for
3178 ordering
3179 :param borders: whether to put borders on the table or not
3180 """
3181 if borders:
3182 _space = "|"
3183 else:
3184 _space = " "
3185 cols = len(header[0])
3186 # Windows has a fat terminal border
3187 _spacelen = len(_space) * (cols - 1) + int(WINDOWS)
3188 _croped = False
3189 if sortBy is not None:
3190 # Sort correctly
3191 rtlst.sort(key=lambda x: x[sortBy])
3192 # Resolve multi-values
3193 for i, line in enumerate(rtlst):
3194 ids = [] # type: List[int]
3195 values = [] # type: List[Union[str, List[str]]]
3196 for j, val in enumerate(line):
3197 if isinstance(val, list):
3198 ids.append(j)
3199 values.append(val or " ")
3200 if values:
3201 del rtlst[i]
3202 k = 0
3203 for ex_vals in zip_longest(*values, fillvalue=" "):
3204 if k:
3205 extra_line = [" "] * cols
3206 else:
3207 extra_line = list(line) # type: ignore
3208 for j, h in enumerate(ids):
3209 extra_line[h] = ex_vals[j]
3210 rtlst.insert(i + k, tuple(extra_line))
3211 k += 1
3212 rtslst = cast(List[Tuple[str, ...]], rtlst)
3213 # Append tag
3214 rtslst = header + rtslst
3215 # Detect column's width
3216 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)]
3217 # Make text fit in box (if required)
3218 width = get_terminal_width()
3219 if conf.auto_crop_tables and width:
3220 width = width - _spacelen
3221 while sum(colwidth) > width:
3222 _croped = True
3223 # Needs to be cropped
3224 # Get the longest row
3225 i = colwidth.index(max(colwidth))
3226 # Get all elements of this row
3227 row = [len(x[i]) for x in rtslst]
3228 # Get biggest element of this row: biggest of the array
3229 j = row.index(max(row))
3230 # Re-build column tuple with the edited element
3231 t = list(rtslst[j])
3232 t[i] = t[i][:-2] + "_"
3233 rtslst[j] = tuple(t)
3234 # Update max size
3235 row[j] = len(t[i])
3236 colwidth[i] = max(row)
3237 if _croped:
3238 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501
3239 # Generate padding scheme
3240 fmt = _space.join(["%%-%ds" % x for x in colwidth])
3241 # Append separation line if needed
3242 if borders:
3243 rtslst.insert(1, tuple("-" * x for x in colwidth))
3244 # Compile
3245 return "\n".join(fmt % x for x in rtslst)
3248def human_size(x, fmt=".1f"):
3249 # type: (int, str) -> str
3250 """
3251 Convert a size in octets to a human string representation
3252 """
3253 units = ['K', 'M', 'G', 'T', 'P', 'E']
3254 if not x:
3255 return "0B"
3256 i = int(math.log(x, 2**10))
3257 if i and i < len(units):
3258 return format(x / 2**(10 * i), fmt) + units[i - 1]
3259 return str(x) + "B"
3262def __make_table(
3263 yfmtfunc, # type: Callable[[int], str]
3264 fmtfunc, # type: Callable[[int], str]
3265 endline, # type: str
3266 data, # type: List[Tuple[Packet, Packet]]
3267 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]]
3268 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3269 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3270 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]]
3271 dump=False # type: bool
3272):
3273 # type: (...) -> Optional[str]
3274 """Core function of the make_table suite, which generates the table"""
3275 vx = {} # type: Dict[str, int]
3276 vy = {} # type: Dict[str, Optional[int]]
3277 vz = {} # type: Dict[Tuple[str, str], str]
3278 vxf = {} # type: Dict[str, str]
3280 tmp_len = 0
3281 for e in data:
3282 xx, yy, zz = [str(s) for s in fxyz(*e)]
3283 tmp_len = max(len(yy), tmp_len)
3284 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz))
3285 vy[yy] = None
3286 vz[(xx, yy)] = zz
3288 vxk = list(vx)
3289 vyk = list(vy)
3290 if sortx:
3291 vxk.sort(key=sortx)
3292 else:
3293 try:
3294 vxk.sort(key=int)
3295 except Exception:
3296 try:
3297 vxk.sort(key=atol)
3298 except Exception:
3299 vxk.sort()
3300 if sorty:
3301 vyk.sort(key=sorty)
3302 else:
3303 try:
3304 vyk.sort(key=int)
3305 except Exception:
3306 try:
3307 vyk.sort(key=atol)
3308 except Exception:
3309 vyk.sort()
3311 s = ""
3312 if seplinefunc:
3313 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk])
3314 s += sepline + "\n"
3316 fmt = yfmtfunc(tmp_len)
3317 s += fmt % ""
3318 s += ' '
3319 for x in vxk:
3320 vxf[x] = fmtfunc(vx[x])
3321 s += vxf[x] % x
3322 s += ' '
3323 s += endline + "\n"
3324 if seplinefunc:
3325 s += sepline + "\n"
3326 for y in vyk:
3327 s += fmt % y
3328 s += ' '
3329 for x in vxk:
3330 s += vxf[x] % vz.get((x, y), "-")
3331 s += ' '
3332 s += endline + "\n"
3333 if seplinefunc:
3334 s += sepline + "\n"
3336 if dump:
3337 return s
3338 else:
3339 print(s, end="")
3340 return None
3343def make_table(*args, **kargs):
3344 # type: (*Any, **Any) -> Optional[Any]
3345 return __make_table(
3346 lambda l: "%%-%is" % l,
3347 lambda l: "%%-%is" % l,
3348 "",
3349 *args,
3350 **kargs
3351 )
3354def make_lined_table(*args, **kargs):
3355 # type: (*Any, **Any) -> Optional[str]
3356 return __make_table( # type: ignore
3357 lambda l: "%%-%is |" % l,
3358 lambda l: "%%-%is |" % l,
3359 "",
3360 *args,
3361 seplinefunc=lambda a, x: "+".join(
3362 '-' * (y + 2) for y in [a - 1] + x + [-2]
3363 ),
3364 **kargs
3365 )
3368def make_tex_table(*args, **kargs):
3369 # type: (*Any, **Any) -> Optional[str]
3370 return __make_table( # type: ignore
3371 lambda l: "%s",
3372 lambda l: "& %s",
3373 "\\\\",
3374 *args,
3375 seplinefunc=lambda a, x: "\\hline",
3376 **kargs
3377 )
3379####################
3380# WHOIS CLIENT #
3381####################
3384def whois(ip_address):
3385 # type: (str) -> bytes
3386 """Whois client for Python"""
3387 whois_ip = str(ip_address)
3388 try:
3389 query = socket.gethostbyname(whois_ip)
3390 except Exception:
3391 query = whois_ip
3392 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3393 s.connect(("whois.ripe.net", 43))
3394 s.send(query.encode("utf8") + b"\r\n")
3395 answer = b""
3396 while True:
3397 d = s.recv(4096)
3398 answer += d
3399 if not d:
3400 break
3401 s.close()
3402 ignore_tag = b"remarks:"
3403 # ignore all lines starting with the ignore_tag
3404 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501
3405 # remove empty lines at the bottom
3406 for i in range(1, len(lines)):
3407 if not lines[-i].strip():
3408 del lines[-i]
3409 else:
3410 break
3411 return b"\n".join(lines[3:])
3413####################
3414# CLI utils #
3415####################
3418class CLIUtil:
3419 """
3420 Provides a Util class to easily create simple CLI tools in Scapy,
3421 that can still be used as an API.
3423 Doc:
3424 - override the ps1() function
3425 - register commands with the @CLIUtil.addcomment decorator
3426 - call the loop() function when ready
3427 """
3429 def _depcheck(self) -> None:
3430 """
3431 Check that all dependencies are installed
3432 """
3433 try:
3434 import prompt_toolkit # noqa: F401
3435 except ImportError:
3436 # okay we lie but prompt_toolkit is a dependency...
3437 raise ImportError("You need to have IPython installed to use the CLI")
3439 # Okay let's do nice code
3440 commands: Dict[str, Callable[..., Any]] = {}
3441 # print output of command
3442 commands_output: Dict[str, Callable[..., str]] = {}
3443 # provides completion to command
3444 commands_complete: Dict[str, Callable[..., List[str]]] = {}
3446 @staticmethod
3447 def _inspectkwargs(func: DecoratorCallable) -> None:
3448 """
3449 Internal function to parse arguments from the kwargs of the functions
3450 """
3451 func._flagnames = [ # type: ignore
3452 x.name for x in
3453 inspect.signature(func).parameters.values()
3454 if x.kind == inspect.Parameter.KEYWORD_ONLY
3455 ]
3456 func._flags = [ # type: ignore
3457 ("-%s" % x) if len(x) == 1 else ("--%s" % x)
3458 for x in func._flagnames # type: ignore
3459 ]
3461 @staticmethod
3462 def _parsekwargs(
3463 func: DecoratorCallable,
3464 args: List[str]
3465 ) -> Tuple[List[str], Dict[str, Literal[True]]]:
3466 """
3467 Internal function to parse CLI arguments of a function.
3468 """
3469 kwargs: Dict[str, Literal[True]] = {}
3470 if func._flags: # type: ignore
3471 i = 0
3472 for arg in args:
3473 if arg in func._flags: # type: ignore
3474 i += 1
3475 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501
3476 continue
3477 break
3478 args = args[i:]
3479 return args, kwargs
3481 @classmethod
3482 def _parseallargs(
3483 cls,
3484 func: DecoratorCallable,
3485 cmd: str, args: List[str]
3486 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]:
3487 """
3488 Internal function to parse CLI arguments of both the function
3489 and its output function.
3490 """
3491 args, kwargs = cls._parsekwargs(func, args)
3492 outkwargs: Dict[str, Literal[True]] = {}
3493 if cmd in cls.commands_output:
3494 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args)
3495 return args, kwargs, outkwargs
3497 @classmethod
3498 def addcommand(
3499 cls,
3500 spaces: bool = False,
3501 globsupport: bool = False,
3502 ) -> Callable[[DecoratorCallable], DecoratorCallable]:
3503 """
3504 Decorator to register a command
3505 """
3506 def func(cmd: DecoratorCallable) -> DecoratorCallable:
3507 cls.commands[cmd.__name__] = cmd
3508 cmd._spaces = spaces # type: ignore
3509 cmd._globsupport = globsupport # type: ignore
3510 cls._inspectkwargs(cmd)
3511 if cmd._globsupport and not cmd._spaces: # type: ignore
3512 raise ValueError("Cannot use globsupport without spaces.")
3513 return cmd
3514 return func
3516 @classmethod
3517 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3518 """
3519 Decorator to register a command output processor
3520 """
3521 def func(processor: DecoratorCallable) -> DecoratorCallable:
3522 cls.commands_output[cmd.__name__] = processor
3523 cls._inspectkwargs(processor)
3524 return processor
3525 return func
3527 @classmethod
3528 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3529 """
3530 Decorator to register a command completor
3531 """
3532 def func(processor: DecoratorCallable) -> DecoratorCallable:
3533 cls.commands_complete[cmd.__name__] = processor
3534 return processor
3535 return func
3537 def ps1(self) -> str:
3538 """
3539 Return the PS1 of the shell
3540 """
3541 return "> "
3543 def close(self) -> None:
3544 """
3545 Function called on exiting
3546 """
3547 print("Exited")
3549 def help(self, cmd: Optional[str] = None) -> None:
3550 """
3551 Return the help related to this CLI util
3552 """
3553 def _args(func: Any) -> str:
3554 flags = func._flags.copy()
3555 if func.__name__ in self.commands_output:
3556 flags += self.commands_output[func.__name__]._flags # type: ignore
3557 return " %s%s" % (
3558 (
3559 "%s " % " ".join("[%s]" % x for x in flags)
3560 if flags else ""
3561 ),
3562 " ".join(
3563 "<%s%s>" % (
3564 x.name,
3565 "?" if
3566 (x.default is None or x.default != inspect.Parameter.empty)
3567 else ""
3568 )
3569 for x in list(inspect.signature(func).parameters.values())[1:]
3570 if x.name not in func._flagnames and x.name[0] != "_"
3571 )
3572 )
3574 if cmd:
3575 if cmd not in self.commands:
3576 print("Unknown command '%s'" % cmd)
3577 return
3578 # help for one command
3579 func = self.commands[cmd]
3580 print("%s%s: %s" % (
3581 cmd,
3582 _args(func),
3583 func.__doc__ and func.__doc__.strip()
3584 ))
3585 else:
3586 header = "│ %s - Help │" % self.__class__.__name__
3587 print("┌" + "─" * (len(header) - 2) + "┐")
3588 print(header)
3589 print("└" + "─" * (len(header) - 2) + "┘")
3590 print(
3591 pretty_list(
3592 [
3593 (
3594 cmd,
3595 _args(func),
3596 func.__doc__ and func.__doc__.strip().split("\n")[0] or ""
3597 )
3598 for cmd, func in self.commands.items()
3599 ],
3600 [("Command", "Arguments", "Description")]
3601 )
3602 )
3604 def _completer(self) -> 'prompt_toolkit.completion.Completer':
3605 """
3606 Returns a prompt_toolkit custom completer
3607 """
3608 from prompt_toolkit.completion import Completer, Completion
3610 class CLICompleter(Completer):
3611 def get_completions(cmpl, document, complete_event): # type: ignore
3612 if not complete_event.completion_requested:
3613 # Only activate when the user does <TAB>
3614 return
3615 parts = document.text.split(" ")
3616 cmd = parts[0].lower()
3617 if cmd not in self.commands:
3618 # We are trying to complete the command
3619 for possible_cmd in (x for x in self.commands if x.startswith(cmd)):
3620 yield Completion(possible_cmd, start_position=-len(cmd))
3621 else:
3622 # We are trying to complete the command content
3623 if len(parts) == 1:
3624 return
3625 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:])
3626 arg = " ".join(args)
3627 if cmd in self.commands_complete:
3628 for possible_arg in self.commands_complete[cmd](self, arg):
3629 yield Completion(possible_arg, start_position=-len(arg))
3630 return
3631 return CLICompleter()
3633 def loop(self, debug: int = 0) -> None:
3634 """
3635 Main command handling loop
3636 """
3637 from prompt_toolkit import PromptSession
3638 session = PromptSession(completer=self._completer())
3640 while True:
3641 try:
3642 cmd = session.prompt(self.ps1()).strip()
3643 except KeyboardInterrupt:
3644 continue
3645 except EOFError:
3646 self.close()
3647 break
3648 args = cmd.split(" ")[1:]
3649 cmd = cmd.split(" ")[0].strip().lower()
3650 if not cmd:
3651 continue
3652 if cmd in ["help", "h", "?"]:
3653 self.help(" ".join(args))
3654 continue
3655 if cmd in "exit":
3656 break
3657 if cmd not in self.commands:
3658 print("Unknown command. Type help or ?")
3659 else:
3660 # check the number of arguments
3661 func = self.commands[cmd]
3662 args, kwargs, outkwargs = self._parseallargs(func, cmd, args)
3663 if func._spaces: # type: ignore
3664 args = [" ".join(args)]
3665 # if globsupport is set, we might need to do several calls
3666 if func._globsupport and "*" in args[0]: # type: ignore
3667 if args[0].count("*") > 1:
3668 print("More than 1 glob star (*) is currently unsupported.")
3669 continue
3670 before, after = args[0].split("*", 1)
3671 reg = re.compile(re.escape(before) + r".*" + after)
3672 calls = [
3673 [x] for x in
3674 self.commands_complete[cmd](self, before)
3675 if reg.match(x)
3676 ]
3677 else:
3678 calls = [args]
3679 else:
3680 calls = [args]
3681 # now iterate if required, call the function and print its output
3682 res = None
3683 for args in calls:
3684 try:
3685 res = func(self, *args, **kwargs)
3686 except TypeError:
3687 print("Bad number of arguments !")
3688 self.help(cmd=cmd)
3689 continue
3690 except Exception as ex:
3691 print("Command failed with error: %s" % ex)
3692 if debug:
3693 traceback.print_exception(ex)
3694 try:
3695 if res and cmd in self.commands_output:
3696 self.commands_output[cmd](self, res, **outkwargs)
3697 except Exception as ex:
3698 print("Output processor failed with error: %s" % ex)
3701def AutoArgparse(func: DecoratorCallable) -> None:
3702 """
3703 Generate an Argparse call from a function, then call this function.
3705 Notes:
3707 - for the arguments to have a description, the sphinx docstring format
3708 must be used. See
3709 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html
3710 - the arguments must be typed in Python (we ignore Sphinx-specific types)
3711 untyped arguments are ignored.
3712 - only types that would be supported by argparse are supported. The others
3713 are omitted.
3714 """
3715 argsdoc = {}
3716 if func.__doc__:
3717 # Sphinx doc format parser
3718 m = re.match(
3719 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)",
3720 func.__doc__.strip(),
3721 )
3722 if not m:
3723 desc = func.__doc__.strip()
3724 else:
3725 desc = m.group(1)
3726 sphinxargs = re.findall(
3727 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)",
3728 m.group(2),
3729 )
3730 for argtype, argparam, argdesc in sphinxargs:
3731 argparam = argparam.strip()
3732 argdesc = argdesc.strip()
3733 if argtype == "param":
3734 if not argparam:
3735 raise ValueError(":param: without a name !")
3736 argsdoc[argparam] = argdesc
3737 else:
3738 desc = ""
3739 # Now build the argparse.ArgumentParser
3740 parser = argparse.ArgumentParser(
3741 prog=func.__name__,
3742 description=desc,
3743 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
3744 )
3745 # Process the parameters
3746 positional = []
3747 for param in inspect.signature(func).parameters.values():
3748 if not param.annotation:
3749 continue
3750 parname = param.name
3751 paramkwargs = {}
3752 if param.annotation is bool:
3753 if param.default is True:
3754 parname = "no-" + parname
3755 paramkwargs["action"] = "store_false"
3756 else:
3757 paramkwargs["action"] = "store_true"
3758 elif param.annotation in [str, int, float]:
3759 paramkwargs["type"] = param.annotation
3760 else:
3761 continue
3762 if param.default != inspect.Parameter.empty:
3763 if param.kind == inspect.Parameter.POSITIONAL_ONLY:
3764 positional.append(param.name)
3765 paramkwargs["nargs"] = '?'
3766 else:
3767 parname = "--" + parname
3768 paramkwargs["default"] = param.default
3769 else:
3770 positional.append(param.name)
3771 if param.kind == inspect.Parameter.VAR_POSITIONAL:
3772 paramkwargs["action"] = "append"
3773 if param.name in argsdoc:
3774 paramkwargs["help"] = argsdoc[param.name]
3775 parser.add_argument(parname, **paramkwargs) # type: ignore
3776 # Now parse the sys.argv parameters
3777 params = vars(parser.parse_args())
3778 # Act as in interactive mode
3779 conf.logLevel = 20
3780 from scapy.themes import DefaultTheme
3781 conf.color_theme = DefaultTheme()
3782 # And call the function
3783 try:
3784 func(
3785 *[params.pop(x) for x in positional],
3786 **{
3787 (k[3:] if k.startswith("no_") else k): v
3788 for k, v in params.items()
3789 }
3790 )
3791 except AssertionError as ex:
3792 print("ERROR: " + str(ex))
3793 parser.print_help()
3796#######################
3797# PERIODIC SENDER #
3798#######################
3801class PeriodicSenderThread(threading.Thread):
3802 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True):
3803 # type: (Any, _PacketIterable, float, bool) -> None
3804 """ Thread to send packets periodically
3806 Args:
3807 sock: socket where packet is sent periodically
3808 pkt: packet or list of packets to send
3809 interval: interval between two packets
3810 """
3811 if not isinstance(pkt, list):
3812 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable
3813 else:
3814 self._pkts = pkt
3815 self._socket = sock
3816 self._stopped = threading.Event()
3817 self._enabled = threading.Event()
3818 self._enabled.set()
3819 self._interval = interval
3820 self._ignore_exceptions = ignore_exceptions
3821 threading.Thread.__init__(self)
3823 def enable(self):
3824 # type: () -> None
3825 self._enabled.set()
3827 def disable(self):
3828 # type: () -> None
3829 self._enabled.clear()
3831 def run(self):
3832 # type: () -> None
3833 while not self._stopped.is_set() and not self._socket.closed:
3834 for p in self._pkts:
3835 try:
3836 if self._enabled.is_set():
3837 self._socket.send(p)
3838 except (OSError, TimeoutError) as e:
3839 if self._ignore_exceptions:
3840 return
3841 else:
3842 raise e
3843 self._stopped.wait(timeout=self._interval)
3844 if self._stopped.is_set() or self._socket.closed:
3845 break
3847 def stop(self):
3848 # type: () -> None
3849 self._stopped.set()
3850 self.join(self._interval * 2)
3853class SingleConversationSocket(object):
3854 def __init__(self, o):
3855 # type: (Any) -> None
3856 self._inner = o
3857 self._tx_mutex = threading.RLock()
3859 @property
3860 def __dict__(self): # type: ignore
3861 return self._inner.__dict__
3863 def __getattr__(self, name):
3864 # type: (str) -> Any
3865 return getattr(self._inner, name)
3867 def sr1(self, *args, **kargs):
3868 # type: (*Any, **Any) -> Any
3869 with self._tx_mutex:
3870 return self._inner.sr1(*args, **kargs)
3872 def sr(self, *args, **kargs):
3873 # type: (*Any, **Any) -> Any
3874 with self._tx_mutex:
3875 return self._inner.sr(*args, **kargs)
3877 def send(self, x):
3878 # type: (Packet) -> Any
3879 with self._tx_mutex:
3880 try:
3881 return self._inner.send(x)
3882 except (ConnectionError, OSError) as e:
3883 self._inner.close()
3884 raise e