Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/packet.py: 47%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Packet class
9Provides:
10 - the default Packet classes
11 - binding mechanisms
12 - fuzz() method
13 - exploration methods: explore() / ls()
14"""
16from collections import defaultdict
18import json
19import re
20import time
21import itertools
22import copy
23import types
24import warnings
26from scapy.fields import (
27 AnyField,
28 BitField,
29 ConditionalField,
30 Emph,
31 EnumField,
32 Field,
33 FlagsField,
34 FlagValue,
35 MayEnd,
36 MultiEnumField,
37 MultipleTypeField,
38 PadField,
39 PacketListField,
40 RawVal,
41 StrField,
42)
43from scapy.config import conf, _version_checker
44from scapy.compat import raw, orb, bytes_encode
45from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \
46 _CanvasDumpExtended
47from scapy.interfaces import _GlobInterfaceType
48from scapy.volatile import RandField, VolatileValue
49from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \
50 pretty_list, EDecimal
51from scapy.error import Scapy_Exception, log_runtime, warning
52from scapy.libs.test_pyx import PYX
54# Typing imports
55from typing import (
56 Any,
57 Callable,
58 Dict,
59 Iterator,
60 List,
61 NoReturn,
62 Optional,
63 Set,
64 Tuple,
65 Type,
66 TypeVar,
67 Union,
68 Sequence,
69 cast,
70)
71from scapy.compat import Self
73try:
74 import pyx
75except ImportError:
76 pass
79_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]])
82class Packet(
83 BasePacket,
84 _CanvasDumpExtended,
85 metaclass=Packet_metaclass
86):
87 __slots__ = [
88 "time", "sent_time", "name",
89 "default_fields", "fields", "fieldtype",
90 "overload_fields", "overloaded_fields",
91 "packetfields",
92 "original", "explicit", "raw_packet_cache",
93 "raw_packet_cache_fields", "_pkt", "post_transforms",
94 "stop_dissection_after",
95 # then payload, underlayer and parent
96 "payload", "underlayer", "parent",
97 "name",
98 # used for sr()
99 "_answered",
100 # used when sniffing
101 "direction", "sniffed_on",
102 # handle snaplen Vs real length
103 "wirelen",
104 "comments",
105 "process_information"
106 ]
107 name = None
108 fields_desc = [] # type: List[AnyField]
109 deprecated_fields = {} # type: Dict[str, Tuple[str, str]]
110 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]]
111 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]]
112 show_indent = 1
113 show_summary = True
114 match_subclass = False
115 class_dont_cache = {} # type: Dict[Type[Packet], bool]
116 class_packetfields = {} # type: Dict[Type[Packet], Any]
117 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]]
118 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]]
119 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501
121 @classmethod
122 def from_hexcap(cls):
123 # type: (Type[Packet]) -> Packet
124 return cls(import_hexcap())
126 @classmethod
127 def upper_bonds(self):
128 # type: () -> None
129 for fval, upper in self.payload_guess:
130 print(
131 "%-20s %s" % (
132 upper.__name__,
133 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
134 )
135 )
137 @classmethod
138 def lower_bonds(self):
139 # type: () -> None
140 for lower, fval in self._overload_fields.items():
141 print(
142 "%-20s %s" % (
143 lower.__name__,
144 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
145 )
146 )
148 def __init__(self,
149 _pkt=b"", # type: Union[bytes, bytearray]
150 post_transform=None, # type: Any
151 _internal=0, # type: int
152 _underlayer=None, # type: Optional[Packet]
153 _parent=None, # type: Optional[Packet]
154 stop_dissection_after=None, # type: Optional[Type[Packet]]
155 **fields # type: Any
156 ):
157 # type: (...) -> None
158 self.time = time.time() # type: Union[EDecimal, float]
159 self.sent_time = None # type: Union[EDecimal, float, None]
160 self.name = (self.__class__.__name__
161 if self._name is None else
162 self._name)
163 self.default_fields = {} # type: Dict[str, Any]
164 self.overload_fields = self._overload_fields
165 self.overloaded_fields = {} # type: Dict[str, Any]
166 self.fields = {} # type: Dict[str, Any]
167 self.fieldtype = {} # type: Dict[str, AnyField]
168 self.packetfields = [] # type: List[AnyField]
169 self.payload = NoPayload() # type: Packet
170 self.init_fields(bool(_pkt))
171 self.underlayer = _underlayer
172 self.parent = _parent
173 if isinstance(_pkt, bytearray):
174 _pkt = bytes(_pkt)
175 self.original = _pkt
176 self.explicit = 0
177 self.raw_packet_cache = None # type: Optional[bytes]
178 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501
179 self.wirelen = None # type: Optional[int]
180 self.direction = None # type: Optional[int]
181 self.sniffed_on = None # type: Optional[_GlobInterfaceType]
182 self.comments = None # type: Optional[List[bytes]]
183 self.process_information = None # type: Optional[Dict[str, Any]]
184 self.stop_dissection_after = stop_dissection_after
185 if _pkt:
186 self.dissect(_pkt)
187 if not _internal:
188 self.dissection_done(self)
189 # We use this strange initialization so that the fields
190 # are initialized in their declaration order.
191 # It is required to always support MultipleTypeField
192 for field in self.fields_desc:
193 fname = field.name
194 try:
195 value = fields.pop(fname)
196 except KeyError:
197 continue
198 self.fields[fname] = value if isinstance(value, RawVal) else \
199 self.get_field(fname).any2i(self, value)
200 # The remaining fields are unknown
201 for fname in fields:
202 if fname in self.deprecated_fields:
203 # Resolve deprecated fields
204 value = fields[fname]
205 fname = self._resolve_alias(fname)
206 self.fields[fname] = value if isinstance(value, RawVal) else \
207 self.get_field(fname).any2i(self, value)
208 continue
209 raise AttributeError(fname)
210 if isinstance(post_transform, list):
211 self.post_transforms = post_transform
212 elif post_transform is None:
213 self.post_transforms = []
214 else:
215 self.post_transforms = [post_transform]
217 _PickleType = Tuple[
218 Union[EDecimal, float],
219 Optional[Union[EDecimal, float, None]],
220 Optional[int],
221 Optional[_GlobInterfaceType],
222 Optional[int],
223 Optional[bytes],
224 ]
226 @property
227 def comment(self):
228 # type: () -> Optional[bytes]
229 """Get the comment of the packet"""
230 if self.comments and len(self.comments):
231 return self.comments[0]
232 return None
234 @comment.setter
235 def comment(self, value):
236 # type: (Optional[bytes]) -> None
237 """
238 Set the comment of the packet.
239 If value is None, it will clear the comments.
240 """
241 if value is not None:
242 self.comments = [value]
243 else:
244 self.comments = None
246 def __reduce__(self):
247 # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType]
248 """Used by pickling methods"""
249 return (self.__class__, (self.build(),), (
250 self.time,
251 self.sent_time,
252 self.direction,
253 self.sniffed_on,
254 self.wirelen,
255 self.comment
256 ))
258 def __setstate__(self, state):
259 # type: (Packet._PickleType) -> Packet
260 """Rebuild state using pickable methods"""
261 self.time = state[0]
262 self.sent_time = state[1]
263 self.direction = state[2]
264 self.sniffed_on = state[3]
265 self.wirelen = state[4]
266 self.comment = state[5]
267 return self
269 def __deepcopy__(self,
270 memo, # type: Any
271 ):
272 # type: (...) -> Packet
273 """Used by copy.deepcopy"""
274 return self.copy()
276 def init_fields(self, for_dissect_only=False):
277 # type: (bool) -> None
278 """
279 Initialize each fields of the fields_desc dict
280 """
282 if self.class_dont_cache.get(self.__class__, False):
283 self.do_init_fields(self.fields_desc)
284 else:
285 self.do_init_cached_fields(for_dissect_only=for_dissect_only)
287 def do_init_fields(self,
288 flist, # type: Sequence[AnyField]
289 ):
290 # type: (...) -> None
291 """
292 Initialize each fields of the fields_desc dict
293 """
294 default_fields = {}
295 for f in flist:
296 default_fields[f.name] = copy.deepcopy(f.default)
297 self.fieldtype[f.name] = f
298 if f.holds_packets:
299 self.packetfields.append(f)
300 # We set default_fields last to avoid race issues
301 self.default_fields = default_fields
303 def do_init_cached_fields(self, for_dissect_only=False):
304 # type: (bool) -> None
305 """
306 Initialize each fields of the fields_desc dict, or use the cached
307 fields information
308 """
310 cls_name = self.__class__
312 # Build the fields information
313 if Packet.class_default_fields.get(cls_name, None) is None:
314 self.prepare_cached_fields(self.fields_desc)
316 # Use fields information from cache
317 default_fields = Packet.class_default_fields.get(cls_name, None)
318 if default_fields:
319 self.default_fields = default_fields
320 self.fieldtype = Packet.class_fieldtype[cls_name]
321 self.packetfields = Packet.class_packetfields[cls_name]
323 # Optimization: no need for references when only dissecting.
324 if for_dissect_only:
325 return
327 # Deepcopy default references
328 for fname in Packet.class_default_fields_ref[cls_name]:
329 value = self.default_fields[fname]
330 try:
331 self.fields[fname] = value.copy()
332 except AttributeError:
333 # Python 2.7 - list only
334 self.fields[fname] = value[:]
336 def prepare_cached_fields(self, flist):
337 # type: (Sequence[AnyField]) -> None
338 """
339 Prepare the cached fields of the fields_desc dict
340 """
342 cls_name = self.__class__
344 # Fields cache initialization
345 if not flist:
346 return
348 class_default_fields = dict()
349 class_default_fields_ref = list()
350 class_fieldtype = dict()
351 class_packetfields = list()
353 # Fields initialization
354 for f in flist:
355 if isinstance(f, MultipleTypeField):
356 # Abort
357 self.class_dont_cache[cls_name] = True
358 self.do_init_fields(self.fields_desc)
359 return
361 class_default_fields[f.name] = copy.deepcopy(f.default)
362 class_fieldtype[f.name] = f
363 if f.holds_packets:
364 class_packetfields.append(f)
366 # Remember references
367 if isinstance(f.default, (list, dict, set, RandField, Packet)):
368 class_default_fields_ref.append(f.name)
370 # Apply
371 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref
372 Packet.class_fieldtype[cls_name] = class_fieldtype
373 Packet.class_packetfields[cls_name] = class_packetfields
374 # Last to avoid racing issues
375 Packet.class_default_fields[cls_name] = class_default_fields
377 def dissection_done(self, pkt):
378 # type: (Packet) -> None
379 """DEV: will be called after a dissection is completed"""
380 self.post_dissection(pkt)
381 self.payload.dissection_done(pkt)
383 def post_dissection(self, pkt):
384 # type: (Packet) -> None
385 """DEV: is called after the dissection of the whole packet"""
386 pass
388 def get_field(self, fld):
389 # type: (str) -> AnyField
390 """DEV: returns the field instance from the name of the field"""
391 return self.fieldtype[fld]
393 def add_payload(self, payload):
394 # type: (Union[Packet, bytes]) -> None
395 if payload is None:
396 return
397 elif not isinstance(self.payload, NoPayload):
398 self.payload.add_payload(payload)
399 else:
400 if isinstance(payload, Packet):
401 self.payload = payload
402 payload.add_underlayer(self)
403 for t in self.aliastypes:
404 if t in payload.overload_fields:
405 self.overloaded_fields = payload.overload_fields[t]
406 break
407 elif isinstance(payload, (bytes, str, bytearray, memoryview)):
408 self.payload = conf.raw_layer(load=bytes_encode(payload))
409 else:
410 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501
412 def remove_payload(self):
413 # type: () -> None
414 self.payload.remove_underlayer(self)
415 self.payload = NoPayload()
416 self.overloaded_fields = {}
418 def add_underlayer(self, underlayer):
419 # type: (Packet) -> None
420 self.underlayer = underlayer
422 def remove_underlayer(self, other):
423 # type: (Packet) -> None
424 self.underlayer = None
426 def add_parent(self, parent):
427 # type: (Packet) -> None
428 """Set packet parent.
429 When packet is an element in PacketListField, parent field would
430 point to the list owner packet."""
431 self.parent = parent
433 def remove_parent(self, other):
434 # type: (Packet) -> None
435 """Remove packet parent.
436 When packet is an element in PacketListField, parent field would
437 point to the list owner packet."""
438 self.parent = None
440 def copy(self) -> Self:
441 """Returns a deep copy of the instance."""
442 clone = self.__class__()
443 clone.fields = self.copy_fields_dict(self.fields)
444 clone.default_fields = self.copy_fields_dict(self.default_fields)
445 clone.overloaded_fields = self.overloaded_fields.copy()
446 clone.underlayer = self.underlayer
447 clone.parent = self.parent
448 clone.explicit = self.explicit
449 clone.raw_packet_cache = self.raw_packet_cache
450 clone.raw_packet_cache_fields = self.copy_fields_dict(
451 self.raw_packet_cache_fields
452 )
453 clone.wirelen = self.wirelen
454 clone.post_transforms = self.post_transforms[:]
455 clone.payload = self.payload.copy()
456 clone.payload.add_underlayer(clone)
457 clone.time = self.time
458 clone.comments = self.comments
459 clone.direction = self.direction
460 clone.sniffed_on = self.sniffed_on
461 return clone
463 def _resolve_alias(self, attr):
464 # type: (str) -> str
465 new_attr, version = self.deprecated_fields[attr]
466 warnings.warn(
467 "%s has been deprecated in favor of %s since %s !" % (
468 attr, new_attr, version
469 ), DeprecationWarning
470 )
471 return new_attr
473 def getfieldval(self, attr):
474 # type: (str) -> Any
475 if self.deprecated_fields and attr in self.deprecated_fields:
476 attr = self._resolve_alias(attr)
477 if attr in self.fields:
478 return self.fields[attr]
479 if attr in self.overloaded_fields:
480 return self.overloaded_fields[attr]
481 if attr in self.default_fields:
482 return self.default_fields[attr]
483 return self.payload.getfieldval(attr)
485 def getfield_and_val(self, attr):
486 # type: (str) -> Tuple[AnyField, Any]
487 if self.deprecated_fields and attr in self.deprecated_fields:
488 attr = self._resolve_alias(attr)
489 if attr in self.fields:
490 return self.get_field(attr), self.fields[attr]
491 if attr in self.overloaded_fields:
492 return self.get_field(attr), self.overloaded_fields[attr]
493 if attr in self.default_fields:
494 return self.get_field(attr), self.default_fields[attr]
495 raise ValueError
497 def __getattr__(self, attr):
498 # type: (str) -> Any
499 try:
500 fld, v = self.getfield_and_val(attr)
501 except ValueError:
502 return self.payload.__getattr__(attr)
503 if fld is not None:
504 return v if isinstance(v, RawVal) else fld.i2h(self, v)
505 return v
507 def setfieldval(self, attr, val):
508 # type: (str, Any) -> None
509 if self.deprecated_fields and attr in self.deprecated_fields:
510 attr = self._resolve_alias(attr)
511 if attr in self.default_fields:
512 fld = self.get_field(attr)
513 if fld is None:
514 any2i = lambda x, y: y # type: Callable[..., Any]
515 else:
516 any2i = fld.any2i
517 self.fields[attr] = val if isinstance(val, RawVal) else \
518 any2i(self, val)
519 self.explicit = 0
520 self.raw_packet_cache = None
521 self.raw_packet_cache_fields = None
522 self.wirelen = None
523 elif attr == "payload":
524 self.remove_payload()
525 self.add_payload(val)
526 else:
527 self.payload.setfieldval(attr, val)
529 def __setattr__(self, attr, val):
530 # type: (str, Any) -> None
531 if attr in self.__all_slots__:
532 return object.__setattr__(self, attr, val)
533 try:
534 return self.setfieldval(attr, val)
535 except AttributeError:
536 pass
537 return object.__setattr__(self, attr, val)
539 def delfieldval(self, attr):
540 # type: (str) -> None
541 if attr in self.fields:
542 del self.fields[attr]
543 self.explicit = 0 # in case a default value must be explicit
544 self.raw_packet_cache = None
545 self.raw_packet_cache_fields = None
546 self.wirelen = None
547 elif attr in self.default_fields:
548 pass
549 elif attr == "payload":
550 self.remove_payload()
551 else:
552 self.payload.delfieldval(attr)
554 def __delattr__(self, attr):
555 # type: (str) -> None
556 if attr == "payload":
557 return self.remove_payload()
558 if attr in self.__all_slots__:
559 return object.__delattr__(self, attr)
560 try:
561 return self.delfieldval(attr)
562 except AttributeError:
563 pass
564 return object.__delattr__(self, attr)
566 def _superdir(self):
567 # type: () -> Set[str]
568 """
569 Return a list of slots and methods, including those from subclasses.
570 """
571 attrs = set() # type: Set[str]
572 cls = self.__class__
573 if hasattr(cls, '__all_slots__'):
574 attrs.update(cls.__all_slots__)
575 for bcls in cls.__mro__:
576 if hasattr(bcls, '__dict__'):
577 attrs.update(bcls.__dict__)
578 return attrs
580 def __dir__(self):
581 # type: () -> List[str]
582 """
583 Add fields to tab completion list.
584 """
585 return sorted(itertools.chain(self._superdir(), self.default_fields))
587 def __repr__(self):
588 # type: () -> str
589 s = ""
590 ct = conf.color_theme
591 for f in self.fields_desc:
592 if isinstance(f, ConditionalField) and not f._evalcond(self):
593 continue
594 if f.name in self.fields:
595 fval = self.fields[f.name]
596 if isinstance(fval, (list, dict, set)) and len(fval) == 0:
597 continue
598 val = f.i2repr(self, fval)
599 elif f.name in self.overloaded_fields:
600 fover = self.overloaded_fields[f.name]
601 if isinstance(fover, (list, dict, set)) and len(fover) == 0:
602 continue
603 val = f.i2repr(self, fover)
604 else:
605 continue
606 if isinstance(f, Emph) or f in conf.emph:
607 ncol = ct.emph_field_name
608 vcol = ct.emph_field_value
609 else:
610 ncol = ct.field_name
611 vcol = ct.field_value
613 s += " %s%s%s" % (ncol(f.name),
614 ct.punct("="),
615 vcol(val))
616 return "%s%s %s %s%s%s" % (ct.punct("<"),
617 ct.layer_name(self.__class__.__name__),
618 s,
619 ct.punct("|"),
620 repr(self.payload),
621 ct.punct(">"))
623 def __str__(self):
624 # type: () -> str
625 return self.summary()
627 def __bytes__(self):
628 # type: () -> bytes
629 return self.build()
631 def __div__(self, other):
632 # type: (Any) -> Self
633 if isinstance(other, Packet):
634 cloneA = self.copy()
635 cloneB = other.copy()
636 cloneA.add_payload(cloneB)
637 return cloneA
638 elif isinstance(other, (bytes, str, bytearray, memoryview)):
639 return self / conf.raw_layer(load=bytes_encode(other))
640 else:
641 return other.__rdiv__(self) # type: ignore
642 __truediv__ = __div__
644 def __rdiv__(self, other):
645 # type: (Any) -> Packet
646 if isinstance(other, (bytes, str, bytearray, memoryview)):
647 return conf.raw_layer(load=bytes_encode(other)) / self
648 else:
649 raise TypeError
650 __rtruediv__ = __rdiv__
652 def __mul__(self, other):
653 # type: (Any) -> List[Packet]
654 if isinstance(other, int):
655 return [self] * other
656 else:
657 raise TypeError
659 def __rmul__(self, other):
660 # type: (Any) -> List[Packet]
661 return self.__mul__(other)
663 def __nonzero__(self):
664 # type: () -> bool
665 return True
666 __bool__ = __nonzero__
668 def __len__(self):
669 # type: () -> int
670 return len(self.__bytes__())
672 def copy_field_value(self, fieldname, value):
673 # type: (str, Any) -> Any
674 return self.get_field(fieldname).do_copy(value)
676 def copy_fields_dict(self, fields):
677 # type: (_T) -> _T
678 if fields is None:
679 return None
680 return {fname: self.copy_field_value(fname, fval)
681 for fname, fval in fields.items()}
683 def _raw_packet_cache_field_value(self, fld, val, copy=False):
684 # type: (AnyField, Any, bool) -> Optional[Any]
685 """Get a value representative of a mutable field to detect changes"""
686 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any]
687 if fld.holds_packets:
688 # avoid copying whole packets (perf: #GH3894)
689 if fld.islist:
690 return [
691 (_cpy(x.fields), x.payload.raw_packet_cache) for x in val
692 ]
693 else:
694 return (_cpy(val.fields), val.payload.raw_packet_cache)
695 elif fld.islist or fld.ismutable:
696 return _cpy(val)
697 return None
699 def clear_cache(self):
700 # type: () -> None
701 """Clear the raw packet cache for the field and all its subfields"""
702 self.raw_packet_cache = None
703 for fname, fval in self.fields.items():
704 fld = self.get_field(fname)
705 if fld.holds_packets:
706 if isinstance(fval, Packet):
707 fval.clear_cache()
708 elif isinstance(fval, list):
709 for fsubval in fval:
710 fsubval.clear_cache()
711 self.payload.clear_cache()
713 def self_build(self):
714 # type: () -> bytes
715 """
716 Create the default layer regarding fields_desc dict
717 """
718 if self.raw_packet_cache is not None and \
719 self.raw_packet_cache_fields is not None:
720 for fname, fval in self.raw_packet_cache_fields.items():
721 fld, val = self.getfield_and_val(fname)
722 if self._raw_packet_cache_field_value(fld, val) != fval:
723 self.raw_packet_cache = None
724 self.raw_packet_cache_fields = None
725 self.wirelen = None
726 break
727 if self.raw_packet_cache is not None:
728 return self.raw_packet_cache
729 p = b""
730 for f in self.fields_desc:
731 val = self.getfieldval(f.name)
732 if isinstance(val, RawVal):
733 p += bytes(val)
734 else:
735 try:
736 p = f.addfield(self, p, val)
737 except Exception as ex:
738 try:
739 ex.args = (
740 "While building field '%s': " % f.name +
741 ex.args[0],
742 ) + ex.args[1:]
743 except (AttributeError, IndexError):
744 pass
745 raise ex
746 return p
748 def do_build_payload(self):
749 # type: () -> bytes
750 """
751 Create the default version of the payload layer
753 :return: a string of payload layer
754 """
755 return self.payload.do_build()
757 def do_build(self):
758 # type: () -> bytes
759 """
760 Create the default version of the layer
762 :return: a string of the packet with the payload
763 """
764 if not self.explicit:
765 self = next(iter(self))
766 pkt = self.self_build()
767 for t in self.post_transforms:
768 pkt = t(pkt)
769 pay = self.do_build_payload()
770 if self.raw_packet_cache is None:
771 return self.post_build(pkt, pay)
772 else:
773 return pkt + pay
775 def build_padding(self):
776 # type: () -> bytes
777 return self.payload.build_padding()
779 def build(self):
780 # type: () -> bytes
781 """
782 Create the current layer
784 :return: string of the packet with the payload
785 """
786 p = self.do_build()
787 p += self.build_padding()
788 p = self.build_done(p)
789 return p
791 def post_build(self, pkt, pay):
792 # type: (bytes, bytes) -> bytes
793 """
794 DEV: called right after the current layer is build.
796 :param str pkt: the current packet (build by self_build function)
797 :param str pay: the packet payload (build by do_build_payload function)
798 :return: a string of the packet with the payload
799 """
800 return pkt + pay
802 def build_done(self, p):
803 # type: (bytes) -> bytes
804 return self.payload.build_done(p)
806 def do_build_ps(self):
807 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501
808 p = b""
809 pl = []
810 q = b""
811 for f in self.fields_desc:
812 if isinstance(f, ConditionalField) and not f._evalcond(self):
813 continue
814 p = f.addfield(self, p, self.getfieldval(f.name))
815 if isinstance(p, bytes):
816 r = p[len(q):]
817 q = p
818 else:
819 r = b""
820 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r))
822 pkt, lst = self.payload.build_ps(internal=1)
823 p += pkt
824 lst.append((self, pl))
826 return p, lst
828 def build_ps(self, internal=0):
829 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501
830 p, lst = self.do_build_ps()
831# if not internal:
832# pkt = self
833# while pkt.haslayer(conf.padding_layer):
834# pkt = pkt.getlayer(conf.padding_layer)
835# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) )
836# p += pkt.load
837# pkt = pkt.payload
838 return p, lst
840 def canvas_dump(self, layer_shift=0, rebuild=1):
841 # type: (int, int) -> pyx.canvas.canvas
842 if PYX == 0:
843 raise ImportError("PyX and its dependencies must be installed")
844 canvas = pyx.canvas.canvas()
845 if rebuild:
846 _, t = self.__class__(raw(self)).build_ps()
847 else:
848 _, t = self.build_ps()
849 YTXTI = len(t)
850 for _, l in t:
851 YTXTI += len(l)
852 YTXT = float(YTXTI)
853 YDUMP = YTXT
855 XSTART = 1
856 XDSTART = 10
857 y = 0.0
858 yd = 0.0
859 XMUL = 0.55
860 YMUL = 0.4
862 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb)
863 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb)
864# backcolor=makecol(0.376, 0.729, 0.525, 1.0)
866 def hexstr(x):
867 # type: (bytes) -> str
868 return " ".join("%02x" % orb(c) for c in x)
870 def make_dump_txt(x, y, txt):
871 # type: (int, float, bytes) -> pyx.text.text
872 return pyx.text.text(
873 XDSTART + x * XMUL,
874 (YDUMP - y) * YMUL,
875 r"\tt{%s}" % hexstr(txt),
876 [pyx.text.size.Large]
877 )
879 def make_box(o):
880 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox
881 return pyx.box.rect(
882 o.left(), o.bottom(), o.width(), o.height(),
883 relcenter=(0.5, 0.5)
884 )
886 def make_frame(lst):
887 # type: (List[Any]) -> pyx.path.path
888 if len(lst) == 1:
889 b = lst[0].bbox()
890 b.enlarge(pyx.unit.u_pt)
891 return b.path()
892 else:
893 fb = lst[0].bbox()
894 fb.enlarge(pyx.unit.u_pt)
895 lb = lst[-1].bbox()
896 lb.enlarge(pyx.unit.u_pt)
897 if len(lst) == 2 and fb.left() > lb.right():
898 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()),
899 pyx.path.lineto(fb.left(), fb.top()),
900 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501
901 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501
902 pyx.path.moveto(lb.left(), lb.top()),
903 pyx.path.lineto(lb.right(), lb.top()),
904 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501
905 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501
906 else:
907 # XXX
908 gb = lst[1].bbox()
909 if gb != lb:
910 gb.enlarge(pyx.unit.u_pt)
911 kb = lst[-2].bbox()
912 if kb != gb and kb != lb:
913 kb.enlarge(pyx.unit.u_pt)
914 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()),
915 pyx.path.lineto(fb.right(), fb.top()),
916 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501
917 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501
918 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501
919 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501
920 pyx.path.lineto(lb.left(), gb.top()),
921 pyx.path.lineto(fb.left(), gb.top()),
922 pyx.path.closepath(),)
924 def make_dump(s, # type: bytes
925 shift=0, # type: int
926 y=0., # type: float
927 col=None, # type: pyx.color.color
928 bkcol=None, # type: pyx.color.color
929 large=16 # type: int
930 ):
931 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501
932 c = pyx.canvas.canvas()
933 tlist = []
934 while s:
935 dmp, s = s[:large - shift], s[large - shift:]
936 txt = make_dump_txt(shift, y, dmp)
937 tlist.append(txt)
938 shift += len(dmp)
939 if shift >= 16:
940 shift = 0
941 y += 1
942 if col is None:
943 col = pyx.color.rgb.red
944 if bkcol is None:
945 bkcol = pyx.color.rgb.white
946 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501
947 for txt in tlist:
948 c.insert(txt)
949 return c, tlist[-1].bbox(), shift, y
951 last_shift, last_y = 0, 0.0
952 while t:
953 bkcol = next(backcolor)
954 proto, fields = t.pop()
955 y += 0.5
956 pt = pyx.text.text(
957 XSTART,
958 (YTXT - y) * YMUL,
959 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(
960 str(proto.name)
961 ),
962 [pyx.text.size.Large]
963 )
964 y += 1
965 ptbb = pt.bbox()
966 ptbb.enlarge(pyx.unit.u_pt * 2)
967 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501
968 canvas.insert(pt)
969 for field, fval, fdump in fields:
970 col = next(forecolor)
971 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501
972 if isinstance(field, BitField):
973 fsize = '%sb' % field.size
974 else:
975 fsize = '%sB' % len(fdump)
976 if (hasattr(field, 'field') and
977 'LE' in field.field.__class__.__name__[:3] or
978 'LE' in field.__class__.__name__[:3]):
979 fsize = r'$\scriptstyle\langle$' + fsize
980 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501
981 if isinstance(fval, str):
982 if len(fval) > 18:
983 fval = fval[:18] + "[...]"
984 else:
985 fval = ""
986 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501
987 y += 1.0
988 if fdump:
989 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501
991 dtb = target
992 vtb = vt.bbox()
993 bxvt = make_box(vtb)
994 bxdt = make_box(dtb)
995 dtb.enlarge(pyx.unit.u_pt)
996 try:
997 if yd < 0:
998 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501
999 else:
1000 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501
1001 except Exception:
1002 pass
1003 else:
1004 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501
1006 canvas.insert(dt)
1008 canvas.insert(ft)
1009 canvas.insert(st)
1010 canvas.insert(vt)
1011 last_y += layer_shift
1013 return canvas
1015 def extract_padding(self, s):
1016 # type: (bytes) -> Tuple[bytes, Optional[bytes]]
1017 """
1018 DEV: to be overloaded to extract current layer's padding.
1020 :param str s: the current layer
1021 :return: a couple of strings (actual layer, padding)
1022 """
1023 return s, None
1025 def post_dissect(self, s):
1026 # type: (bytes) -> bytes
1027 """DEV: is called right after the current layer has been dissected"""
1028 return s
1030 def pre_dissect(self, s):
1031 # type: (bytes) -> bytes
1032 """DEV: is called right before the current layer is dissected"""
1033 return s
1035 def do_dissect(self, s):
1036 # type: (bytes) -> bytes
1037 _raw = s
1038 self.raw_packet_cache_fields = {}
1039 for f in self.fields_desc:
1040 s, fval = f.getfield(self, s)
1041 # Skip unused ConditionalField
1042 if isinstance(f, ConditionalField) and fval is None:
1043 continue
1044 # We need to track fields with mutable values to discard
1045 # .raw_packet_cache when needed.
1046 if (f.islist or f.holds_packets or f.ismutable) and fval is not None:
1047 self.raw_packet_cache_fields[f.name] = \
1048 self._raw_packet_cache_field_value(f, fval, copy=True)
1049 self.fields[f.name] = fval
1050 # Nothing left to dissect
1051 if not s and (isinstance(f, MayEnd) or
1052 (fval is not None and isinstance(f, ConditionalField) and
1053 isinstance(f.fld, MayEnd))):
1054 break
1055 self.raw_packet_cache = _raw[:-len(s)] if s else _raw
1056 self.explicit = 1
1057 return s
1059 def do_dissect_payload(self, s):
1060 # type: (bytes) -> None
1061 """
1062 Perform the dissection of the layer's payload
1064 :param str s: the raw layer
1065 """
1066 if s:
1067 if (
1068 self.stop_dissection_after and
1069 isinstance(self, self.stop_dissection_after)
1070 ):
1071 # stop dissection here
1072 p = conf.raw_layer(s, _internal=1, _underlayer=self)
1073 self.add_payload(p)
1074 return
1075 cls = self.guess_payload_class(s)
1076 try:
1077 p = cls(
1078 s,
1079 stop_dissection_after=self.stop_dissection_after,
1080 _internal=1,
1081 _underlayer=self,
1082 )
1083 except KeyboardInterrupt:
1084 raise
1085 except Exception:
1086 if conf.debug_dissector:
1087 if issubtype(cls, Packet):
1088 log_runtime.error("%s dissector failed", cls.__name__)
1089 else:
1090 log_runtime.error("%s.guess_payload_class() returned "
1091 "[%s]",
1092 self.__class__.__name__, repr(cls))
1093 if cls is not None:
1094 raise
1095 p = conf.raw_layer(s, _internal=1, _underlayer=self)
1096 self.add_payload(p)
1098 def dissect(self, s):
1099 # type: (bytes) -> None
1100 s = self.pre_dissect(s)
1102 s = self.do_dissect(s)
1104 s = self.post_dissect(s)
1106 payl, pad = self.extract_padding(s)
1107 self.do_dissect_payload(payl)
1108 if pad and conf.padding:
1109 self.add_payload(conf.padding_layer(pad))
1111 def guess_payload_class(self, payload):
1112 # type: (bytes) -> Type[Packet]
1113 """
1114 DEV: Guesses the next payload class from layer bonds.
1115 Can be overloaded to use a different mechanism.
1117 :param str payload: the layer's payload
1118 :return: the payload class
1119 """
1120 for t in self.aliastypes:
1121 for fval, cls in t.payload_guess:
1122 try:
1123 if all(v == self.getfieldval(k)
1124 for k, v in fval.items()):
1125 return cls # type: ignore
1126 except AttributeError:
1127 pass
1128 return self.default_payload_class(payload)
1130 def default_payload_class(self, payload):
1131 # type: (bytes) -> Type[Packet]
1132 """
1133 DEV: Returns the default payload class if nothing has been found by the
1134 guess_payload_class() method.
1136 :param str payload: the layer's payload
1137 :return: the default payload class define inside the configuration file
1138 """
1139 return conf.raw_layer
1141 def hide_defaults(self):
1142 # type: () -> None
1143 """Removes fields' values that are the same as default values."""
1144 # use list(): self.fields is modified in the loop
1145 for k, v in list(self.fields.items()):
1146 v = self.fields[k]
1147 if k in self.default_fields:
1148 if self.default_fields[k] == v:
1149 del self.fields[k]
1150 self.payload.hide_defaults()
1152 def clone_with(self, payload=None, **kargs):
1153 # type: (Optional[Any], **Any) -> Any
1154 pkt = self.__class__()
1155 pkt.explicit = 1
1156 pkt.fields = kargs
1157 pkt.default_fields = self.copy_fields_dict(self.default_fields)
1158 pkt.overloaded_fields = self.overloaded_fields.copy()
1159 pkt.time = self.time
1160 pkt.underlayer = self.underlayer
1161 pkt.parent = self.parent
1162 pkt.post_transforms = self.post_transforms
1163 pkt.raw_packet_cache = self.raw_packet_cache
1164 pkt.raw_packet_cache_fields = self.copy_fields_dict(
1165 self.raw_packet_cache_fields
1166 )
1167 pkt.wirelen = self.wirelen
1168 pkt.comments = self.comments
1169 pkt.sniffed_on = self.sniffed_on
1170 pkt.direction = self.direction
1171 if payload is not None:
1172 pkt.add_payload(payload)
1173 return pkt
1175 def __iter__(self):
1176 # type: () -> Iterator[Packet]
1177 """Iterates through all sub-packets generated by this Packet."""
1178 def loop(todo, done, self=self):
1179 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet]
1180 if todo:
1181 eltname = todo.pop()
1182 elt = self.getfieldval(eltname)
1183 if not isinstance(elt, Gen):
1184 if self.get_field(eltname).islist:
1185 elt = SetGen([elt])
1186 else:
1187 elt = SetGen(elt)
1188 for e in elt:
1189 done[eltname] = e
1190 for x in loop(todo[:], done):
1191 yield x
1192 else:
1193 if isinstance(self.payload, NoPayload):
1194 payloads = SetGen([None]) # type: SetGen[Packet]
1195 else:
1196 payloads = self.payload
1197 for payl in payloads:
1198 # Let's make sure subpackets are consistent
1199 done2 = done.copy()
1200 for k in done2:
1201 if isinstance(done2[k], VolatileValue):
1202 done2[k] = done2[k]._fix()
1203 pkt = self.clone_with(payload=payl, **done2)
1204 yield pkt
1206 if self.explicit or self.raw_packet_cache is not None:
1207 todo = []
1208 done = self.fields
1209 else:
1210 todo = [k for (k, v) in itertools.chain(self.default_fields.items(),
1211 self.overloaded_fields.items())
1212 if isinstance(v, VolatileValue)] + list(self.fields)
1213 done = {}
1214 return loop(todo, done)
1216 def iterpayloads(self):
1217 # type: () -> Iterator[Packet]
1218 """Used to iter through the payloads of a Packet.
1219 Useful for DNS or 802.11 for instance.
1220 """
1221 yield self
1222 current = self
1223 while current.payload:
1224 current = current.payload
1225 yield current
1227 def __gt__(self, other):
1228 # type: (Packet) -> int
1229 """True if other is an answer from self (self ==> other)."""
1230 if isinstance(other, Packet):
1231 return other < self
1232 elif isinstance(other, bytes):
1233 return 1
1234 else:
1235 raise TypeError((self, other))
1237 def __lt__(self, other):
1238 # type: (Packet) -> int
1239 """True if self is an answer from other (other ==> self)."""
1240 if isinstance(other, Packet):
1241 return self.answers(other)
1242 elif isinstance(other, bytes):
1243 return 1
1244 else:
1245 raise TypeError((self, other))
1247 def __eq__(self, other):
1248 # type: (Any) -> bool
1249 if not isinstance(other, self.__class__):
1250 return False
1251 for f in self.fields_desc:
1252 if f not in other.fields_desc:
1253 return False
1254 if self.getfieldval(f.name) != other.getfieldval(f.name):
1255 return False
1256 return self.payload == other.payload
1258 def __ne__(self, other):
1259 # type: (Any) -> bool
1260 return not self.__eq__(other)
1262 # Note: setting __hash__ to None is the standard way
1263 # of making an object un-hashable. mypy doesn't know that
1264 __hash__ = None # type: ignore
1266 def hashret(self):
1267 # type: () -> bytes
1268 """DEV: returns a string that has the same value for a request
1269 and its answer."""
1270 return self.payload.hashret()
1272 def answers(self, other):
1273 # type: (Packet) -> int
1274 """DEV: true if self is an answer from other"""
1275 if other.__class__ == self.__class__:
1276 return self.payload.answers(other.payload)
1277 return 0
1279 def layers(self):
1280 # type: () -> List[Type[Packet]]
1281 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501
1282 layers = []
1283 lyr = self # type: Optional[Packet]
1284 while lyr:
1285 layers.append(lyr.__class__)
1286 lyr = lyr.payload.getlayer(0, _subclass=True)
1287 return layers
1289 def haslayer(self, cls, _subclass=None):
1290 # type: (Union[Type[Packet], str], Optional[bool]) -> int
1291 """
1292 true if self has a layer that is an instance of cls.
1293 Superseded by "cls in self" syntax.
1294 """
1295 if _subclass is None:
1296 _subclass = self.match_subclass or None
1297 if _subclass:
1298 match = issubtype
1299 else:
1300 match = lambda x, t: bool(x == t)
1301 if cls is None or match(self.__class__, cls) \
1302 or cls in [self.__class__.__name__, self._name]:
1303 return True
1304 for f in self.packetfields:
1305 fvalue_gen = self.getfieldval(f.name)
1306 if fvalue_gen is None:
1307 continue
1308 if not f.islist:
1309 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1310 for fvalue in fvalue_gen:
1311 if isinstance(fvalue, Packet):
1312 ret = fvalue.haslayer(cls, _subclass=_subclass)
1313 if ret:
1314 return ret
1315 return self.payload.haslayer(cls, _subclass=_subclass)
1317 def getlayer(self,
1318 cls, # type: Union[int, Type[Packet], str]
1319 nb=1, # type: int
1320 _track=None, # type: Optional[List[int]]
1321 _subclass=None, # type: Optional[bool]
1322 **flt # type: Any
1323 ):
1324 # type: (...) -> Optional[Packet]
1325 """Return the nb^th layer that is an instance of cls, matching flt
1326values.
1327 """
1328 if _subclass is None:
1329 _subclass = self.match_subclass or None
1330 if _subclass:
1331 match = issubtype
1332 else:
1333 match = lambda x, t: bool(x == t)
1334 # Note:
1335 # cls can be int, packet, str
1336 # string_class_name can be packet, str (packet or packet+field)
1337 # class_name can be packet, str (packet only)
1338 if isinstance(cls, int):
1339 nb = cls + 1
1340 string_class_name = "" # type: Union[Type[Packet], str]
1341 else:
1342 string_class_name = cls
1343 class_name = "" # type: Union[Type[Packet], str]
1344 fld = None # type: Optional[str]
1345 if isinstance(string_class_name, str) and "." in string_class_name:
1346 class_name, fld = string_class_name.split(".", 1)
1347 else:
1348 class_name, fld = string_class_name, None
1349 if not class_name or match(self.__class__, class_name) \
1350 or class_name in [self.__class__.__name__, self._name]:
1351 if all(self.getfieldval(fldname) == fldvalue
1352 for fldname, fldvalue in flt.items()):
1353 if nb == 1:
1354 if fld is None:
1355 return self
1356 else:
1357 return self.getfieldval(fld) # type: ignore
1358 else:
1359 nb -= 1
1360 for f in self.packetfields:
1361 fvalue_gen = self.getfieldval(f.name)
1362 if fvalue_gen is None:
1363 continue
1364 if not f.islist:
1365 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1366 for fvalue in fvalue_gen:
1367 if isinstance(fvalue, Packet):
1368 track = [] # type: List[int]
1369 ret = fvalue.getlayer(class_name, nb=nb, _track=track,
1370 _subclass=_subclass, **flt)
1371 if ret is not None:
1372 return ret
1373 nb = track[0]
1374 return self.payload.getlayer(class_name, nb=nb, _track=_track,
1375 _subclass=_subclass, **flt)
1377 def firstlayer(self):
1378 # type: () -> Packet
1379 q = self
1380 while q.underlayer is not None:
1381 q = q.underlayer
1382 return q
1384 def __getitem__(self, cls):
1385 # type: (Union[Type[Packet], str]) -> Any
1386 if isinstance(cls, slice):
1387 lname = cls.start
1388 if cls.stop:
1389 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {}))
1390 else:
1391 ret = self.getlayer(cls.start, **(cls.step or {}))
1392 else:
1393 lname = cls
1394 ret = self.getlayer(cls)
1395 if ret is None:
1396 if isinstance(lname, type):
1397 name = lname.__name__
1398 elif not isinstance(lname, bytes):
1399 name = repr(lname)
1400 else:
1401 name = cast(str, lname)
1402 raise IndexError("Layer [%s] not found" % name)
1403 return ret
1405 def __delitem__(self, cls):
1406 # type: (Type[Packet]) -> None
1407 del self[cls].underlayer.payload
1409 def __setitem__(self, cls, val):
1410 # type: (Type[Packet], Packet) -> None
1411 self[cls].underlayer.payload = val
1413 def __contains__(self, cls):
1414 # type: (Union[Type[Packet], str]) -> int
1415 """
1416 "cls in self" returns true if self has a layer which is an
1417 instance of cls.
1418 """
1419 return self.haslayer(cls)
1421 def route(self):
1422 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
1423 return self.payload.route()
1425 def fragment(self, *args, **kargs):
1426 # type: (*Any, **Any) -> List[Packet]
1427 return self.payload.fragment(*args, **kargs)
1429 def display(self, *args, **kargs): # Deprecated. Use show()
1430 # type: (*Any, **Any) -> None
1431 """Deprecated. Use show() method."""
1432 self.show(*args, **kargs)
1434 def _show_or_dump(self,
1435 dump=False, # type: bool
1436 indent=3, # type: int
1437 lvl="", # type: str
1438 label_lvl="", # type: str
1439 first_call=True # type: bool
1440 ):
1441 # type: (...) -> Optional[str]
1442 """
1443 Internal method that shows or dumps a hierarchical view of a packet.
1444 Called by show.
1446 :param dump: determine if it prints or returns the string value
1447 :param int indent: the size of indentation for each layer
1448 :param str lvl: additional information about the layer lvl
1449 :param str label_lvl: additional information about the layer fields
1450 :param first_call: determine if the current function is the first
1451 :return: return a hierarchical view if dump, else print it
1452 """
1454 if dump:
1455 from scapy.themes import ColorTheme, AnsiColorTheme
1456 ct: ColorTheme = AnsiColorTheme() # No color for dump output
1457 else:
1458 ct = conf.color_theme
1459 s = "%s%s %s %s\n" % (label_lvl,
1460 ct.punct("###["),
1461 ct.layer_name(self.name),
1462 ct.punct("]###"))
1463 fields = self.fields_desc.copy()
1464 while fields:
1465 f = fields.pop(0)
1466 if isinstance(f, ConditionalField) and not f._evalcond(self):
1467 continue
1468 if hasattr(f, "fields"): # Field has subfields
1469 s += "%s %s =\n" % (
1470 label_lvl + lvl,
1471 ct.depreciate_field_name(f.name),
1472 )
1473 lvl += " " * indent * self.show_indent
1474 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)):
1475 fields.insert(i, fld)
1476 continue
1477 if isinstance(f, Emph) or f in conf.emph:
1478 ncol = ct.emph_field_name
1479 vcol = ct.emph_field_value
1480 else:
1481 ncol = ct.field_name
1482 vcol = ct.field_value
1483 pad = max(0, 10 - len(f.name)) * " "
1484 fvalue = self.getfieldval(f.name)
1485 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501
1486 s += "%s %s%s%s%s\n" % (label_lvl + lvl,
1487 ct.punct("\\"),
1488 ncol(f.name),
1489 pad,
1490 ct.punct("\\"))
1491 fvalue_gen = SetGen(
1492 fvalue,
1493 _iterpacket=0
1494 ) # type: SetGen[Packet]
1495 for fvalue in fvalue_gen:
1496 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501
1497 else:
1498 begn = "%s %s%s%s " % (label_lvl + lvl,
1499 ncol(f.name),
1500 pad,
1501 ct.punct("="),)
1502 reprval = f.i2repr(self, fvalue)
1503 if isinstance(reprval, str):
1504 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501
1505 len(lvl) +
1506 len(f.name) +
1507 4))
1508 s += "%s%s\n" % (begn, vcol(reprval))
1509 if self.payload:
1510 s += self.payload._show_or_dump( # type: ignore
1511 dump=dump,
1512 indent=indent,
1513 lvl=lvl + (" " * indent * self.show_indent),
1514 label_lvl=label_lvl,
1515 first_call=False
1516 )
1518 if first_call and not dump:
1519 print(s)
1520 return None
1521 else:
1522 return s
1524 def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1525 # type: (bool, int, str, str) -> Optional[Any]
1526 """
1527 Prints or returns (when "dump" is true) a hierarchical view of the
1528 packet.
1530 :param dump: determine if it prints or returns the string value
1531 :param int indent: the size of indentation for each layer
1532 :param str lvl: additional information about the layer lvl
1533 :param str label_lvl: additional information about the layer fields
1534 :return: return a hierarchical view if dump, else print it
1535 """
1536 return self._show_or_dump(dump, indent, lvl, label_lvl)
1538 def show2(self, dump=False, indent=3, lvl="", label_lvl=""):
1539 # type: (bool, int, str, str) -> Optional[Any]
1540 """
1541 Prints or returns (when "dump" is true) a hierarchical view of an
1542 assembled version of the packet, so that automatic fields are
1543 calculated (checksums, etc.)
1545 :param dump: determine if it prints or returns the string value
1546 :param int indent: the size of indentation for each layer
1547 :param str lvl: additional information about the layer lvl
1548 :param str label_lvl: additional information about the layer fields
1549 :return: return a hierarchical view if dump, else print it
1550 """
1551 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl)
1553 def sprintf(self, fmt, relax=1):
1554 # type: (str, int) -> str
1555 """
1556 sprintf(format, [relax=1]) -> str
1558 Where format is a string that can include directives. A directive
1559 begins and ends by % and has the following format:
1560 ``%[fmt[r],][cls[:nb].]field%``
1562 :param fmt: is a classic printf directive, "r" can be appended for raw
1563 substitution:
1564 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer
1565 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer).
1566 Special case : "%.time%" is the creation time.
1567 Ex::
1569 p.sprintf(
1570 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% "
1571 "%03xr,IP.proto% %r,TCP.flags%"
1572 )
1574 Moreover, the format string can include conditional statements. A
1575 conditional statement looks like : {layer:string} where layer is a
1576 layer name, and string is the string to insert in place of the
1577 condition if it is true, i.e. if layer is present. If layer is
1578 preceded by a "!", the result is inverted. Conditions can be
1579 imbricated. A valid statement can be::
1581 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
1582 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}")
1584 A side effect is that, to obtain "{" and "}" characters, you must use
1585 "%(" and "%)".
1586 """
1588 escape = {"%": "%",
1589 "(": "{",
1590 ")": "}"}
1592 # Evaluate conditions
1593 while "{" in fmt:
1594 i = fmt.rindex("{")
1595 j = fmt[i + 1:].index("}")
1596 cond = fmt[i + 1:i + j + 1]
1597 k = cond.find(":")
1598 if k < 0:
1599 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501
1600 cond, format_ = cond[:k], cond[k + 1:]
1601 res = False
1602 if cond[0] == "!":
1603 res = True
1604 cond = cond[1:]
1605 if self.haslayer(cond):
1606 res = not res
1607 if not res:
1608 format_ = ""
1609 fmt = fmt[:i] + format_ + fmt[i + j + 2:]
1611 # Evaluate directives
1612 s = ""
1613 while "%" in fmt:
1614 i = fmt.index("%")
1615 s += fmt[:i]
1616 fmt = fmt[i + 1:]
1617 if fmt and fmt[0] in escape:
1618 s += escape[fmt[0]]
1619 fmt = fmt[1:]
1620 continue
1621 try:
1622 i = fmt.index("%")
1623 sfclsfld = fmt[:i]
1624 fclsfld = sfclsfld.split(",")
1625 if len(fclsfld) == 1:
1626 f = "s"
1627 clsfld = fclsfld[0]
1628 elif len(fclsfld) == 2:
1629 f, clsfld = fclsfld
1630 else:
1631 raise Scapy_Exception
1632 if "." in clsfld:
1633 cls, fld = clsfld.split(".")
1634 else:
1635 cls = self.__class__.__name__
1636 fld = clsfld
1637 num = 1
1638 if ":" in cls:
1639 cls, snum = cls.split(":")
1640 num = int(snum)
1641 fmt = fmt[i + 1:]
1642 except Exception:
1643 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501
1644 else:
1645 if fld == "time":
1646 val = time.strftime(
1647 "%H:%M:%S.%%06i",
1648 time.localtime(float(self.time))
1649 ) % int((self.time - int(self.time)) * 1000000)
1650 elif cls == self.__class__.__name__ and hasattr(self, fld):
1651 if num > 1:
1652 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501
1653 f = "s"
1654 else:
1655 try:
1656 val = self.getfieldval(fld)
1657 except AttributeError:
1658 val = getattr(self, fld)
1659 if f[-1] == "r": # Raw field value
1660 f = f[:-1]
1661 if not f:
1662 f = "s"
1663 else:
1664 if fld in self.fieldtype:
1665 val = self.fieldtype[fld].i2repr(self, val)
1666 else:
1667 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax)
1668 f = "s"
1669 s += ("%" + f) % val
1671 s += fmt
1672 return s
1674 def mysummary(self):
1675 # type: () -> str
1676 """DEV: can be overloaded to return a string that summarizes the layer.
1677 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501
1678 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501
1679 mysummary() must be called if they are present."""
1680 return ""
1682 def _do_summary(self):
1683 # type: () -> Tuple[int, str, List[Any]]
1684 found, s, needed = self.payload._do_summary()
1685 ret = ""
1686 if not found or self.__class__ in needed:
1687 ret = self.mysummary()
1688 if isinstance(ret, tuple):
1689 ret, n = ret
1690 needed += n
1691 if ret or needed:
1692 found = 1
1693 if not ret:
1694 ret = self.__class__.__name__ if self.show_summary else ""
1695 if self.__class__ in conf.emph:
1696 impf = []
1697 for f in self.fields_desc:
1698 if f in conf.emph:
1699 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501
1700 ret = "%s [%s]" % (ret, " ".join(impf))
1701 if ret and s:
1702 ret = "%s / %s" % (ret, s)
1703 else:
1704 ret = "%s%s" % (ret, s)
1705 return found, ret, needed
1707 def summary(self, intern=0):
1708 # type: (int) -> str
1709 """Prints a one line summary of a packet."""
1710 return self._do_summary()[1]
1712 def lastlayer(self, layer=None):
1713 # type: (Optional[Packet]) -> Packet
1714 """Returns the uppest layer of the packet"""
1715 return self.payload.lastlayer(self)
1717 def decode_payload_as(self, cls):
1718 # type: (Type[Packet]) -> None
1719 """Reassembles the payload and decode it using another packet class"""
1720 s = raw(self.payload)
1721 self.payload = cls(s, _internal=1, _underlayer=self)
1722 pp = self
1723 while pp.underlayer is not None:
1724 pp = pp.underlayer
1725 self.payload.dissection_done(pp)
1727 def _command(self, json=False):
1728 # type: (bool) -> List[Tuple[str, Any]]
1729 """
1730 Internal method used to generate command() and json()
1731 """
1732 f = []
1733 iterator: Iterator[Tuple[str, Any]]
1734 if json:
1735 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc)
1736 else:
1737 iterator = iter(self.fields.items())
1738 for fn, fv in iterator:
1739 fld = self.get_field(fn)
1740 if isinstance(fv, (list, dict, set)) and not fv and not fld.default:
1741 continue
1742 if isinstance(fv, Packet):
1743 if json:
1744 fv = {k: v for (k, v) in fv._command(json=True)}
1745 else:
1746 fv = fv.command()
1747 elif fld.islist and fld.holds_packets and isinstance(fv, list):
1748 if json:
1749 fv = [
1750 {k: v for (k, v) in x}
1751 for x in map(lambda y: Packet._command(y, json=True), fv)
1752 ]
1753 else:
1754 fv = "[%s]" % ",".join(map(Packet.command, fv))
1755 elif fld.islist and isinstance(fv, list):
1756 if json:
1757 fv = [
1758 getattr(x, 'command', lambda: repr(x))()
1759 for x in fv
1760 ]
1761 else:
1762 fv = "[%s]" % ",".join(
1763 getattr(x, 'command', lambda: repr(x))()
1764 for x in fv
1765 )
1766 elif isinstance(fv, FlagValue):
1767 fv = int(fv)
1768 elif callable(getattr(fv, 'command', None)):
1769 fv = fv.command(json=json)
1770 else:
1771 if json:
1772 if isinstance(fv, bytes):
1773 fv = fv.decode("utf-8", errors="backslashreplace")
1774 else:
1775 fv = fld.i2h(self, fv)
1776 else:
1777 fv = repr(fld.i2h(self, fv))
1778 f.append((fn, fv))
1779 return f
1781 def command(self):
1782 # type: () -> str
1783 """
1784 Returns a string representing the command you have to type to
1785 obtain the same packet
1786 """
1787 c = "%s(%s)" % (
1788 self.__class__.__name__,
1789 ", ".join("%s=%s" % x for x in self._command())
1790 )
1791 pc = self.payload.command()
1792 if pc:
1793 c += "/" + pc
1794 return c
1796 def json(self):
1797 # type: () -> str
1798 """
1799 Returns a JSON representing the packet.
1801 Please note that this cannot be used for bijective usage: data loss WILL occur,
1802 so it will not make sense to try to rebuild the packet from the output.
1803 This must only be used for a grepping/displaying purpose.
1804 """
1805 dump = json.dumps({k: v for (k, v) in self._command(json=True)})
1806 pc = self.payload.json()
1807 if pc:
1808 dump = dump[:-1] + ", \"payload\": %s}" % pc
1809 return dump
1812class NoPayload(Packet):
1813 def __new__(cls, *args, **kargs):
1814 # type: (Type[Packet], *Any, **Any) -> NoPayload
1815 singl = cls.__dict__.get("__singl__")
1816 if singl is None:
1817 cls.__singl__ = singl = Packet.__new__(cls)
1818 Packet.__init__(singl)
1819 return cast(NoPayload, singl)
1821 def __init__(self, *args, **kargs):
1822 # type: (*Any, **Any) -> None
1823 pass
1825 def dissection_done(self, pkt):
1826 # type: (Packet) -> None
1827 pass
1829 def add_payload(self, payload):
1830 # type: (Union[Packet, bytes]) -> NoReturn
1831 raise Scapy_Exception("Can't add payload to NoPayload instance")
1833 def remove_payload(self):
1834 # type: () -> None
1835 pass
1837 def add_underlayer(self, underlayer):
1838 # type: (Any) -> None
1839 pass
1841 def remove_underlayer(self, other):
1842 # type: (Packet) -> None
1843 pass
1845 def add_parent(self, parent):
1846 # type: (Any) -> None
1847 pass
1849 def remove_parent(self, other):
1850 # type: (Packet) -> None
1851 pass
1853 def copy(self):
1854 # type: () -> NoPayload
1855 return self
1857 def clear_cache(self):
1858 # type: () -> None
1859 pass
1861 def __repr__(self):
1862 # type: () -> str
1863 return ""
1865 def __str__(self):
1866 # type: () -> str
1867 return ""
1869 def __bytes__(self):
1870 # type: () -> bytes
1871 return b""
1873 def __nonzero__(self):
1874 # type: () -> bool
1875 return False
1876 __bool__ = __nonzero__
1878 def do_build(self):
1879 # type: () -> bytes
1880 return b""
1882 def build(self):
1883 # type: () -> bytes
1884 return b""
1886 def build_padding(self):
1887 # type: () -> bytes
1888 return b""
1890 def build_done(self, p):
1891 # type: (bytes) -> bytes
1892 return p
1894 def build_ps(self, internal=0):
1895 # type: (int) -> Tuple[bytes, List[Any]]
1896 return b"", []
1898 def getfieldval(self, attr):
1899 # type: (str) -> NoReturn
1900 raise AttributeError(attr)
1902 def getfield_and_val(self, attr):
1903 # type: (str) -> NoReturn
1904 raise AttributeError(attr)
1906 def setfieldval(self, attr, val):
1907 # type: (str, Any) -> NoReturn
1908 raise AttributeError(attr)
1910 def delfieldval(self, attr):
1911 # type: (str) -> NoReturn
1912 raise AttributeError(attr)
1914 def hide_defaults(self):
1915 # type: () -> None
1916 pass
1918 def __iter__(self):
1919 # type: () -> Iterator[Packet]
1920 return iter([])
1922 def __eq__(self, other):
1923 # type: (Any) -> bool
1924 if isinstance(other, NoPayload):
1925 return True
1926 return False
1928 def hashret(self):
1929 # type: () -> bytes
1930 return b""
1932 def answers(self, other):
1933 # type: (Packet) -> bool
1934 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501
1936 def haslayer(self, cls, _subclass=None):
1937 # type: (Union[Type[Packet], str], Optional[bool]) -> int
1938 return 0
1940 def getlayer(self,
1941 cls, # type: Union[int, Type[Packet], str]
1942 nb=1, # type: int
1943 _track=None, # type: Optional[List[int]]
1944 _subclass=None, # type: Optional[bool]
1945 **flt # type: Any
1946 ):
1947 # type: (...) -> Optional[Packet]
1948 if _track is not None:
1949 _track.append(nb)
1950 return None
1952 def fragment(self, *args, **kargs):
1953 # type: (*Any, **Any) -> List[Packet]
1954 raise Scapy_Exception("cannot fragment this packet")
1956 def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1957 # type: (bool, int, str, str) -> None
1958 pass
1960 def sprintf(self, fmt, relax=1):
1961 # type: (str, int) -> str
1962 if relax:
1963 return "??"
1964 else:
1965 raise Scapy_Exception("Format not found [%s]" % fmt)
1967 def _do_summary(self):
1968 # type: () -> Tuple[int, str, List[Any]]
1969 return 0, "", []
1971 def layers(self):
1972 # type: () -> List[Type[Packet]]
1973 return []
1975 def lastlayer(self, layer=None):
1976 # type: (Optional[Packet]) -> Packet
1977 return layer or self
1979 def command(self):
1980 # type: () -> str
1981 return ""
1983 def json(self):
1984 # type: () -> str
1985 return ""
1987 def route(self):
1988 # type: () -> Tuple[None, None, None]
1989 return (None, None, None)
1992####################
1993# packet classes #
1994####################
1997class Raw(Packet):
1998 name = "Raw"
1999 fields_desc = [StrField("load", b"")]
2001 def __init__(self, _pkt=b"", *args, **kwargs):
2002 # type: (bytes, *Any, **Any) -> None
2003 if _pkt and not isinstance(_pkt, bytes):
2004 if isinstance(_pkt, tuple):
2005 _pkt, bn = _pkt
2006 _pkt = bytes_encode(_pkt), bn
2007 else:
2008 _pkt = bytes_encode(_pkt)
2009 super(Raw, self).__init__(_pkt, *args, **kwargs)
2011 def answers(self, other):
2012 # type: (Packet) -> int
2013 return 1
2015 def mysummary(self):
2016 # type: () -> str
2017 cs = conf.raw_summary
2018 if cs:
2019 if callable(cs):
2020 return "Raw %s" % cs(self.load)
2021 else:
2022 return "Raw %r" % self.load
2023 return Packet.mysummary(self)
2026class Padding(Raw):
2027 name = "Padding"
2029 def self_build(self):
2030 # type: (Optional[Any]) -> bytes
2031 return b""
2033 def build_padding(self):
2034 # type: () -> bytes
2035 return (
2036 bytes_encode(self.load) if self.raw_packet_cache is None
2037 else self.raw_packet_cache
2038 ) + self.payload.build_padding()
2041conf.raw_layer = Raw
2042conf.padding_layer = Padding
2043if conf.default_l2 is None:
2044 conf.default_l2 = Raw
2046#################
2047# Bind layers #
2048#################
2051def bind_bottom_up(lower, # type: Type[Packet]
2052 upper, # type: Type[Packet]
2053 __fval=None, # type: Optional[Any]
2054 **fval # type: Any
2055 ):
2056 # type: (...) -> None
2057 r"""Bind 2 layers for dissection.
2058 The upper layer will be chosen for dissection on top of the lower layer, if
2059 ALL the passed arguments are validated. If multiple calls are made with
2060 the same layers, the last one will be used as default.
2062 ex:
2063 >>> bind_bottom_up(Ether, SNAP, type=0x1234)
2064 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501
2065 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501
2066 """
2067 if __fval is not None:
2068 fval.update(__fval)
2069 lower.payload_guess = lower.payload_guess[:]
2070 lower.payload_guess.append((fval, upper))
2073def bind_top_down(lower, # type: Type[Packet]
2074 upper, # type: Type[Packet]
2075 __fval=None, # type: Optional[Any]
2076 **fval # type: Any
2077 ):
2078 # type: (...) -> None
2079 """Bind 2 layers for building.
2080 When the upper layer is added as a payload of the lower layer, all the
2081 arguments will be applied to them.
2083 ex:
2084 >>> bind_top_down(Ether, SNAP, type=0x1234)
2085 >>> Ether()/SNAP()
2086 <Ether type=0x1234 |<SNAP |>>
2087 """
2088 if __fval is not None:
2089 fval.update(__fval)
2090 upper._overload_fields = upper._overload_fields.copy() # type: ignore
2091 upper._overload_fields[lower] = fval
2094@conf.commands.register
2095def bind_layers(lower, # type: Type[Packet]
2096 upper, # type: Type[Packet]
2097 __fval=None, # type: Optional[Dict[str, int]]
2098 **fval # type: Any
2099 ):
2100 # type: (...) -> None
2101 """Bind 2 layers on some specific fields' values.
2103 It makes the packet being built and dissected when the arguments
2104 are present.
2106 This function calls both bind_bottom_up and bind_top_down, with
2107 all passed arguments.
2109 Please have a look at their docs:
2110 - help(bind_bottom_up)
2111 - help(bind_top_down)
2112 """
2113 if __fval is not None:
2114 fval.update(__fval)
2115 bind_top_down(lower, upper, **fval)
2116 bind_bottom_up(lower, upper, **fval)
2119def split_bottom_up(lower, # type: Type[Packet]
2120 upper, # type: Type[Packet]
2121 __fval=None, # type: Optional[Any]
2122 **fval # type: Any
2123 ):
2124 # type: (...) -> None
2125 """This call un-links an association that was made using bind_bottom_up.
2126 Have a look at help(bind_bottom_up)
2127 """
2128 if __fval is not None:
2129 fval.update(__fval)
2131 def do_filter(params, cls):
2132 # type: (Dict[str, int], Type[Packet]) -> bool
2133 params_is_invalid = any(
2134 k not in params or params[k] != v for k, v in fval.items()
2135 )
2136 return cls != upper or params_is_invalid
2137 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)]
2140def split_top_down(lower, # type: Type[Packet]
2141 upper, # type: Type[Packet]
2142 __fval=None, # type: Optional[Any]
2143 **fval # type: Any
2144 ):
2145 # type: (...) -> None
2146 """This call un-links an association that was made using bind_top_down.
2147 Have a look at help(bind_top_down)
2148 """
2149 if __fval is not None:
2150 fval.update(__fval)
2151 if lower in upper._overload_fields:
2152 ofval = upper._overload_fields[lower]
2153 if any(k not in ofval or ofval[k] != v for k, v in fval.items()):
2154 return
2155 upper._overload_fields = upper._overload_fields.copy() # type: ignore
2156 del upper._overload_fields[lower]
2159@conf.commands.register
2160def split_layers(lower, # type: Type[Packet]
2161 upper, # type: Type[Packet]
2162 __fval=None, # type: Optional[Any]
2163 **fval # type: Any
2164 ):
2165 # type: (...) -> None
2166 """Split 2 layers previously bound.
2167 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501
2168 bind_layers.
2170 Please have a look at their docs:
2171 - help(split_bottom_up)
2172 - help(split_top_down)
2173 """
2174 if __fval is not None:
2175 fval.update(__fval)
2176 split_bottom_up(lower, upper, **fval)
2177 split_top_down(lower, upper, **fval)
2180@conf.commands.register
2181def explore(layer=None):
2182 # type: (Optional[str]) -> None
2183 """Function used to discover the Scapy layers and protocols.
2184 It helps to see which packets exists in contrib or layer files.
2186 params:
2187 - layer: If specified, the function will explore the layer. If not,
2188 the GUI mode will be activated, to browse the available layers
2190 examples:
2191 >>> explore() # Launches the GUI
2192 >>> explore("dns") # Explore scapy.layers.dns
2193 >>> explore("http2") # Explore scapy.contrib.http2
2194 >>> explore(scapy.layers.bluetooth4LE)
2196 Note: to search a packet by name, use ls("name") rather than explore.
2197 """
2198 if layer is None: # GUI MODE
2199 if not conf.interactive:
2200 raise Scapy_Exception("explore() GUI-mode cannot be run in "
2201 "interactive mode. Please provide a "
2202 "'layer' parameter !")
2203 # 0 - Imports
2204 try:
2205 import prompt_toolkit
2206 except ImportError:
2207 raise ImportError("prompt_toolkit is not installed ! "
2208 "You may install IPython, which contains it, via"
2209 " `pip install ipython`")
2210 if not _version_checker(prompt_toolkit, (2, 0)):
2211 raise ImportError("prompt_toolkit >= 2.0.0 is required !")
2212 # Only available with prompt_toolkit > 2.0, not released on PyPi yet
2213 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \
2214 button_dialog
2215 from prompt_toolkit.formatted_text import HTML
2216 # Check for prompt_toolkit >= 3.0.0
2217 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str]
2218 if _version_checker(prompt_toolkit, (3, 0)):
2219 call_ptk = lambda x: x.run()
2220 # 1 - Ask for layer or contrib
2221 btn_diag = button_dialog(
2222 title="Scapy v%s" % conf.version,
2223 text=HTML(
2224 '<style bg="white" fg="red">Chose the type of packets'
2225 ' you want to explore:</style>'
2226 ),
2227 buttons=[
2228 ("Layers", "layers"),
2229 ("Contribs", "contribs"),
2230 ("Cancel", "cancel")
2231 ])
2232 action = call_ptk(btn_diag)
2233 # 2 - Retrieve list of Packets
2234 if action == "layers":
2235 # Get all loaded layers
2236 lvalues = conf.layers.layers()
2237 # Restrict to layers-only (not contribs) + packet.py and asn1*.py
2238 values = [x for x in lvalues if ("layers" in x[0] or
2239 "packet" in x[0] or
2240 "asn1" in x[0])]
2241 elif action == "contribs":
2242 # Get all existing contribs
2243 from scapy.main import list_contrib
2244 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True))
2245 values = [(x['name'], x['description'])
2246 for x in cvalues]
2247 # Remove very specific modules
2248 values = [x for x in values if "can" not in x[0]]
2249 else:
2250 # Escape/Cancel was pressed
2251 return
2252 # Build tree
2253 if action == "contribs":
2254 # A tree is a dictionary. Each layer contains a keyword
2255 # _l which contains the files in the layer, and a _name
2256 # argument which is its name. The other keys are the subfolders,
2257 # which are similar dictionaries
2258 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501
2259 for name, desc in values:
2260 if "." in name: # Folder detected
2261 parts = name.split(".")
2262 subtree = tree
2263 for pa in parts[:-1]:
2264 if pa not in subtree:
2265 subtree[pa] = {}
2266 # one layer deeper
2267 subtree = subtree[pa] # type: ignore
2268 subtree["_name"] = pa # type: ignore
2269 if "_l" not in subtree:
2270 subtree["_l"] = []
2271 subtree["_l"].append((parts[-1], desc)) # type: ignore
2272 else:
2273 tree["_l"].append((name, desc)) # type: ignore
2274 elif action == "layers":
2275 tree = {"_l": values}
2276 # 3 - Ask for the layer/contrib module to explore
2277 current = tree # type: Any
2278 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501
2279 while True:
2280 # Generate tests & form
2281 folders = list(current.keys())
2282 _radio_values = [
2283 ("$" + name, str('[+] ' + name.capitalize()))
2284 for name in folders if not name.startswith("_")
2285 ] + current.get("_l", []) # type: List[str]
2286 cur_path = ""
2287 if previous:
2288 cur_path = ".".join(
2289 itertools.chain(
2290 (x["_name"] for x in previous[1:]), # type: ignore
2291 (current["_name"],)
2292 )
2293 )
2294 extra_text = (
2295 '\n<style bg="white" fg="green">> scapy.%s</style>'
2296 ) % (action + ("." + cur_path if cur_path else ""))
2297 # Show popup
2298 rd_diag = radiolist_dialog(
2299 values=_radio_values,
2300 title="Scapy v%s" % conf.version,
2301 text=HTML(
2302 (
2303 '<style bg="white" fg="red">Please select a file'
2304 'among the following, to see all layers contained in'
2305 ' it:</style>'
2306 ) + extra_text
2307 ),
2308 cancel_text="Back" if previous else "Cancel"
2309 )
2310 result = call_ptk(rd_diag)
2311 if result is None:
2312 # User pressed "Cancel/Back"
2313 if previous: # Back
2314 current = previous.pop()
2315 continue
2316 else: # Cancel
2317 return
2318 if result.startswith("$"):
2319 previous.append(current)
2320 current = current[result[1:]]
2321 else:
2322 # Enter on layer
2323 if previous: # In subfolder
2324 result = cur_path + "." + result
2325 break
2326 # 4 - (Contrib only): load contrib
2327 if action == "contribs":
2328 from scapy.main import load_contrib
2329 load_contrib(result)
2330 result = "scapy.contrib." + result
2331 else: # NON-GUI MODE
2332 # We handle layer as a short layer name, full layer name
2333 # or the module itself
2334 if isinstance(layer, types.ModuleType):
2335 layer = layer.__name__
2336 if isinstance(layer, str):
2337 if layer.startswith("scapy.layers."):
2338 result = layer
2339 else:
2340 if layer.startswith("scapy.contrib."):
2341 layer = layer.replace("scapy.contrib.", "")
2342 from scapy.main import load_contrib
2343 load_contrib(layer)
2344 result_layer, result_contrib = (("scapy.layers.%s" % layer),
2345 ("scapy.contrib.%s" % layer))
2346 if result_layer in conf.layers.ldict:
2347 result = result_layer
2348 elif result_contrib in conf.layers.ldict:
2349 result = result_contrib
2350 else:
2351 raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2352 else:
2353 warning("Wrong usage ! Check out help(explore)")
2354 return
2356 # COMMON PART
2357 # Get the list of all Packets contained in that module
2358 try:
2359 all_layers = conf.layers.ldict[result]
2360 except KeyError:
2361 raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2362 # Print
2363 print(conf.color_theme.layer_name("Packets contained in %s:" % result))
2364 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]]
2365 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers]
2366 print(pretty_list(rtlst, [("Class", "Name")], borders=True))
2369def _pkt_ls(obj, # type: Union[Packet, Type[Packet]]
2370 verbose=False, # type: bool
2371 ):
2372 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501
2373 """Internal function used to resolve `fields_desc` to display it.
2375 :param obj: a packet object or class
2376 :returns: a list containing tuples [(name, clsname, clsname_extras,
2377 default, long_attrs)]
2378 """
2379 is_pkt = isinstance(obj, Packet)
2380 if not issubtype(obj, Packet) and not is_pkt:
2381 raise ValueError
2382 fields = []
2383 for f in obj.fields_desc:
2384 cur_fld = f
2385 attrs = [] # type: List[str]
2386 long_attrs = [] # type: List[str]
2387 while isinstance(cur_fld, (Emph, ConditionalField)):
2388 if isinstance(cur_fld, ConditionalField):
2389 attrs.append(cur_fld.__class__.__name__[:4])
2390 cur_fld = cur_fld.fld
2391 name = cur_fld.name
2392 default = cur_fld.default
2393 if verbose and isinstance(cur_fld, EnumField) \
2394 and hasattr(cur_fld, "i2s") and cur_fld.i2s:
2395 if len(cur_fld.i2s or []) < 50:
2396 long_attrs.extend(
2397 "%s: %d" % (strval, numval)
2398 for numval, strval in
2399 sorted(cur_fld.i2s.items())
2400 )
2401 elif isinstance(cur_fld, MultiEnumField):
2402 if isinstance(obj, Packet):
2403 obj_pkt = obj
2404 else:
2405 obj_pkt = obj()
2406 fld_depend = cur_fld.depends_on(obj_pkt)
2407 attrs.append("Depends on %s" % fld_depend)
2408 if verbose:
2409 cur_i2s = cur_fld.i2s_multi.get(
2410 cur_fld.depends_on(obj_pkt), {}
2411 )
2412 if len(cur_i2s) < 50:
2413 long_attrs.extend(
2414 "%s: %d" % (strval, numval)
2415 for numval, strval in
2416 sorted(cur_i2s.items())
2417 )
2418 elif verbose and isinstance(cur_fld, FlagsField):
2419 names = cur_fld.names
2420 long_attrs.append(", ".join(names))
2421 elif isinstance(cur_fld, MultipleTypeField):
2422 default = cur_fld.dflt.default
2423 attrs.append(", ".join(
2424 x[0].__class__.__name__ for x in
2425 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)])
2426 ))
2428 cls = cur_fld.__class__
2429 class_name_extras = "(%s)" % (
2430 ", ".join(attrs)
2431 ) if attrs else ""
2432 if isinstance(cur_fld, BitField):
2433 class_name_extras += " (%d bit%s)" % (
2434 cur_fld.size,
2435 "s" if cur_fld.size > 1 else ""
2436 )
2437 fields.append(
2438 (name,
2439 cls,
2440 class_name_extras,
2441 repr(default),
2442 long_attrs)
2443 )
2444 return fields
2447@conf.commands.register
2448def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]]
2449 case_sensitive=False, # type: bool
2450 verbose=False # type: bool
2451 ):
2452 # type: (...) -> None
2453 """List available layers, or infos on a given layer class or name.
2455 :param obj: Packet / packet name to use
2456 :param case_sensitive: if obj is a string, is it case sensitive?
2457 :param verbose:
2458 """
2459 if obj is None or isinstance(obj, str):
2460 tip = False
2461 if obj is None:
2462 tip = True
2463 all_layers = sorted(conf.layers, key=lambda x: x.__name__)
2464 else:
2465 pattern = re.compile(
2466 obj,
2467 0 if case_sensitive else re.I
2468 )
2469 # We first order by accuracy, then length
2470 if case_sensitive:
2471 sorter = lambda x: (x.__name__.index(obj), len(x.__name__))
2472 else:
2473 obj = obj.lower()
2474 sorter = lambda x: (x.__name__.lower().index(obj),
2475 len(x.__name__))
2476 all_layers = sorted((layer for layer in conf.layers
2477 if (isinstance(layer.__name__, str) and
2478 pattern.search(layer.__name__)) or
2479 (isinstance(layer.name, str) and
2480 pattern.search(layer.name))),
2481 key=sorter)
2482 for layer in all_layers:
2483 print("%-10s : %s" % (layer.__name__, layer._name))
2484 if tip and conf.interactive:
2485 print("\nTIP: You may use explore() to navigate through all "
2486 "layers using a clear GUI")
2487 else:
2488 try:
2489 fields = _pkt_ls(
2490 obj,
2491 verbose=verbose
2492 )
2493 is_pkt = isinstance(obj, Packet)
2494 # Print
2495 for fname, cls, clsne, dflt, long_attrs in fields:
2496 clsinfo = cls.__name__ + " " + clsne
2497 print("%-10s : %-35s =" % (fname, clsinfo), end=' ')
2498 if is_pkt:
2499 print("%-15r" % (getattr(obj, fname),), end=' ')
2500 print("(%r)" % (dflt,))
2501 for attr in long_attrs:
2502 print("%-15s%s" % ("", attr))
2503 # Restart for payload if any
2504 if is_pkt:
2505 obj = cast(Packet, obj)
2506 if isinstance(obj.payload, NoPayload):
2507 return
2508 print("--")
2509 ls(obj.payload)
2510 except ValueError:
2511 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501
2514@conf.commands.register
2515def rfc(cls, ret=False, legend=True):
2516 # type: (Type[Packet], bool, bool) -> Optional[str]
2517 """
2518 Generate an RFC-like representation of a packet def.
2520 :param cls: the Packet class
2521 :param ret: return the result instead of printing (def. False)
2522 :param legend: show text under the diagram (default True)
2524 Ex::
2526 >>> rfc(Ether)
2528 """
2529 if not issubclass(cls, Packet):
2530 raise TypeError("Packet class expected")
2531 cur_len = 0
2532 cur_line = []
2533 lines = []
2534 # Get the size (width) that a field will take
2535 # when formatted, from its length in bits
2536 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int]
2537 ident = 0 # Fields UUID
2539 # Generate packet groups
2540 def _iterfields() -> Iterator[Tuple[str, int]]:
2541 for f in cls.fields_desc:
2542 # Fancy field name
2543 fname = f.name.upper().replace("_", " ")
2544 fsize = int(f.sz * 8)
2545 yield fname, fsize
2546 # Add padding optionally
2547 if isinstance(f, PadField):
2548 if isinstance(f._align, tuple):
2549 pad = - cur_len % (f._align[0] * 8)
2550 else:
2551 pad = - cur_len % (f._align * 8)
2552 if pad:
2553 yield "padding", pad
2554 for fname, flen in _iterfields():
2555 cur_len += flen
2556 ident += 1
2557 # The field might exceed the current line or
2558 # take more than one line. Copy it as required
2559 while True:
2560 over = max(0, cur_len - 32) # Exceed
2561 len1 = clsize(flen - over) # What fits
2562 cur_line.append((fname[:len1], len1, ident))
2563 if cur_len >= 32:
2564 # Current line is full. start a new line
2565 lines.append(cur_line)
2566 cur_len = flen = over
2567 fname = "" # do not repeat the field
2568 cur_line = []
2569 if not over:
2570 # there is no data left
2571 break
2572 else:
2573 # End of the field
2574 break
2575 # Add the last line if un-finished
2576 if cur_line:
2577 lines.append(cur_line)
2578 # Calculate separations between lines
2579 seps = []
2580 seps.append("+-" * 32 + "+\n")
2581 for i in range(len(lines) - 1):
2582 # Start with a full line
2583 sep = "+-" * 32 + "+\n"
2584 # Get the line above and below the current
2585 # separation
2586 above, below = lines[i], lines[i + 1]
2587 # The last field of above is shared with below
2588 if above[-1][2] == below[0][2]:
2589 # where the field in "above" starts
2590 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1
2591 # where the field in "below" ends
2592 pos_below = below[0][1]
2593 if pos_above < pos_below:
2594 # they are overlapping.
2595 # Now crop the space between those pos
2596 # and fill it with " "
2597 pos_above = pos_above + pos_above % 2
2598 sep = (
2599 sep[:1 + pos_above] +
2600 " " * (pos_below - pos_above) +
2601 sep[1 + pos_below:]
2602 )
2603 # line is complete
2604 seps.append(sep)
2605 # Graph
2606 result = ""
2607 # Bytes markers
2608 result += " " + (" " * 19).join(
2609 str(x) for x in range(4)
2610 ) + "\n"
2611 # Bits markers
2612 result += " " + " ".join(
2613 str(x % 10) for x in range(32)
2614 ) + "\n"
2615 # Add fields and their separations
2616 for line, sep in zip(lines, seps):
2617 result += sep
2618 for elt, flen, _ in line:
2619 result += "|" + elt.center(flen, " ")
2620 result += "|\n"
2621 result += "+-" * (cur_len or 32) + "+\n"
2622 # Annotate with the figure name
2623 if legend:
2624 result += "\n" + ("Fig. " + cls.__name__).center(66, " ")
2625 # return if asked for, else print
2626 if ret:
2627 return result
2628 print(result)
2629 return None
2632#############
2633# Fuzzing #
2634#############
2636_P = TypeVar('_P', bound=Packet)
2639@conf.commands.register
2640def fuzz(p, # type: _P
2641 _inplace=0, # type: int
2642 ):
2643 # type: (...) -> _P
2644 """
2645 Transform a layer into a fuzzy layer by replacing some default values
2646 by random objects.
2648 :param p: the Packet instance to fuzz
2649 :return: the fuzzed packet.
2650 """
2651 if not _inplace:
2652 p = p.copy()
2653 q = cast(Packet, p)
2654 while not isinstance(q, NoPayload):
2655 new_default_fields = {}
2656 multiple_type_fields = [] # type: List[str]
2657 for f in q.fields_desc:
2658 if isinstance(f, PacketListField):
2659 for r in getattr(q, f.name):
2660 fuzz(r, _inplace=1)
2661 elif isinstance(f, MultipleTypeField):
2662 # the type of the field will depend on others
2663 multiple_type_fields.append(f.name)
2664 elif f.default is not None:
2665 if not isinstance(f, ConditionalField) or f._evalcond(q):
2666 rnd = f.randval()
2667 if rnd is not None:
2668 new_default_fields[f.name] = rnd
2669 # Process packets with MultipleTypeFields
2670 if multiple_type_fields:
2671 # freeze the other random values
2672 new_default_fields = {
2673 key: (val._fix() if isinstance(val, VolatileValue) else val)
2674 for key, val in new_default_fields.items()
2675 }
2676 q.default_fields.update(new_default_fields)
2677 new_default_fields.clear()
2678 # add the random values of the MultipleTypeFields
2679 for name in multiple_type_fields:
2680 fld = cast(MultipleTypeField, q.get_field(name))
2681 rnd = fld._find_fld_pkt(q).randval()
2682 if rnd is not None:
2683 new_default_fields[name] = rnd
2684 q.default_fields.update(new_default_fields)
2685 q = q.payload
2686 return p