Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/utils.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7General utility functions.
8"""
11from decimal import Decimal
12from io import StringIO
13from itertools import zip_longest
14from uuid import UUID
16import argparse
17import array
18import collections
19import decimal
20import difflib
21import enum
22import gzip
23import inspect
24import locale
25import math
26import os
27import random
28import re
29import shutil
30import socket
31import struct
32import subprocess
33import sys
34import tempfile
35import threading
36import time
37import traceback
38import warnings
40from scapy.config import conf
41from scapy.consts import DARWIN, OPENBSD, WINDOWS
42from scapy.data import MTU, DLT_EN10MB, DLT_RAW
43from scapy.compat import (
44 orb,
45 plain_str,
46 chb,
47 hex_bytes,
48 bytes_encode,
49)
50from scapy.error import (
51 log_interactive,
52 log_runtime,
53 Scapy_Exception,
54 warning,
55)
56from scapy.pton_ntop import inet_pton
58# Typing imports
59from typing import (
60 cast,
61 Any,
62 AnyStr,
63 Callable,
64 Dict,
65 IO,
66 Iterator,
67 List,
68 Optional,
69 TYPE_CHECKING,
70 Tuple,
71 Type,
72 Union,
73 overload,
74)
75from scapy.compat import (
76 DecoratorCallable,
77 Literal,
78)
80if TYPE_CHECKING:
81 from scapy.packet import Packet
82 from scapy.plist import _PacketIterable, PacketList
83 from scapy.supersocket import SuperSocket
84 import prompt_toolkit
86_ByteStream = Union[IO[bytes], gzip.GzipFile]
88###########
89# Tools #
90###########
93def issubtype(x, # type: Any
94 t, # type: Union[type, str]
95 ):
96 # type: (...) -> bool
97 """issubtype(C, B) -> bool
99 Return whether C is a class and if it is a subclass of class B.
100 When using a tuple as the second argument issubtype(X, (A, B, ...)),
101 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.).
102 """
103 if isinstance(t, str):
104 return t in (z.__name__ for z in x.__bases__)
105 if isinstance(x, type) and issubclass(x, t):
106 return True
107 return False
110_Decimal = Union[Decimal, int]
113class EDecimal(Decimal):
114 """Extended Decimal
116 This implements arithmetic and comparison with float for
117 backward compatibility
118 """
120 def __add__(self, other, context=None):
121 # type: (_Decimal, Any) -> EDecimal
122 return EDecimal(Decimal.__add__(self, Decimal(other)))
124 def __radd__(self, other):
125 # type: (_Decimal) -> EDecimal
126 return EDecimal(Decimal.__add__(self, Decimal(other)))
128 def __sub__(self, other):
129 # type: (_Decimal) -> EDecimal
130 return EDecimal(Decimal.__sub__(self, Decimal(other)))
132 def __rsub__(self, other):
133 # type: (_Decimal) -> EDecimal
134 return EDecimal(Decimal.__rsub__(self, Decimal(other)))
136 def __mul__(self, other):
137 # type: (_Decimal) -> EDecimal
138 return EDecimal(Decimal.__mul__(self, Decimal(other)))
140 def __rmul__(self, other):
141 # type: (_Decimal) -> EDecimal
142 return EDecimal(Decimal.__mul__(self, Decimal(other)))
144 def __truediv__(self, other):
145 # type: (_Decimal) -> EDecimal
146 return EDecimal(Decimal.__truediv__(self, Decimal(other)))
148 def __floordiv__(self, other):
149 # type: (_Decimal) -> EDecimal
150 return EDecimal(Decimal.__floordiv__(self, Decimal(other)))
152 def __divmod__(self, other):
153 # type: (_Decimal) -> Tuple[EDecimal, EDecimal]
154 r = Decimal.__divmod__(self, Decimal(other))
155 return EDecimal(r[0]), EDecimal(r[1])
157 def __mod__(self, other):
158 # type: (_Decimal) -> EDecimal
159 return EDecimal(Decimal.__mod__(self, Decimal(other)))
161 def __rmod__(self, other):
162 # type: (_Decimal) -> EDecimal
163 return EDecimal(Decimal.__rmod__(self, Decimal(other)))
165 def __pow__(self, other, modulo=None):
166 # type: (_Decimal, Optional[_Decimal]) -> EDecimal
167 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo))
169 def __eq__(self, other):
170 # type: (Any) -> bool
171 if isinstance(other, Decimal):
172 return super(EDecimal, self).__eq__(other)
173 else:
174 return bool(float(self) == other)
176 def normalize(self, precision): # type: ignore
177 # type: (int) -> EDecimal
178 with decimal.localcontext() as ctx:
179 ctx.prec = precision
180 return EDecimal(super(EDecimal, self).normalize(ctx))
183@overload
184def get_temp_file(keep, autoext, fd):
185 # type: (bool, str, Literal[True]) -> IO[bytes]
186 pass
189@overload
190def get_temp_file(keep=False, autoext="", fd=False):
191 # type: (bool, str, Literal[False]) -> str
192 pass
195def get_temp_file(keep=False, autoext="", fd=False):
196 # type: (bool, str, bool) -> Union[IO[bytes], str]
197 """Creates a temporary file.
199 :param keep: If False, automatically delete the file when Scapy exits.
200 :param autoext: Suffix to add to the generated file name.
201 :param fd: If True, this returns a file-like object with the temporary
202 file opened. If False (default), this returns a file path.
203 """
204 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
205 delete=False)
206 if not keep:
207 conf.temp_files.append(f.name)
209 if fd:
210 return f
211 else:
212 # Close the file so something else can take it.
213 f.close()
214 return f.name
217def get_temp_dir(keep=False):
218 # type: (bool) -> str
219 """Creates a temporary file, and returns its name.
221 :param keep: If False (default), the directory will be recursively
222 deleted when Scapy exits.
223 :return: A full path to a temporary directory.
224 """
226 dname = tempfile.mkdtemp(prefix="scapy")
228 if not keep:
229 conf.temp_files.append(dname)
231 return dname
234def _create_fifo() -> Tuple[str, Any]:
235 """Creates a temporary fifo.
237 You must then use open_fifo() on the server_fd once
238 the client is connected to use it.
240 :returns: (client_file, server_fd)
241 """
242 if WINDOWS:
243 from scapy.arch.windows.structures import _get_win_fifo
244 return _get_win_fifo()
245 else:
246 f = get_temp_file()
247 os.unlink(f)
248 os.mkfifo(f)
249 return f, f
252def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]:
253 """Open the server_fd (see create_fifo)
254 """
255 if WINDOWS:
256 from scapy.arch.windows.structures import _win_fifo_open
257 return _win_fifo_open(fd)
258 else:
259 return open(fd, mode)
262def sane(x, color=False):
263 # type: (AnyStr, bool) -> str
264 r = ""
265 for i in x:
266 j = orb(i)
267 if (j < 32) or (j >= 127):
268 if color:
269 r += conf.color_theme.not_printable(".")
270 else:
271 r += "."
272 else:
273 r += chr(j)
274 return r
277@conf.commands.register
278def restart():
279 # type: () -> None
280 """Restarts scapy"""
281 if not conf.interactive or not os.path.isfile(sys.argv[0]):
282 raise OSError("Scapy was not started from console")
283 if WINDOWS:
284 res_code = 1
285 try:
286 res_code = subprocess.call([sys.executable] + sys.argv)
287 finally:
288 os._exit(res_code)
289 os.execv(sys.executable, [sys.executable] + sys.argv)
292def lhex(x):
293 # type: (Any) -> str
294 from scapy.volatile import VolatileValue
295 if isinstance(x, VolatileValue):
296 return repr(x)
297 if isinstance(x, int):
298 return hex(x)
299 if isinstance(x, tuple):
300 return "(%s)" % ", ".join(lhex(v) for v in x)
301 if isinstance(x, list):
302 return "[%s]" % ", ".join(lhex(v) for v in x)
303 return str(x)
306@conf.commands.register
307def hexdump(p, dump=False):
308 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
309 """Build a tcpdump like hexadecimal view
311 :param p: a Packet
312 :param dump: define if the result must be printed or returned in a variable
313 :return: a String only when dump=True
314 """
315 s = ""
316 x = bytes_encode(p)
317 x_len = len(x)
318 i = 0
319 while i < x_len:
320 s += "%04x " % i
321 for j in range(16):
322 if i + j < x_len:
323 s += "%02X " % orb(x[i + j])
324 else:
325 s += " "
326 s += " %s\n" % sane(x[i:i + 16], color=True)
327 i += 16
328 # remove trailing \n
329 s = s[:-1] if s.endswith("\n") else s
330 if dump:
331 return s
332 else:
333 print(s)
334 return None
337@conf.commands.register
338def linehexdump(p, onlyasc=0, onlyhex=0, dump=False):
339 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str]
340 """Build an equivalent view of hexdump() on a single line
342 Note that setting both onlyasc and onlyhex to 1 results in a empty output
344 :param p: a Packet
345 :param onlyasc: 1 to display only the ascii view
346 :param onlyhex: 1 to display only the hexadecimal view
347 :param dump: print the view if False
348 :return: a String only when dump=True
349 """
350 s = ""
351 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump)
352 if dump:
353 return s
354 else:
355 print(s)
356 return None
359@conf.commands.register
360def chexdump(p, dump=False):
361 # type: (Union[Packet, AnyStr], bool) -> Optional[str]
362 """Build a per byte hexadecimal representation
364 Example:
365 >>> chexdump(IP())
366 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501
368 :param p: a Packet
369 :param dump: print the view if False
370 :return: a String only if dump=True
371 """
372 x = bytes_encode(p)
373 s = ", ".join("%#04x" % orb(x) for x in x)
374 if dump:
375 return s
376 else:
377 print(s)
378 return None
381@conf.commands.register
382def hexstr(p, onlyasc=0, onlyhex=0, color=False):
383 # type: (Union[Packet, AnyStr], int, int, bool) -> str
384 """Build a fancy tcpdump like hex from bytes."""
385 x = bytes_encode(p)
386 s = []
387 if not onlyasc:
388 s.append(" ".join("%02X" % orb(b) for b in x))
389 if not onlyhex:
390 s.append(sane(x, color=color))
391 return " ".join(s)
394def repr_hex(s):
395 # type: (bytes) -> str
396 """ Convert provided bitstring to a simple string of hex digits """
397 return "".join("%02x" % orb(x) for x in s)
400@conf.commands.register
401def hexdiff(
402 a: Union['Packet', AnyStr],
403 b: Union['Packet', AnyStr],
404 algo: Optional[str] = None,
405 autojunk: bool = False,
406) -> None:
407 """
408 Show differences between 2 binary strings, Packets...
410 Available algorithms:
411 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the
412 Levenstein distance between the strings then backtrack.
413 - difflib: Use the difflib.SequenceMatcher implementation. This based on a
414 modified version of the Ratcliff and Obershelp algorithm.
415 This is much faster, but far less accurate.
416 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher
418 :param a:
419 :param b: The binary strings, packets... to compare
420 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'.
421 By default, this is chosen depending on the complexity, optimistically
422 preferring wagnerfischer unless really necessary.
423 :param autojunk: (difflib only) See difflib documentation.
424 """
425 xb = bytes_encode(a)
426 yb = bytes_encode(b)
428 if algo is None:
429 # Choose the best algorithm
430 complexity = len(xb) * len(yb)
431 if complexity < 1e7:
432 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable.
433 # Anything much larger than this shouldn't be attempted by default.
434 algo = "wagnerfischer"
435 if complexity > 1e6:
436 log_interactive.info(
437 "Complexity is a bit high. hexdiff will take a few seconds."
438 )
439 else:
440 algo = "difflib"
442 backtrackx = []
443 backtracky = []
445 if algo == "wagnerfischer":
446 xb = xb[::-1]
447 yb = yb[::-1]
449 # costs for the 3 operations
450 INSERT = 1
451 DELETE = 1
452 SUBST = 1
454 # Typically, d[i,j] will hold the distance between
455 # the first i characters of xb and the first j characters of yb.
456 # We change the Wagner Fischer to also store pointers to all
457 # the intermediate steps taken while calculating the Levenstein distance.
458 d = {(-1, -1): (0, (-1, -1))}
459 for j in range(len(yb)):
460 d[-1, j] = (j + 1) * INSERT, (-1, j - 1)
461 for i in range(len(xb)):
462 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1)
464 # Compute the Levenstein distance between the two strings, but
465 # store all the steps to be able to backtrack at the end.
466 for j in range(len(yb)):
467 for i in range(len(xb)):
468 d[i, j] = min(
469 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)),
470 (d[i - 1, j][0] + DELETE, (i - 1, j)),
471 (d[i, j - 1][0] + INSERT, (i, j - 1)),
472 )
474 # Iterate through the steps backwards to create the diff
475 i = len(xb) - 1
476 j = len(yb) - 1
477 while not (i == j == -1):
478 i2, j2 = d[i, j][1]
479 backtrackx.append(xb[i2 + 1:i + 1])
480 backtracky.append(yb[j2 + 1:j + 1])
481 i, j = i2, j2
482 elif algo == "difflib":
483 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk)
484 xarr = [xb[i:i + 1] for i in range(len(xb))]
485 yarr = [yb[i:i + 1] for i in range(len(yb))]
486 # Iterate through opcodes to build the backtrack
487 for opcode in sm.get_opcodes():
488 typ, x0, x1, y0, y1 = opcode
489 if typ == 'delete':
490 backtrackx += xarr[x0:x1]
491 backtracky += [b''] * (x1 - x0)
492 elif typ == 'insert':
493 backtrackx += [b''] * (y1 - y0)
494 backtracky += yarr[y0:y1]
495 elif typ in ['equal', 'replace']:
496 backtrackx += xarr[x0:x1]
497 backtracky += yarr[y0:y1]
498 # Some lines may have been considered as junk. Check the sizes
499 if autojunk:
500 lbx = len(backtrackx)
501 lby = len(backtracky)
502 backtrackx += [b''] * (max(lbx, lby) - lbx)
503 backtracky += [b''] * (max(lbx, lby) - lby)
504 else:
505 raise ValueError("Unknown algorithm '%s'" % algo)
507 # Print the diff
509 x = y = i = 0
510 colorize: Dict[int, Callable[[str], str]] = {
511 0: lambda x: x,
512 -1: conf.color_theme.left,
513 1: conf.color_theme.right
514 }
516 dox = 1
517 doy = 0
518 btx_len = len(backtrackx)
519 while i < btx_len:
520 linex = backtrackx[i:i + 16]
521 liney = backtracky[i:i + 16]
522 xx = sum(len(k) for k in linex)
523 yy = sum(len(k) for k in liney)
524 if dox and not xx:
525 dox = 0
526 doy = 1
527 if dox and linex == liney:
528 doy = 1
530 if dox:
531 xd = y
532 j = 0
533 while not linex[j]:
534 j += 1
535 xd -= 1
536 print(colorize[doy - dox]("%04x" % xd), end=' ')
537 x += xx
538 line = linex
539 else:
540 print(" ", end=' ')
541 if doy:
542 yd = y
543 j = 0
544 while not liney[j]:
545 j += 1
546 yd -= 1
547 print(colorize[doy - dox]("%04x" % yd), end=' ')
548 y += yy
549 line = liney
550 else:
551 print(" ", end=' ')
553 print(" ", end=' ')
555 cl = ""
556 for j in range(16):
557 if i + j < min(len(backtrackx), len(backtracky)):
558 if line[j]:
559 col = colorize[(linex[j] != liney[j]) * (doy - dox)]
560 print(col("%02X" % orb(line[j])), end=' ')
561 if linex[j] == liney[j]:
562 cl += sane(line[j], color=True)
563 else:
564 cl += col(sane(line[j]))
565 else:
566 print(" ", end=' ')
567 cl += " "
568 else:
569 print(" ", end=' ')
570 if j == 7:
571 print("", end=' ')
573 print(" ", cl)
575 if doy or not yy:
576 doy = 0
577 dox = 1
578 i += 16
579 else:
580 if yy:
581 dox = 0
582 doy = 1
583 else:
584 i += 16
587if struct.pack("H", 1) == b"\x00\x01": # big endian
588 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int]
589else:
590 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8
593def checksum(pkt):
594 # type: (bytes) -> int
595 if len(pkt) % 2 == 1:
596 pkt += b"\0"
597 s = sum(array.array("H", pkt))
598 s = (s >> 16) + (s & 0xffff)
599 s += s >> 16
600 s = ~s
601 return checksum_endian_transform(s) & 0xffff
604def _fletcher16(charbuf):
605 # type: (bytes) -> Tuple[int, int]
606 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
607 c0 = c1 = 0
608 for char in charbuf:
609 c0 += char
610 c1 += c0
612 c0 %= 255
613 c1 %= 255
614 return (c0, c1)
617@conf.commands.register
618def fletcher16_checksum(binbuf):
619 # type: (bytes) -> int
620 """Calculates Fletcher-16 checksum of the given buffer.
622 Note:
623 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501
624 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501
625 """
626 (c0, c1) = _fletcher16(binbuf)
627 return (c1 << 8) | c0
630@conf.commands.register
631def fletcher16_checkbytes(binbuf, offset):
632 # type: (bytes, int) -> bytes
633 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.
635 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501
636 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501
637 the integrity of the buffer on the receiver side.
639 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501
640 """
642 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501
643 if len(binbuf) < offset:
644 raise Exception("Packet too short for checkbytes %d" % len(binbuf))
646 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
647 (c0, c1) = _fletcher16(binbuf)
649 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255
651 if (x <= 0):
652 x += 255
654 y = 510 - c0 - x
656 if (y > 255):
657 y -= 255
658 return chb(x) + chb(y)
661def mac2str(mac):
662 # type: (str) -> bytes
663 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':'))
666def valid_mac(mac):
667 # type: (str) -> bool
668 try:
669 return len(mac2str(mac)) == 6
670 except ValueError:
671 pass
672 return False
675def str2mac(s):
676 # type: (bytes) -> str
677 if isinstance(s, str):
678 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s))
679 return ("%02x:" * len(s))[:-1] % tuple(s)
682def randstring(length):
683 # type: (int) -> bytes
684 """
685 Returns a random string of length (length >= 0)
686 """
687 return b"".join(struct.pack('B', random.randint(0, 255))
688 for _ in range(length))
691def zerofree_randstring(length):
692 # type: (int) -> bytes
693 """
694 Returns a random string of length (length >= 0) without zero in it.
695 """
696 return b"".join(struct.pack('B', random.randint(1, 255))
697 for _ in range(length))
700def stror(s1, s2):
701 # type: (bytes, bytes) -> bytes
702 """
703 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2
704 must be of same length.
705 """
706 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2))
709def strxor(s1, s2):
710 # type: (bytes, bytes) -> bytes
711 """
712 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
713 must be of same length.
714 """
715 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2))
718def strand(s1, s2):
719 # type: (bytes, bytes) -> bytes
720 """
721 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
722 must be of same length.
723 """
724 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2))
727def strrot(s1, count, right=True):
728 # type: (bytes, int, bool) -> bytes
729 """
730 Rotate the binary by 'count' bytes
731 """
732 off = count % len(s1)
733 if right:
734 return s1[-off:] + s1[:-off]
735 else:
736 return s1[off:] + s1[:off]
739# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501
740try:
741 socket.inet_aton("255.255.255.255")
742except socket.error:
743 def inet_aton(ip_string):
744 # type: (str) -> bytes
745 if ip_string == "255.255.255.255":
746 return b"\xff" * 4
747 else:
748 return socket.inet_aton(ip_string)
749else:
750 inet_aton = socket.inet_aton # type: ignore
752inet_ntoa = socket.inet_ntoa
755def atol(x):
756 # type: (str) -> int
757 try:
758 ip = inet_aton(x)
759 except socket.error:
760 raise ValueError("Bad IP format: %s" % x)
761 return cast(int, struct.unpack("!I", ip)[0])
764def valid_ip(addr):
765 # type: (str) -> bool
766 try:
767 addr = plain_str(addr)
768 except UnicodeDecodeError:
769 return False
770 try:
771 atol(addr)
772 except (OSError, ValueError, socket.error):
773 return False
774 return True
777def valid_net(addr):
778 # type: (str) -> bool
779 try:
780 addr = plain_str(addr)
781 except UnicodeDecodeError:
782 return False
783 if '/' in addr:
784 ip, mask = addr.split('/', 1)
785 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32
786 return valid_ip(addr)
789def valid_ip6(addr):
790 # type: (str) -> bool
791 try:
792 addr = plain_str(addr)
793 except UnicodeDecodeError:
794 return False
795 try:
796 inet_pton(socket.AF_INET6, addr)
797 except socket.error:
798 return False
799 return True
802def valid_net6(addr):
803 # type: (str) -> bool
804 try:
805 addr = plain_str(addr)
806 except UnicodeDecodeError:
807 return False
808 if '/' in addr:
809 ip, mask = addr.split('/', 1)
810 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128
811 return valid_ip6(addr)
814def ltoa(x):
815 # type: (int) -> str
816 return inet_ntoa(struct.pack("!I", x & 0xffffffff))
819def itom(x):
820 # type: (int) -> int
821 return (0xffffffff00000000 >> x) & 0xffffffff
824def in4_cidr2mask(m):
825 # type: (int) -> bytes
826 """
827 Return the mask (bitstring) associated with provided length
828 value. For instance if function is called on 20, return value is
829 b'\xff\xff\xf0\x00'.
830 """
831 if m > 32 or m < 0:
832 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501
834 return strxor(
835 b"\xff" * 4,
836 struct.pack(">I", 2**(32 - m) - 1)
837 )
840def in4_isincluded(addr, prefix, mask):
841 # type: (str, str, int) -> bool
842 """
843 Returns True when 'addr' belongs to prefix/mask. False otherwise.
844 """
845 temp = inet_pton(socket.AF_INET, addr)
846 pref = in4_cidr2mask(mask)
847 zero = inet_pton(socket.AF_INET, prefix)
848 return zero == strand(temp, pref)
851def in4_ismaddr(str):
852 # type: (str) -> bool
853 """
854 Returns True if provided address in printable format belongs to
855 allocated Multicast address space (224.0.0.0/4).
856 """
857 return in4_isincluded(str, "224.0.0.0", 4)
860def in4_ismlladdr(str):
861 # type: (str) -> bool
862 """
863 Returns True if address belongs to link-local multicast address
864 space (224.0.0.0/24)
865 """
866 return in4_isincluded(str, "224.0.0.0", 24)
869def in4_ismgladdr(str):
870 # type: (str) -> bool
871 """
872 Returns True if address belongs to global multicast address
873 space (224.0.1.0-238.255.255.255).
874 """
875 return (
876 in4_isincluded(str, "224.0.0.0", 4) and
877 not in4_isincluded(str, "224.0.0.0", 24) and
878 not in4_isincluded(str, "239.0.0.0", 8)
879 )
882def in4_ismlsaddr(str):
883 # type: (str) -> bool
884 """
885 Returns True if address belongs to limited scope multicast address
886 space (239.0.0.0/8).
887 """
888 return in4_isincluded(str, "239.0.0.0", 8)
891def in4_isaddrllallnodes(str):
892 # type: (str) -> bool
893 """
894 Returns True if address is the link-local all-nodes multicast
895 address (224.0.0.1).
896 """
897 return (inet_pton(socket.AF_INET, "224.0.0.1") ==
898 inet_pton(socket.AF_INET, str))
901def in4_getnsmac(a):
902 # type: (bytes) -> str
903 """
904 Return the multicast mac address associated with provided
905 IPv4 address. Passed address must be in network format.
906 """
908 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3])
911def decode_locale_str(x):
912 # type: (bytes) -> str
913 """
914 Decode bytes into a string using the system locale.
915 Useful on Windows where it can be unusual (e.g. cp1252)
916 """
917 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace")
920class ContextManagerSubprocess(object):
921 """
922 Context manager that eases checking for unknown command, without
923 crashing.
925 Example:
926 >>> with ContextManagerSubprocess("tcpdump"):
927 >>> subprocess.Popen(["tcpdump", "--version"])
928 ERROR: Could not execute tcpdump, is it installed?
930 """
932 def __init__(self, prog, suppress=True):
933 # type: (str, bool) -> None
934 self.prog = prog
935 self.suppress = suppress
937 def __enter__(self):
938 # type: () -> None
939 pass
941 def __exit__(self,
942 exc_type, # type: Optional[type]
943 exc_value, # type: Optional[Exception]
944 traceback, # type: Optional[Any]
945 ):
946 # type: (...) -> Optional[bool]
947 if exc_value is None or exc_type is None:
948 return None
949 # Errored
950 if isinstance(exc_value, EnvironmentError):
951 msg = "Could not execute %s, is it installed?" % self.prog
952 else:
953 msg = "%s: execution failed (%s)" % (
954 self.prog,
955 exc_type.__class__.__name__
956 )
957 if not self.suppress:
958 raise exc_type(msg)
959 log_runtime.error(msg, exc_info=True)
960 return True # Suppress the exception
963class ContextManagerCaptureOutput(object):
964 """
965 Context manager that intercept the console's output.
967 Example:
968 >>> with ContextManagerCaptureOutput() as cmco:
969 ... print("hey")
970 ... assert cmco.get_output() == "hey"
971 """
973 def __init__(self):
974 # type: () -> None
975 self.result_export_object = ""
977 def __enter__(self):
978 # type: () -> ContextManagerCaptureOutput
979 from unittest import mock
981 def write(s, decorator=self):
982 # type: (str, ContextManagerCaptureOutput) -> None
983 decorator.result_export_object += s
984 mock_stdout = mock.Mock()
985 mock_stdout.write = write
986 self.bck_stdout = sys.stdout
987 sys.stdout = mock_stdout
988 return self
990 def __exit__(self, *exc):
991 # type: (*Any) -> Literal[False]
992 sys.stdout = self.bck_stdout
993 return False
995 def get_output(self, eval_bytes=False):
996 # type: (bool) -> str
997 if self.result_export_object.startswith("b'") and eval_bytes:
998 return plain_str(eval(self.result_export_object))
999 return self.result_export_object
1002def do_graph(
1003 graph, # type: str
1004 prog=None, # type: Optional[str]
1005 format=None, # type: Optional[str]
1006 target=None, # type: Optional[Union[IO[bytes], str]]
1007 type=None, # type: Optional[str]
1008 string=None, # type: Optional[bool]
1009 options=None # type: Optional[List[str]]
1010):
1011 # type: (...) -> Optional[str]
1012 """Processes graph description using an external software.
1013 This method is used to convert a graphviz format to an image.
1015 :param graph: GraphViz graph description
1016 :param prog: which graphviz program to use
1017 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T"
1018 option
1019 :param string: if not None, simply return the graph string
1020 :param target: filename or redirect. Defaults pipe to Imagemagick's
1021 display program
1022 :param options: options to be passed to prog
1023 """
1025 if format is None:
1026 format = "svg"
1027 if string:
1028 return graph
1029 if type is not None:
1030 warnings.warn(
1031 "type is deprecated, and was renamed format",
1032 DeprecationWarning
1033 )
1034 format = type
1035 if prog is None:
1036 prog = conf.prog.dot
1037 start_viewer = False
1038 if target is None:
1039 if WINDOWS:
1040 target = get_temp_file(autoext="." + format)
1041 start_viewer = True
1042 else:
1043 with ContextManagerSubprocess(conf.prog.display):
1044 target = subprocess.Popen([conf.prog.display],
1045 stdin=subprocess.PIPE).stdin
1046 if format is not None:
1047 format = "-T%s" % format
1048 if isinstance(target, str):
1049 if target.startswith('|'):
1050 target = subprocess.Popen(target[1:].lstrip(), shell=True,
1051 stdin=subprocess.PIPE).stdin
1052 elif target.startswith('>'):
1053 target = open(target[1:].lstrip(), "wb")
1054 else:
1055 target = open(os.path.abspath(target), "wb")
1056 target = cast(IO[bytes], target)
1057 proc = subprocess.Popen(
1058 "\"%s\" %s %s" % (prog, options or "", format or ""),
1059 shell=True, stdin=subprocess.PIPE, stdout=target,
1060 stderr=subprocess.PIPE
1061 )
1062 _, stderr = proc.communicate(bytes_encode(graph))
1063 if proc.returncode != 0:
1064 raise OSError(
1065 "GraphViz call failed (is it installed?):\n" +
1066 plain_str(stderr)
1067 )
1068 try:
1069 target.close()
1070 except Exception:
1071 pass
1072 if start_viewer:
1073 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501
1074 waiting_start = time.time()
1075 while not os.path.exists(target.name):
1076 time.sleep(0.1)
1077 if time.time() - waiting_start > 3:
1078 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501
1079 break
1080 else:
1081 if WINDOWS and conf.prog.display == conf.prog._default:
1082 os.startfile(target.name)
1083 else:
1084 with ContextManagerSubprocess(conf.prog.display):
1085 subprocess.Popen([conf.prog.display, target.name])
1086 return None
1089_TEX_TR = {
1090 "{": "{\\tt\\char123}",
1091 "}": "{\\tt\\char125}",
1092 "\\": "{\\tt\\char92}",
1093 "^": "\\^{}",
1094 "$": "\\$",
1095 "#": "\\#",
1096 "_": "\\_",
1097 "&": "\\&",
1098 "%": "\\%",
1099 "|": "{\\tt\\char124}",
1100 "~": "{\\tt\\char126}",
1101 "<": "{\\tt\\char60}",
1102 ">": "{\\tt\\char62}",
1103}
1106def tex_escape(x):
1107 # type: (str) -> str
1108 s = ""
1109 for c in x:
1110 s += _TEX_TR.get(c, c)
1111 return s
1114def colgen(*lstcol, # type: Any
1115 **kargs # type: Any
1116 ):
1117 # type: (...) -> Iterator[Any]
1118 """Returns a generator that mixes provided quantities forever
1119 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501
1120 if len(lstcol) < 2:
1121 lstcol *= 2
1122 trans = kargs.get("trans", lambda x, y, z: (x, y, z))
1123 while True:
1124 for i in range(len(lstcol)):
1125 for j in range(len(lstcol)):
1126 for k in range(len(lstcol)):
1127 if i != j or j != k or k != i:
1128 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501
1131def incremental_label(label="tag%05i", start=0):
1132 # type: (str, int) -> Iterator[str]
1133 while True:
1134 yield label % start
1135 start += 1
1138def binrepr(val):
1139 # type: (int) -> str
1140 return bin(val)[2:]
1143def long_converter(s):
1144 # type: (str) -> int
1145 return int(s.replace('\n', '').replace(' ', ''), 16)
1147#########################
1148# Enum management #
1149#########################
1152class EnumElement:
1153 def __init__(self, key, value):
1154 # type: (str, int) -> None
1155 self._key = key
1156 self._value = value
1158 def __repr__(self):
1159 # type: () -> str
1160 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501
1162 def __getattr__(self, attr):
1163 # type: (str) -> Any
1164 return getattr(self._value, attr)
1166 def __str__(self):
1167 # type: () -> str
1168 return self._key
1170 def __bytes__(self):
1171 # type: () -> bytes
1172 return bytes_encode(self.__str__())
1174 def __hash__(self):
1175 # type: () -> int
1176 return self._value
1178 def __int__(self):
1179 # type: () -> int
1180 return int(self._value)
1182 def __eq__(self, other):
1183 # type: (Any) -> bool
1184 return self._value == int(other)
1186 def __neq__(self, other):
1187 # type: (Any) -> bool
1188 return not self.__eq__(other)
1191class Enum_metaclass(type):
1192 element_class = EnumElement
1194 def __new__(cls, name, bases, dct):
1195 # type: (Any, str, Any, Dict[str, Any]) -> Any
1196 rdict = {}
1197 for k, v in dct.items():
1198 if isinstance(v, int):
1199 v = cls.element_class(k, v)
1200 dct[k] = v
1201 rdict[v] = k
1202 dct["__rdict__"] = rdict
1203 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)
1205 def __getitem__(self, attr):
1206 # type: (int) -> Any
1207 return self.__rdict__[attr] # type: ignore
1209 def __contains__(self, val):
1210 # type: (int) -> bool
1211 return val in self.__rdict__ # type: ignore
1213 def get(self, attr, val=None):
1214 # type: (str, Optional[Any]) -> Any
1215 return self.__rdict__.get(attr, val) # type: ignore
1217 def __repr__(self):
1218 # type: () -> str
1219 return "<%s>" % self.__dict__.get("name", self.__name__)
1222##################
1223# Corrupt data #
1224##################
1226@conf.commands.register
1227def corrupt_bytes(data, p=0.01, n=None):
1228 # type: (str, float, Optional[int]) -> bytes
1229 """
1230 Corrupt a given percentage (at least one byte) or number of bytes
1231 from a string
1232 """
1233 s = array.array("B", bytes_encode(data))
1234 s_len = len(s)
1235 if n is None:
1236 n = max(1, int(s_len * p))
1237 for i in random.sample(range(s_len), n):
1238 s[i] = (s[i] + random.randint(1, 255)) % 256
1239 return s.tobytes()
1242@conf.commands.register
1243def corrupt_bits(data, p=0.01, n=None):
1244 # type: (str, float, Optional[int]) -> bytes
1245 """
1246 Flip a given percentage (at least one bit) or number of bits
1247 from a string
1248 """
1249 s = array.array("B", bytes_encode(data))
1250 s_len = len(s) * 8
1251 if n is None:
1252 n = max(1, int(s_len * p))
1253 for i in random.sample(range(s_len), n):
1254 s[i // 8] ^= 1 << (i % 8)
1255 return s.tobytes()
1258#############################
1259# pcap capture file stuff #
1260#############################
1262@conf.commands.register
1263def wrpcap(filename, # type: Union[IO[bytes], str]
1264 pkt, # type: _PacketIterable
1265 *args, # type: Any
1266 **kargs # type: Any
1267 ):
1268 # type: (...) -> None
1269 """Write a list of packets to a pcap file
1271 :param filename: the name of the file to write packets to, or an open,
1272 writable file-like object. The file descriptor will be
1273 closed at the end of the call, so do not use an object you
1274 do not want to close (e.g., running wrpcap(sys.stdout, [])
1275 in interactive mode will crash Scapy).
1276 :param gz: set to 1 to save a gzipped capture
1277 :param linktype: force linktype value
1278 :param endianness: "<" or ">", force endianness
1279 :param sync: do not bufferize writes to the capture file
1280 """
1281 with PcapWriter(filename, *args, **kargs) as fdesc:
1282 fdesc.write(pkt)
1285@conf.commands.register
1286def wrpcapng(filename, # type: str
1287 pkt, # type: _PacketIterable
1288 ):
1289 # type: (...) -> None
1290 """Write a list of packets to a pcapng file
1292 :param filename: the name of the file to write packets to, or an open,
1293 writable file-like object. The file descriptor will be
1294 closed at the end of the call, so do not use an object you
1295 do not want to close (e.g., running wrpcapng(sys.stdout, [])
1296 in interactive mode will crash Scapy).
1297 :param pkt: packets to write
1298 """
1299 with PcapNgWriter(filename) as fdesc:
1300 fdesc.write(pkt)
1303@conf.commands.register
1304def rdpcap(filename, count=-1):
1305 # type: (Union[IO[bytes], str], int) -> PacketList
1306 """Read a pcap or pcapng file and return a packet list
1308 :param count: read only <count> packets
1309 """
1310 # Rant: Our complicated use of metaclasses and especially the
1311 # __call__ function is, of course, not supported by MyPy.
1312 # One day we should simplify this mess and use a much simpler
1313 # layout that will actually be supported and properly dissected.
1314 with PcapReader(filename) as fdesc: # type: ignore
1315 return fdesc.read_all(count=count)
1318# NOTE: Type hinting
1319# Mypy doesn't understand the following metaclass, and thinks each
1320# constructor (PcapReader...) needs 3 arguments each. To avoid this,
1321# we add a fake (=None) to the last 2 arguments then force the value
1322# to not be None in the signature and pack the whole thing in an ignore.
1323# This allows to not have # type: ignore every time we call those
1324# constructors.
1326class PcapReader_metaclass(type):
1327 """Metaclass for (Raw)Pcap(Ng)Readers"""
1329 def __new__(cls, name, bases, dct):
1330 # type: (Any, str, Any, Dict[str, Any]) -> Any
1331 """The `alternative` class attribute is declared in the PcapNg
1332 variant, and set here to the Pcap variant.
1334 """
1335 newcls = super(PcapReader_metaclass, cls).__new__(
1336 cls, name, bases, dct
1337 )
1338 if 'alternative' in dct:
1339 dct['alternative'].alternative = newcls
1340 return newcls
1342 def __call__(cls, filename):
1343 # type: (Union[IO[bytes], str]) -> Any
1344 """Creates a cls instance, use the `alternative` if that
1345 fails.
1347 """
1348 i = cls.__new__(
1349 cls,
1350 cls.__name__,
1351 cls.__bases__,
1352 cls.__dict__ # type: ignore
1353 )
1354 filename, fdesc, magic = cls.open(filename)
1355 if not magic:
1356 raise Scapy_Exception(
1357 "No data could be read!"
1358 )
1359 try:
1360 i.__init__(filename, fdesc, magic)
1361 return i
1362 except (Scapy_Exception, EOFError):
1363 pass
1365 if "alternative" in cls.__dict__:
1366 cls = cls.__dict__["alternative"]
1367 i = cls.__new__(
1368 cls,
1369 cls.__name__,
1370 cls.__bases__,
1371 cls.__dict__ # type: ignore
1372 )
1373 try:
1374 i.__init__(filename, fdesc, magic)
1375 return i
1376 except (Scapy_Exception, EOFError):
1377 pass
1379 raise Scapy_Exception("Not a supported capture file")
1381 @staticmethod
1382 def open(fname # type: Union[IO[bytes], str]
1383 ):
1384 # type: (...) -> Tuple[str, _ByteStream, bytes]
1385 """Open (if necessary) filename, and read the magic."""
1386 if isinstance(fname, str):
1387 filename = fname
1388 fdesc = open(filename, "rb") # type: _ByteStream
1389 magic = fdesc.read(2)
1390 if magic == b"\x1f\x8b":
1391 # GZIP header detected.
1392 fdesc.seek(0)
1393 fdesc = gzip.GzipFile(fileobj=fdesc)
1394 magic = fdesc.read(2)
1395 magic += fdesc.read(2)
1396 else:
1397 fdesc = fname
1398 filename = getattr(fdesc, "name", "No name")
1399 magic = fdesc.read(4)
1400 return filename, fdesc, magic
1403class RawPcapReader(metaclass=PcapReader_metaclass):
1404 """A stateful pcap reader. Each packet is returned as a string"""
1406 # TODO: use Generics to properly type the various readers.
1407 # As of right now, RawPcapReader is typed as if it returned packets
1408 # because all of its child do. Fix that
1410 nonblocking_socket = True
1411 PacketMetadata = collections.namedtuple("PacketMetadata",
1412 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501
1414 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1415 # type: (str, _ByteStream, bytes) -> None
1416 self.filename = filename
1417 self.f = fdesc
1418 if magic == b"\xa1\xb2\xc3\xd4": # big endian
1419 self.endian = ">"
1420 self.nano = False
1421 elif magic == b"\xd4\xc3\xb2\xa1": # little endian
1422 self.endian = "<"
1423 self.nano = False
1424 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision
1425 self.endian = ">"
1426 self.nano = True
1427 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501
1428 self.endian = "<"
1429 self.nano = True
1430 else:
1431 raise Scapy_Exception(
1432 "Not a pcap capture file (bad magic: %r)" % magic
1433 )
1434 hdr = self.f.read(20)
1435 if len(hdr) < 20:
1436 raise Scapy_Exception("Invalid pcap file (too short)")
1437 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
1438 self.endian + "HHIIII", hdr
1439 )
1440 self.linktype = linktype
1441 self.snaplen = snaplen
1443 def __enter__(self):
1444 # type: () -> RawPcapReader
1445 return self
1447 def __iter__(self):
1448 # type: () -> RawPcapReader
1449 return self
1451 def __next__(self):
1452 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata]
1453 """
1454 implement the iterator protocol on a set of packets in a pcap file
1455 """
1456 try:
1457 return self._read_packet()
1458 except EOFError:
1459 raise StopIteration
1461 def _read_packet(self, size=MTU):
1462 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata]
1463 """return a single packet read from the file as a tuple containing
1464 (pkt_data, pkt_metadata)
1466 raise EOFError when no more packets are available
1467 """
1468 hdr = self.f.read(16)
1469 if len(hdr) < 16:
1470 raise EOFError
1471 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr)
1473 try:
1474 data = self.f.read(caplen)[:size]
1475 except OverflowError as e:
1476 warning(f"Pcap: {e}")
1477 raise EOFError
1479 return (data,
1480 RawPcapReader.PacketMetadata(sec=sec, usec=usec,
1481 wirelen=wirelen, caplen=caplen))
1483 def read_packet(self, size=MTU):
1484 # type: (int) -> Packet
1485 raise Exception(
1486 "Cannot call read_packet() in RawPcapReader. Use "
1487 "_read_packet()"
1488 )
1490 def dispatch(self,
1491 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501
1492 ):
1493 # type: (...) -> None
1494 """call the specified callback routine for each packet read
1496 This is just a convenience function for the main loop
1497 that allows for easy launching of packet processing in a
1498 thread.
1499 """
1500 for p in self:
1501 callback(p)
1503 def _read_all(self, count=-1):
1504 # type: (int) -> List[Packet]
1505 """return a list of all packets in the pcap file
1506 """
1507 res = [] # type: List[Packet]
1508 while count != 0:
1509 count -= 1
1510 try:
1511 p = self.read_packet() # type: Packet
1512 except EOFError:
1513 break
1514 res.append(p)
1515 return res
1517 def recv(self, size=MTU):
1518 # type: (int) -> bytes
1519 """ Emulate a socket
1520 """
1521 return self._read_packet(size=size)[0]
1523 def fileno(self):
1524 # type: () -> int
1525 return -1 if WINDOWS else self.f.fileno()
1527 def close(self):
1528 # type: () -> None
1529 if isinstance(self.f, gzip.GzipFile):
1530 self.f.fileobj.close() # type: ignore
1531 self.f.close()
1533 def __exit__(self, exc_type, exc_value, tracback):
1534 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
1535 self.close()
1537 # emulate SuperSocket
1538 @staticmethod
1539 def select(sockets, # type: List[SuperSocket]
1540 remain=None, # type: Optional[float]
1541 ):
1542 # type: (...) -> List[SuperSocket]
1543 return sockets
1546class PcapReader(RawPcapReader):
1547 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1548 # type: (str, IO[bytes], bytes) -> None
1549 RawPcapReader.__init__(self, filename, fdesc, magic)
1550 try:
1551 self.LLcls = conf.l2types.num2layer[
1552 self.linktype
1553 ] # type: Type[Packet]
1554 except KeyError:
1555 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501
1556 if conf.raw_layer is None:
1557 # conf.raw_layer is set on import
1558 import scapy.packet # noqa: F401
1559 self.LLcls = conf.raw_layer
1561 def __enter__(self):
1562 # type: () -> PcapReader
1563 return self
1565 def read_packet(self, size=MTU, **kwargs):
1566 # type: (int, **Any) -> Packet
1567 rp = super(PcapReader, self)._read_packet(size=size)
1568 if rp is None:
1569 raise EOFError
1570 s, pkt_info = rp
1572 try:
1573 p = self.LLcls(s, **kwargs) # type: Packet
1574 except KeyboardInterrupt:
1575 raise
1576 except Exception:
1577 if conf.debug_dissector:
1578 from scapy.sendrecv import debug
1579 debug.crashed_on = (self.LLcls, s)
1580 raise
1581 if conf.raw_layer is None:
1582 # conf.raw_layer is set on import
1583 import scapy.packet # noqa: F401
1584 p = conf.raw_layer(s)
1585 power = Decimal(10) ** Decimal(-9 if self.nano else -6)
1586 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec)
1587 p.wirelen = pkt_info.wirelen
1588 return p
1590 def recv(self, size=MTU, **kwargs): # type: ignore
1591 # type: (int, **Any) -> Packet
1592 return self.read_packet(size=size, **kwargs)
1594 def __iter__(self):
1595 # type: () -> PcapReader
1596 return self
1598 def __next__(self): # type: ignore
1599 # type: () -> Packet
1600 try:
1601 return self.read_packet()
1602 except EOFError:
1603 raise StopIteration
1605 def read_all(self, count=-1):
1606 # type: (int) -> PacketList
1607 res = self._read_all(count)
1608 from scapy import plist
1609 return plist.PacketList(res, name=os.path.basename(self.filename))
1612class RawPcapNgReader(RawPcapReader):
1613 """A stateful pcapng reader. Each packet is returned as
1614 bytes.
1616 """
1618 alternative = RawPcapReader # type: Type[Any]
1620 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore
1621 ["linktype", "tsresol",
1622 "tshigh", "tslow", "wirelen",
1623 "comments", "ifname", "direction",
1624 "process_information"])
1626 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
1627 # type: (str, IO[bytes], bytes) -> None
1628 self.filename = filename
1629 self.f = fdesc
1630 # A list of (linktype, snaplen, tsresol); will be populated by IDBs.
1631 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]]
1632 self.default_options = {
1633 "tsresol": 1000000
1634 }
1635 self.blocktypes: Dict[
1636 int,
1637 Callable[
1638 [bytes, int],
1639 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]
1640 ]] = {
1641 1: self._read_block_idb,
1642 2: self._read_block_pkt,
1643 3: self._read_block_spb,
1644 6: self._read_block_epb,
1645 10: self._read_block_dsb,
1646 0x80000001: self._read_block_pib,
1647 }
1648 self.endian = "!" # Will be overwritten by first SHB
1649 self.process_information = [] # type: List[Dict[str, Any]]
1651 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg:
1652 raise Scapy_Exception(
1653 "Not a pcapng capture file (bad magic: %r)" % magic
1654 )
1656 try:
1657 self._read_block_shb()
1658 except EOFError:
1659 raise Scapy_Exception(
1660 "The first SHB of the pcapng file is malformed !"
1661 )
1663 def _read_block(self, size=MTU):
1664 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501
1665 try:
1666 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0]
1667 except struct.error:
1668 raise EOFError
1669 if blocktype == 0x0A0D0D0A:
1670 # This function updates the endianness based on the block content.
1671 self._read_block_shb()
1672 return None
1673 try:
1674 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0]
1675 except struct.error:
1676 warning("PcapNg: Error reading blocklen before block body")
1677 raise EOFError
1678 if blocklen < 12:
1679 warning("PcapNg: Invalid block length !")
1680 raise EOFError
1682 _block_body_length = blocklen - 12
1683 block = self.f.read(_block_body_length)
1684 if len(block) != _block_body_length:
1685 raise Scapy_Exception("PcapNg: Invalid Block body length "
1686 "(too short)")
1687 self._read_block_tail(blocklen)
1688 if blocktype in self.blocktypes:
1689 return self.blocktypes[blocktype](block, size)
1690 return None
1692 def _read_block_tail(self, blocklen):
1693 # type: (int) -> None
1694 if blocklen % 4:
1695 pad = self.f.read(-blocklen % 4)
1696 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
1697 "Ignored padding %r" % (blocklen, pad))
1698 try:
1699 if blocklen != struct.unpack(self.endian + 'I',
1700 self.f.read(4))[0]:
1701 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)")
1702 except struct.error:
1703 warning("PcapNg: Could not read blocklen after block body")
1704 raise EOFError
1706 def _read_block_shb(self):
1707 # type: () -> None
1708 """Section Header Block"""
1709 _blocklen = self.f.read(4)
1710 endian = self.f.read(4)
1711 if endian == b"\x1a\x2b\x3c\x4d":
1712 self.endian = ">"
1713 elif endian == b"\x4d\x3c\x2b\x1a":
1714 self.endian = "<"
1715 else:
1716 warning("PcapNg: Bad magic in Section Header Block"
1717 " (not a pcapng file?)")
1718 raise EOFError
1720 try:
1721 blocklen = struct.unpack(self.endian + "I", _blocklen)[0]
1722 except struct.error:
1723 warning("PcapNg: Could not read blocklen")
1724 raise EOFError
1725 if blocklen < 28:
1726 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501
1727 raise EOFError
1729 # Major version must be 1
1730 _major = self.f.read(2)
1731 try:
1732 major = struct.unpack(self.endian + "H", _major)[0]
1733 except struct.error:
1734 warning("PcapNg: Could not read major value")
1735 raise EOFError
1736 if major != 1:
1737 warning(f"PcapNg: SHB Major version {major} unsupported !")
1738 raise EOFError
1740 # Skip minor version & section length
1741 skipped = self.f.read(10)
1742 if len(skipped) != 10:
1743 warning("PcapNg: Could not read minor value & section length")
1744 raise EOFError
1746 _options_len = blocklen - 28
1747 options = self.f.read(_options_len)
1748 if len(options) != _options_len:
1749 raise Scapy_Exception("PcapNg: Invalid Section Header Block "
1750 " options (too short)")
1751 self._read_block_tail(blocklen)
1752 self._read_options(options)
1754 def _read_packet(self, size=MTU): # type: ignore
1755 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1756 """Read blocks until it reaches either EOF or a packet, and
1757 returns None or (packet, (linktype, sec, usec, wirelen)),
1758 where packet is a string.
1760 """
1761 while True:
1762 res = self._read_block(size=size)
1763 if res is not None:
1764 return res
1766 def _read_options(self, options):
1767 # type: (bytes) -> Dict[int, Union[bytes, List[bytes]]]
1768 opts = dict() # type: Dict[int, Union[bytes, List[bytes]]]
1769 while len(options) >= 4:
1770 try:
1771 code, length = struct.unpack(self.endian + "HH", options[:4])
1772 except struct.error:
1773 warning("PcapNg: options header is too small "
1774 "%d !" % len(options))
1775 raise EOFError
1776 if code != 0 and 4 + length <= len(options):
1777 # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#name-options-format
1778 if code in [1, 2988, 2989, 19372, 19373]:
1779 if code not in opts:
1780 opts[code] = []
1781 opts[code].append(options[4:4 + length]) # type: ignore
1782 else:
1783 opts[code] = options[4:4 + length]
1784 if code == 0:
1785 if length != 0:
1786 warning("PcapNg: invalid option "
1787 "length %d for end-of-option" % length)
1788 break
1789 if length % 4:
1790 length += (4 - (length % 4))
1791 options = options[4 + length:]
1792 return opts
1794 def _read_block_idb(self, block, _):
1795 # type: (bytes, int) -> None
1796 """Interface Description Block"""
1797 # 2 bytes LinkType + 2 bytes Reserved
1798 # 4 bytes Snaplen
1799 options_raw = self._read_options(block[8:])
1800 options = self.default_options.copy() # type: Dict[str, Any]
1801 for c, v in options_raw.items():
1802 if isinstance(v, list):
1803 # Spec allows multiple occurrences (see
1804 # https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-05.html#section-4.2-8.6)
1805 # but does not define which to use. We take the first for
1806 # backward compatibility.
1807 v = v[0]
1808 if c == 9:
1809 length = len(v)
1810 if length == 1:
1811 tsresol = orb(v)
1812 options["tsresol"] = (2 if tsresol & 128 else 10) ** (
1813 tsresol & 127
1814 )
1815 else:
1816 warning("PcapNg: invalid options "
1817 "length %d for IDB tsresol" % length)
1818 elif c == 2:
1819 options["name"] = v
1820 elif c == 1:
1821 options["comment"] = v
1822 try:
1823 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack(
1824 self.endian + "HxxI",
1825 block[:8]
1826 ) + (options,)
1827 except struct.error:
1828 warning("PcapNg: IDB is too small %d/8 !" % len(block))
1829 raise EOFError
1830 self.interfaces.append(interface)
1832 def _check_interface_id(self, intid):
1833 # type: (int) -> None
1834 """Check the interface id value and raise EOFError if invalid."""
1835 tmp_len = len(self.interfaces)
1836 if intid >= tmp_len:
1837 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len))
1838 raise EOFError
1840 def _read_block_epb(self, block, size):
1841 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1842 """Enhanced Packet Block"""
1843 try:
1844 intid, tshigh, tslow, caplen, wirelen = struct.unpack(
1845 self.endian + "5I",
1846 block[:20],
1847 )
1848 except struct.error:
1849 warning("PcapNg: EPB is too small %d/20 !" % len(block))
1850 raise EOFError
1852 # Compute the options offset taking padding into account
1853 if caplen % 4:
1854 opt_offset = 20 + caplen + (-caplen) % 4
1855 else:
1856 opt_offset = 20 + caplen
1858 # Parse options
1859 options = self._read_options(block[opt_offset:])
1861 process_information = {}
1862 for code, value in options.items():
1863 # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX
1864 if code in [0x8001, 0x8003]:
1865 try:
1866 proc_index = struct.unpack(
1867 self.endian + "I", value)[0] # type: ignore
1868 except struct.error:
1869 warning("PcapNg: EPB invalid proc index "
1870 "(expected 4 bytes, got %d) !" % len(value))
1871 raise EOFError
1872 if proc_index < len(self.process_information):
1873 key = "proc" if code == 0x8001 else "eproc"
1874 process_information[key] = self.process_information[proc_index]
1875 else:
1876 warning("PcapNg: EPB invalid process information index "
1877 "(%d/%d) !" % (proc_index, len(self.process_information)))
1879 comments = options.get(1, None)
1880 epb_flags_raw = options.get(2, None)
1881 if epb_flags_raw and isinstance(epb_flags_raw, bytes):
1882 try:
1883 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw)
1884 except struct.error:
1885 warning("PcapNg: EPB invalid flags size"
1886 "(expected 4 bytes, got %d) !" % len(epb_flags_raw))
1887 raise EOFError
1888 direction = epb_flags & 3
1890 else:
1891 direction = None
1893 self._check_interface_id(intid)
1894 ifname = self.interfaces[intid][2].get('name', None)
1896 return (block[20:20 + caplen][:size],
1897 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1898 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1899 tshigh=tshigh,
1900 tslow=tslow,
1901 wirelen=wirelen,
1902 ifname=ifname,
1903 direction=direction,
1904 process_information=process_information,
1905 comments=comments))
1907 def _read_block_spb(self, block, size):
1908 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1909 """Simple Packet Block"""
1910 # "it MUST be assumed that all the Simple Packet Blocks have
1911 # been captured on the interface previously specified in the
1912 # first Interface Description Block."
1913 intid = 0
1914 self._check_interface_id(intid)
1916 try:
1917 wirelen, = struct.unpack(self.endian + "I", block[:4])
1918 except struct.error:
1919 warning("PcapNg: SPB is too small %d/4 !" % len(block))
1920 raise EOFError
1922 caplen = min(wirelen, self.interfaces[intid][1])
1923 return (block[4:4 + caplen][:size],
1924 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1925 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1926 tshigh=None,
1927 tslow=None,
1928 wirelen=wirelen,
1929 ifname=None,
1930 direction=None,
1931 process_information={},
1932 comments=None))
1934 def _read_block_pkt(self, block, size):
1935 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1936 """(Obsolete) Packet Block"""
1937 try:
1938 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
1939 self.endian + "HH4I",
1940 block[:20],
1941 )
1942 except struct.error:
1943 warning("PcapNg: PKT is too small %d/20 !" % len(block))
1944 raise EOFError
1946 self._check_interface_id(intid)
1947 return (block[20:20 + caplen][:size],
1948 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501
1949 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501
1950 tshigh=tshigh,
1951 tslow=tslow,
1952 wirelen=wirelen,
1953 ifname=None,
1954 direction=None,
1955 process_information={},
1956 comments=None))
1958 def _read_block_dsb(self, block, size):
1959 # type: (bytes, int) -> None
1960 """Decryption Secrets Block"""
1962 # Parse the secrets type and length fields
1963 try:
1964 secrets_type, secrets_length = struct.unpack(
1965 self.endian + "II",
1966 block[:8],
1967 )
1968 block = block[8:]
1969 except struct.error:
1970 warning("PcapNg: DSB is too small %d!", len(block))
1971 raise EOFError
1973 # Compute the secrets length including the padding
1974 padded_secrets_length = secrets_length + (-secrets_length) % 4
1975 if len(block) < padded_secrets_length:
1976 warning("PcapNg: invalid DSB secrets length!")
1977 raise EOFError
1979 # Extract secrets data and options
1980 secrets_data = block[:padded_secrets_length][:secrets_length]
1981 if block[padded_secrets_length:]:
1982 warning("PcapNg: DSB options are not supported!")
1984 # TLS Key Log
1985 if secrets_type == 0x544c534b:
1986 if getattr(conf, "tls_sessions", False) is False:
1987 warning("PcapNg: TLS Key Log available, but "
1988 "the TLS layer is not loaded! Scapy won't be able "
1989 "to decrypt the packets.")
1990 else:
1991 from scapy.layers.tls.session import load_nss_keys
1993 # Write Key Log to a file and parse it
1994 filename = get_temp_file()
1995 with open(filename, "wb") as fd:
1996 fd.write(secrets_data)
1997 fd.close()
1999 keys = load_nss_keys(filename)
2000 if not keys:
2001 warning("PcapNg: invalid TLS Key Log in DSB!")
2002 else:
2003 # Note: these attributes are only available when the TLS
2004 # layer is loaded.
2005 conf.tls_nss_keys = keys
2006 conf.tls_session_enable = True
2007 else:
2008 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type)
2010 def _read_block_pib(self, block, _):
2011 # type: (bytes, int) -> None
2012 """Apple Process Information Block"""
2014 # Get the Process ID
2015 try:
2016 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0]
2017 process_information = {"id": dpeb_pid}
2018 block = block[4:]
2019 except struct.error:
2020 warning("PcapNg: DPEB is too small (%d). Cannot get PID!",
2021 len(block))
2022 raise EOFError
2024 # Get Options
2025 options = self._read_options(block)
2026 for code, value in options.items():
2027 if code == 2:
2028 process_information["name"] = value.decode( # type: ignore
2029 "ascii", "backslashreplace")
2030 elif code == 4:
2031 if len(value) == 16:
2032 process_information["uuid"] = str(UUID(bytes=value)) # type: ignore
2033 else:
2034 warning("PcapNg: DPEB UUID length is invalid (%d)!",
2035 len(value))
2037 # Store process information
2038 self.process_information.append(process_information)
2041class PcapNgReader(RawPcapNgReader, PcapReader):
2043 alternative = PcapReader
2045 def __init__(self, filename, fdesc=None, magic=None): # type: ignore
2046 # type: (str, IO[bytes], bytes) -> None
2047 RawPcapNgReader.__init__(self, filename, fdesc, magic)
2049 def __enter__(self):
2050 # type: () -> PcapNgReader
2051 return self
2053 def read_packet(self, size=MTU, **kwargs):
2054 # type: (int, **Any) -> Packet
2055 rp = super(PcapNgReader, self)._read_packet(size=size)
2056 if rp is None:
2057 raise EOFError
2058 s, (linktype, tsresol, tshigh, tslow, wirelen, comments, ifname, direction, process_information) = rp # noqa: E501
2059 try:
2060 cls = conf.l2types.num2layer[linktype] # type: Type[Packet]
2061 p = cls(s, **kwargs) # type: Packet
2062 except KeyboardInterrupt:
2063 raise
2064 except Exception:
2065 if conf.debug_dissector:
2066 raise
2067 if conf.raw_layer is None:
2068 # conf.raw_layer is set on import
2069 import scapy.packet # noqa: F401
2070 p = conf.raw_layer(s)
2071 if tshigh is not None:
2072 p.time = EDecimal((tshigh << 32) + tslow) / tsresol
2073 p.wirelen = wirelen
2074 p.comments = comments
2075 p.direction = direction
2076 p.process_information = process_information.copy()
2077 if ifname is not None:
2078 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace')
2079 return p
2081 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore
2082 return self.read_packet(size=size, **kwargs)
2085class GenericPcapWriter(object):
2086 nano = False
2087 linktype: int
2089 def _write_header(self, pkt):
2090 # type: (Optional[Union[Packet, bytes]]) -> None
2091 raise NotImplementedError
2093 def _write_packet(self,
2094 packet, # type: Union[bytes, Packet]
2095 linktype, # type: int
2096 sec=None, # type: Optional[float]
2097 usec=None, # type: Optional[int]
2098 caplen=None, # type: Optional[int]
2099 wirelen=None, # type: Optional[int]
2100 ifname=None, # type: Optional[bytes]
2101 direction=None, # type: Optional[int]
2102 comments=None, # type: Optional[List[bytes]]
2103 ):
2104 # type: (...) -> None
2105 raise NotImplementedError
2107 def _get_time(self,
2108 packet, # type: Union[bytes, Packet]
2109 sec, # type: Optional[float]
2110 usec # type: Optional[int]
2111 ):
2112 # type: (...) -> Tuple[float, int]
2113 if hasattr(packet, "time"):
2114 if sec is None:
2115 packet_time = packet.time
2116 tmp = int(packet_time)
2117 usec = int(round((packet_time - tmp) *
2118 (1000000000 if self.nano else 1000000)))
2119 sec = float(packet_time)
2120 if sec is not None and usec is None:
2121 usec = 0
2122 return sec, usec # type: ignore
2124 def write_header(self, pkt):
2125 # type: (Optional[Union[Packet, bytes]]) -> None
2126 if not hasattr(self, 'linktype'):
2127 try:
2128 if pkt is None or isinstance(pkt, bytes):
2129 # Can't guess LL
2130 raise KeyError
2131 self.linktype = conf.l2types.layer2num[
2132 pkt.__class__
2133 ]
2134 except KeyError:
2135 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)"
2136 warning(msg, self.__class__.__name__, pkt.__class__.__name__)
2137 self.linktype = DLT_EN10MB
2138 self._write_header(pkt)
2140 def write_packet(self,
2141 packet, # type: Union[bytes, Packet]
2142 sec=None, # type: Optional[float]
2143 usec=None, # type: Optional[int]
2144 caplen=None, # type: Optional[int]
2145 wirelen=None, # type: Optional[int]
2146 ):
2147 # type: (...) -> None
2148 """
2149 Writes a single packet to the pcap file.
2151 :param packet: Packet, or bytes for a single packet
2152 :type packet: scapy.packet.Packet or bytes
2153 :param sec: time the packet was captured, in seconds since epoch. If
2154 not supplied, defaults to now.
2155 :type sec: float
2156 :param usec: If ``nano=True``, then number of nanoseconds after the
2157 second that the packet was captured. If ``nano=False``,
2158 then the number of microseconds after the second the
2159 packet was captured. If ``sec`` is not specified,
2160 this value is ignored.
2161 :type usec: int or long
2162 :param caplen: The length of the packet in the capture file. If not
2163 specified, uses ``len(raw(packet))``.
2164 :type caplen: int
2165 :param wirelen: The length of the packet on the wire. If not
2166 specified, tries ``packet.wirelen``, otherwise uses
2167 ``caplen``.
2168 :type wirelen: int
2169 :return: None
2170 :rtype: None
2171 """
2172 f_sec, usec = self._get_time(packet, sec, usec)
2174 rawpkt = bytes_encode(packet)
2175 caplen = len(rawpkt) if caplen is None else caplen
2177 if wirelen is None:
2178 if hasattr(packet, "wirelen"):
2179 wirelen = packet.wirelen
2180 if wirelen is None:
2181 wirelen = caplen
2183 comments = getattr(packet, "comments", None)
2184 ifname = getattr(packet, "sniffed_on", None)
2185 direction = getattr(packet, "direction", None)
2186 if not isinstance(packet, bytes):
2187 linktype: int = conf.l2types.layer2num[
2188 packet.__class__
2189 ]
2190 else:
2191 linktype = self.linktype
2192 if ifname is not None:
2193 ifname = str(ifname).encode('utf-8')
2194 self._write_packet(
2195 rawpkt,
2196 sec=f_sec, usec=usec,
2197 caplen=caplen, wirelen=wirelen,
2198 ifname=ifname,
2199 direction=direction,
2200 linktype=linktype,
2201 comments=comments,
2202 )
2205class GenericRawPcapWriter(GenericPcapWriter):
2206 header_present = False
2207 nano = False
2208 sync = False
2209 f = None # type: Union[IO[bytes], gzip.GzipFile]
2211 def fileno(self):
2212 # type: () -> int
2213 return -1 if WINDOWS else self.f.fileno()
2215 def flush(self):
2216 # type: () -> Optional[Any]
2217 return self.f.flush()
2219 def close(self):
2220 # type: () -> Optional[Any]
2221 if not self.header_present:
2222 self.write_header(None)
2223 return self.f.close()
2225 def __enter__(self):
2226 # type: () -> GenericRawPcapWriter
2227 return self
2229 def __exit__(self, exc_type, exc_value, tracback):
2230 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
2231 self.flush()
2232 self.close()
2234 def write(self, pkt):
2235 # type: (Union[_PacketIterable, bytes]) -> None
2236 """
2237 Writes a Packet, a SndRcvList object, or bytes to a pcap file.
2239 :param pkt: Packet(s) to write (one record for each Packet), or raw
2240 bytes to write (as one record).
2241 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes
2242 """
2243 if isinstance(pkt, bytes):
2244 if not self.header_present:
2245 self.write_header(pkt)
2246 self.write_packet(pkt)
2247 else:
2248 # Import here to avoid circular dependency
2249 from scapy.supersocket import IterSocket
2250 for p in IterSocket(pkt).iter:
2251 if not self.header_present:
2252 self.write_header(p)
2254 if not isinstance(p, bytes) and \
2255 self.linktype != conf.l2types.get(type(p), None):
2256 warning("Inconsistent linktypes detected!"
2257 " The resulting file might contain"
2258 " invalid packets."
2259 )
2261 self.write_packet(p)
2264class RawPcapWriter(GenericRawPcapWriter):
2265 """A stream PCAP writer with more control than wrpcap()"""
2267 def __init__(self,
2268 filename, # type: Union[IO[bytes], str]
2269 linktype=None, # type: Optional[int]
2270 gz=False, # type: bool
2271 endianness="", # type: str
2272 append=False, # type: bool
2273 sync=False, # type: bool
2274 nano=False, # type: bool
2275 snaplen=MTU, # type: int
2276 bufsz=4096, # type: int
2277 ):
2278 # type: (...) -> None
2279 """
2280 :param filename: the name of the file to write packets to, or an open,
2281 writable file-like object.
2282 :param linktype: force linktype to a given value. If None, linktype is
2283 taken from the first writer packet
2284 :param gz: compress the capture on the fly
2285 :param endianness: force an endianness (little:"<", big:">").
2286 Default is native
2287 :param append: append packets to the capture file instead of
2288 truncating it
2289 :param sync: do not bufferize writes to the capture file
2290 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0)
2292 """
2294 if linktype:
2295 self.linktype = linktype
2296 self.snaplen = snaplen
2297 self.append = append
2298 self.gz = gz
2299 self.endian = endianness
2300 self.sync = sync
2301 self.nano = nano
2302 if sync:
2303 bufsz = 0
2305 if isinstance(filename, str):
2306 self.filename = filename
2307 if gz:
2308 self.f = cast(_ByteStream, gzip.open(
2309 filename, append and "ab" or "wb", 9
2310 ))
2311 else:
2312 self.f = open(filename, append and "ab" or "wb", bufsz)
2313 else:
2314 self.f = filename
2315 self.filename = getattr(filename, "name", "No name")
2317 def _write_header(self, pkt):
2318 # type: (Optional[Union[Packet, bytes]]) -> None
2319 self.header_present = True
2321 if self.append:
2322 # Even if prone to race conditions, this seems to be
2323 # safest way to tell whether the header is already present
2324 # because we have to handle compressed streams that
2325 # are not as flexible as basic files
2326 if self.gz:
2327 g = gzip.open(self.filename, "rb") # type: _ByteStream
2328 else:
2329 g = open(self.filename, "rb")
2330 try:
2331 if g.read(16):
2332 return
2333 finally:
2334 g.close()
2336 if not hasattr(self, 'linktype'):
2337 raise ValueError(
2338 "linktype could not be guessed. "
2339 "Please pass a linktype while creating the writer"
2340 )
2342 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501
2343 2, 4, 0, 0, self.snaplen, self.linktype))
2344 self.f.flush()
2346 def _write_packet(self,
2347 packet, # type: Union[bytes, Packet]
2348 linktype, # type: int
2349 sec=None, # type: Optional[float]
2350 usec=None, # type: Optional[int]
2351 caplen=None, # type: Optional[int]
2352 wirelen=None, # type: Optional[int]
2353 ifname=None, # type: Optional[bytes]
2354 direction=None, # type: Optional[int]
2355 comments=None, # type: Optional[List[bytes]]
2356 ):
2357 # type: (...) -> None
2358 """
2359 Writes a single packet to the pcap file.
2361 :param packet: bytes for a single packet
2362 :type packet: bytes
2363 :param linktype: linktype value associated with the packet
2364 :type linktype: int
2365 :param sec: time the packet was captured, in seconds since epoch. If
2366 not supplied, defaults to now.
2367 :type sec: float
2368 :param usec: not used with pcapng
2369 packet was captured
2370 :type usec: int or long
2371 :param caplen: The length of the packet in the capture file. If not
2372 specified, uses ``len(packet)``.
2373 :type caplen: int
2374 :param wirelen: The length of the packet on the wire. If not
2375 specified, uses ``caplen``.
2376 :type wirelen: int
2377 :return: None
2378 :rtype: None
2379 """
2380 if caplen is None:
2381 caplen = len(packet)
2382 if wirelen is None:
2383 wirelen = caplen
2384 if sec is None or usec is None:
2385 t = time.time()
2386 it = int(t)
2387 if sec is None:
2388 sec = it
2389 usec = int(round((t - it) *
2390 (1000000000 if self.nano else 1000000)))
2391 elif usec is None:
2392 usec = 0
2394 self.f.write(struct.pack(self.endian + "IIII",
2395 int(sec), usec, caplen, wirelen))
2396 self.f.write(bytes(packet))
2397 if self.sync:
2398 self.f.flush()
2401class RawPcapNgWriter(GenericRawPcapWriter):
2402 """A stream pcapng writer with more control than wrpcapng()"""
2404 def __init__(self,
2405 filename, # type: str
2406 ):
2407 # type: (...) -> None
2409 self.header_present = False
2410 self.tsresol = 1000000
2411 # A dict to keep if_name to IDB id mapping.
2412 # unknown if_name(None) id=0
2413 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0}
2415 # tcpdump only support little-endian in PCAPng files
2416 self.endian = "<"
2417 self.endian_magic = b"\x4d\x3c\x2b\x1a"
2419 self.filename = filename
2420 self.f = open(filename, "wb", 4096)
2422 def _get_time(self,
2423 packet, # type: Union[bytes, Packet]
2424 sec, # type: Optional[float]
2425 usec # type: Optional[int]
2426 ):
2427 # type: (...) -> Tuple[float, int]
2428 if hasattr(packet, "time"):
2429 if sec is None:
2430 sec = float(packet.time)
2432 if usec is None:
2433 usec = 0
2435 return sec, usec # type: ignore
2437 def _add_padding(self, raw_data):
2438 # type: (bytes) -> bytes
2439 raw_data += ((-len(raw_data)) % 4) * b"\x00"
2440 return raw_data
2442 def build_block(self, block_type, block_body, options=None):
2443 # type: (bytes, bytes, Optional[bytes]) -> bytes
2445 # Pad Block Body to 32 bits
2446 block_body = self._add_padding(block_body)
2448 if options:
2449 block_body += options
2451 # An empty block is 12 bytes long
2452 block_total_length = 12 + len(block_body)
2454 # Block Type
2455 block = block_type
2456 # Block Total Length$
2457 block += struct.pack(self.endian + "I", block_total_length)
2458 # Block Body
2459 block += block_body
2460 # Block Total Length$
2461 block += struct.pack(self.endian + "I", block_total_length)
2463 return block
2465 def _write_header(self, pkt):
2466 # type: (Optional[Union[Packet, bytes]]) -> None
2467 if not self.header_present:
2468 self.header_present = True
2469 self._write_block_shb()
2470 self._write_block_idb(linktype=self.linktype)
2472 def _write_block_shb(self):
2473 # type: () -> None
2475 # Block Type
2476 block_type = b"\x0A\x0D\x0D\x0A"
2477 # Byte-Order Magic
2478 block_shb = self.endian_magic
2479 # Major Version
2480 block_shb += struct.pack(self.endian + "H", 1)
2481 # Minor Version
2482 block_shb += struct.pack(self.endian + "H", 0)
2483 # Section Length
2484 block_shb += struct.pack(self.endian + "q", -1)
2486 self.f.write(self.build_block(block_type, block_shb))
2488 def _write_block_idb(self,
2489 linktype, # type: int
2490 ifname=None # type: Optional[bytes]
2491 ):
2492 # type: (...) -> None
2494 # Block Type
2495 block_type = struct.pack(self.endian + "I", 1)
2496 # LinkType
2497 block_idb = struct.pack(self.endian + "H", linktype)
2498 # Reserved
2499 block_idb += struct.pack(self.endian + "H", 0)
2500 # SnapLen
2501 block_idb += struct.pack(self.endian + "I", 262144)
2503 # if_name option
2504 opts = None
2505 if ifname is not None:
2506 opts = struct.pack(self.endian + "HH", 2, len(ifname))
2507 # Pad Option Value to 32 bits
2508 opts += self._add_padding(ifname)
2509 opts += struct.pack(self.endian + "HH", 0, 0)
2511 self.f.write(self.build_block(block_type, block_idb, options=opts))
2513 def _write_block_spb(self, raw_pkt):
2514 # type: (bytes) -> None
2516 # Block Type
2517 block_type = struct.pack(self.endian + "I", 3)
2518 # Original Packet Length
2519 block_spb = struct.pack(self.endian + "I", len(raw_pkt))
2520 # Packet Data
2521 block_spb += raw_pkt
2523 self.f.write(self.build_block(block_type, block_spb))
2525 def _write_block_epb(self,
2526 raw_pkt, # type: bytes
2527 ifid, # type: int
2528 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501
2529 caplen=None, # type: Optional[int]
2530 orglen=None, # type: Optional[int]
2531 comments=None, # type: Optional[List[bytes]]
2532 flags=None, # type: Optional[int]
2533 ):
2534 # type: (...) -> None
2536 if timestamp:
2537 tmp_ts = int(timestamp * self.tsresol)
2538 ts_high = tmp_ts >> 32
2539 ts_low = tmp_ts & 0xFFFFFFFF
2540 else:
2541 ts_high = ts_low = 0
2543 if not caplen:
2544 caplen = len(raw_pkt)
2546 if not orglen:
2547 orglen = len(raw_pkt)
2549 # Block Type
2550 block_type = struct.pack(self.endian + "I", 6)
2551 # Interface ID
2552 block_epb = struct.pack(self.endian + "I", ifid)
2553 # Timestamp (High)
2554 block_epb += struct.pack(self.endian + "I", ts_high)
2555 # Timestamp (Low)
2556 block_epb += struct.pack(self.endian + "I", ts_low)
2557 # Captured Packet Length
2558 block_epb += struct.pack(self.endian + "I", caplen)
2559 # Original Packet Length
2560 block_epb += struct.pack(self.endian + "I", orglen)
2561 # Packet Data
2562 block_epb += raw_pkt
2564 # Options
2565 opts = b''
2566 if comments and len(comments):
2567 for c in comments:
2568 comment = bytes_encode(c)
2569 opts += struct.pack(self.endian + "HH", 1, len(comment))
2570 # Pad Option Value to 32 bits
2571 opts += self._add_padding(comment)
2572 if type(flags) == int:
2573 opts += struct.pack(self.endian + "HH", 2, 4)
2574 opts += struct.pack(self.endian + "I", flags)
2575 if opts:
2576 opts += struct.pack(self.endian + "HH", 0, 0)
2578 self.f.write(self.build_block(block_type, block_epb,
2579 options=opts))
2581 def _write_packet(self, # type: ignore
2582 packet, # type: bytes
2583 linktype, # type: int
2584 sec=None, # type: Optional[float]
2585 usec=None, # type: Optional[int]
2586 caplen=None, # type: Optional[int]
2587 wirelen=None, # type: Optional[int]
2588 ifname=None, # type: Optional[bytes]
2589 direction=None, # type: Optional[int]
2590 comments=None, # type: Optional[List[bytes]]
2591 ):
2592 # type: (...) -> None
2593 """
2594 Writes a single packet to the pcap file.
2596 :param packet: bytes for a single packet
2597 :type packet: bytes
2598 :param linktype: linktype value associated with the packet
2599 :type linktype: int
2600 :param sec: time the packet was captured, in seconds since epoch. If
2601 not supplied, defaults to now.
2602 :type sec: float
2603 :param caplen: The length of the packet in the capture file. If not
2604 specified, uses ``len(packet)``.
2605 :type caplen: int
2606 :param wirelen: The length of the packet on the wire. If not
2607 specified, uses ``caplen``.
2608 :type wirelen: int
2609 :param comment: UTF-8 string containing human-readable comment text
2610 that is associated to the current block. Line separators
2611 SHOULD be a carriage-return + linefeed ('\r\n') or
2612 just linefeed ('\n'); either form may appear and
2613 be considered a line separator. The string is not
2614 zero-terminated.
2615 :type bytes
2616 :param ifname: UTF-8 string containing the
2617 name of the device used to capture data.
2618 The string is not zero-terminated.
2619 :type bytes
2620 :param direction: 0 = information not available,
2621 1 = inbound,
2622 2 = outbound
2623 :type int
2624 :return: None
2625 :rtype: None
2626 """
2627 if caplen is None:
2628 caplen = len(packet)
2629 if wirelen is None:
2630 wirelen = caplen
2632 ifid = self.interfaces2id.get(ifname, None)
2633 if ifid is None:
2634 ifid = max(self.interfaces2id.values()) + 1
2635 self.interfaces2id[ifname] = ifid
2636 self._write_block_idb(linktype=linktype, ifname=ifname)
2638 # EPB flags (32 bits).
2639 # currently only direction is implemented (least 2 significant bits)
2640 if type(direction) == int:
2641 flags = direction & 0x3
2642 else:
2643 flags = None
2645 self._write_block_epb(packet, timestamp=sec, caplen=caplen,
2646 orglen=wirelen, comments=comments, ifid=ifid, flags=flags)
2647 if self.sync:
2648 self.f.flush()
2651class PcapWriter(RawPcapWriter):
2652 """A stream PCAP writer with more control than wrpcap()"""
2653 pass
2656class PcapNgWriter(RawPcapNgWriter):
2657 """A stream pcapng writer with more control than wrpcapng()"""
2659 def _get_time(self,
2660 packet, # type: Union[bytes, Packet]
2661 sec, # type: Optional[float]
2662 usec # type: Optional[int]
2663 ):
2664 # type: (...) -> Tuple[float, int]
2665 if hasattr(packet, "time"):
2666 if sec is None:
2667 sec = float(packet.time)
2669 if usec is None:
2670 usec = 0
2672 return sec, usec # type: ignore
2675@conf.commands.register
2676def rderf(filename, count=-1):
2677 # type: (Union[IO[bytes], str], int) -> PacketList
2678 """Read a ERF file and return a packet list
2680 :param count: read only <count> packets
2681 """
2682 with ERFEthernetReader(filename) as fdesc:
2683 return fdesc.read_all(count=count)
2686class ERFEthernetReader_metaclass(PcapReader_metaclass):
2687 def __call__(cls, filename):
2688 # type: (Union[IO[bytes], str]) -> Any
2689 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore
2690 filename, fdesc = cls.open(filename)
2691 try:
2692 i.__init__(filename, fdesc)
2693 return i
2694 except (Scapy_Exception, EOFError):
2695 pass
2697 if "alternative" in cls.__dict__:
2698 cls = cls.__dict__["alternative"]
2699 i = cls.__new__(
2700 cls,
2701 cls.__name__,
2702 cls.__bases__,
2703 cls.__dict__ # type: ignore
2704 )
2705 try:
2706 i.__init__(filename, fdesc)
2707 return i
2708 except (Scapy_Exception, EOFError):
2709 pass
2711 raise Scapy_Exception("Not a supported capture file")
2713 @staticmethod
2714 def open(fname # type: ignore
2715 ):
2716 # type: (...) -> Tuple[str, _ByteStream]
2717 """Open (if necessary) filename"""
2718 if isinstance(fname, str):
2719 filename = fname
2720 try:
2721 with gzip.open(filename, "rb") as tmp:
2722 tmp.read(1)
2723 fdesc = gzip.open(filename, "rb") # type: _ByteStream
2724 except IOError:
2725 fdesc = open(filename, "rb")
2727 else:
2728 fdesc = fname
2729 filename = getattr(fdesc, "name", "No name")
2730 return filename, fdesc
2733class ERFEthernetReader(PcapReader,
2734 metaclass=ERFEthernetReader_metaclass):
2736 def __init__(self, filename, fdesc=None): # type: ignore
2737 # type: (Union[IO[bytes], str], IO[bytes]) -> None
2738 self.filename = filename # type: ignore
2739 self.f = fdesc
2740 self.power = Decimal(10) ** Decimal(-9)
2742 # time is in 64-bits Endace's format which can be see here:
2743 # https://www.endace.com/erf-extensible-record-format-types.pdf
2744 def _convert_erf_timestamp(self, t):
2745 # type: (int) -> EDecimal
2746 sec = t >> 32
2747 frac_sec = t & 0xffffffff
2748 frac_sec *= 10**9
2749 frac_sec += (frac_sec & 0x80000000) << 1
2750 frac_sec >>= 32
2751 return EDecimal(sec + self.power * frac_sec)
2753 # The details of ERF Packet format can be see here:
2754 # https://www.endace.com/erf-extensible-record-format-types.pdf
2755 def read_packet(self, size=MTU, **kwargs):
2756 # type: (int, **Any) -> Packet
2758 # General ERF Header have exactly 16 bytes
2759 hdr = self.f.read(16)
2760 if len(hdr) < 16:
2761 raise EOFError
2763 # The timestamp is in little-endian byte-order.
2764 time = struct.unpack('<Q', hdr[:8])[0]
2765 # The rest is in big-endian byte-order.
2766 # Ignoring flags and lctr (loss counter) since they are ERF specific
2767 # header fields which Packet object does not support.
2768 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:])
2769 # Check if the type != 0x02, type Ethernet
2770 if type & 0x02 == 0:
2771 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)")
2773 # If there are extended headers, ignore it because Packet object does
2774 # not support it. Extended headers size is 8 bytes before the payload.
2775 if type & 0x80:
2776 _ = self.f.read(8)
2777 s = self.f.read(rlen - 24)
2778 else:
2779 s = self.f.read(rlen - 16)
2781 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both
2782 # of the fields are disregarded by Endace.
2783 pb = s[2:size]
2784 from scapy.layers.l2 import Ether
2785 try:
2786 p = Ether(pb, **kwargs) # type: Packet
2787 except KeyboardInterrupt:
2788 raise
2789 except Exception:
2790 if conf.debug_dissector:
2791 from scapy.sendrecv import debug
2792 debug.crashed_on = (Ether, s)
2793 raise
2794 if conf.raw_layer is None:
2795 # conf.raw_layer is set on import
2796 import scapy.packet # noqa: F401
2797 p = conf.raw_layer(s)
2799 p.time = self._convert_erf_timestamp(time)
2800 p.wirelen = wlen
2802 return p
2805@conf.commands.register
2806def wrerf(filename, # type: Union[IO[bytes], str]
2807 pkt, # type: _PacketIterable
2808 *args, # type: Any
2809 **kargs # type: Any
2810 ):
2811 # type: (...) -> None
2812 """Write a list of packets to a ERF file
2814 :param filename: the name of the file to write packets to, or an open,
2815 writable file-like object. The file descriptor will be
2816 closed at the end of the call, so do not use an object you
2817 do not want to close (e.g., running wrerf(sys.stdout, [])
2818 in interactive mode will crash Scapy).
2819 :param gz: set to 1 to save a gzipped capture
2820 :param append: append packets to the capture file instead of
2821 truncating it
2822 :param sync: do not bufferize writes to the capture file
2823 """
2824 with ERFEthernetWriter(filename, *args, **kargs) as fdesc:
2825 fdesc.write(pkt)
2828class ERFEthernetWriter(PcapWriter):
2829 """A stream ERF Ethernet writer with more control than wrerf()"""
2831 def __init__(self,
2832 filename, # type: Union[IO[bytes], str]
2833 gz=False, # type: bool
2834 append=False, # type: bool
2835 sync=False, # type: bool
2836 ):
2837 # type: (...) -> None
2838 """
2839 :param filename: the name of the file to write packets to, or an open,
2840 writable file-like object.
2841 :param gz: compress the capture on the fly
2842 :param append: append packets to the capture file instead of
2843 truncating it
2844 :param sync: do not bufferize writes to the capture file
2845 """
2846 super(ERFEthernetWriter, self).__init__(filename,
2847 gz=gz,
2848 append=append,
2849 sync=sync)
2851 def write(self, pkt): # type: ignore
2852 # type: (_PacketIterable) -> None
2853 """
2854 Writes a Packet, a SndRcvList object, or bytes to a ERF file.
2856 :param pkt: Packet(s) to write (one record for each Packet)
2857 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet
2858 """
2859 # Import here to avoid circular dependency
2860 from scapy.supersocket import IterSocket
2861 for p in IterSocket(pkt).iter:
2862 self.write_packet(p)
2864 def write_packet(self, pkt): # type: ignore
2865 # type: (Packet) -> None
2867 if hasattr(pkt, "time"):
2868 sec = int(pkt.time)
2869 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9)
2870 t = (sec << 32) + usec
2871 else:
2872 t = int(time.time()) << 32
2874 # There are 16 bytes of headers + 2 bytes of padding before the packets
2875 # payload.
2876 rlen = len(pkt) + 18
2878 if hasattr(pkt, "wirelen"):
2879 wirelen = pkt.wirelen
2880 if wirelen is None:
2881 wirelen = rlen
2883 self.f.write(struct.pack("<Q", t))
2884 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0))
2885 self.f.write(bytes(pkt))
2886 self.f.flush()
2888 def close(self):
2889 # type: () -> Optional[Any]
2890 return self.f.close()
2893@conf.commands.register
2894def import_hexcap(input_string=None):
2895 # type: (Optional[str]) -> bytes
2896 """Imports a tcpdump like hexadecimal view
2898 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex"
2900 :param input_string: String containing the hexdump input to parse. If None,
2901 read from standard input.
2902 """
2903 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501
2904 p = ""
2905 try:
2906 if input_string:
2907 input_function = StringIO(input_string).readline
2908 else:
2909 input_function = input
2910 while True:
2911 line = input_function().strip()
2912 if not line:
2913 break
2914 try:
2915 p += re_extract_hexcap.match(line).groups()[2] # type: ignore
2916 except Exception:
2917 warning("Parsing error during hexcap")
2918 continue
2919 except EOFError:
2920 pass
2922 p = p.replace(" ", "")
2923 return hex_bytes(p)
2926@conf.commands.register
2927def wireshark(pktlist, wait=False, **kwargs):
2928 # type: (List[Packet], bool, **Any) -> Optional[Any]
2929 """
2930 Runs Wireshark on a list of packets.
2932 See :func:`tcpdump` for more parameter description.
2934 Note: this defaults to wait=False, to run Wireshark in the background.
2935 """
2936 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs)
2939@conf.commands.register
2940def tdecode(
2941 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable]
2942 args=None, # type: Optional[List[str]]
2943 **kwargs # type: Any
2944):
2945 # type: (...) -> Any
2946 """
2947 Run tshark on a list of packets.
2949 :param args: If not specified, defaults to ``tshark -V``.
2951 See :func:`tcpdump` for more parameters.
2952 """
2953 if args is None:
2954 args = ["-V"]
2955 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs)
2958def _guess_linktype_name(value):
2959 # type: (int) -> str
2960 """Guess the DLT name from its value."""
2961 from scapy.libs.winpcapy import pcap_datalink_val_to_name
2962 return cast(bytes, pcap_datalink_val_to_name(value)).decode()
2965def _guess_linktype_value(name):
2966 # type: (str) -> int
2967 """Guess the value of a DLT name."""
2968 from scapy.libs.winpcapy import pcap_datalink_name_to_val
2969 val = cast(int, pcap_datalink_name_to_val(name.encode()))
2970 if val == -1:
2971 warning("Unknown linktype: %s. Using EN10MB", name)
2972 return DLT_EN10MB
2973 return val
2976@conf.commands.register
2977def tcpdump(
2978 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable]
2979 dump=False, # type: bool
2980 getfd=False, # type: bool
2981 args=None, # type: Optional[List[str]]
2982 flt=None, # type: Optional[str]
2983 prog=None, # type: Optional[Any]
2984 getproc=False, # type: bool
2985 quiet=False, # type: bool
2986 use_tempfile=None, # type: Optional[Any]
2987 read_stdin_opts=None, # type: Optional[Any]
2988 linktype=None, # type: Optional[Any]
2989 wait=True, # type: bool
2990 _suppress=False # type: bool
2991):
2992 # type: (...) -> Any
2993 """Run tcpdump or tshark on a list of packets.
2995 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a
2996 temporary file to store the packets. This works around a bug in Apple's
2997 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/
2999 Otherwise, the packets are passed in stdin.
3001 This function can be explicitly enabled or disabled with the
3002 ``use_tempfile`` parameter.
3004 When using ``wireshark``, it will be called with ``-ki -`` to start
3005 immediately capturing packets from stdin.
3007 Otherwise, the command will be run with ``-r -`` (which is correct for
3008 ``tcpdump`` and ``tshark``).
3010 This can be overridden with ``read_stdin_opts``. This has no effect when
3011 ``use_tempfile=True``, or otherwise reading packets from a regular file.
3013 :param pktlist: a Packet instance, a PacketList instance or a list of
3014 Packet instances. Can also be a filename (as a string), an open
3015 file-like object that must be a file format readable by
3016 tshark (Pcap, PcapNg, etc.) or None (to sniff)
3017 :param flt: a filter to use with tcpdump
3018 :param dump: when set to True, returns a string instead of displaying it.
3019 :param getfd: when set to True, returns a file-like object to read data
3020 from tcpdump or tshark from.
3021 :param getproc: when set to True, the subprocess.Popen object is returned
3022 :param args: arguments (as a list) to pass to tshark (example for tshark:
3023 args=["-T", "json"]).
3024 :param prog: program to use (defaults to tcpdump, will work with tshark)
3025 :param quiet: when set to True, the process stderr is discarded
3026 :param use_tempfile: When set to True, always use a temporary file to store
3027 packets.
3028 When set to False, pipe packets through stdin.
3029 When set to None (default), only use a temporary file with
3030 ``tcpdump`` on OSX.
3031 :param read_stdin_opts: When set, a list of arguments needed to capture
3032 from stdin. Otherwise, attempts to guess.
3033 :param linktype: A custom DLT value or name, to overwrite the default
3034 values.
3035 :param wait: If True (default), waits for the process to terminate before
3036 returning to Scapy. If False, the process will be detached to the
3037 background. If dump, getproc or getfd is True, these have the same
3038 effect as ``wait=False``.
3040 Examples::
3042 >>> tcpdump([IP()/TCP(), IP()/UDP()])
3043 reading from file -, link-type RAW (Raw IP)
3044 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501
3045 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]
3047 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
3048 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501
3049 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0
3051 To get a JSON representation of a tshark-parsed PacketList(), one can::
3053 >>> import json, pprint
3054 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5",
3055 ... dst="45.33.32.156"),
3056 ... prog=conf.prog.tshark,
3057 ... args=["-T", "json"],
3058 ... getfd=True))
3059 >>> pprint.pprint(json_data)
3060 [{u'_index': u'packets-2016-12-23',
3061 u'_score': None,
3062 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
3063 u'frame.encap_type': u'7',
3064 [...]
3065 },
3066 u'ip': {u'ip.addr': u'45.33.32.156',
3067 u'ip.checksum': u'0x0000a20d',
3068 [...]
3069 u'ip.ttl': u'64',
3070 u'ip.version': u'4'},
3071 u'raw': u'Raw packet data'}},
3072 u'_type': u'pcap_file'}]
3073 >>> json_data[0]['_source']['layers']['ip']['ip.ttl']
3074 u'64'
3075 """
3076 getfd = getfd or getproc
3077 if prog is None:
3078 if not conf.prog.tcpdump:
3079 raise Scapy_Exception(
3080 "tcpdump is not available"
3081 )
3082 prog = [conf.prog.tcpdump]
3083 elif isinstance(prog, str):
3084 prog = [prog]
3085 else:
3086 raise ValueError("prog must be a string")
3088 if linktype is not None:
3089 if isinstance(linktype, int):
3090 # Guess name from value
3091 try:
3092 linktype_name = _guess_linktype_name(linktype)
3093 except StopIteration:
3094 linktype = -1
3095 else:
3096 # Guess value from name
3097 if linktype.startswith("DLT_"):
3098 linktype = linktype[4:]
3099 linktype_name = linktype
3100 try:
3101 linktype = _guess_linktype_value(linktype)
3102 except KeyError:
3103 linktype = -1
3104 if linktype == -1:
3105 raise ValueError(
3106 "Unknown linktype. Try passing its datalink name instead"
3107 )
3108 prog += ["-y", linktype_name]
3110 # Build Popen arguments
3111 if args is None:
3112 args = []
3113 else:
3114 # Make a copy of args
3115 args = list(args)
3117 if flt is not None:
3118 # Check the validity of the filter
3119 if linktype is None and isinstance(pktlist, str):
3120 # linktype is unknown but required. Read it from file
3121 with PcapReader(pktlist) as rd:
3122 if isinstance(rd, PcapNgReader):
3123 # Get the linktype from the first packet
3124 try:
3125 _, metadata = rd._read_packet()
3126 linktype = metadata.linktype
3127 if OPENBSD and linktype == 228:
3128 linktype = DLT_RAW
3129 except EOFError:
3130 raise ValueError(
3131 "Cannot get linktype from a PcapNg packet."
3132 )
3133 else:
3134 linktype = rd.linktype
3135 from scapy.arch.common import compile_filter
3136 compile_filter(flt, linktype=linktype)
3137 args.append(flt)
3139 stdout = subprocess.PIPE if dump or getfd else None
3140 stderr = open(os.devnull) if quiet else None
3141 proc = None
3143 if use_tempfile is None:
3144 # Apple's tcpdump cannot read from stdin, see:
3145 # http://apple.stackexchange.com/questions/152682/
3146 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump
3148 if read_stdin_opts is None:
3149 if prog[0] == conf.prog.wireshark:
3150 # Start capturing immediately (-k) from stdin (-i -)
3151 read_stdin_opts = ["-ki", "-"]
3152 elif prog[0] == conf.prog.tcpdump and not OPENBSD:
3153 # Capture in packet-buffered mode (-U) from stdin (-r -)
3154 read_stdin_opts = ["-U", "-r", "-"]
3155 else:
3156 read_stdin_opts = ["-r", "-"]
3157 else:
3158 # Make a copy of read_stdin_opts
3159 read_stdin_opts = list(read_stdin_opts)
3161 if pktlist is None:
3162 # sniff
3163 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3164 proc = subprocess.Popen(
3165 prog + args,
3166 stdout=stdout,
3167 stderr=stderr,
3168 )
3169 elif isinstance(pktlist, str):
3170 # file
3171 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3172 proc = subprocess.Popen(
3173 prog + ["-r", pktlist] + args,
3174 stdout=stdout,
3175 stderr=stderr,
3176 )
3177 elif use_tempfile:
3178 tmpfile = get_temp_file( # type: ignore
3179 autoext=".pcap",
3180 fd=True
3181 ) # type: IO[bytes]
3182 try:
3183 tmpfile.writelines(
3184 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3185 )
3186 except AttributeError:
3187 pktlist = cast("_PacketIterable", pktlist)
3188 wrpcap(tmpfile, pktlist, linktype=linktype)
3189 else:
3190 tmpfile.close()
3191 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3192 proc = subprocess.Popen(
3193 prog + ["-r", tmpfile.name] + args,
3194 stdout=stdout,
3195 stderr=stderr,
3196 )
3197 else:
3198 try:
3199 pktlist.fileno() # type: ignore
3200 # pass the packet stream
3201 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3202 proc = subprocess.Popen(
3203 prog + read_stdin_opts + args,
3204 stdin=pktlist, # type: ignore
3205 stdout=stdout,
3206 stderr=stderr,
3207 )
3208 except (AttributeError, ValueError):
3209 # write the packet stream to stdin
3210 with ContextManagerSubprocess(prog[0], suppress=_suppress):
3211 proc = subprocess.Popen(
3212 prog + read_stdin_opts + args,
3213 stdin=subprocess.PIPE,
3214 stdout=stdout,
3215 stderr=stderr,
3216 )
3217 if proc is None:
3218 # An error has occurred
3219 return
3220 try:
3221 proc.stdin.writelines( # type: ignore
3222 iter(lambda: pktlist.read(1048576), b"") # type: ignore
3223 )
3224 except AttributeError:
3225 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore
3226 except UnboundLocalError:
3227 # The error was handled by ContextManagerSubprocess
3228 pass
3229 else:
3230 proc.stdin.close() # type: ignore
3231 if proc is None:
3232 # An error has occurred
3233 return
3234 if dump:
3235 data = b"".join(
3236 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore
3237 )
3238 proc.terminate()
3239 return data
3240 if getproc:
3241 return proc
3242 if getfd:
3243 return proc.stdout
3244 if wait:
3245 proc.wait()
3248@conf.commands.register
3249def hexedit(pktlist):
3250 # type: (_PacketIterable) -> PacketList
3251 """Run hexedit on a list of packets, then return the edited packets."""
3252 f = get_temp_file()
3253 wrpcap(f, pktlist)
3254 with ContextManagerSubprocess(conf.prog.hexedit):
3255 subprocess.call([conf.prog.hexedit, f])
3256 rpktlist = rdpcap(f)
3257 os.unlink(f)
3258 return rpktlist
3261def get_terminal_width():
3262 # type: () -> Optional[int]
3263 """Get terminal width (number of characters) if in a window.
3265 Notice: this will try several methods in order to
3266 support as many terminals and OS as possible.
3267 """
3268 sizex = shutil.get_terminal_size(fallback=(0, 0))[0]
3269 if sizex != 0:
3270 return sizex
3271 # Backups
3272 if WINDOWS:
3273 from ctypes import windll, create_string_buffer
3274 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
3275 h = windll.kernel32.GetStdHandle(-12)
3276 csbi = create_string_buffer(22)
3277 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
3278 if res:
3279 (bufx, bufy, curx, cury, wattr,
3280 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501
3281 sizex = right - left + 1
3282 # sizey = bottom - top + 1
3283 return sizex
3284 return sizex
3285 # We have various methods
3286 # COLUMNS is set on some terminals
3287 try:
3288 sizex = int(os.environ['COLUMNS'])
3289 except Exception:
3290 pass
3291 if sizex:
3292 return sizex
3293 # We can query TIOCGWINSZ
3294 try:
3295 import fcntl
3296 import termios
3297 s = struct.pack('HHHH', 0, 0, 0, 0)
3298 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
3299 sizex = struct.unpack('HHHH', x)[1]
3300 except (IOError, ModuleNotFoundError):
3301 # If everything failed, return default terminal size
3302 sizex = 79
3303 return sizex
3306def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]]
3307 header, # type: List[Tuple[str, ...]]
3308 sortBy=0, # type: Optional[int]
3309 borders=False, # type: bool
3310 ):
3311 # type: (...) -> str
3312 """
3313 Pretty list to fit the terminal, and add header.
3315 :param rtlst: a list of tuples. each tuple contains a value which can
3316 be either a string or a list of string.
3317 :param sortBy: the column id (starting with 0) which will be used for
3318 ordering
3319 :param borders: whether to put borders on the table or not
3320 """
3321 if borders:
3322 _space = "|"
3323 else:
3324 _space = " "
3325 cols = len(header[0])
3326 # Windows has a fat terminal border
3327 _spacelen = len(_space) * (cols - 1) + int(WINDOWS)
3328 _croped = False
3329 if sortBy is not None:
3330 # Sort correctly
3331 rtlst.sort(key=lambda x: x[sortBy])
3332 # Resolve multi-values
3333 for i, line in enumerate(rtlst):
3334 ids = [] # type: List[int]
3335 values = [] # type: List[Union[str, List[str]]]
3336 for j, val in enumerate(line):
3337 if isinstance(val, list):
3338 ids.append(j)
3339 values.append(val or " ")
3340 if values:
3341 del rtlst[i]
3342 k = 0
3343 for ex_vals in zip_longest(*values, fillvalue=" "):
3344 if k:
3345 extra_line = [" "] * cols
3346 else:
3347 extra_line = list(line) # type: ignore
3348 for j, h in enumerate(ids):
3349 extra_line[h] = ex_vals[j]
3350 rtlst.insert(i + k, tuple(extra_line))
3351 k += 1
3352 rtslst = cast(List[Tuple[str, ...]], rtlst)
3353 # Append tag
3354 rtslst = header + rtslst
3355 # Detect column's width
3356 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)]
3357 # Make text fit in box (if required)
3358 width = get_terminal_width()
3359 if conf.auto_crop_tables and width:
3360 width = width - _spacelen
3361 while sum(colwidth) > width:
3362 _croped = True
3363 # Needs to be cropped
3364 # Get the longest row
3365 i = colwidth.index(max(colwidth))
3366 # Get all elements of this row
3367 row = [len(x[i]) for x in rtslst]
3368 # Get biggest element of this row: biggest of the array
3369 j = row.index(max(row))
3370 # Re-build column tuple with the edited element
3371 t = list(rtslst[j])
3372 t[i] = t[i][:-2] + "_"
3373 rtslst[j] = tuple(t)
3374 # Update max size
3375 row[j] = len(t[i])
3376 colwidth[i] = max(row)
3377 if _croped:
3378 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501
3379 # Generate padding scheme
3380 fmt = _space.join(["%%-%ds" % x for x in colwidth])
3381 # Append separation line if needed
3382 if borders:
3383 rtslst.insert(1, tuple("-" * x for x in colwidth))
3384 # Compile
3385 return "\n".join(fmt % x for x in rtslst)
3388def human_size(x, fmt=".1f"):
3389 # type: (int, str) -> str
3390 """
3391 Convert a size in octets to a human string representation
3392 """
3393 units = ['K', 'M', 'G', 'T', 'P', 'E']
3394 if not x:
3395 return "0B"
3396 i = int(math.log(x, 2**10))
3397 if i and i < len(units):
3398 return format(x / 2**(10 * i), fmt) + units[i - 1]
3399 return str(x) + "B"
3402def __make_table(
3403 yfmtfunc, # type: Callable[[int], str]
3404 fmtfunc, # type: Callable[[int], str]
3405 endline, # type: str
3406 data, # type: List[Tuple[Packet, Packet]]
3407 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]]
3408 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3409 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]]
3410 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]]
3411 dump=False # type: bool
3412):
3413 # type: (...) -> Optional[str]
3414 """Core function of the make_table suite, which generates the table"""
3415 vx = {} # type: Dict[str, int]
3416 vy = {} # type: Dict[str, Optional[int]]
3417 vz = {} # type: Dict[Tuple[str, str], str]
3418 vxf = {} # type: Dict[str, str]
3420 tmp_len = 0
3421 for e in data:
3422 xx, yy, zz = [str(s) for s in fxyz(*e)]
3423 tmp_len = max(len(yy), tmp_len)
3424 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz))
3425 vy[yy] = None
3426 vz[(xx, yy)] = zz
3428 vxk = list(vx)
3429 vyk = list(vy)
3430 if sortx:
3431 vxk.sort(key=sortx)
3432 else:
3433 try:
3434 vxk.sort(key=int)
3435 except Exception:
3436 try:
3437 vxk.sort(key=atol)
3438 except Exception:
3439 vxk.sort()
3440 if sorty:
3441 vyk.sort(key=sorty)
3442 else:
3443 try:
3444 vyk.sort(key=int)
3445 except Exception:
3446 try:
3447 vyk.sort(key=atol)
3448 except Exception:
3449 vyk.sort()
3451 s = ""
3452 if seplinefunc:
3453 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk])
3454 s += sepline + "\n"
3456 fmt = yfmtfunc(tmp_len)
3457 s += fmt % ""
3458 s += ' '
3459 for x in vxk:
3460 vxf[x] = fmtfunc(vx[x])
3461 s += vxf[x] % x
3462 s += ' '
3463 s += endline + "\n"
3464 if seplinefunc:
3465 s += sepline + "\n"
3466 for y in vyk:
3467 s += fmt % y
3468 s += ' '
3469 for x in vxk:
3470 s += vxf[x] % vz.get((x, y), "-")
3471 s += ' '
3472 s += endline + "\n"
3473 if seplinefunc:
3474 s += sepline + "\n"
3476 if dump:
3477 return s
3478 else:
3479 print(s, end="")
3480 return None
3483def make_table(*args, **kargs):
3484 # type: (*Any, **Any) -> Optional[Any]
3485 return __make_table(
3486 lambda l: "%%-%is" % l,
3487 lambda l: "%%-%is" % l,
3488 "",
3489 *args,
3490 **kargs
3491 )
3494def make_lined_table(*args, **kargs):
3495 # type: (*Any, **Any) -> Optional[str]
3496 return __make_table( # type: ignore
3497 lambda l: "%%-%is |" % l,
3498 lambda l: "%%-%is |" % l,
3499 "",
3500 *args,
3501 seplinefunc=lambda a, x: "+".join(
3502 '-' * (y + 2) for y in [a - 1] + x + [-2]
3503 ),
3504 **kargs
3505 )
3508def make_tex_table(*args, **kargs):
3509 # type: (*Any, **Any) -> Optional[str]
3510 return __make_table( # type: ignore
3511 lambda l: "%s",
3512 lambda l: "& %s",
3513 "\\\\",
3514 *args,
3515 seplinefunc=lambda a, x: "\\hline",
3516 **kargs
3517 )
3519####################
3520# WHOIS CLIENT #
3521####################
3524def whois(ip_address):
3525 # type: (str) -> bytes
3526 """Whois client for Python"""
3527 whois_ip = str(ip_address)
3528 try:
3529 query = socket.gethostbyname(whois_ip)
3530 except Exception:
3531 query = whois_ip
3532 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3533 s.connect(("whois.ripe.net", 43))
3534 s.send(query.encode("utf8") + b"\r\n")
3535 answer = b""
3536 while True:
3537 d = s.recv(4096)
3538 answer += d
3539 if not d:
3540 break
3541 s.close()
3542 ignore_tag = b"remarks:"
3543 # ignore all lines starting with the ignore_tag
3544 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501
3545 # remove empty lines at the bottom
3546 for i in range(1, len(lines)):
3547 if not lines[-i].strip():
3548 del lines[-i]
3549 else:
3550 break
3551 return b"\n".join(lines[3:])
3553####################
3554# CLI utils #
3555####################
3558class _CLIUtilMetaclass(type):
3559 class TYPE(enum.Enum):
3560 COMMAND = 0
3561 OUTPUT = 1
3562 COMPLETE = 2
3564 def __new__(cls, # type: Type[_CLIUtilMetaclass]
3565 name, # type: str
3566 bases, # type: Tuple[type, ...]
3567 dct # type: Dict[str, Any]
3568 ):
3569 # type: (...) -> Type[CLIUtil]
3570 dct["commands"] = {
3571 x.__name__: x
3572 for x in dct.values()
3573 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMMAND
3574 }
3575 dct["commands_output"] = {
3576 x.cliutil_ref.__name__: x
3577 for x in dct.values()
3578 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.OUTPUT
3579 }
3580 dct["commands_complete"] = {
3581 x.cliutil_ref.__name__: x
3582 for x in dct.values()
3583 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMPLETE
3584 }
3585 newcls = cast(Type['CLIUtil'], type.__new__(cls, name, bases, dct))
3586 return newcls
3589class CLIUtil(metaclass=_CLIUtilMetaclass):
3590 """
3591 Provides a Util class to easily create simple CLI tools in Scapy,
3592 that can still be used as an API.
3594 Doc:
3595 - override the ps1() function
3596 - register commands with the @CLIUtil.addcomment decorator
3597 - call the loop() function when ready
3598 """
3600 def _depcheck(self) -> None:
3601 """
3602 Check that all dependencies are installed
3603 """
3604 try:
3605 import prompt_toolkit # noqa: F401
3606 except ImportError:
3607 # okay we lie but prompt_toolkit is a dependency...
3608 raise ImportError("You need to have IPython installed to use the CLI")
3610 # Okay let's do nice code
3611 commands: Dict[str, Callable[..., Any]] = {}
3612 # print output of command
3613 commands_output: Dict[str, Callable[..., str]] = {}
3614 # provides completion to command
3615 commands_complete: Dict[str, Callable[..., List[str]]] = {}
3617 def __init__(self, cli: bool = True, debug: bool = False) -> None:
3618 """
3619 DEV: overwrite
3620 """
3621 if cli:
3622 self._depcheck()
3623 self.loop(debug=debug)
3625 @staticmethod
3626 def _inspectkwargs(func: DecoratorCallable) -> None:
3627 """
3628 Internal function to parse arguments from the kwargs of the functions
3629 """
3630 func._flagnames = [ # type: ignore
3631 x.name for x in
3632 inspect.signature(func).parameters.values()
3633 if x.kind == inspect.Parameter.KEYWORD_ONLY
3634 ]
3635 func._flags = [ # type: ignore
3636 ("-%s" % x) if len(x) == 1 else ("--%s" % x)
3637 for x in func._flagnames # type: ignore
3638 ]
3640 @staticmethod
3641 def _parsekwargs(
3642 func: DecoratorCallable,
3643 args: List[str]
3644 ) -> Tuple[List[str], Dict[str, Literal[True]]]:
3645 """
3646 Internal function to parse CLI arguments of a function.
3647 """
3648 kwargs: Dict[str, Literal[True]] = {}
3649 if func._flags: # type: ignore
3650 i = 0
3651 for arg in args:
3652 if arg in func._flags: # type: ignore
3653 i += 1
3654 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501
3655 continue
3656 break
3657 args = args[i:]
3658 return args, kwargs
3660 @classmethod
3661 def _parseallargs(
3662 cls,
3663 func: DecoratorCallable,
3664 cmd: str, args: List[str]
3665 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]:
3666 """
3667 Internal function to parse CLI arguments of both the function
3668 and its output function.
3669 """
3670 args, kwargs = cls._parsekwargs(func, args)
3671 outkwargs: Dict[str, Literal[True]] = {}
3672 if cmd in cls.commands_output:
3673 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args)
3674 return args, kwargs, outkwargs
3676 @classmethod
3677 def addcommand(
3678 cls,
3679 spaces: bool = False,
3680 globsupport: bool = False,
3681 ) -> Callable[[DecoratorCallable], DecoratorCallable]:
3682 """
3683 Decorator to register a command
3684 """
3685 def func(cmd: DecoratorCallable) -> DecoratorCallable:
3686 cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore
3687 cmd._spaces = spaces # type: ignore
3688 cmd._globsupport = globsupport # type: ignore
3689 cls._inspectkwargs(cmd)
3690 if cmd._globsupport and not cmd._spaces: # type: ignore
3691 raise ValueError("Cannot use globsupport without spaces.")
3692 return cmd
3693 return func
3695 @classmethod
3696 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3697 """
3698 Decorator to register a command output processor
3699 """
3700 def func(processor: DecoratorCallable) -> DecoratorCallable:
3701 processor.cliutil_type = _CLIUtilMetaclass.TYPE.OUTPUT # type: ignore
3702 processor.cliutil_ref = cmd # type: ignore
3703 cls._inspectkwargs(processor)
3704 return processor
3705 return func
3707 @classmethod
3708 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501
3709 """
3710 Decorator to register a command completor
3711 """
3712 def func(processor: DecoratorCallable) -> DecoratorCallable:
3713 processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore
3714 processor.cliutil_ref = cmd # type: ignore
3715 return processor
3716 return func
3718 def ps1(self) -> str:
3719 """
3720 Return the PS1 of the shell
3721 """
3722 return "> "
3724 def close(self) -> None:
3725 """
3726 Function called on exiting
3727 """
3728 print("Exited")
3730 def help(self, cmd: Optional[str] = None) -> None:
3731 """
3732 Return the help related to this CLI util
3733 """
3734 def _args(func: Any) -> str:
3735 flags = func._flags.copy()
3736 if func.__name__ in self.commands_output:
3737 flags += self.commands_output[func.__name__]._flags # type: ignore
3738 return " %s%s" % (
3739 (
3740 "%s " % " ".join("[%s]" % x for x in flags)
3741 if flags else ""
3742 ),
3743 " ".join(
3744 "<%s%s>" % (
3745 x.name,
3746 "?" if
3747 (x.default is None or x.default != inspect.Parameter.empty)
3748 else ""
3749 )
3750 for x in list(inspect.signature(func).parameters.values())[1:]
3751 if x.name not in func._flagnames and x.name[0] != "_"
3752 )
3753 )
3755 if cmd:
3756 if cmd not in self.commands:
3757 print("Unknown command '%s'" % cmd)
3758 return
3759 # help for one command
3760 func = self.commands[cmd]
3761 print("%s%s: %s" % (
3762 cmd,
3763 _args(func),
3764 func.__doc__ and func.__doc__.strip()
3765 ))
3766 else:
3767 header = "│ %s - Help │" % self.__class__.__name__
3768 print("┌" + "─" * (len(header) - 2) + "┐")
3769 print(header)
3770 print("└" + "─" * (len(header) - 2) + "┘")
3771 print(
3772 pretty_list(
3773 [
3774 (
3775 cmd,
3776 _args(func),
3777 func.__doc__ and func.__doc__.strip().split("\n")[0] or ""
3778 )
3779 for cmd, func in self.commands.items()
3780 ],
3781 [("Command", "Arguments", "Description")]
3782 )
3783 )
3785 def _completer(self) -> 'prompt_toolkit.completion.Completer':
3786 """
3787 Returns a prompt_toolkit custom completer
3788 """
3789 from prompt_toolkit.completion import Completer, Completion
3791 class CLICompleter(Completer):
3792 def get_completions(cmpl, document, complete_event): # type: ignore
3793 if not complete_event.completion_requested:
3794 # Only activate when the user does <TAB>
3795 return
3796 parts = document.text.split(" ")
3797 cmd = parts[0].lower()
3798 if cmd not in self.commands:
3799 # We are trying to complete the command
3800 for possible_cmd in (x for x in self.commands if x.startswith(cmd)):
3801 yield Completion(possible_cmd, start_position=-len(cmd))
3802 else:
3803 # We are trying to complete the command content
3804 if len(parts) == 1:
3805 return
3806 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:])
3807 arg = " ".join(args)
3808 if cmd in self.commands_complete:
3809 for possible_arg in self.commands_complete[cmd](self, arg):
3810 yield Completion(possible_arg, start_position=-len(arg))
3811 return
3812 return CLICompleter()
3814 def loop(self, debug: int = 0) -> None:
3815 """
3816 Main command handling loop
3817 """
3818 from prompt_toolkit import PromptSession
3819 session = PromptSession(completer=self._completer())
3821 while True:
3822 try:
3823 cmd = session.prompt(self.ps1()).strip()
3824 except KeyboardInterrupt:
3825 continue
3826 except EOFError:
3827 self.close()
3828 break
3829 args = cmd.split(" ")[1:]
3830 cmd = cmd.split(" ")[0].strip().lower()
3831 if not cmd:
3832 continue
3833 if cmd in ["help", "h", "?"]:
3834 self.help(" ".join(args))
3835 continue
3836 if cmd in "exit":
3837 break
3838 if cmd not in self.commands:
3839 print("Unknown command. Type help or ?")
3840 else:
3841 # check the number of arguments
3842 func = self.commands[cmd]
3843 args, kwargs, outkwargs = self._parseallargs(func, cmd, args)
3844 if func._spaces: # type: ignore
3845 args = [" ".join(args)]
3846 # if globsupport is set, we might need to do several calls
3847 if func._globsupport and "*" in args[0]: # type: ignore
3848 if args[0].count("*") > 1:
3849 print("More than 1 glob star (*) is currently unsupported.")
3850 continue
3851 before, after = args[0].split("*", 1)
3852 reg = re.compile(re.escape(before) + r".*" + after)
3853 calls = [
3854 [x] for x in
3855 self.commands_complete[cmd](self, before)
3856 if reg.match(x)
3857 ]
3858 else:
3859 calls = [args]
3860 else:
3861 calls = [args]
3862 # now iterate if required, call the function and print its output
3863 res = None
3864 for args in calls:
3865 try:
3866 res = func(self, *args, **kwargs)
3867 except TypeError:
3868 print("Bad number of arguments !")
3869 self.help(cmd=cmd)
3870 continue
3871 except Exception as ex:
3872 print("Command failed with error: %s" % ex)
3873 if debug:
3874 traceback.print_exception(ex)
3875 try:
3876 if res and cmd in self.commands_output:
3877 self.commands_output[cmd](self, res, **outkwargs)
3878 except Exception as ex:
3879 print("Output processor failed with error: %s" % ex)
3882def AutoArgparse(
3883 func: DecoratorCallable,
3884 _parseonly: bool = False,
3885) -> Optional[Tuple[List[str], List[str]]]:
3886 """
3887 Generate an Argparse call from a function, then call this function.
3889 Notes:
3891 - for the arguments to have a description, the sphinx docstring format
3892 must be used. See
3893 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html
3894 - the arguments must be typed in Python (we ignore Sphinx-specific types)
3895 untyped arguments are ignored.
3896 - only types that would be supported by argparse are supported. The others
3897 are omitted.
3898 """
3899 argsdoc = {}
3900 if func.__doc__:
3901 # Sphinx doc format parser
3902 m = re.match(
3903 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)",
3904 func.__doc__.strip(),
3905 )
3906 if not m:
3907 desc = func.__doc__.strip()
3908 else:
3909 desc = m.group(1)
3910 sphinxargs = re.findall(
3911 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)",
3912 m.group(2),
3913 )
3914 for argtype, argparam, argdesc in sphinxargs:
3915 argparam = argparam.strip()
3916 argdesc = argdesc.strip()
3917 if argtype == "param":
3918 if not argparam:
3919 raise ValueError(":param: without a name !")
3920 argsdoc[argparam] = argdesc
3921 else:
3922 desc = ""
3924 # Process the parameters
3925 positional = []
3926 noargument = []
3927 hexarguments = []
3928 parameters = {}
3929 for param in inspect.signature(func).parameters.values():
3930 if not param.annotation:
3931 continue
3932 noarg = False
3933 parname = param.name.replace("_", "-")
3934 paramkwargs: Dict[str, Any] = {}
3935 if param.annotation is bool:
3936 if param.default is True:
3937 parname = "no-" + parname
3938 paramkwargs["action"] = "store_false"
3939 else:
3940 paramkwargs["action"] = "store_true"
3941 noarg = True
3942 elif param.annotation is bytes:
3943 paramkwargs["type"] = str
3944 hexarguments.append(parname)
3945 elif param.annotation in [str, int, float]:
3946 paramkwargs["type"] = param.annotation
3947 else:
3948 continue
3949 if param.default != inspect.Parameter.empty:
3950 if param.kind == inspect.Parameter.POSITIONAL_ONLY:
3951 positional.append(param.name)
3952 paramkwargs["nargs"] = '?'
3953 else:
3954 parname = "--" + parname
3955 paramkwargs["default"] = param.default
3956 else:
3957 positional.append(param.name)
3958 if param.kind == inspect.Parameter.VAR_POSITIONAL:
3959 paramkwargs["action"] = "append"
3960 if param.name in argsdoc:
3961 paramkwargs["help"] = argsdoc[param.name]
3962 if param.annotation is bytes:
3963 paramkwargs["help"] = "(hex) " + paramkwargs["help"]
3964 elif param.annotation is bool:
3965 paramkwargs["help"] = "(flag) " + paramkwargs["help"]
3966 else:
3967 paramkwargs["help"] = (
3968 "(%s) " % param.annotation.__name__ + paramkwargs["help"]
3969 )
3970 # Add to the parameter list
3971 parameters[parname] = paramkwargs
3972 if noarg:
3973 noargument.append(parname)
3975 if _parseonly:
3976 # An internal mode used to generate bash autocompletion, do it then exit.
3977 return (
3978 [x for x in parameters if x not in positional] + ["--help"],
3979 [x for x in noargument if x not in positional] + ["--help"],
3980 )
3982 # Now build the argparse.ArgumentParser
3983 parser = argparse.ArgumentParser(
3984 prog=func.__name__,
3985 description=desc,
3986 formatter_class=argparse.ArgumentDefaultsHelpFormatter,
3987 )
3989 # Add parameters to parser
3990 for parname, paramkwargs in parameters.items():
3991 parser.add_argument(parname, **paramkwargs)
3993 # Now parse the sys.argv parameters
3994 params = vars(parser.parse_args())
3996 # Convert hex parameters if provided
3997 for p in hexarguments:
3998 if params[p] is not None:
3999 try:
4000 params[p] = bytes.fromhex(params[p])
4001 except ValueError:
4002 print(
4003 conf.color_theme.fail(
4004 "ERROR: the value of parameter %s "
4005 "'%s' is not valid hexadecimal !" % (p, params[p])
4006 )
4007 )
4008 return None
4010 # Act as in interactive mode
4011 conf.logLevel = 20
4012 from scapy.themes import DefaultTheme
4013 conf.color_theme = DefaultTheme()
4014 # And call the function
4015 try:
4016 func(
4017 *[params.pop(x) for x in positional],
4018 **{
4019 (k[3:] if k.startswith("no_") else k): v
4020 for k, v in params.items()
4021 }
4022 )
4023 except AssertionError as ex:
4024 print(conf.color_theme.fail("ERROR: " + str(ex)))
4025 parser.print_help()
4026 return None
4029#######################
4030# PERIODIC SENDER #
4031#######################
4034class PeriodicSenderThread(threading.Thread):
4035 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True):
4036 # type: (Any, _PacketIterable, float, bool) -> None
4037 """ Thread to send packets periodically
4039 Args:
4040 sock: socket where packet is sent periodically
4041 pkt: packet or list of packets to send
4042 interval: interval between two packets
4043 """
4044 if not isinstance(pkt, list):
4045 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable
4046 else:
4047 self._pkts = pkt
4048 self._socket = sock
4049 self._stopped = threading.Event()
4050 self._enabled = threading.Event()
4051 self._enabled.set()
4052 self._interval = interval
4053 self._ignore_exceptions = ignore_exceptions
4054 threading.Thread.__init__(self)
4056 def enable(self):
4057 # type: () -> None
4058 self._enabled.set()
4060 def disable(self):
4061 # type: () -> None
4062 self._enabled.clear()
4064 def run(self):
4065 # type: () -> None
4066 while not self._stopped.is_set() and not self._socket.closed:
4067 for p in self._pkts:
4068 try:
4069 if self._enabled.is_set():
4070 self._socket.send(p)
4071 except (OSError, TimeoutError) as e:
4072 if self._ignore_exceptions:
4073 return
4074 else:
4075 raise e
4076 self._stopped.wait(timeout=self._interval)
4077 if self._stopped.is_set() or self._socket.closed:
4078 break
4080 def stop(self):
4081 # type: () -> None
4082 self._stopped.set()
4083 self.join(self._interval * 2)
4086class SingleConversationSocket(object):
4087 def __init__(self, o):
4088 # type: (Any) -> None
4089 self._inner = o
4090 self._tx_mutex = threading.RLock()
4092 @property
4093 def __dict__(self): # type: ignore
4094 return self._inner.__dict__
4096 def __getattr__(self, name):
4097 # type: (str) -> Any
4098 return getattr(self._inner, name)
4100 def sr1(self, *args, **kargs):
4101 # type: (*Any, **Any) -> Any
4102 with self._tx_mutex:
4103 return self._inner.sr1(*args, **kargs)
4105 def sr(self, *args, **kargs):
4106 # type: (*Any, **Any) -> Any
4107 with self._tx_mutex:
4108 return self._inner.sr(*args, **kargs)
4110 def send(self, x):
4111 # type: (Packet) -> Any
4112 with self._tx_mutex:
4113 try:
4114 return self._inner.send(x)
4115 except (ConnectionError, OSError) as e:
4116 self._inner.close()
4117 raise e