Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/packet.py: 47%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Packet class
9Provides:
10 - the default Packet classes
11 - binding mechanisms
12 - fuzz() method
13 - exploration methods: explore() / ls()
14"""
16from collections import defaultdict
18import json
19import re
20import time
21import itertools
22import copy
23import types
24import warnings
26from scapy.fields import (
27 AnyField,
28 BitField,
29 ConditionalField,
30 Emph,
31 EnumField,
32 Field,
33 FlagsField,
34 FlagValue,
35 MayEnd,
36 MultiEnumField,
37 MultipleTypeField,
38 PadField,
39 PacketListField,
40 RawVal,
41 StrField,
42)
43from scapy.config import conf, _version_checker
44from scapy.compat import raw, orb, bytes_encode
45from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \
46 _CanvasDumpExtended
47from scapy.interfaces import _GlobInterfaceType
48from scapy.volatile import RandField, VolatileValue
49from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \
50 pretty_list, EDecimal
51from scapy.error import Scapy_Exception, log_runtime, warning
52from scapy.libs.test_pyx import PYX
54# Typing imports
55from typing import (
56 Any,
57 Callable,
58 Dict,
59 Iterator,
60 List,
61 NoReturn,
62 Optional,
63 Set,
64 Tuple,
65 Type,
66 TypeVar,
67 Union,
68 Sequence,
69 cast,
70)
71from scapy.compat import Self
73try:
74 import pyx
75except ImportError:
76 pass
79_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]])
82class Packet(
83 BasePacket,
84 _CanvasDumpExtended,
85 metaclass=Packet_metaclass
86):
87 __slots__ = [
88 "time", "sent_time", "name",
89 "default_fields", "fields", "fieldtype",
90 "overload_fields", "overloaded_fields",
91 "packetfields",
92 "original", "explicit", "raw_packet_cache",
93 "raw_packet_cache_fields", "_pkt", "post_transforms",
94 "stop_dissection_after",
95 # then payload, underlayer and parent
96 "payload", "underlayer", "parent",
97 "name",
98 # used for sr()
99 "_answered",
100 # used when sniffing
101 "direction", "sniffed_on",
102 # handle snaplen Vs real length
103 "wirelen",
104 "comment",
105 "process_information"
106 ]
107 name = None
108 fields_desc = [] # type: List[AnyField]
109 deprecated_fields = {} # type: Dict[str, Tuple[str, str]]
110 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]]
111 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]]
112 show_indent = 1
113 show_summary = True
114 match_subclass = False
115 class_dont_cache = {} # type: Dict[Type[Packet], bool]
116 class_packetfields = {} # type: Dict[Type[Packet], Any]
117 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]]
118 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]]
119 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501
121 @classmethod
122 def from_hexcap(cls):
123 # type: (Type[Packet]) -> Packet
124 return cls(import_hexcap())
126 @classmethod
127 def upper_bonds(self):
128 # type: () -> None
129 for fval, upper in self.payload_guess:
130 print(
131 "%-20s %s" % (
132 upper.__name__,
133 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
134 )
135 )
137 @classmethod
138 def lower_bonds(self):
139 # type: () -> None
140 for lower, fval in self._overload_fields.items():
141 print(
142 "%-20s %s" % (
143 lower.__name__,
144 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
145 )
146 )
148 def __init__(self,
149 _pkt=b"", # type: Union[bytes, bytearray]
150 post_transform=None, # type: Any
151 _internal=0, # type: int
152 _underlayer=None, # type: Optional[Packet]
153 _parent=None, # type: Optional[Packet]
154 stop_dissection_after=None, # type: Optional[Type[Packet]]
155 **fields # type: Any
156 ):
157 # type: (...) -> None
158 self.time = time.time() # type: Union[EDecimal, float]
159 self.sent_time = None # type: Union[EDecimal, float, None]
160 self.name = (self.__class__.__name__
161 if self._name is None else
162 self._name)
163 self.default_fields = {} # type: Dict[str, Any]
164 self.overload_fields = self._overload_fields
165 self.overloaded_fields = {} # type: Dict[str, Any]
166 self.fields = {} # type: Dict[str, Any]
167 self.fieldtype = {} # type: Dict[str, AnyField]
168 self.packetfields = [] # type: List[AnyField]
169 self.payload = NoPayload() # type: Packet
170 self.init_fields(bool(_pkt))
171 self.underlayer = _underlayer
172 self.parent = _parent
173 if isinstance(_pkt, bytearray):
174 _pkt = bytes(_pkt)
175 self.original = _pkt
176 self.explicit = 0
177 self.raw_packet_cache = None # type: Optional[bytes]
178 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501
179 self.wirelen = None # type: Optional[int]
180 self.direction = None # type: Optional[int]
181 self.sniffed_on = None # type: Optional[_GlobInterfaceType]
182 self.comment = None # type: Optional[bytes]
183 self.process_information = None # type: Optional[Dict[str, Any]]
184 self.stop_dissection_after = stop_dissection_after
185 if _pkt:
186 self.dissect(_pkt)
187 if not _internal:
188 self.dissection_done(self)
189 # We use this strange initialization so that the fields
190 # are initialized in their declaration order.
191 # It is required to always support MultipleTypeField
192 for field in self.fields_desc:
193 fname = field.name
194 try:
195 value = fields.pop(fname)
196 except KeyError:
197 continue
198 self.fields[fname] = value if isinstance(value, RawVal) else \
199 self.get_field(fname).any2i(self, value)
200 # The remaining fields are unknown
201 for fname in fields:
202 if fname in self.deprecated_fields:
203 # Resolve deprecated fields
204 value = fields[fname]
205 fname = self._resolve_alias(fname)
206 self.fields[fname] = value if isinstance(value, RawVal) else \
207 self.get_field(fname).any2i(self, value)
208 continue
209 raise AttributeError(fname)
210 if isinstance(post_transform, list):
211 self.post_transforms = post_transform
212 elif post_transform is None:
213 self.post_transforms = []
214 else:
215 self.post_transforms = [post_transform]
217 _PickleType = Tuple[
218 Union[EDecimal, float],
219 Optional[Union[EDecimal, float, None]],
220 Optional[int],
221 Optional[_GlobInterfaceType],
222 Optional[int],
223 Optional[bytes],
224 ]
226 def __reduce__(self):
227 # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType]
228 """Used by pickling methods"""
229 return (self.__class__, (self.build(),), (
230 self.time,
231 self.sent_time,
232 self.direction,
233 self.sniffed_on,
234 self.wirelen,
235 self.comment
236 ))
238 def __setstate__(self, state):
239 # type: (Packet._PickleType) -> Packet
240 """Rebuild state using pickable methods"""
241 self.time = state[0]
242 self.sent_time = state[1]
243 self.direction = state[2]
244 self.sniffed_on = state[3]
245 self.wirelen = state[4]
246 self.comment = state[5]
247 return self
249 def __deepcopy__(self,
250 memo, # type: Any
251 ):
252 # type: (...) -> Packet
253 """Used by copy.deepcopy"""
254 return self.copy()
256 def init_fields(self, for_dissect_only=False):
257 # type: (bool) -> None
258 """
259 Initialize each fields of the fields_desc dict
260 """
262 if self.class_dont_cache.get(self.__class__, False):
263 self.do_init_fields(self.fields_desc)
264 else:
265 self.do_init_cached_fields(for_dissect_only=for_dissect_only)
267 def do_init_fields(self,
268 flist, # type: Sequence[AnyField]
269 ):
270 # type: (...) -> None
271 """
272 Initialize each fields of the fields_desc dict
273 """
274 default_fields = {}
275 for f in flist:
276 default_fields[f.name] = copy.deepcopy(f.default)
277 self.fieldtype[f.name] = f
278 if f.holds_packets:
279 self.packetfields.append(f)
280 # We set default_fields last to avoid race issues
281 self.default_fields = default_fields
283 def do_init_cached_fields(self, for_dissect_only=False):
284 # type: (bool) -> None
285 """
286 Initialize each fields of the fields_desc dict, or use the cached
287 fields information
288 """
290 cls_name = self.__class__
292 # Build the fields information
293 if Packet.class_default_fields.get(cls_name, None) is None:
294 self.prepare_cached_fields(self.fields_desc)
296 # Use fields information from cache
297 default_fields = Packet.class_default_fields.get(cls_name, None)
298 if default_fields:
299 self.default_fields = default_fields
300 self.fieldtype = Packet.class_fieldtype[cls_name]
301 self.packetfields = Packet.class_packetfields[cls_name]
303 # Optimization: no need for references when only dissecting.
304 if for_dissect_only:
305 return
307 # Deepcopy default references
308 for fname in Packet.class_default_fields_ref[cls_name]:
309 value = self.default_fields[fname]
310 try:
311 self.fields[fname] = value.copy()
312 except AttributeError:
313 # Python 2.7 - list only
314 self.fields[fname] = value[:]
316 def prepare_cached_fields(self, flist):
317 # type: (Sequence[AnyField]) -> None
318 """
319 Prepare the cached fields of the fields_desc dict
320 """
322 cls_name = self.__class__
324 # Fields cache initialization
325 if not flist:
326 return
328 class_default_fields = dict()
329 class_default_fields_ref = list()
330 class_fieldtype = dict()
331 class_packetfields = list()
333 # Fields initialization
334 for f in flist:
335 if isinstance(f, MultipleTypeField):
336 # Abort
337 self.class_dont_cache[cls_name] = True
338 self.do_init_fields(self.fields_desc)
339 return
341 class_default_fields[f.name] = copy.deepcopy(f.default)
342 class_fieldtype[f.name] = f
343 if f.holds_packets:
344 class_packetfields.append(f)
346 # Remember references
347 if isinstance(f.default, (list, dict, set, RandField, Packet)):
348 class_default_fields_ref.append(f.name)
350 # Apply
351 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref
352 Packet.class_fieldtype[cls_name] = class_fieldtype
353 Packet.class_packetfields[cls_name] = class_packetfields
354 # Last to avoid racing issues
355 Packet.class_default_fields[cls_name] = class_default_fields
357 def dissection_done(self, pkt):
358 # type: (Packet) -> None
359 """DEV: will be called after a dissection is completed"""
360 self.post_dissection(pkt)
361 self.payload.dissection_done(pkt)
363 def post_dissection(self, pkt):
364 # type: (Packet) -> None
365 """DEV: is called after the dissection of the whole packet"""
366 pass
368 def get_field(self, fld):
369 # type: (str) -> AnyField
370 """DEV: returns the field instance from the name of the field"""
371 return self.fieldtype[fld]
373 def add_payload(self, payload):
374 # type: (Union[Packet, bytes]) -> None
375 if payload is None:
376 return
377 elif not isinstance(self.payload, NoPayload):
378 self.payload.add_payload(payload)
379 else:
380 if isinstance(payload, Packet):
381 self.payload = payload
382 payload.add_underlayer(self)
383 for t in self.aliastypes:
384 if t in payload.overload_fields:
385 self.overloaded_fields = payload.overload_fields[t]
386 break
387 elif isinstance(payload, (bytes, str, bytearray, memoryview)):
388 self.payload = conf.raw_layer(load=bytes_encode(payload))
389 else:
390 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501
392 def remove_payload(self):
393 # type: () -> None
394 self.payload.remove_underlayer(self)
395 self.payload = NoPayload()
396 self.overloaded_fields = {}
398 def add_underlayer(self, underlayer):
399 # type: (Packet) -> None
400 self.underlayer = underlayer
402 def remove_underlayer(self, other):
403 # type: (Packet) -> None
404 self.underlayer = None
406 def add_parent(self, parent):
407 # type: (Packet) -> None
408 """Set packet parent.
409 When packet is an element in PacketListField, parent field would
410 point to the list owner packet."""
411 self.parent = parent
413 def remove_parent(self, other):
414 # type: (Packet) -> None
415 """Remove packet parent.
416 When packet is an element in PacketListField, parent field would
417 point to the list owner packet."""
418 self.parent = None
420 def copy(self) -> Self:
421 """Returns a deep copy of the instance."""
422 clone = self.__class__()
423 clone.fields = self.copy_fields_dict(self.fields)
424 clone.default_fields = self.copy_fields_dict(self.default_fields)
425 clone.overloaded_fields = self.overloaded_fields.copy()
426 clone.underlayer = self.underlayer
427 clone.parent = self.parent
428 clone.explicit = self.explicit
429 clone.raw_packet_cache = self.raw_packet_cache
430 clone.raw_packet_cache_fields = self.copy_fields_dict(
431 self.raw_packet_cache_fields
432 )
433 clone.wirelen = self.wirelen
434 clone.post_transforms = self.post_transforms[:]
435 clone.payload = self.payload.copy()
436 clone.payload.add_underlayer(clone)
437 clone.time = self.time
438 clone.comment = self.comment
439 clone.direction = self.direction
440 clone.sniffed_on = self.sniffed_on
441 return clone
443 def _resolve_alias(self, attr):
444 # type: (str) -> str
445 new_attr, version = self.deprecated_fields[attr]
446 warnings.warn(
447 "%s has been deprecated in favor of %s since %s !" % (
448 attr, new_attr, version
449 ), DeprecationWarning
450 )
451 return new_attr
453 def getfieldval(self, attr):
454 # type: (str) -> Any
455 if self.deprecated_fields and attr in self.deprecated_fields:
456 attr = self._resolve_alias(attr)
457 if attr in self.fields:
458 return self.fields[attr]
459 if attr in self.overloaded_fields:
460 return self.overloaded_fields[attr]
461 if attr in self.default_fields:
462 return self.default_fields[attr]
463 return self.payload.getfieldval(attr)
465 def getfield_and_val(self, attr):
466 # type: (str) -> Tuple[AnyField, Any]
467 if self.deprecated_fields and attr in self.deprecated_fields:
468 attr = self._resolve_alias(attr)
469 if attr in self.fields:
470 return self.get_field(attr), self.fields[attr]
471 if attr in self.overloaded_fields:
472 return self.get_field(attr), self.overloaded_fields[attr]
473 if attr in self.default_fields:
474 return self.get_field(attr), self.default_fields[attr]
475 raise ValueError
477 def __getattr__(self, attr):
478 # type: (str) -> Any
479 try:
480 fld, v = self.getfield_and_val(attr)
481 except ValueError:
482 return self.payload.__getattr__(attr)
483 if fld is not None:
484 return v if isinstance(v, RawVal) else fld.i2h(self, v)
485 return v
487 def setfieldval(self, attr, val):
488 # type: (str, Any) -> None
489 if self.deprecated_fields and attr in self.deprecated_fields:
490 attr = self._resolve_alias(attr)
491 if attr in self.default_fields:
492 fld = self.get_field(attr)
493 if fld is None:
494 any2i = lambda x, y: y # type: Callable[..., Any]
495 else:
496 any2i = fld.any2i
497 self.fields[attr] = val if isinstance(val, RawVal) else \
498 any2i(self, val)
499 self.explicit = 0
500 self.raw_packet_cache = None
501 self.raw_packet_cache_fields = None
502 self.wirelen = None
503 elif attr == "payload":
504 self.remove_payload()
505 self.add_payload(val)
506 else:
507 self.payload.setfieldval(attr, val)
509 def __setattr__(self, attr, val):
510 # type: (str, Any) -> None
511 if attr in self.__all_slots__:
512 return object.__setattr__(self, attr, val)
513 try:
514 return self.setfieldval(attr, val)
515 except AttributeError:
516 pass
517 return object.__setattr__(self, attr, val)
519 def delfieldval(self, attr):
520 # type: (str) -> None
521 if attr in self.fields:
522 del self.fields[attr]
523 self.explicit = 0 # in case a default value must be explicit
524 self.raw_packet_cache = None
525 self.raw_packet_cache_fields = None
526 self.wirelen = None
527 elif attr in self.default_fields:
528 pass
529 elif attr == "payload":
530 self.remove_payload()
531 else:
532 self.payload.delfieldval(attr)
534 def __delattr__(self, attr):
535 # type: (str) -> None
536 if attr == "payload":
537 return self.remove_payload()
538 if attr in self.__all_slots__:
539 return object.__delattr__(self, attr)
540 try:
541 return self.delfieldval(attr)
542 except AttributeError:
543 pass
544 return object.__delattr__(self, attr)
546 def _superdir(self):
547 # type: () -> Set[str]
548 """
549 Return a list of slots and methods, including those from subclasses.
550 """
551 attrs = set() # type: Set[str]
552 cls = self.__class__
553 if hasattr(cls, '__all_slots__'):
554 attrs.update(cls.__all_slots__)
555 for bcls in cls.__mro__:
556 if hasattr(bcls, '__dict__'):
557 attrs.update(bcls.__dict__)
558 return attrs
560 def __dir__(self):
561 # type: () -> List[str]
562 """
563 Add fields to tab completion list.
564 """
565 return sorted(itertools.chain(self._superdir(), self.default_fields))
567 def __repr__(self):
568 # type: () -> str
569 s = ""
570 ct = conf.color_theme
571 for f in self.fields_desc:
572 if isinstance(f, ConditionalField) and not f._evalcond(self):
573 continue
574 if f.name in self.fields:
575 fval = self.fields[f.name]
576 if isinstance(fval, (list, dict, set)) and len(fval) == 0:
577 continue
578 val = f.i2repr(self, fval)
579 elif f.name in self.overloaded_fields:
580 fover = self.overloaded_fields[f.name]
581 if isinstance(fover, (list, dict, set)) and len(fover) == 0:
582 continue
583 val = f.i2repr(self, fover)
584 else:
585 continue
586 if isinstance(f, Emph) or f in conf.emph:
587 ncol = ct.emph_field_name
588 vcol = ct.emph_field_value
589 else:
590 ncol = ct.field_name
591 vcol = ct.field_value
593 s += " %s%s%s" % (ncol(f.name),
594 ct.punct("="),
595 vcol(val))
596 return "%s%s %s %s%s%s" % (ct.punct("<"),
597 ct.layer_name(self.__class__.__name__),
598 s,
599 ct.punct("|"),
600 repr(self.payload),
601 ct.punct(">"))
603 def __str__(self):
604 # type: () -> str
605 return self.summary()
607 def __bytes__(self):
608 # type: () -> bytes
609 return self.build()
611 def __div__(self, other):
612 # type: (Any) -> Self
613 if isinstance(other, Packet):
614 cloneA = self.copy()
615 cloneB = other.copy()
616 cloneA.add_payload(cloneB)
617 return cloneA
618 elif isinstance(other, (bytes, str, bytearray, memoryview)):
619 return self / conf.raw_layer(load=bytes_encode(other))
620 else:
621 return other.__rdiv__(self) # type: ignore
622 __truediv__ = __div__
624 def __rdiv__(self, other):
625 # type: (Any) -> Packet
626 if isinstance(other, (bytes, str, bytearray, memoryview)):
627 return conf.raw_layer(load=bytes_encode(other)) / self
628 else:
629 raise TypeError
630 __rtruediv__ = __rdiv__
632 def __mul__(self, other):
633 # type: (Any) -> List[Packet]
634 if isinstance(other, int):
635 return [self] * other
636 else:
637 raise TypeError
639 def __rmul__(self, other):
640 # type: (Any) -> List[Packet]
641 return self.__mul__(other)
643 def __nonzero__(self):
644 # type: () -> bool
645 return True
646 __bool__ = __nonzero__
648 def __len__(self):
649 # type: () -> int
650 return len(self.__bytes__())
652 def copy_field_value(self, fieldname, value):
653 # type: (str, Any) -> Any
654 return self.get_field(fieldname).do_copy(value)
656 def copy_fields_dict(self, fields):
657 # type: (_T) -> _T
658 if fields is None:
659 return None
660 return {fname: self.copy_field_value(fname, fval)
661 for fname, fval in fields.items()}
663 def _raw_packet_cache_field_value(self, fld, val, copy=False):
664 # type: (AnyField, Any, bool) -> Optional[Any]
665 """Get a value representative of a mutable field to detect changes"""
666 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any]
667 if fld.holds_packets:
668 # avoid copying whole packets (perf: #GH3894)
669 if fld.islist:
670 return [
671 (_cpy(x.fields), x.payload.raw_packet_cache) for x in val
672 ]
673 else:
674 return (_cpy(val.fields), val.payload.raw_packet_cache)
675 elif fld.islist or fld.ismutable:
676 return _cpy(val)
677 return None
679 def clear_cache(self):
680 # type: () -> None
681 """Clear the raw packet cache for the field and all its subfields"""
682 self.raw_packet_cache = None
683 for fname, fval in self.fields.items():
684 fld = self.get_field(fname)
685 if fld.holds_packets:
686 if isinstance(fval, Packet):
687 fval.clear_cache()
688 elif isinstance(fval, list):
689 for fsubval in fval:
690 fsubval.clear_cache()
691 self.payload.clear_cache()
693 def self_build(self):
694 # type: () -> bytes
695 """
696 Create the default layer regarding fields_desc dict
697 """
698 if self.raw_packet_cache is not None and \
699 self.raw_packet_cache_fields is not None:
700 for fname, fval in self.raw_packet_cache_fields.items():
701 fld, val = self.getfield_and_val(fname)
702 if self._raw_packet_cache_field_value(fld, val) != fval:
703 self.raw_packet_cache = None
704 self.raw_packet_cache_fields = None
705 self.wirelen = None
706 break
707 if self.raw_packet_cache is not None:
708 return self.raw_packet_cache
709 p = b""
710 for f in self.fields_desc:
711 val = self.getfieldval(f.name)
712 if isinstance(val, RawVal):
713 p += bytes(val)
714 else:
715 try:
716 p = f.addfield(self, p, val)
717 except Exception as ex:
718 try:
719 ex.args = (
720 "While dissecting field '%s': " % f.name +
721 ex.args[0],
722 ) + ex.args[1:]
723 except (AttributeError, IndexError):
724 pass
725 raise ex
726 return p
728 def do_build_payload(self):
729 # type: () -> bytes
730 """
731 Create the default version of the payload layer
733 :return: a string of payload layer
734 """
735 return self.payload.do_build()
737 def do_build(self):
738 # type: () -> bytes
739 """
740 Create the default version of the layer
742 :return: a string of the packet with the payload
743 """
744 if not self.explicit:
745 self = next(iter(self))
746 pkt = self.self_build()
747 for t in self.post_transforms:
748 pkt = t(pkt)
749 pay = self.do_build_payload()
750 if self.raw_packet_cache is None:
751 return self.post_build(pkt, pay)
752 else:
753 return pkt + pay
755 def build_padding(self):
756 # type: () -> bytes
757 return self.payload.build_padding()
759 def build(self):
760 # type: () -> bytes
761 """
762 Create the current layer
764 :return: string of the packet with the payload
765 """
766 p = self.do_build()
767 p += self.build_padding()
768 p = self.build_done(p)
769 return p
771 def post_build(self, pkt, pay):
772 # type: (bytes, bytes) -> bytes
773 """
774 DEV: called right after the current layer is build.
776 :param str pkt: the current packet (build by self_build function)
777 :param str pay: the packet payload (build by do_build_payload function)
778 :return: a string of the packet with the payload
779 """
780 return pkt + pay
782 def build_done(self, p):
783 # type: (bytes) -> bytes
784 return self.payload.build_done(p)
786 def do_build_ps(self):
787 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501
788 p = b""
789 pl = []
790 q = b""
791 for f in self.fields_desc:
792 if isinstance(f, ConditionalField) and not f._evalcond(self):
793 continue
794 p = f.addfield(self, p, self.getfieldval(f.name))
795 if isinstance(p, bytes):
796 r = p[len(q):]
797 q = p
798 else:
799 r = b""
800 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r))
802 pkt, lst = self.payload.build_ps(internal=1)
803 p += pkt
804 lst.append((self, pl))
806 return p, lst
808 def build_ps(self, internal=0):
809 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501
810 p, lst = self.do_build_ps()
811# if not internal:
812# pkt = self
813# while pkt.haslayer(conf.padding_layer):
814# pkt = pkt.getlayer(conf.padding_layer)
815# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) )
816# p += pkt.load
817# pkt = pkt.payload
818 return p, lst
820 def canvas_dump(self, layer_shift=0, rebuild=1):
821 # type: (int, int) -> pyx.canvas.canvas
822 if PYX == 0:
823 raise ImportError("PyX and its dependencies must be installed")
824 canvas = pyx.canvas.canvas()
825 if rebuild:
826 _, t = self.__class__(raw(self)).build_ps()
827 else:
828 _, t = self.build_ps()
829 YTXTI = len(t)
830 for _, l in t:
831 YTXTI += len(l)
832 YTXT = float(YTXTI)
833 YDUMP = YTXT
835 XSTART = 1
836 XDSTART = 10
837 y = 0.0
838 yd = 0.0
839 XMUL = 0.55
840 YMUL = 0.4
842 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb)
843 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb)
844# backcolor=makecol(0.376, 0.729, 0.525, 1.0)
846 def hexstr(x):
847 # type: (bytes) -> str
848 return " ".join("%02x" % orb(c) for c in x)
850 def make_dump_txt(x, y, txt):
851 # type: (int, float, bytes) -> pyx.text.text
852 return pyx.text.text(
853 XDSTART + x * XMUL,
854 (YDUMP - y) * YMUL,
855 r"\tt{%s}" % hexstr(txt),
856 [pyx.text.size.Large]
857 )
859 def make_box(o):
860 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox
861 return pyx.box.rect(
862 o.left(), o.bottom(), o.width(), o.height(),
863 relcenter=(0.5, 0.5)
864 )
866 def make_frame(lst):
867 # type: (List[Any]) -> pyx.path.path
868 if len(lst) == 1:
869 b = lst[0].bbox()
870 b.enlarge(pyx.unit.u_pt)
871 return b.path()
872 else:
873 fb = lst[0].bbox()
874 fb.enlarge(pyx.unit.u_pt)
875 lb = lst[-1].bbox()
876 lb.enlarge(pyx.unit.u_pt)
877 if len(lst) == 2 and fb.left() > lb.right():
878 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()),
879 pyx.path.lineto(fb.left(), fb.top()),
880 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501
881 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501
882 pyx.path.moveto(lb.left(), lb.top()),
883 pyx.path.lineto(lb.right(), lb.top()),
884 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501
885 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501
886 else:
887 # XXX
888 gb = lst[1].bbox()
889 if gb != lb:
890 gb.enlarge(pyx.unit.u_pt)
891 kb = lst[-2].bbox()
892 if kb != gb and kb != lb:
893 kb.enlarge(pyx.unit.u_pt)
894 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()),
895 pyx.path.lineto(fb.right(), fb.top()),
896 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501
897 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501
898 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501
899 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501
900 pyx.path.lineto(lb.left(), gb.top()),
901 pyx.path.lineto(fb.left(), gb.top()),
902 pyx.path.closepath(),)
904 def make_dump(s, # type: bytes
905 shift=0, # type: int
906 y=0., # type: float
907 col=None, # type: pyx.color.color
908 bkcol=None, # type: pyx.color.color
909 large=16 # type: int
910 ):
911 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501
912 c = pyx.canvas.canvas()
913 tlist = []
914 while s:
915 dmp, s = s[:large - shift], s[large - shift:]
916 txt = make_dump_txt(shift, y, dmp)
917 tlist.append(txt)
918 shift += len(dmp)
919 if shift >= 16:
920 shift = 0
921 y += 1
922 if col is None:
923 col = pyx.color.rgb.red
924 if bkcol is None:
925 bkcol = pyx.color.rgb.white
926 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501
927 for txt in tlist:
928 c.insert(txt)
929 return c, tlist[-1].bbox(), shift, y
931 last_shift, last_y = 0, 0.0
932 while t:
933 bkcol = next(backcolor)
934 proto, fields = t.pop()
935 y += 0.5
936 pt = pyx.text.text(
937 XSTART,
938 (YTXT - y) * YMUL,
939 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(
940 str(proto.name)
941 ),
942 [pyx.text.size.Large]
943 )
944 y += 1
945 ptbb = pt.bbox()
946 ptbb.enlarge(pyx.unit.u_pt * 2)
947 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501
948 canvas.insert(pt)
949 for field, fval, fdump in fields:
950 col = next(forecolor)
951 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501
952 if isinstance(field, BitField):
953 fsize = '%sb' % field.size
954 else:
955 fsize = '%sB' % len(fdump)
956 if (hasattr(field, 'field') and
957 'LE' in field.field.__class__.__name__[:3] or
958 'LE' in field.__class__.__name__[:3]):
959 fsize = r'$\scriptstyle\langle$' + fsize
960 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501
961 if isinstance(fval, str):
962 if len(fval) > 18:
963 fval = fval[:18] + "[...]"
964 else:
965 fval = ""
966 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501
967 y += 1.0
968 if fdump:
969 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501
971 dtb = target
972 vtb = vt.bbox()
973 bxvt = make_box(vtb)
974 bxdt = make_box(dtb)
975 dtb.enlarge(pyx.unit.u_pt)
976 try:
977 if yd < 0:
978 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501
979 else:
980 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501
981 except Exception:
982 pass
983 else:
984 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501
986 canvas.insert(dt)
988 canvas.insert(ft)
989 canvas.insert(st)
990 canvas.insert(vt)
991 last_y += layer_shift
993 return canvas
995 def extract_padding(self, s):
996 # type: (bytes) -> Tuple[bytes, Optional[bytes]]
997 """
998 DEV: to be overloaded to extract current layer's padding.
1000 :param str s: the current layer
1001 :return: a couple of strings (actual layer, padding)
1002 """
1003 return s, None
1005 def post_dissect(self, s):
1006 # type: (bytes) -> bytes
1007 """DEV: is called right after the current layer has been dissected"""
1008 return s
1010 def pre_dissect(self, s):
1011 # type: (bytes) -> bytes
1012 """DEV: is called right before the current layer is dissected"""
1013 return s
1015 def do_dissect(self, s):
1016 # type: (bytes) -> bytes
1017 _raw = s
1018 self.raw_packet_cache_fields = {}
1019 for f in self.fields_desc:
1020 s, fval = f.getfield(self, s)
1021 # Skip unused ConditionalField
1022 if isinstance(f, ConditionalField) and fval is None:
1023 continue
1024 # We need to track fields with mutable values to discard
1025 # .raw_packet_cache when needed.
1026 if (f.islist or f.holds_packets or f.ismutable) and fval is not None:
1027 self.raw_packet_cache_fields[f.name] = \
1028 self._raw_packet_cache_field_value(f, fval, copy=True)
1029 self.fields[f.name] = fval
1030 # Nothing left to dissect
1031 if not s and (isinstance(f, MayEnd) or
1032 (fval is not None and isinstance(f, ConditionalField) and
1033 isinstance(f.fld, MayEnd))):
1034 break
1035 self.raw_packet_cache = _raw[:-len(s)] if s else _raw
1036 self.explicit = 1
1037 return s
1039 def do_dissect_payload(self, s):
1040 # type: (bytes) -> None
1041 """
1042 Perform the dissection of the layer's payload
1044 :param str s: the raw layer
1045 """
1046 if s:
1047 if (
1048 self.stop_dissection_after and
1049 isinstance(self, self.stop_dissection_after)
1050 ):
1051 # stop dissection here
1052 p = conf.raw_layer(s, _internal=1, _underlayer=self)
1053 self.add_payload(p)
1054 return
1055 cls = self.guess_payload_class(s)
1056 try:
1057 p = cls(
1058 s,
1059 stop_dissection_after=self.stop_dissection_after,
1060 _internal=1,
1061 _underlayer=self,
1062 )
1063 except KeyboardInterrupt:
1064 raise
1065 except Exception:
1066 if conf.debug_dissector:
1067 if issubtype(cls, Packet):
1068 log_runtime.error("%s dissector failed", cls.__name__)
1069 else:
1070 log_runtime.error("%s.guess_payload_class() returned "
1071 "[%s]",
1072 self.__class__.__name__, repr(cls))
1073 if cls is not None:
1074 raise
1075 p = conf.raw_layer(s, _internal=1, _underlayer=self)
1076 self.add_payload(p)
1078 def dissect(self, s):
1079 # type: (bytes) -> None
1080 s = self.pre_dissect(s)
1082 s = self.do_dissect(s)
1084 s = self.post_dissect(s)
1086 payl, pad = self.extract_padding(s)
1087 self.do_dissect_payload(payl)
1088 if pad and conf.padding:
1089 self.add_payload(conf.padding_layer(pad))
1091 def guess_payload_class(self, payload):
1092 # type: (bytes) -> Type[Packet]
1093 """
1094 DEV: Guesses the next payload class from layer bonds.
1095 Can be overloaded to use a different mechanism.
1097 :param str payload: the layer's payload
1098 :return: the payload class
1099 """
1100 for t in self.aliastypes:
1101 for fval, cls in t.payload_guess:
1102 try:
1103 if all(v == self.getfieldval(k)
1104 for k, v in fval.items()):
1105 return cls # type: ignore
1106 except AttributeError:
1107 pass
1108 return self.default_payload_class(payload)
1110 def default_payload_class(self, payload):
1111 # type: (bytes) -> Type[Packet]
1112 """
1113 DEV: Returns the default payload class if nothing has been found by the
1114 guess_payload_class() method.
1116 :param str payload: the layer's payload
1117 :return: the default payload class define inside the configuration file
1118 """
1119 return conf.raw_layer
1121 def hide_defaults(self):
1122 # type: () -> None
1123 """Removes fields' values that are the same as default values."""
1124 # use list(): self.fields is modified in the loop
1125 for k, v in list(self.fields.items()):
1126 v = self.fields[k]
1127 if k in self.default_fields:
1128 if self.default_fields[k] == v:
1129 del self.fields[k]
1130 self.payload.hide_defaults()
1132 def clone_with(self, payload=None, **kargs):
1133 # type: (Optional[Any], **Any) -> Any
1134 pkt = self.__class__()
1135 pkt.explicit = 1
1136 pkt.fields = kargs
1137 pkt.default_fields = self.copy_fields_dict(self.default_fields)
1138 pkt.overloaded_fields = self.overloaded_fields.copy()
1139 pkt.time = self.time
1140 pkt.underlayer = self.underlayer
1141 pkt.parent = self.parent
1142 pkt.post_transforms = self.post_transforms
1143 pkt.raw_packet_cache = self.raw_packet_cache
1144 pkt.raw_packet_cache_fields = self.copy_fields_dict(
1145 self.raw_packet_cache_fields
1146 )
1147 pkt.wirelen = self.wirelen
1148 pkt.comment = self.comment
1149 pkt.sniffed_on = self.sniffed_on
1150 pkt.direction = self.direction
1151 if payload is not None:
1152 pkt.add_payload(payload)
1153 return pkt
1155 def __iter__(self):
1156 # type: () -> Iterator[Packet]
1157 """Iterates through all sub-packets generated by this Packet."""
1158 def loop(todo, done, self=self):
1159 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet]
1160 if todo:
1161 eltname = todo.pop()
1162 elt = self.getfieldval(eltname)
1163 if not isinstance(elt, Gen):
1164 if self.get_field(eltname).islist:
1165 elt = SetGen([elt])
1166 else:
1167 elt = SetGen(elt)
1168 for e in elt:
1169 done[eltname] = e
1170 for x in loop(todo[:], done):
1171 yield x
1172 else:
1173 if isinstance(self.payload, NoPayload):
1174 payloads = SetGen([None]) # type: SetGen[Packet]
1175 else:
1176 payloads = self.payload
1177 for payl in payloads:
1178 # Let's make sure subpackets are consistent
1179 done2 = done.copy()
1180 for k in done2:
1181 if isinstance(done2[k], VolatileValue):
1182 done2[k] = done2[k]._fix()
1183 pkt = self.clone_with(payload=payl, **done2)
1184 yield pkt
1186 if self.explicit or self.raw_packet_cache is not None:
1187 todo = []
1188 done = self.fields
1189 else:
1190 todo = [k for (k, v) in itertools.chain(self.default_fields.items(),
1191 self.overloaded_fields.items())
1192 if isinstance(v, VolatileValue)] + list(self.fields)
1193 done = {}
1194 return loop(todo, done)
1196 def iterpayloads(self):
1197 # type: () -> Iterator[Packet]
1198 """Used to iter through the payloads of a Packet.
1199 Useful for DNS or 802.11 for instance.
1200 """
1201 yield self
1202 current = self
1203 while current.payload:
1204 current = current.payload
1205 yield current
1207 def __gt__(self, other):
1208 # type: (Packet) -> int
1209 """True if other is an answer from self (self ==> other)."""
1210 if isinstance(other, Packet):
1211 return other < self
1212 elif isinstance(other, bytes):
1213 return 1
1214 else:
1215 raise TypeError((self, other))
1217 def __lt__(self, other):
1218 # type: (Packet) -> int
1219 """True if self is an answer from other (other ==> self)."""
1220 if isinstance(other, Packet):
1221 return self.answers(other)
1222 elif isinstance(other, bytes):
1223 return 1
1224 else:
1225 raise TypeError((self, other))
1227 def __eq__(self, other):
1228 # type: (Any) -> bool
1229 if not isinstance(other, self.__class__):
1230 return False
1231 for f in self.fields_desc:
1232 if f not in other.fields_desc:
1233 return False
1234 if self.getfieldval(f.name) != other.getfieldval(f.name):
1235 return False
1236 return self.payload == other.payload
1238 def __ne__(self, other):
1239 # type: (Any) -> bool
1240 return not self.__eq__(other)
1242 # Note: setting __hash__ to None is the standard way
1243 # of making an object un-hashable. mypy doesn't know that
1244 __hash__ = None # type: ignore
1246 def hashret(self):
1247 # type: () -> bytes
1248 """DEV: returns a string that has the same value for a request
1249 and its answer."""
1250 return self.payload.hashret()
1252 def answers(self, other):
1253 # type: (Packet) -> int
1254 """DEV: true if self is an answer from other"""
1255 if other.__class__ == self.__class__:
1256 return self.payload.answers(other.payload)
1257 return 0
1259 def layers(self):
1260 # type: () -> List[Type[Packet]]
1261 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501
1262 layers = []
1263 lyr = self # type: Optional[Packet]
1264 while lyr:
1265 layers.append(lyr.__class__)
1266 lyr = lyr.payload.getlayer(0, _subclass=True)
1267 return layers
1269 def haslayer(self, cls, _subclass=None):
1270 # type: (Union[Type[Packet], str], Optional[bool]) -> int
1271 """
1272 true if self has a layer that is an instance of cls.
1273 Superseded by "cls in self" syntax.
1274 """
1275 if _subclass is None:
1276 _subclass = self.match_subclass or None
1277 if _subclass:
1278 match = issubtype
1279 else:
1280 match = lambda x, t: bool(x == t)
1281 if cls is None or match(self.__class__, cls) \
1282 or cls in [self.__class__.__name__, self._name]:
1283 return True
1284 for f in self.packetfields:
1285 fvalue_gen = self.getfieldval(f.name)
1286 if fvalue_gen is None:
1287 continue
1288 if not f.islist:
1289 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1290 for fvalue in fvalue_gen:
1291 if isinstance(fvalue, Packet):
1292 ret = fvalue.haslayer(cls, _subclass=_subclass)
1293 if ret:
1294 return ret
1295 return self.payload.haslayer(cls, _subclass=_subclass)
1297 def getlayer(self,
1298 cls, # type: Union[int, Type[Packet], str]
1299 nb=1, # type: int
1300 _track=None, # type: Optional[List[int]]
1301 _subclass=None, # type: Optional[bool]
1302 **flt # type: Any
1303 ):
1304 # type: (...) -> Optional[Packet]
1305 """Return the nb^th layer that is an instance of cls, matching flt
1306values.
1307 """
1308 if _subclass is None:
1309 _subclass = self.match_subclass or None
1310 if _subclass:
1311 match = issubtype
1312 else:
1313 match = lambda x, t: bool(x == t)
1314 # Note:
1315 # cls can be int, packet, str
1316 # string_class_name can be packet, str (packet or packet+field)
1317 # class_name can be packet, str (packet only)
1318 if isinstance(cls, int):
1319 nb = cls + 1
1320 string_class_name = "" # type: Union[Type[Packet], str]
1321 else:
1322 string_class_name = cls
1323 class_name = "" # type: Union[Type[Packet], str]
1324 fld = None # type: Optional[str]
1325 if isinstance(string_class_name, str) and "." in string_class_name:
1326 class_name, fld = string_class_name.split(".", 1)
1327 else:
1328 class_name, fld = string_class_name, None
1329 if not class_name or match(self.__class__, class_name) \
1330 or class_name in [self.__class__.__name__, self._name]:
1331 if all(self.getfieldval(fldname) == fldvalue
1332 for fldname, fldvalue in flt.items()):
1333 if nb == 1:
1334 if fld is None:
1335 return self
1336 else:
1337 return self.getfieldval(fld) # type: ignore
1338 else:
1339 nb -= 1
1340 for f in self.packetfields:
1341 fvalue_gen = self.getfieldval(f.name)
1342 if fvalue_gen is None:
1343 continue
1344 if not f.islist:
1345 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1346 for fvalue in fvalue_gen:
1347 if isinstance(fvalue, Packet):
1348 track = [] # type: List[int]
1349 ret = fvalue.getlayer(class_name, nb=nb, _track=track,
1350 _subclass=_subclass, **flt)
1351 if ret is not None:
1352 return ret
1353 nb = track[0]
1354 return self.payload.getlayer(class_name, nb=nb, _track=_track,
1355 _subclass=_subclass, **flt)
1357 def firstlayer(self):
1358 # type: () -> Packet
1359 q = self
1360 while q.underlayer is not None:
1361 q = q.underlayer
1362 return q
1364 def __getitem__(self, cls):
1365 # type: (Union[Type[Packet], str]) -> Any
1366 if isinstance(cls, slice):
1367 lname = cls.start
1368 if cls.stop:
1369 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {}))
1370 else:
1371 ret = self.getlayer(cls.start, **(cls.step or {}))
1372 else:
1373 lname = cls
1374 ret = self.getlayer(cls)
1375 if ret is None:
1376 if isinstance(lname, type):
1377 name = lname.__name__
1378 elif not isinstance(lname, bytes):
1379 name = repr(lname)
1380 else:
1381 name = cast(str, lname)
1382 raise IndexError("Layer [%s] not found" % name)
1383 return ret
1385 def __delitem__(self, cls):
1386 # type: (Type[Packet]) -> None
1387 del self[cls].underlayer.payload
1389 def __setitem__(self, cls, val):
1390 # type: (Type[Packet], Packet) -> None
1391 self[cls].underlayer.payload = val
1393 def __contains__(self, cls):
1394 # type: (Union[Type[Packet], str]) -> int
1395 """
1396 "cls in self" returns true if self has a layer which is an
1397 instance of cls.
1398 """
1399 return self.haslayer(cls)
1401 def route(self):
1402 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
1403 return self.payload.route()
1405 def fragment(self, *args, **kargs):
1406 # type: (*Any, **Any) -> List[Packet]
1407 return self.payload.fragment(*args, **kargs)
1409 def display(self, *args, **kargs): # Deprecated. Use show()
1410 # type: (*Any, **Any) -> None
1411 """Deprecated. Use show() method."""
1412 self.show(*args, **kargs)
1414 def _show_or_dump(self,
1415 dump=False, # type: bool
1416 indent=3, # type: int
1417 lvl="", # type: str
1418 label_lvl="", # type: str
1419 first_call=True # type: bool
1420 ):
1421 # type: (...) -> Optional[str]
1422 """
1423 Internal method that shows or dumps a hierarchical view of a packet.
1424 Called by show.
1426 :param dump: determine if it prints or returns the string value
1427 :param int indent: the size of indentation for each layer
1428 :param str lvl: additional information about the layer lvl
1429 :param str label_lvl: additional information about the layer fields
1430 :param first_call: determine if the current function is the first
1431 :return: return a hierarchical view if dump, else print it
1432 """
1434 if dump:
1435 from scapy.themes import ColorTheme, AnsiColorTheme
1436 ct: ColorTheme = AnsiColorTheme() # No color for dump output
1437 else:
1438 ct = conf.color_theme
1439 s = "%s%s %s %s\n" % (label_lvl,
1440 ct.punct("###["),
1441 ct.layer_name(self.name),
1442 ct.punct("]###"))
1443 fields = self.fields_desc.copy()
1444 while fields:
1445 f = fields.pop(0)
1446 if isinstance(f, ConditionalField) and not f._evalcond(self):
1447 continue
1448 if hasattr(f, "fields"): # Field has subfields
1449 s += "%s %s =\n" % (
1450 label_lvl + lvl,
1451 ct.depreciate_field_name(f.name),
1452 )
1453 lvl += " " * indent * self.show_indent
1454 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)):
1455 fields.insert(i, fld)
1456 continue
1457 if isinstance(f, Emph) or f in conf.emph:
1458 ncol = ct.emph_field_name
1459 vcol = ct.emph_field_value
1460 else:
1461 ncol = ct.field_name
1462 vcol = ct.field_value
1463 pad = max(0, 10 - len(f.name)) * " "
1464 fvalue = self.getfieldval(f.name)
1465 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501
1466 s += "%s %s%s%s%s\n" % (label_lvl + lvl,
1467 ct.punct("\\"),
1468 ncol(f.name),
1469 pad,
1470 ct.punct("\\"))
1471 fvalue_gen = SetGen(
1472 fvalue,
1473 _iterpacket=0
1474 ) # type: SetGen[Packet]
1475 for fvalue in fvalue_gen:
1476 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501
1477 else:
1478 begn = "%s %s%s%s " % (label_lvl + lvl,
1479 ncol(f.name),
1480 pad,
1481 ct.punct("="),)
1482 reprval = f.i2repr(self, fvalue)
1483 if isinstance(reprval, str):
1484 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501
1485 len(lvl) +
1486 len(f.name) +
1487 4))
1488 s += "%s%s\n" % (begn, vcol(reprval))
1489 if self.payload:
1490 s += self.payload._show_or_dump( # type: ignore
1491 dump=dump,
1492 indent=indent,
1493 lvl=lvl + (" " * indent * self.show_indent),
1494 label_lvl=label_lvl,
1495 first_call=False
1496 )
1498 if first_call and not dump:
1499 print(s)
1500 return None
1501 else:
1502 return s
1504 def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1505 # type: (bool, int, str, str) -> Optional[Any]
1506 """
1507 Prints or returns (when "dump" is true) a hierarchical view of the
1508 packet.
1510 :param dump: determine if it prints or returns the string value
1511 :param int indent: the size of indentation for each layer
1512 :param str lvl: additional information about the layer lvl
1513 :param str label_lvl: additional information about the layer fields
1514 :return: return a hierarchical view if dump, else print it
1515 """
1516 return self._show_or_dump(dump, indent, lvl, label_lvl)
1518 def show2(self, dump=False, indent=3, lvl="", label_lvl=""):
1519 # type: (bool, int, str, str) -> Optional[Any]
1520 """
1521 Prints or returns (when "dump" is true) a hierarchical view of an
1522 assembled version of the packet, so that automatic fields are
1523 calculated (checksums, etc.)
1525 :param dump: determine if it prints or returns the string value
1526 :param int indent: the size of indentation for each layer
1527 :param str lvl: additional information about the layer lvl
1528 :param str label_lvl: additional information about the layer fields
1529 :return: return a hierarchical view if dump, else print it
1530 """
1531 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl)
1533 def sprintf(self, fmt, relax=1):
1534 # type: (str, int) -> str
1535 """
1536 sprintf(format, [relax=1]) -> str
1538 Where format is a string that can include directives. A directive
1539 begins and ends by % and has the following format:
1540 ``%[fmt[r],][cls[:nb].]field%``
1542 :param fmt: is a classic printf directive, "r" can be appended for raw
1543 substitution:
1544 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer
1545 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer).
1546 Special case : "%.time%" is the creation time.
1547 Ex::
1549 p.sprintf(
1550 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% "
1551 "%03xr,IP.proto% %r,TCP.flags%"
1552 )
1554 Moreover, the format string can include conditional statements. A
1555 conditional statement looks like : {layer:string} where layer is a
1556 layer name, and string is the string to insert in place of the
1557 condition if it is true, i.e. if layer is present. If layer is
1558 preceded by a "!", the result is inverted. Conditions can be
1559 imbricated. A valid statement can be::
1561 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
1562 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}")
1564 A side effect is that, to obtain "{" and "}" characters, you must use
1565 "%(" and "%)".
1566 """
1568 escape = {"%": "%",
1569 "(": "{",
1570 ")": "}"}
1572 # Evaluate conditions
1573 while "{" in fmt:
1574 i = fmt.rindex("{")
1575 j = fmt[i + 1:].index("}")
1576 cond = fmt[i + 1:i + j + 1]
1577 k = cond.find(":")
1578 if k < 0:
1579 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501
1580 cond, format_ = cond[:k], cond[k + 1:]
1581 res = False
1582 if cond[0] == "!":
1583 res = True
1584 cond = cond[1:]
1585 if self.haslayer(cond):
1586 res = not res
1587 if not res:
1588 format_ = ""
1589 fmt = fmt[:i] + format_ + fmt[i + j + 2:]
1591 # Evaluate directives
1592 s = ""
1593 while "%" in fmt:
1594 i = fmt.index("%")
1595 s += fmt[:i]
1596 fmt = fmt[i + 1:]
1597 if fmt and fmt[0] in escape:
1598 s += escape[fmt[0]]
1599 fmt = fmt[1:]
1600 continue
1601 try:
1602 i = fmt.index("%")
1603 sfclsfld = fmt[:i]
1604 fclsfld = sfclsfld.split(",")
1605 if len(fclsfld) == 1:
1606 f = "s"
1607 clsfld = fclsfld[0]
1608 elif len(fclsfld) == 2:
1609 f, clsfld = fclsfld
1610 else:
1611 raise Scapy_Exception
1612 if "." in clsfld:
1613 cls, fld = clsfld.split(".")
1614 else:
1615 cls = self.__class__.__name__
1616 fld = clsfld
1617 num = 1
1618 if ":" in cls:
1619 cls, snum = cls.split(":")
1620 num = int(snum)
1621 fmt = fmt[i + 1:]
1622 except Exception:
1623 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501
1624 else:
1625 if fld == "time":
1626 val = time.strftime(
1627 "%H:%M:%S.%%06i",
1628 time.localtime(float(self.time))
1629 ) % int((self.time - int(self.time)) * 1000000)
1630 elif cls == self.__class__.__name__ and hasattr(self, fld):
1631 if num > 1:
1632 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501
1633 f = "s"
1634 else:
1635 try:
1636 val = self.getfieldval(fld)
1637 except AttributeError:
1638 val = getattr(self, fld)
1639 if f[-1] == "r": # Raw field value
1640 f = f[:-1]
1641 if not f:
1642 f = "s"
1643 else:
1644 if fld in self.fieldtype:
1645 val = self.fieldtype[fld].i2repr(self, val)
1646 else:
1647 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax)
1648 f = "s"
1649 s += ("%" + f) % val
1651 s += fmt
1652 return s
1654 def mysummary(self):
1655 # type: () -> str
1656 """DEV: can be overloaded to return a string that summarizes the layer.
1657 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501
1658 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501
1659 mysummary() must be called if they are present."""
1660 return ""
1662 def _do_summary(self):
1663 # type: () -> Tuple[int, str, List[Any]]
1664 found, s, needed = self.payload._do_summary()
1665 ret = ""
1666 if not found or self.__class__ in needed:
1667 ret = self.mysummary()
1668 if isinstance(ret, tuple):
1669 ret, n = ret
1670 needed += n
1671 if ret or needed:
1672 found = 1
1673 if not ret:
1674 ret = self.__class__.__name__ if self.show_summary else ""
1675 if self.__class__ in conf.emph:
1676 impf = []
1677 for f in self.fields_desc:
1678 if f in conf.emph:
1679 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501
1680 ret = "%s [%s]" % (ret, " ".join(impf))
1681 if ret and s:
1682 ret = "%s / %s" % (ret, s)
1683 else:
1684 ret = "%s%s" % (ret, s)
1685 return found, ret, needed
1687 def summary(self, intern=0):
1688 # type: (int) -> str
1689 """Prints a one line summary of a packet."""
1690 return self._do_summary()[1]
1692 def lastlayer(self, layer=None):
1693 # type: (Optional[Packet]) -> Packet
1694 """Returns the uppest layer of the packet"""
1695 return self.payload.lastlayer(self)
1697 def decode_payload_as(self, cls):
1698 # type: (Type[Packet]) -> None
1699 """Reassembles the payload and decode it using another packet class"""
1700 s = raw(self.payload)
1701 self.payload = cls(s, _internal=1, _underlayer=self)
1702 pp = self
1703 while pp.underlayer is not None:
1704 pp = pp.underlayer
1705 self.payload.dissection_done(pp)
1707 def _command(self, json=False):
1708 # type: (bool) -> List[Tuple[str, Any]]
1709 """
1710 Internal method used to generate command() and json()
1711 """
1712 f = []
1713 iterator: Iterator[Tuple[str, Any]]
1714 if json:
1715 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc)
1716 else:
1717 iterator = iter(self.fields.items())
1718 for fn, fv in iterator:
1719 fld = self.get_field(fn)
1720 if isinstance(fv, (list, dict, set)) and not fv and not fld.default:
1721 continue
1722 if isinstance(fv, Packet):
1723 if json:
1724 fv = {k: v for (k, v) in fv._command(json=True)}
1725 else:
1726 fv = fv.command()
1727 elif fld.islist and fld.holds_packets and isinstance(fv, list):
1728 if json:
1729 fv = [
1730 {k: v for (k, v) in x}
1731 for x in map(lambda y: Packet._command(y, json=True), fv)
1732 ]
1733 else:
1734 fv = "[%s]" % ",".join(map(Packet.command, fv))
1735 elif fld.islist and isinstance(fv, list):
1736 if json:
1737 fv = [
1738 getattr(x, 'command', lambda: repr(x))()
1739 for x in fv
1740 ]
1741 else:
1742 fv = "[%s]" % ",".join(
1743 getattr(x, 'command', lambda: repr(x))()
1744 for x in fv
1745 )
1746 elif isinstance(fv, FlagValue):
1747 fv = int(fv)
1748 elif callable(getattr(fv, 'command', None)):
1749 fv = fv.command(json=json)
1750 else:
1751 if json:
1752 if isinstance(fv, bytes):
1753 fv = fv.decode("utf-8", errors="backslashreplace")
1754 else:
1755 fv = fld.i2h(self, fv)
1756 else:
1757 fv = repr(fld.i2h(self, fv))
1758 f.append((fn, fv))
1759 return f
1761 def command(self):
1762 # type: () -> str
1763 """
1764 Returns a string representing the command you have to type to
1765 obtain the same packet
1766 """
1767 c = "%s(%s)" % (
1768 self.__class__.__name__,
1769 ", ".join("%s=%s" % x for x in self._command())
1770 )
1771 pc = self.payload.command()
1772 if pc:
1773 c += "/" + pc
1774 return c
1776 def json(self):
1777 # type: () -> str
1778 """
1779 Returns a JSON representing the packet.
1781 Please note that this cannot be used for bijective usage: data loss WILL occur,
1782 so it will not make sense to try to rebuild the packet from the output.
1783 This must only be used for a grepping/displaying purpose.
1784 """
1785 dump = json.dumps({k: v for (k, v) in self._command(json=True)})
1786 pc = self.payload.json()
1787 if pc:
1788 dump = dump[:-1] + ", \"payload\": %s}" % pc
1789 return dump
1792class NoPayload(Packet):
1793 def __new__(cls, *args, **kargs):
1794 # type: (Type[Packet], *Any, **Any) -> NoPayload
1795 singl = cls.__dict__.get("__singl__")
1796 if singl is None:
1797 cls.__singl__ = singl = Packet.__new__(cls)
1798 Packet.__init__(singl)
1799 return cast(NoPayload, singl)
1801 def __init__(self, *args, **kargs):
1802 # type: (*Any, **Any) -> None
1803 pass
1805 def dissection_done(self, pkt):
1806 # type: (Packet) -> None
1807 pass
1809 def add_payload(self, payload):
1810 # type: (Union[Packet, bytes]) -> NoReturn
1811 raise Scapy_Exception("Can't add payload to NoPayload instance")
1813 def remove_payload(self):
1814 # type: () -> None
1815 pass
1817 def add_underlayer(self, underlayer):
1818 # type: (Any) -> None
1819 pass
1821 def remove_underlayer(self, other):
1822 # type: (Packet) -> None
1823 pass
1825 def add_parent(self, parent):
1826 # type: (Any) -> None
1827 pass
1829 def remove_parent(self, other):
1830 # type: (Packet) -> None
1831 pass
1833 def copy(self):
1834 # type: () -> NoPayload
1835 return self
1837 def clear_cache(self):
1838 # type: () -> None
1839 pass
1841 def __repr__(self):
1842 # type: () -> str
1843 return ""
1845 def __str__(self):
1846 # type: () -> str
1847 return ""
1849 def __bytes__(self):
1850 # type: () -> bytes
1851 return b""
1853 def __nonzero__(self):
1854 # type: () -> bool
1855 return False
1856 __bool__ = __nonzero__
1858 def do_build(self):
1859 # type: () -> bytes
1860 return b""
1862 def build(self):
1863 # type: () -> bytes
1864 return b""
1866 def build_padding(self):
1867 # type: () -> bytes
1868 return b""
1870 def build_done(self, p):
1871 # type: (bytes) -> bytes
1872 return p
1874 def build_ps(self, internal=0):
1875 # type: (int) -> Tuple[bytes, List[Any]]
1876 return b"", []
1878 def getfieldval(self, attr):
1879 # type: (str) -> NoReturn
1880 raise AttributeError(attr)
1882 def getfield_and_val(self, attr):
1883 # type: (str) -> NoReturn
1884 raise AttributeError(attr)
1886 def setfieldval(self, attr, val):
1887 # type: (str, Any) -> NoReturn
1888 raise AttributeError(attr)
1890 def delfieldval(self, attr):
1891 # type: (str) -> NoReturn
1892 raise AttributeError(attr)
1894 def hide_defaults(self):
1895 # type: () -> None
1896 pass
1898 def __iter__(self):
1899 # type: () -> Iterator[Packet]
1900 return iter([])
1902 def __eq__(self, other):
1903 # type: (Any) -> bool
1904 if isinstance(other, NoPayload):
1905 return True
1906 return False
1908 def hashret(self):
1909 # type: () -> bytes
1910 return b""
1912 def answers(self, other):
1913 # type: (Packet) -> bool
1914 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501
1916 def haslayer(self, cls, _subclass=None):
1917 # type: (Union[Type[Packet], str], Optional[bool]) -> int
1918 return 0
1920 def getlayer(self,
1921 cls, # type: Union[int, Type[Packet], str]
1922 nb=1, # type: int
1923 _track=None, # type: Optional[List[int]]
1924 _subclass=None, # type: Optional[bool]
1925 **flt # type: Any
1926 ):
1927 # type: (...) -> Optional[Packet]
1928 if _track is not None:
1929 _track.append(nb)
1930 return None
1932 def fragment(self, *args, **kargs):
1933 # type: (*Any, **Any) -> List[Packet]
1934 raise Scapy_Exception("cannot fragment this packet")
1936 def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1937 # type: (bool, int, str, str) -> None
1938 pass
1940 def sprintf(self, fmt, relax=1):
1941 # type: (str, int) -> str
1942 if relax:
1943 return "??"
1944 else:
1945 raise Scapy_Exception("Format not found [%s]" % fmt)
1947 def _do_summary(self):
1948 # type: () -> Tuple[int, str, List[Any]]
1949 return 0, "", []
1951 def layers(self):
1952 # type: () -> List[Type[Packet]]
1953 return []
1955 def lastlayer(self, layer=None):
1956 # type: (Optional[Packet]) -> Packet
1957 return layer or self
1959 def command(self):
1960 # type: () -> str
1961 return ""
1963 def json(self):
1964 # type: () -> str
1965 return ""
1967 def route(self):
1968 # type: () -> Tuple[None, None, None]
1969 return (None, None, None)
1972####################
1973# packet classes #
1974####################
1977class Raw(Packet):
1978 name = "Raw"
1979 fields_desc = [StrField("load", b"")]
1981 def __init__(self, _pkt=b"", *args, **kwargs):
1982 # type: (bytes, *Any, **Any) -> None
1983 if _pkt and not isinstance(_pkt, bytes):
1984 if isinstance(_pkt, tuple):
1985 _pkt, bn = _pkt
1986 _pkt = bytes_encode(_pkt), bn
1987 else:
1988 _pkt = bytes_encode(_pkt)
1989 super(Raw, self).__init__(_pkt, *args, **kwargs)
1991 def answers(self, other):
1992 # type: (Packet) -> int
1993 return 1
1995 def mysummary(self):
1996 # type: () -> str
1997 cs = conf.raw_summary
1998 if cs:
1999 if callable(cs):
2000 return "Raw %s" % cs(self.load)
2001 else:
2002 return "Raw %r" % self.load
2003 return Packet.mysummary(self)
2006class Padding(Raw):
2007 name = "Padding"
2009 def self_build(self):
2010 # type: (Optional[Any]) -> bytes
2011 return b""
2013 def build_padding(self):
2014 # type: () -> bytes
2015 return (
2016 bytes_encode(self.load) if self.raw_packet_cache is None
2017 else self.raw_packet_cache
2018 ) + self.payload.build_padding()
2021conf.raw_layer = Raw
2022conf.padding_layer = Padding
2023if conf.default_l2 is None:
2024 conf.default_l2 = Raw
2026#################
2027# Bind layers #
2028#################
2031def bind_bottom_up(lower, # type: Type[Packet]
2032 upper, # type: Type[Packet]
2033 __fval=None, # type: Optional[Any]
2034 **fval # type: Any
2035 ):
2036 # type: (...) -> None
2037 r"""Bind 2 layers for dissection.
2038 The upper layer will be chosen for dissection on top of the lower layer, if
2039 ALL the passed arguments are validated. If multiple calls are made with
2040 the same layers, the last one will be used as default.
2042 ex:
2043 >>> bind_bottom_up(Ether, SNAP, type=0x1234)
2044 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501
2045 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501
2046 """
2047 if __fval is not None:
2048 fval.update(__fval)
2049 lower.payload_guess = lower.payload_guess[:]
2050 lower.payload_guess.append((fval, upper))
2053def bind_top_down(lower, # type: Type[Packet]
2054 upper, # type: Type[Packet]
2055 __fval=None, # type: Optional[Any]
2056 **fval # type: Any
2057 ):
2058 # type: (...) -> None
2059 """Bind 2 layers for building.
2060 When the upper layer is added as a payload of the lower layer, all the
2061 arguments will be applied to them.
2063 ex:
2064 >>> bind_top_down(Ether, SNAP, type=0x1234)
2065 >>> Ether()/SNAP()
2066 <Ether type=0x1234 |<SNAP |>>
2067 """
2068 if __fval is not None:
2069 fval.update(__fval)
2070 upper._overload_fields = upper._overload_fields.copy() # type: ignore
2071 upper._overload_fields[lower] = fval
2074@conf.commands.register
2075def bind_layers(lower, # type: Type[Packet]
2076 upper, # type: Type[Packet]
2077 __fval=None, # type: Optional[Dict[str, int]]
2078 **fval # type: Any
2079 ):
2080 # type: (...) -> None
2081 """Bind 2 layers on some specific fields' values.
2083 It makes the packet being built and dissected when the arguments
2084 are present.
2086 This function calls both bind_bottom_up and bind_top_down, with
2087 all passed arguments.
2089 Please have a look at their docs:
2090 - help(bind_bottom_up)
2091 - help(bind_top_down)
2092 """
2093 if __fval is not None:
2094 fval.update(__fval)
2095 bind_top_down(lower, upper, **fval)
2096 bind_bottom_up(lower, upper, **fval)
2099def split_bottom_up(lower, # type: Type[Packet]
2100 upper, # type: Type[Packet]
2101 __fval=None, # type: Optional[Any]
2102 **fval # type: Any
2103 ):
2104 # type: (...) -> None
2105 """This call un-links an association that was made using bind_bottom_up.
2106 Have a look at help(bind_bottom_up)
2107 """
2108 if __fval is not None:
2109 fval.update(__fval)
2111 def do_filter(params, cls):
2112 # type: (Dict[str, int], Type[Packet]) -> bool
2113 params_is_invalid = any(
2114 k not in params or params[k] != v for k, v in fval.items()
2115 )
2116 return cls != upper or params_is_invalid
2117 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)]
2120def split_top_down(lower, # type: Type[Packet]
2121 upper, # type: Type[Packet]
2122 __fval=None, # type: Optional[Any]
2123 **fval # type: Any
2124 ):
2125 # type: (...) -> None
2126 """This call un-links an association that was made using bind_top_down.
2127 Have a look at help(bind_top_down)
2128 """
2129 if __fval is not None:
2130 fval.update(__fval)
2131 if lower in upper._overload_fields:
2132 ofval = upper._overload_fields[lower]
2133 if any(k not in ofval or ofval[k] != v for k, v in fval.items()):
2134 return
2135 upper._overload_fields = upper._overload_fields.copy() # type: ignore
2136 del upper._overload_fields[lower]
2139@conf.commands.register
2140def split_layers(lower, # type: Type[Packet]
2141 upper, # type: Type[Packet]
2142 __fval=None, # type: Optional[Any]
2143 **fval # type: Any
2144 ):
2145 # type: (...) -> None
2146 """Split 2 layers previously bound.
2147 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501
2148 bind_layers.
2150 Please have a look at their docs:
2151 - help(split_bottom_up)
2152 - help(split_top_down)
2153 """
2154 if __fval is not None:
2155 fval.update(__fval)
2156 split_bottom_up(lower, upper, **fval)
2157 split_top_down(lower, upper, **fval)
2160@conf.commands.register
2161def explore(layer=None):
2162 # type: (Optional[str]) -> None
2163 """Function used to discover the Scapy layers and protocols.
2164 It helps to see which packets exists in contrib or layer files.
2166 params:
2167 - layer: If specified, the function will explore the layer. If not,
2168 the GUI mode will be activated, to browse the available layers
2170 examples:
2171 >>> explore() # Launches the GUI
2172 >>> explore("dns") # Explore scapy.layers.dns
2173 >>> explore("http2") # Explore scapy.contrib.http2
2174 >>> explore(scapy.layers.bluetooth4LE)
2176 Note: to search a packet by name, use ls("name") rather than explore.
2177 """
2178 if layer is None: # GUI MODE
2179 if not conf.interactive:
2180 raise Scapy_Exception("explore() GUI-mode cannot be run in "
2181 "interactive mode. Please provide a "
2182 "'layer' parameter !")
2183 # 0 - Imports
2184 try:
2185 import prompt_toolkit
2186 except ImportError:
2187 raise ImportError("prompt_toolkit is not installed ! "
2188 "You may install IPython, which contains it, via"
2189 " `pip install ipython`")
2190 if not _version_checker(prompt_toolkit, (2, 0)):
2191 raise ImportError("prompt_toolkit >= 2.0.0 is required !")
2192 # Only available with prompt_toolkit > 2.0, not released on PyPi yet
2193 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \
2194 button_dialog
2195 from prompt_toolkit.formatted_text import HTML
2196 # Check for prompt_toolkit >= 3.0.0
2197 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str]
2198 if _version_checker(prompt_toolkit, (3, 0)):
2199 call_ptk = lambda x: x.run()
2200 # 1 - Ask for layer or contrib
2201 btn_diag = button_dialog(
2202 title="Scapy v%s" % conf.version,
2203 text=HTML(
2204 '<style bg="white" fg="red">Chose the type of packets'
2205 ' you want to explore:</style>'
2206 ),
2207 buttons=[
2208 ("Layers", "layers"),
2209 ("Contribs", "contribs"),
2210 ("Cancel", "cancel")
2211 ])
2212 action = call_ptk(btn_diag)
2213 # 2 - Retrieve list of Packets
2214 if action == "layers":
2215 # Get all loaded layers
2216 lvalues = conf.layers.layers()
2217 # Restrict to layers-only (not contribs) + packet.py and asn1*.py
2218 values = [x for x in lvalues if ("layers" in x[0] or
2219 "packet" in x[0] or
2220 "asn1" in x[0])]
2221 elif action == "contribs":
2222 # Get all existing contribs
2223 from scapy.main import list_contrib
2224 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True))
2225 values = [(x['name'], x['description'])
2226 for x in cvalues]
2227 # Remove very specific modules
2228 values = [x for x in values if "can" not in x[0]]
2229 else:
2230 # Escape/Cancel was pressed
2231 return
2232 # Build tree
2233 if action == "contribs":
2234 # A tree is a dictionary. Each layer contains a keyword
2235 # _l which contains the files in the layer, and a _name
2236 # argument which is its name. The other keys are the subfolders,
2237 # which are similar dictionaries
2238 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501
2239 for name, desc in values:
2240 if "." in name: # Folder detected
2241 parts = name.split(".")
2242 subtree = tree
2243 for pa in parts[:-1]:
2244 if pa not in subtree:
2245 subtree[pa] = {}
2246 # one layer deeper
2247 subtree = subtree[pa] # type: ignore
2248 subtree["_name"] = pa # type: ignore
2249 if "_l" not in subtree:
2250 subtree["_l"] = []
2251 subtree["_l"].append((parts[-1], desc)) # type: ignore
2252 else:
2253 tree["_l"].append((name, desc)) # type: ignore
2254 elif action == "layers":
2255 tree = {"_l": values}
2256 # 3 - Ask for the layer/contrib module to explore
2257 current = tree # type: Any
2258 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501
2259 while True:
2260 # Generate tests & form
2261 folders = list(current.keys())
2262 _radio_values = [
2263 ("$" + name, str('[+] ' + name.capitalize()))
2264 for name in folders if not name.startswith("_")
2265 ] + current.get("_l", []) # type: List[str]
2266 cur_path = ""
2267 if previous:
2268 cur_path = ".".join(
2269 itertools.chain(
2270 (x["_name"] for x in previous[1:]), # type: ignore
2271 (current["_name"],)
2272 )
2273 )
2274 extra_text = (
2275 '\n<style bg="white" fg="green">> scapy.%s</style>'
2276 ) % (action + ("." + cur_path if cur_path else ""))
2277 # Show popup
2278 rd_diag = radiolist_dialog(
2279 values=_radio_values,
2280 title="Scapy v%s" % conf.version,
2281 text=HTML(
2282 (
2283 '<style bg="white" fg="red">Please select a file'
2284 'among the following, to see all layers contained in'
2285 ' it:</style>'
2286 ) + extra_text
2287 ),
2288 cancel_text="Back" if previous else "Cancel"
2289 )
2290 result = call_ptk(rd_diag)
2291 if result is None:
2292 # User pressed "Cancel/Back"
2293 if previous: # Back
2294 current = previous.pop()
2295 continue
2296 else: # Cancel
2297 return
2298 if result.startswith("$"):
2299 previous.append(current)
2300 current = current[result[1:]]
2301 else:
2302 # Enter on layer
2303 if previous: # In subfolder
2304 result = cur_path + "." + result
2305 break
2306 # 4 - (Contrib only): load contrib
2307 if action == "contribs":
2308 from scapy.main import load_contrib
2309 load_contrib(result)
2310 result = "scapy.contrib." + result
2311 else: # NON-GUI MODE
2312 # We handle layer as a short layer name, full layer name
2313 # or the module itself
2314 if isinstance(layer, types.ModuleType):
2315 layer = layer.__name__
2316 if isinstance(layer, str):
2317 if layer.startswith("scapy.layers."):
2318 result = layer
2319 else:
2320 if layer.startswith("scapy.contrib."):
2321 layer = layer.replace("scapy.contrib.", "")
2322 from scapy.main import load_contrib
2323 load_contrib(layer)
2324 result_layer, result_contrib = (("scapy.layers.%s" % layer),
2325 ("scapy.contrib.%s" % layer))
2326 if result_layer in conf.layers.ldict:
2327 result = result_layer
2328 elif result_contrib in conf.layers.ldict:
2329 result = result_contrib
2330 else:
2331 raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2332 else:
2333 warning("Wrong usage ! Check out help(explore)")
2334 return
2336 # COMMON PART
2337 # Get the list of all Packets contained in that module
2338 try:
2339 all_layers = conf.layers.ldict[result]
2340 except KeyError:
2341 raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2342 # Print
2343 print(conf.color_theme.layer_name("Packets contained in %s:" % result))
2344 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]]
2345 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers]
2346 print(pretty_list(rtlst, [("Class", "Name")], borders=True))
2349def _pkt_ls(obj, # type: Union[Packet, Type[Packet]]
2350 verbose=False, # type: bool
2351 ):
2352 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501
2353 """Internal function used to resolve `fields_desc` to display it.
2355 :param obj: a packet object or class
2356 :returns: a list containing tuples [(name, clsname, clsname_extras,
2357 default, long_attrs)]
2358 """
2359 is_pkt = isinstance(obj, Packet)
2360 if not issubtype(obj, Packet) and not is_pkt:
2361 raise ValueError
2362 fields = []
2363 for f in obj.fields_desc:
2364 cur_fld = f
2365 attrs = [] # type: List[str]
2366 long_attrs = [] # type: List[str]
2367 while isinstance(cur_fld, (Emph, ConditionalField)):
2368 if isinstance(cur_fld, ConditionalField):
2369 attrs.append(cur_fld.__class__.__name__[:4])
2370 cur_fld = cur_fld.fld
2371 name = cur_fld.name
2372 default = cur_fld.default
2373 if verbose and isinstance(cur_fld, EnumField) \
2374 and hasattr(cur_fld, "i2s") and cur_fld.i2s:
2375 if len(cur_fld.i2s or []) < 50:
2376 long_attrs.extend(
2377 "%s: %d" % (strval, numval)
2378 for numval, strval in
2379 sorted(cur_fld.i2s.items())
2380 )
2381 elif isinstance(cur_fld, MultiEnumField):
2382 if isinstance(obj, Packet):
2383 obj_pkt = obj
2384 else:
2385 obj_pkt = obj()
2386 fld_depend = cur_fld.depends_on(obj_pkt)
2387 attrs.append("Depends on %s" % fld_depend)
2388 if verbose:
2389 cur_i2s = cur_fld.i2s_multi.get(
2390 cur_fld.depends_on(obj_pkt), {}
2391 )
2392 if len(cur_i2s) < 50:
2393 long_attrs.extend(
2394 "%s: %d" % (strval, numval)
2395 for numval, strval in
2396 sorted(cur_i2s.items())
2397 )
2398 elif verbose and isinstance(cur_fld, FlagsField):
2399 names = cur_fld.names
2400 long_attrs.append(", ".join(names))
2401 elif isinstance(cur_fld, MultipleTypeField):
2402 default = cur_fld.dflt.default
2403 attrs.append(", ".join(
2404 x[0].__class__.__name__ for x in
2405 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)])
2406 ))
2408 cls = cur_fld.__class__
2409 class_name_extras = "(%s)" % (
2410 ", ".join(attrs)
2411 ) if attrs else ""
2412 if isinstance(cur_fld, BitField):
2413 class_name_extras += " (%d bit%s)" % (
2414 cur_fld.size,
2415 "s" if cur_fld.size > 1 else ""
2416 )
2417 fields.append(
2418 (name,
2419 cls,
2420 class_name_extras,
2421 repr(default),
2422 long_attrs)
2423 )
2424 return fields
2427@conf.commands.register
2428def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]]
2429 case_sensitive=False, # type: bool
2430 verbose=False # type: bool
2431 ):
2432 # type: (...) -> None
2433 """List available layers, or infos on a given layer class or name.
2435 :param obj: Packet / packet name to use
2436 :param case_sensitive: if obj is a string, is it case sensitive?
2437 :param verbose:
2438 """
2439 if obj is None or isinstance(obj, str):
2440 tip = False
2441 if obj is None:
2442 tip = True
2443 all_layers = sorted(conf.layers, key=lambda x: x.__name__)
2444 else:
2445 pattern = re.compile(
2446 obj,
2447 0 if case_sensitive else re.I
2448 )
2449 # We first order by accuracy, then length
2450 if case_sensitive:
2451 sorter = lambda x: (x.__name__.index(obj), len(x.__name__))
2452 else:
2453 obj = obj.lower()
2454 sorter = lambda x: (x.__name__.lower().index(obj),
2455 len(x.__name__))
2456 all_layers = sorted((layer for layer in conf.layers
2457 if (isinstance(layer.__name__, str) and
2458 pattern.search(layer.__name__)) or
2459 (isinstance(layer.name, str) and
2460 pattern.search(layer.name))),
2461 key=sorter)
2462 for layer in all_layers:
2463 print("%-10s : %s" % (layer.__name__, layer._name))
2464 if tip and conf.interactive:
2465 print("\nTIP: You may use explore() to navigate through all "
2466 "layers using a clear GUI")
2467 else:
2468 try:
2469 fields = _pkt_ls(
2470 obj,
2471 verbose=verbose
2472 )
2473 is_pkt = isinstance(obj, Packet)
2474 # Print
2475 for fname, cls, clsne, dflt, long_attrs in fields:
2476 clsinfo = cls.__name__ + " " + clsne
2477 print("%-10s : %-35s =" % (fname, clsinfo), end=' ')
2478 if is_pkt:
2479 print("%-15r" % (getattr(obj, fname),), end=' ')
2480 print("(%r)" % (dflt,))
2481 for attr in long_attrs:
2482 print("%-15s%s" % ("", attr))
2483 # Restart for payload if any
2484 if is_pkt:
2485 obj = cast(Packet, obj)
2486 if isinstance(obj.payload, NoPayload):
2487 return
2488 print("--")
2489 ls(obj.payload)
2490 except ValueError:
2491 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501
2494@conf.commands.register
2495def rfc(cls, ret=False, legend=True):
2496 # type: (Type[Packet], bool, bool) -> Optional[str]
2497 """
2498 Generate an RFC-like representation of a packet def.
2500 :param cls: the Packet class
2501 :param ret: return the result instead of printing (def. False)
2502 :param legend: show text under the diagram (default True)
2504 Ex::
2506 >>> rfc(Ether)
2508 """
2509 if not issubclass(cls, Packet):
2510 raise TypeError("Packet class expected")
2511 cur_len = 0
2512 cur_line = []
2513 lines = []
2514 # Get the size (width) that a field will take
2515 # when formatted, from its length in bits
2516 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int]
2517 ident = 0 # Fields UUID
2519 # Generate packet groups
2520 def _iterfields() -> Iterator[Tuple[str, int]]:
2521 for f in cls.fields_desc:
2522 # Fancy field name
2523 fname = f.name.upper().replace("_", " ")
2524 fsize = int(f.sz * 8)
2525 yield fname, fsize
2526 # Add padding optionally
2527 if isinstance(f, PadField):
2528 if isinstance(f._align, tuple):
2529 pad = - cur_len % (f._align[0] * 8)
2530 else:
2531 pad = - cur_len % (f._align * 8)
2532 if pad:
2533 yield "padding", pad
2534 for fname, flen in _iterfields():
2535 cur_len += flen
2536 ident += 1
2537 # The field might exceed the current line or
2538 # take more than one line. Copy it as required
2539 while True:
2540 over = max(0, cur_len - 32) # Exceed
2541 len1 = clsize(flen - over) # What fits
2542 cur_line.append((fname[:len1], len1, ident))
2543 if cur_len >= 32:
2544 # Current line is full. start a new line
2545 lines.append(cur_line)
2546 cur_len = flen = over
2547 fname = "" # do not repeat the field
2548 cur_line = []
2549 if not over:
2550 # there is no data left
2551 break
2552 else:
2553 # End of the field
2554 break
2555 # Add the last line if un-finished
2556 if cur_line:
2557 lines.append(cur_line)
2558 # Calculate separations between lines
2559 seps = []
2560 seps.append("+-" * 32 + "+\n")
2561 for i in range(len(lines) - 1):
2562 # Start with a full line
2563 sep = "+-" * 32 + "+\n"
2564 # Get the line above and below the current
2565 # separation
2566 above, below = lines[i], lines[i + 1]
2567 # The last field of above is shared with below
2568 if above[-1][2] == below[0][2]:
2569 # where the field in "above" starts
2570 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1
2571 # where the field in "below" ends
2572 pos_below = below[0][1]
2573 if pos_above < pos_below:
2574 # they are overlapping.
2575 # Now crop the space between those pos
2576 # and fill it with " "
2577 pos_above = pos_above + pos_above % 2
2578 sep = (
2579 sep[:1 + pos_above] +
2580 " " * (pos_below - pos_above) +
2581 sep[1 + pos_below:]
2582 )
2583 # line is complete
2584 seps.append(sep)
2585 # Graph
2586 result = ""
2587 # Bytes markers
2588 result += " " + (" " * 19).join(
2589 str(x) for x in range(4)
2590 ) + "\n"
2591 # Bits markers
2592 result += " " + " ".join(
2593 str(x % 10) for x in range(32)
2594 ) + "\n"
2595 # Add fields and their separations
2596 for line, sep in zip(lines, seps):
2597 result += sep
2598 for elt, flen, _ in line:
2599 result += "|" + elt.center(flen, " ")
2600 result += "|\n"
2601 result += "+-" * (cur_len or 32) + "+\n"
2602 # Annotate with the figure name
2603 if legend:
2604 result += "\n" + ("Fig. " + cls.__name__).center(66, " ")
2605 # return if asked for, else print
2606 if ret:
2607 return result
2608 print(result)
2609 return None
2612#############
2613# Fuzzing #
2614#############
2616_P = TypeVar('_P', bound=Packet)
2619@conf.commands.register
2620def fuzz(p, # type: _P
2621 _inplace=0, # type: int
2622 ):
2623 # type: (...) -> _P
2624 """
2625 Transform a layer into a fuzzy layer by replacing some default values
2626 by random objects.
2628 :param p: the Packet instance to fuzz
2629 :return: the fuzzed packet.
2630 """
2631 if not _inplace:
2632 p = p.copy()
2633 q = cast(Packet, p)
2634 while not isinstance(q, NoPayload):
2635 new_default_fields = {}
2636 multiple_type_fields = [] # type: List[str]
2637 for f in q.fields_desc:
2638 if isinstance(f, PacketListField):
2639 for r in getattr(q, f.name):
2640 fuzz(r, _inplace=1)
2641 elif isinstance(f, MultipleTypeField):
2642 # the type of the field will depend on others
2643 multiple_type_fields.append(f.name)
2644 elif f.default is not None:
2645 if not isinstance(f, ConditionalField) or f._evalcond(q):
2646 rnd = f.randval()
2647 if rnd is not None:
2648 new_default_fields[f.name] = rnd
2649 # Process packets with MultipleTypeFields
2650 if multiple_type_fields:
2651 # freeze the other random values
2652 new_default_fields = {
2653 key: (val._fix() if isinstance(val, VolatileValue) else val)
2654 for key, val in new_default_fields.items()
2655 }
2656 q.default_fields.update(new_default_fields)
2657 new_default_fields.clear()
2658 # add the random values of the MultipleTypeFields
2659 for name in multiple_type_fields:
2660 fld = cast(MultipleTypeField, q.get_field(name))
2661 rnd = fld._find_fld_pkt(q).randval()
2662 if rnd is not None:
2663 new_default_fields[name] = rnd
2664 q.default_fields.update(new_default_fields)
2665 q = q.payload
2666 return p