Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/packet.py: 43%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Packet class
9Provides:
10 - the default Packet classes
11 - binding mechanisms
12 - fuzz() method
13 - exploration methods: explore() / ls()
14"""
16from collections import defaultdict
18import json
19import re
20import time
21import itertools
22import copy
23import types
24import warnings
26from scapy.fields import (
27 AnyField,
28 BitField,
29 ConditionalField,
30 Emph,
31 EnumField,
32 Field,
33 FlagsField,
34 FlagValue,
35 MayEnd,
36 MultiEnumField,
37 MultipleTypeField,
38 PacketListField,
39 RawVal,
40 StrField,
41)
42from scapy.config import conf, _version_checker
43from scapy.compat import raw, orb, bytes_encode
44from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \
45 _CanvasDumpExtended
46from scapy.interfaces import _GlobInterfaceType
47from scapy.volatile import RandField, VolatileValue
48from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \
49 pretty_list, EDecimal
50from scapy.error import Scapy_Exception, log_runtime, warning
51from scapy.libs.test_pyx import PYX
53# Typing imports
54from typing import (
55 Any,
56 Callable,
57 Dict,
58 Iterator,
59 List,
60 NoReturn,
61 Optional,
62 Set,
63 Tuple,
64 Type,
65 TypeVar,
66 Union,
67 Sequence,
68 cast,
69)
70from scapy.compat import Self
72try:
73 import pyx
74except ImportError:
75 pass
78_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]])
81class Packet(
82 BasePacket,
83 _CanvasDumpExtended,
84 metaclass=Packet_metaclass
85):
86 __slots__ = [
87 "time", "sent_time", "name",
88 "default_fields", "fields", "fieldtype",
89 "overload_fields", "overloaded_fields",
90 "packetfields",
91 "original", "explicit", "raw_packet_cache",
92 "raw_packet_cache_fields", "_pkt", "post_transforms",
93 "stop_dissection_after",
94 # then payload, underlayer and parent
95 "payload", "underlayer", "parent",
96 "name",
97 # used for sr()
98 "_answered",
99 # used when sniffing
100 "direction", "sniffed_on",
101 # handle snaplen Vs real length
102 "wirelen",
103 "comment"
104 ]
105 name = None
106 fields_desc = [] # type: List[AnyField]
107 deprecated_fields = {} # type: Dict[str, Tuple[str, str]]
108 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]]
109 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]]
110 show_indent = 1
111 show_summary = True
112 match_subclass = False
113 class_dont_cache = {} # type: Dict[Type[Packet], bool]
114 class_packetfields = {} # type: Dict[Type[Packet], Any]
115 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]]
116 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]]
117 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501
119 @classmethod
120 def from_hexcap(cls):
121 # type: (Type[Packet]) -> Packet
122 return cls(import_hexcap())
124 @classmethod
125 def upper_bonds(self):
126 # type: () -> None
127 for fval, upper in self.payload_guess:
128 print(
129 "%-20s %s" % (
130 upper.__name__,
131 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
132 )
133 )
135 @classmethod
136 def lower_bonds(self):
137 # type: () -> None
138 for lower, fval in self._overload_fields.items():
139 print(
140 "%-20s %s" % (
141 lower.__name__,
142 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
143 )
144 )
146 def __init__(self,
147 _pkt=b"", # type: Union[bytes, bytearray]
148 post_transform=None, # type: Any
149 _internal=0, # type: int
150 _underlayer=None, # type: Optional[Packet]
151 _parent=None, # type: Optional[Packet]
152 stop_dissection_after=None, # type: Optional[Type[Packet]]
153 **fields # type: Any
154 ):
155 # type: (...) -> None
156 self.time = time.time() # type: Union[EDecimal, float]
157 self.sent_time = None # type: Union[EDecimal, float, None]
158 self.name = (self.__class__.__name__
159 if self._name is None else
160 self._name)
161 self.default_fields = {} # type: Dict[str, Any]
162 self.overload_fields = self._overload_fields
163 self.overloaded_fields = {} # type: Dict[str, Any]
164 self.fields = {} # type: Dict[str, Any]
165 self.fieldtype = {} # type: Dict[str, AnyField]
166 self.packetfields = [] # type: List[AnyField]
167 self.payload = NoPayload() # type: Packet
168 self.init_fields()
169 self.underlayer = _underlayer
170 self.parent = _parent
171 if isinstance(_pkt, bytearray):
172 _pkt = bytes(_pkt)
173 self.original = _pkt
174 self.explicit = 0
175 self.raw_packet_cache = None # type: Optional[bytes]
176 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501
177 self.wirelen = None # type: Optional[int]
178 self.direction = None # type: Optional[int]
179 self.sniffed_on = None # type: Optional[_GlobInterfaceType]
180 self.comment = None # type: Optional[bytes]
181 self.stop_dissection_after = stop_dissection_after
182 if _pkt:
183 self.dissect(_pkt)
184 if not _internal:
185 self.dissection_done(self)
186 # We use this strange initialization so that the fields
187 # are initialized in their declaration order.
188 # It is required to always support MultipleTypeField
189 for field in self.fields_desc:
190 fname = field.name
191 try:
192 value = fields.pop(fname)
193 except KeyError:
194 continue
195 self.fields[fname] = value if isinstance(value, RawVal) else \
196 self.get_field(fname).any2i(self, value)
197 # The remaining fields are unknown
198 for fname in fields:
199 if fname in self.deprecated_fields:
200 # Resolve deprecated fields
201 value = fields[fname]
202 fname = self._resolve_alias(fname)
203 self.fields[fname] = value if isinstance(value, RawVal) else \
204 self.get_field(fname).any2i(self, value)
205 continue
206 raise AttributeError(fname)
207 if isinstance(post_transform, list):
208 self.post_transforms = post_transform
209 elif post_transform is None:
210 self.post_transforms = []
211 else:
212 self.post_transforms = [post_transform]
214 _PickleType = Tuple[
215 Union[EDecimal, float],
216 Optional[Union[EDecimal, float, None]],
217 Optional[int],
218 Optional[_GlobInterfaceType],
219 Optional[int],
220 Optional[bytes],
221 ]
223 def __reduce__(self):
224 # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType]
225 """Used by pickling methods"""
226 return (self.__class__, (self.build(),), (
227 self.time,
228 self.sent_time,
229 self.direction,
230 self.sniffed_on,
231 self.wirelen,
232 self.comment
233 ))
235 def __setstate__(self, state):
236 # type: (Packet._PickleType) -> Packet
237 """Rebuild state using pickable methods"""
238 self.time = state[0]
239 self.sent_time = state[1]
240 self.direction = state[2]
241 self.sniffed_on = state[3]
242 self.wirelen = state[4]
243 self.comment = state[5]
244 return self
246 def __deepcopy__(self,
247 memo, # type: Any
248 ):
249 # type: (...) -> Packet
250 """Used by copy.deepcopy"""
251 return self.copy()
253 def init_fields(self):
254 # type: () -> None
255 """
256 Initialize each fields of the fields_desc dict
257 """
259 if self.class_dont_cache.get(self.__class__, False):
260 self.do_init_fields(self.fields_desc)
261 else:
262 self.do_init_cached_fields()
264 def do_init_fields(self,
265 flist, # type: Sequence[AnyField]
266 ):
267 # type: (...) -> None
268 """
269 Initialize each fields of the fields_desc dict
270 """
271 default_fields = {}
272 for f in flist:
273 default_fields[f.name] = copy.deepcopy(f.default)
274 self.fieldtype[f.name] = f
275 if f.holds_packets:
276 self.packetfields.append(f)
277 # We set default_fields last to avoid race issues
278 self.default_fields = default_fields
280 def do_init_cached_fields(self):
281 # type: () -> None
282 """
283 Initialize each fields of the fields_desc dict, or use the cached
284 fields information
285 """
287 cls_name = self.__class__
289 # Build the fields information
290 if Packet.class_default_fields.get(cls_name, None) is None:
291 self.prepare_cached_fields(self.fields_desc)
293 # Use fields information from cache
294 default_fields = Packet.class_default_fields.get(cls_name, None)
295 if default_fields:
296 self.default_fields = default_fields
297 self.fieldtype = Packet.class_fieldtype[cls_name]
298 self.packetfields = Packet.class_packetfields[cls_name]
300 # Deepcopy default references
301 for fname in Packet.class_default_fields_ref[cls_name]:
302 value = self.default_fields[fname]
303 try:
304 self.fields[fname] = value.copy()
305 except AttributeError:
306 # Python 2.7 - list only
307 self.fields[fname] = value[:]
309 def prepare_cached_fields(self, flist):
310 # type: (Sequence[AnyField]) -> None
311 """
312 Prepare the cached fields of the fields_desc dict
313 """
315 cls_name = self.__class__
317 # Fields cache initialization
318 if not flist:
319 return
321 class_default_fields = dict()
322 class_default_fields_ref = list()
323 class_fieldtype = dict()
324 class_packetfields = list()
326 # Fields initialization
327 for f in flist:
328 if isinstance(f, MultipleTypeField):
329 # Abort
330 self.class_dont_cache[cls_name] = True
331 self.do_init_fields(self.fields_desc)
332 return
334 tmp_copy = copy.deepcopy(f.default)
335 class_default_fields[f.name] = tmp_copy
336 class_fieldtype[f.name] = f
337 if f.holds_packets:
338 class_packetfields.append(f)
340 # Remember references
341 if isinstance(f.default, (list, dict, set, RandField, Packet)):
342 class_default_fields_ref.append(f.name)
344 # Apply
345 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref
346 Packet.class_fieldtype[cls_name] = class_fieldtype
347 Packet.class_packetfields[cls_name] = class_packetfields
348 # Last to avoid racing issues
349 Packet.class_default_fields[cls_name] = class_default_fields
351 def dissection_done(self, pkt):
352 # type: (Packet) -> None
353 """DEV: will be called after a dissection is completed"""
354 self.post_dissection(pkt)
355 self.payload.dissection_done(pkt)
357 def post_dissection(self, pkt):
358 # type: (Packet) -> None
359 """DEV: is called after the dissection of the whole packet"""
360 pass
362 def get_field(self, fld):
363 # type: (str) -> AnyField
364 """DEV: returns the field instance from the name of the field"""
365 return self.fieldtype[fld]
367 def add_payload(self, payload):
368 # type: (Union[Packet, bytes]) -> None
369 if payload is None:
370 return
371 elif not isinstance(self.payload, NoPayload):
372 self.payload.add_payload(payload)
373 else:
374 if isinstance(payload, Packet):
375 self.payload = payload
376 payload.add_underlayer(self)
377 for t in self.aliastypes:
378 if t in payload.overload_fields:
379 self.overloaded_fields = payload.overload_fields[t]
380 break
381 elif isinstance(payload, (bytes, str, bytearray, memoryview)):
382 self.payload = conf.raw_layer(load=bytes_encode(payload))
383 else:
384 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501
386 def remove_payload(self):
387 # type: () -> None
388 self.payload.remove_underlayer(self)
389 self.payload = NoPayload()
390 self.overloaded_fields = {}
392 def add_underlayer(self, underlayer):
393 # type: (Packet) -> None
394 self.underlayer = underlayer
396 def remove_underlayer(self, other):
397 # type: (Packet) -> None
398 self.underlayer = None
400 def add_parent(self, parent):
401 # type: (Packet) -> None
402 """Set packet parent.
403 When packet is an element in PacketListField, parent field would
404 point to the list owner packet."""
405 self.parent = parent
407 def remove_parent(self, other):
408 # type: (Packet) -> None
409 """Remove packet parent.
410 When packet is an element in PacketListField, parent field would
411 point to the list owner packet."""
412 self.parent = None
414 def copy(self) -> Self:
415 """Returns a deep copy of the instance."""
416 clone = self.__class__()
417 clone.fields = self.copy_fields_dict(self.fields)
418 clone.default_fields = self.copy_fields_dict(self.default_fields)
419 clone.overloaded_fields = self.overloaded_fields.copy()
420 clone.underlayer = self.underlayer
421 clone.parent = self.parent
422 clone.explicit = self.explicit
423 clone.raw_packet_cache = self.raw_packet_cache
424 clone.raw_packet_cache_fields = self.copy_fields_dict(
425 self.raw_packet_cache_fields
426 )
427 clone.wirelen = self.wirelen
428 clone.post_transforms = self.post_transforms[:]
429 clone.payload = self.payload.copy()
430 clone.payload.add_underlayer(clone)
431 clone.time = self.time
432 clone.comment = self.comment
433 clone.direction = self.direction
434 clone.sniffed_on = self.sniffed_on
435 return clone
437 def _resolve_alias(self, attr):
438 # type: (str) -> str
439 new_attr, version = self.deprecated_fields[attr]
440 warnings.warn(
441 "%s has been deprecated in favor of %s since %s !" % (
442 attr, new_attr, version
443 ), DeprecationWarning
444 )
445 return new_attr
447 def getfieldval(self, attr):
448 # type: (str) -> Any
449 if self.deprecated_fields and attr in self.deprecated_fields:
450 attr = self._resolve_alias(attr)
451 if attr in self.fields:
452 return self.fields[attr]
453 if attr in self.overloaded_fields:
454 return self.overloaded_fields[attr]
455 if attr in self.default_fields:
456 return self.default_fields[attr]
457 return self.payload.getfieldval(attr)
459 def getfield_and_val(self, attr):
460 # type: (str) -> Tuple[AnyField, Any]
461 if self.deprecated_fields and attr in self.deprecated_fields:
462 attr = self._resolve_alias(attr)
463 if attr in self.fields:
464 return self.get_field(attr), self.fields[attr]
465 if attr in self.overloaded_fields:
466 return self.get_field(attr), self.overloaded_fields[attr]
467 if attr in self.default_fields:
468 return self.get_field(attr), self.default_fields[attr]
469 raise ValueError
471 def __getattr__(self, attr):
472 # type: (str) -> Any
473 try:
474 fld, v = self.getfield_and_val(attr)
475 except ValueError:
476 return self.payload.__getattr__(attr)
477 if fld is not None:
478 return v if isinstance(v, RawVal) else fld.i2h(self, v)
479 return v
481 def setfieldval(self, attr, val):
482 # type: (str, Any) -> None
483 if self.deprecated_fields and attr in self.deprecated_fields:
484 attr = self._resolve_alias(attr)
485 if attr in self.default_fields:
486 fld = self.get_field(attr)
487 if fld is None:
488 any2i = lambda x, y: y # type: Callable[..., Any]
489 else:
490 any2i = fld.any2i
491 self.fields[attr] = val if isinstance(val, RawVal) else \
492 any2i(self, val)
493 self.explicit = 0
494 self.raw_packet_cache = None
495 self.raw_packet_cache_fields = None
496 self.wirelen = None
497 elif attr == "payload":
498 self.remove_payload()
499 self.add_payload(val)
500 else:
501 self.payload.setfieldval(attr, val)
503 def __setattr__(self, attr, val):
504 # type: (str, Any) -> None
505 if attr in self.__all_slots__:
506 return object.__setattr__(self, attr, val)
507 try:
508 return self.setfieldval(attr, val)
509 except AttributeError:
510 pass
511 return object.__setattr__(self, attr, val)
513 def delfieldval(self, attr):
514 # type: (str) -> None
515 if attr in self.fields:
516 del self.fields[attr]
517 self.explicit = 0 # in case a default value must be explicit
518 self.raw_packet_cache = None
519 self.raw_packet_cache_fields = None
520 self.wirelen = None
521 elif attr in self.default_fields:
522 pass
523 elif attr == "payload":
524 self.remove_payload()
525 else:
526 self.payload.delfieldval(attr)
528 def __delattr__(self, attr):
529 # type: (str) -> None
530 if attr == "payload":
531 return self.remove_payload()
532 if attr in self.__all_slots__:
533 return object.__delattr__(self, attr)
534 try:
535 return self.delfieldval(attr)
536 except AttributeError:
537 pass
538 return object.__delattr__(self, attr)
540 def _superdir(self):
541 # type: () -> Set[str]
542 """
543 Return a list of slots and methods, including those from subclasses.
544 """
545 attrs = set() # type: Set[str]
546 cls = self.__class__
547 if hasattr(cls, '__all_slots__'):
548 attrs.update(cls.__all_slots__)
549 for bcls in cls.__mro__:
550 if hasattr(bcls, '__dict__'):
551 attrs.update(bcls.__dict__)
552 return attrs
554 def __dir__(self):
555 # type: () -> List[str]
556 """
557 Add fields to tab completion list.
558 """
559 return sorted(itertools.chain(self._superdir(), self.default_fields))
561 def __repr__(self):
562 # type: () -> str
563 s = ""
564 ct = conf.color_theme
565 for f in self.fields_desc:
566 if isinstance(f, ConditionalField) and not f._evalcond(self):
567 continue
568 if f.name in self.fields:
569 fval = self.fields[f.name]
570 if isinstance(fval, (list, dict, set)) and len(fval) == 0:
571 continue
572 val = f.i2repr(self, fval)
573 elif f.name in self.overloaded_fields:
574 fover = self.overloaded_fields[f.name]
575 if isinstance(fover, (list, dict, set)) and len(fover) == 0:
576 continue
577 val = f.i2repr(self, fover)
578 else:
579 continue
580 if isinstance(f, Emph) or f in conf.emph:
581 ncol = ct.emph_field_name
582 vcol = ct.emph_field_value
583 else:
584 ncol = ct.field_name
585 vcol = ct.field_value
587 s += " %s%s%s" % (ncol(f.name),
588 ct.punct("="),
589 vcol(val))
590 return "%s%s %s %s%s%s" % (ct.punct("<"),
591 ct.layer_name(self.__class__.__name__),
592 s,
593 ct.punct("|"),
594 repr(self.payload),
595 ct.punct(">"))
597 def __str__(self):
598 # type: () -> str
599 return self.summary()
601 def __bytes__(self):
602 # type: () -> bytes
603 return self.build()
605 def __div__(self, other):
606 # type: (Any) -> Self
607 if isinstance(other, Packet):
608 cloneA = self.copy()
609 cloneB = other.copy()
610 cloneA.add_payload(cloneB)
611 return cloneA
612 elif isinstance(other, (bytes, str, bytearray, memoryview)):
613 return self / conf.raw_layer(load=bytes_encode(other))
614 else:
615 return other.__rdiv__(self) # type: ignore
616 __truediv__ = __div__
618 def __rdiv__(self, other):
619 # type: (Any) -> Packet
620 if isinstance(other, (bytes, str, bytearray, memoryview)):
621 return conf.raw_layer(load=bytes_encode(other)) / self
622 else:
623 raise TypeError
624 __rtruediv__ = __rdiv__
626 def __mul__(self, other):
627 # type: (Any) -> List[Packet]
628 if isinstance(other, int):
629 return [self] * other
630 else:
631 raise TypeError
633 def __rmul__(self, other):
634 # type: (Any) -> List[Packet]
635 return self.__mul__(other)
637 def __nonzero__(self):
638 # type: () -> bool
639 return True
640 __bool__ = __nonzero__
642 def __len__(self):
643 # type: () -> int
644 return len(self.__bytes__())
646 def copy_field_value(self, fieldname, value):
647 # type: (str, Any) -> Any
648 return self.get_field(fieldname).do_copy(value)
650 def copy_fields_dict(self, fields):
651 # type: (_T) -> _T
652 if fields is None:
653 return None
654 return {fname: self.copy_field_value(fname, fval)
655 for fname, fval in fields.items()}
657 def _raw_packet_cache_field_value(self, fld, val, copy=False):
658 # type: (AnyField, Any, bool) -> Optional[Any]
659 """Get a value representative of a mutable field to detect changes"""
660 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any]
661 if fld.holds_packets:
662 # avoid copying whole packets (perf: #GH3894)
663 if fld.islist:
664 return [
665 _cpy(x.fields) for x in val
666 ]
667 else:
668 return _cpy(val.fields)
669 elif fld.islist or fld.ismutable:
670 return _cpy(val)
671 return None
673 def clear_cache(self):
674 # type: () -> None
675 """Clear the raw packet cache for the field and all its subfields"""
676 self.raw_packet_cache = None
677 for fname, fval in self.fields.items():
678 fld = self.get_field(fname)
679 if fld.holds_packets:
680 if isinstance(fval, Packet):
681 fval.clear_cache()
682 elif isinstance(fval, list):
683 for fsubval in fval:
684 fsubval.clear_cache()
685 self.payload.clear_cache()
687 def self_build(self):
688 # type: () -> bytes
689 """
690 Create the default layer regarding fields_desc dict
692 :param field_pos_list:
693 """
694 if self.raw_packet_cache is not None and \
695 self.raw_packet_cache_fields is not None:
696 for fname, fval in self.raw_packet_cache_fields.items():
697 fld, val = self.getfield_and_val(fname)
698 if self._raw_packet_cache_field_value(fld, val) != fval:
699 self.raw_packet_cache = None
700 self.raw_packet_cache_fields = None
701 self.wirelen = None
702 break
703 if self.raw_packet_cache is not None:
704 return self.raw_packet_cache
705 p = b""
706 for f in self.fields_desc:
707 val = self.getfieldval(f.name)
708 if isinstance(val, RawVal):
709 p += bytes(val)
710 else:
711 try:
712 p = f.addfield(self, p, val)
713 except Exception as ex:
714 try:
715 ex.args = (
716 "While dissecting field '%s': " % f.name +
717 ex.args[0],
718 ) + ex.args[1:]
719 except (AttributeError, IndexError):
720 pass
721 raise ex
722 return p
724 def do_build_payload(self):
725 # type: () -> bytes
726 """
727 Create the default version of the payload layer
729 :return: a string of payload layer
730 """
731 return self.payload.do_build()
733 def do_build(self):
734 # type: () -> bytes
735 """
736 Create the default version of the layer
738 :return: a string of the packet with the payload
739 """
740 if not self.explicit:
741 self = next(iter(self))
742 pkt = self.self_build()
743 for t in self.post_transforms:
744 pkt = t(pkt)
745 pay = self.do_build_payload()
746 if self.raw_packet_cache is None:
747 return self.post_build(pkt, pay)
748 else:
749 return pkt + pay
751 def build_padding(self):
752 # type: () -> bytes
753 return self.payload.build_padding()
755 def build(self):
756 # type: () -> bytes
757 """
758 Create the current layer
760 :return: string of the packet with the payload
761 """
762 p = self.do_build()
763 p += self.build_padding()
764 p = self.build_done(p)
765 return p
767 def post_build(self, pkt, pay):
768 # type: (bytes, bytes) -> bytes
769 """
770 DEV: called right after the current layer is build.
772 :param str pkt: the current packet (build by self_build function)
773 :param str pay: the packet payload (build by do_build_payload function)
774 :return: a string of the packet with the payload
775 """
776 return pkt + pay
778 def build_done(self, p):
779 # type: (bytes) -> bytes
780 return self.payload.build_done(p)
782 def do_build_ps(self):
783 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501
784 p = b""
785 pl = []
786 q = b""
787 for f in self.fields_desc:
788 if isinstance(f, ConditionalField) and not f._evalcond(self):
789 continue
790 p = f.addfield(self, p, self.getfieldval(f.name))
791 if isinstance(p, bytes):
792 r = p[len(q):]
793 q = p
794 else:
795 r = b""
796 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r))
798 pkt, lst = self.payload.build_ps(internal=1)
799 p += pkt
800 lst.append((self, pl))
802 return p, lst
804 def build_ps(self, internal=0):
805 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501
806 p, lst = self.do_build_ps()
807# if not internal:
808# pkt = self
809# while pkt.haslayer(conf.padding_layer):
810# pkt = pkt.getlayer(conf.padding_layer)
811# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) )
812# p += pkt.load
813# pkt = pkt.payload
814 return p, lst
816 def canvas_dump(self, layer_shift=0, rebuild=1):
817 # type: (int, int) -> pyx.canvas.canvas
818 if PYX == 0:
819 raise ImportError("PyX and its dependencies must be installed")
820 canvas = pyx.canvas.canvas()
821 if rebuild:
822 _, t = self.__class__(raw(self)).build_ps()
823 else:
824 _, t = self.build_ps()
825 YTXTI = len(t)
826 for _, l in t:
827 YTXTI += len(l)
828 YTXT = float(YTXTI)
829 YDUMP = YTXT
831 XSTART = 1
832 XDSTART = 10
833 y = 0.0
834 yd = 0.0
835 XMUL = 0.55
836 YMUL = 0.4
838 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb)
839 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb)
840# backcolor=makecol(0.376, 0.729, 0.525, 1.0)
842 def hexstr(x):
843 # type: (bytes) -> str
844 return " ".join("%02x" % orb(c) for c in x)
846 def make_dump_txt(x, y, txt):
847 # type: (int, float, bytes) -> pyx.text.text
848 return pyx.text.text(
849 XDSTART + x * XMUL,
850 (YDUMP - y) * YMUL,
851 r"\tt{%s}" % hexstr(txt),
852 [pyx.text.size.Large]
853 )
855 def make_box(o):
856 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox
857 return pyx.box.rect(
858 o.left(), o.bottom(), o.width(), o.height(),
859 relcenter=(0.5, 0.5)
860 )
862 def make_frame(lst):
863 # type: (List[Any]) -> pyx.path.path
864 if len(lst) == 1:
865 b = lst[0].bbox()
866 b.enlarge(pyx.unit.u_pt)
867 return b.path()
868 else:
869 fb = lst[0].bbox()
870 fb.enlarge(pyx.unit.u_pt)
871 lb = lst[-1].bbox()
872 lb.enlarge(pyx.unit.u_pt)
873 if len(lst) == 2 and fb.left() > lb.right():
874 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()),
875 pyx.path.lineto(fb.left(), fb.top()),
876 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501
877 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501
878 pyx.path.moveto(lb.left(), lb.top()),
879 pyx.path.lineto(lb.right(), lb.top()),
880 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501
881 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501
882 else:
883 # XXX
884 gb = lst[1].bbox()
885 if gb != lb:
886 gb.enlarge(pyx.unit.u_pt)
887 kb = lst[-2].bbox()
888 if kb != gb and kb != lb:
889 kb.enlarge(pyx.unit.u_pt)
890 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()),
891 pyx.path.lineto(fb.right(), fb.top()),
892 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501
893 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501
894 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501
895 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501
896 pyx.path.lineto(lb.left(), gb.top()),
897 pyx.path.lineto(fb.left(), gb.top()),
898 pyx.path.closepath(),)
900 def make_dump(s, # type: bytes
901 shift=0, # type: int
902 y=0., # type: float
903 col=None, # type: pyx.color.color
904 bkcol=None, # type: pyx.color.color
905 large=16 # type: int
906 ):
907 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501
908 c = pyx.canvas.canvas()
909 tlist = []
910 while s:
911 dmp, s = s[:large - shift], s[large - shift:]
912 txt = make_dump_txt(shift, y, dmp)
913 tlist.append(txt)
914 shift += len(dmp)
915 if shift >= 16:
916 shift = 0
917 y += 1
918 if col is None:
919 col = pyx.color.rgb.red
920 if bkcol is None:
921 bkcol = pyx.color.rgb.white
922 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501
923 for txt in tlist:
924 c.insert(txt)
925 return c, tlist[-1].bbox(), shift, y
927 last_shift, last_y = 0, 0.0
928 while t:
929 bkcol = next(backcolor)
930 proto, fields = t.pop()
931 y += 0.5
932 pt = pyx.text.text(
933 XSTART,
934 (YTXT - y) * YMUL,
935 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(
936 str(proto.name)
937 ),
938 [pyx.text.size.Large]
939 )
940 y += 1
941 ptbb = pt.bbox()
942 ptbb.enlarge(pyx.unit.u_pt * 2)
943 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501
944 canvas.insert(pt)
945 for field, fval, fdump in fields:
946 col = next(forecolor)
947 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501
948 if isinstance(field, BitField):
949 fsize = '%sb' % field.size
950 else:
951 fsize = '%sB' % len(fdump)
952 if (hasattr(field, 'field') and
953 'LE' in field.field.__class__.__name__[:3] or
954 'LE' in field.__class__.__name__[:3]):
955 fsize = r'$\scriptstyle\langle$' + fsize
956 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501
957 if isinstance(fval, str):
958 if len(fval) > 18:
959 fval = fval[:18] + "[...]"
960 else:
961 fval = ""
962 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501
963 y += 1.0
964 if fdump:
965 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501
967 dtb = target
968 vtb = vt.bbox()
969 bxvt = make_box(vtb)
970 bxdt = make_box(dtb)
971 dtb.enlarge(pyx.unit.u_pt)
972 try:
973 if yd < 0:
974 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501
975 else:
976 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501
977 except Exception:
978 pass
979 else:
980 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501
982 canvas.insert(dt)
984 canvas.insert(ft)
985 canvas.insert(st)
986 canvas.insert(vt)
987 last_y += layer_shift
989 return canvas
991 def extract_padding(self, s):
992 # type: (bytes) -> Tuple[bytes, Optional[bytes]]
993 """
994 DEV: to be overloaded to extract current layer's padding.
996 :param str s: the current layer
997 :return: a couple of strings (actual layer, padding)
998 """
999 return s, None
1001 def post_dissect(self, s):
1002 # type: (bytes) -> bytes
1003 """DEV: is called right after the current layer has been dissected"""
1004 return s
1006 def pre_dissect(self, s):
1007 # type: (bytes) -> bytes
1008 """DEV: is called right before the current layer is dissected"""
1009 return s
1011 def do_dissect(self, s):
1012 # type: (bytes) -> bytes
1013 _raw = s
1014 self.raw_packet_cache_fields = {}
1015 for f in self.fields_desc:
1016 s, fval = f.getfield(self, s)
1017 # Skip unused ConditionalField
1018 if isinstance(f, ConditionalField) and fval is None:
1019 continue
1020 # We need to track fields with mutable values to discard
1021 # .raw_packet_cache when needed.
1022 if (f.islist or f.holds_packets or f.ismutable) and fval is not None:
1023 self.raw_packet_cache_fields[f.name] = \
1024 self._raw_packet_cache_field_value(f, fval, copy=True)
1025 self.fields[f.name] = fval
1026 # Nothing left to dissect
1027 if not s and (isinstance(f, MayEnd) or
1028 (fval is not None and isinstance(f, ConditionalField) and
1029 isinstance(f.fld, MayEnd))):
1030 break
1031 self.raw_packet_cache = _raw[:-len(s)] if s else _raw
1032 self.explicit = 1
1033 return s
1035 def do_dissect_payload(self, s):
1036 # type: (bytes) -> None
1037 """
1038 Perform the dissection of the layer's payload
1040 :param str s: the raw layer
1041 """
1042 if s:
1043 if (
1044 self.stop_dissection_after and
1045 isinstance(self, self.stop_dissection_after)
1046 ):
1047 # stop dissection here
1048 p = conf.raw_layer(s, _internal=1, _underlayer=self)
1049 self.add_payload(p)
1050 return
1051 cls = self.guess_payload_class(s)
1052 try:
1053 p = cls(
1054 s,
1055 stop_dissection_after=self.stop_dissection_after,
1056 _internal=1,
1057 _underlayer=self,
1058 )
1059 except KeyboardInterrupt:
1060 raise
1061 except Exception:
1062 if conf.debug_dissector:
1063 if issubtype(cls, Packet):
1064 log_runtime.error("%s dissector failed", cls.__name__)
1065 else:
1066 log_runtime.error("%s.guess_payload_class() returned "
1067 "[%s]",
1068 self.__class__.__name__, repr(cls))
1069 if cls is not None:
1070 raise
1071 p = conf.raw_layer(s, _internal=1, _underlayer=self)
1072 self.add_payload(p)
1074 def dissect(self, s):
1075 # type: (bytes) -> None
1076 s = self.pre_dissect(s)
1078 s = self.do_dissect(s)
1080 s = self.post_dissect(s)
1082 payl, pad = self.extract_padding(s)
1083 self.do_dissect_payload(payl)
1084 if pad and conf.padding:
1085 self.add_payload(conf.padding_layer(pad))
1087 def guess_payload_class(self, payload):
1088 # type: (bytes) -> Type[Packet]
1089 """
1090 DEV: Guesses the next payload class from layer bonds.
1091 Can be overloaded to use a different mechanism.
1093 :param str payload: the layer's payload
1094 :return: the payload class
1095 """
1096 for t in self.aliastypes:
1097 for fval, cls in t.payload_guess:
1098 try:
1099 if all(v == self.getfieldval(k)
1100 for k, v in fval.items()):
1101 return cls # type: ignore
1102 except AttributeError:
1103 pass
1104 return self.default_payload_class(payload)
1106 def default_payload_class(self, payload):
1107 # type: (bytes) -> Type[Packet]
1108 """
1109 DEV: Returns the default payload class if nothing has been found by the
1110 guess_payload_class() method.
1112 :param str payload: the layer's payload
1113 :return: the default payload class define inside the configuration file
1114 """
1115 return conf.raw_layer
1117 def hide_defaults(self):
1118 # type: () -> None
1119 """Removes fields' values that are the same as default values."""
1120 # use list(): self.fields is modified in the loop
1121 for k, v in list(self.fields.items()):
1122 v = self.fields[k]
1123 if k in self.default_fields:
1124 if self.default_fields[k] == v:
1125 del self.fields[k]
1126 self.payload.hide_defaults()
1128 def clone_with(self, payload=None, **kargs):
1129 # type: (Optional[Any], **Any) -> Any
1130 pkt = self.__class__()
1131 pkt.explicit = 1
1132 pkt.fields = kargs
1133 pkt.default_fields = self.copy_fields_dict(self.default_fields)
1134 pkt.overloaded_fields = self.overloaded_fields.copy()
1135 pkt.time = self.time
1136 pkt.underlayer = self.underlayer
1137 pkt.parent = self.parent
1138 pkt.post_transforms = self.post_transforms
1139 pkt.raw_packet_cache = self.raw_packet_cache
1140 pkt.raw_packet_cache_fields = self.copy_fields_dict(
1141 self.raw_packet_cache_fields
1142 )
1143 pkt.wirelen = self.wirelen
1144 pkt.comment = self.comment
1145 pkt.sniffed_on = self.sniffed_on
1146 pkt.direction = self.direction
1147 if payload is not None:
1148 pkt.add_payload(payload)
1149 return pkt
1151 def __iter__(self):
1152 # type: () -> Iterator[Packet]
1153 """Iterates through all sub-packets generated by this Packet."""
1154 def loop(todo, done, self=self):
1155 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet]
1156 if todo:
1157 eltname = todo.pop()
1158 elt = self.getfieldval(eltname)
1159 if not isinstance(elt, Gen):
1160 if self.get_field(eltname).islist:
1161 elt = SetGen([elt])
1162 else:
1163 elt = SetGen(elt)
1164 for e in elt:
1165 done[eltname] = e
1166 for x in loop(todo[:], done):
1167 yield x
1168 else:
1169 if isinstance(self.payload, NoPayload):
1170 payloads = SetGen([None]) # type: SetGen[Packet]
1171 else:
1172 payloads = self.payload
1173 for payl in payloads:
1174 # Let's make sure subpackets are consistent
1175 done2 = done.copy()
1176 for k in done2:
1177 if isinstance(done2[k], VolatileValue):
1178 done2[k] = done2[k]._fix()
1179 pkt = self.clone_with(payload=payl, **done2)
1180 yield pkt
1182 if self.explicit or self.raw_packet_cache is not None:
1183 todo = []
1184 done = self.fields
1185 else:
1186 todo = [k for (k, v) in itertools.chain(self.default_fields.items(),
1187 self.overloaded_fields.items())
1188 if isinstance(v, VolatileValue)] + list(self.fields)
1189 done = {}
1190 return loop(todo, done)
1192 def iterpayloads(self):
1193 # type: () -> Iterator[Packet]
1194 """Used to iter through the payloads of a Packet.
1195 Useful for DNS or 802.11 for instance.
1196 """
1197 yield self
1198 current = self
1199 while current.payload:
1200 current = current.payload
1201 yield current
1203 def __gt__(self, other):
1204 # type: (Packet) -> int
1205 """True if other is an answer from self (self ==> other)."""
1206 if isinstance(other, Packet):
1207 return other < self
1208 elif isinstance(other, bytes):
1209 return 1
1210 else:
1211 raise TypeError((self, other))
1213 def __lt__(self, other):
1214 # type: (Packet) -> int
1215 """True if self is an answer from other (other ==> self)."""
1216 if isinstance(other, Packet):
1217 return self.answers(other)
1218 elif isinstance(other, bytes):
1219 return 1
1220 else:
1221 raise TypeError((self, other))
1223 def __eq__(self, other):
1224 # type: (Any) -> bool
1225 if not isinstance(other, self.__class__):
1226 return False
1227 for f in self.fields_desc:
1228 if f not in other.fields_desc:
1229 return False
1230 if self.getfieldval(f.name) != other.getfieldval(f.name):
1231 return False
1232 return self.payload == other.payload
1234 def __ne__(self, other):
1235 # type: (Any) -> bool
1236 return not self.__eq__(other)
1238 # Note: setting __hash__ to None is the standard way
1239 # of making an object un-hashable. mypy doesn't know that
1240 __hash__ = None # type: ignore
1242 def hashret(self):
1243 # type: () -> bytes
1244 """DEV: returns a string that has the same value for a request
1245 and its answer."""
1246 return self.payload.hashret()
1248 def answers(self, other):
1249 # type: (Packet) -> int
1250 """DEV: true if self is an answer from other"""
1251 if other.__class__ == self.__class__:
1252 return self.payload.answers(other.payload)
1253 return 0
1255 def layers(self):
1256 # type: () -> List[Type[Packet]]
1257 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501
1258 layers = []
1259 lyr = self # type: Optional[Packet]
1260 while lyr:
1261 layers.append(lyr.__class__)
1262 lyr = lyr.payload.getlayer(0, _subclass=True)
1263 return layers
1265 def haslayer(self, cls, _subclass=None):
1266 # type: (Union[Type[Packet], str], Optional[bool]) -> int
1267 """
1268 true if self has a layer that is an instance of cls.
1269 Superseded by "cls in self" syntax.
1270 """
1271 if _subclass is None:
1272 _subclass = self.match_subclass or None
1273 if _subclass:
1274 match = issubtype
1275 else:
1276 match = lambda x, t: bool(x == t)
1277 if cls is None or match(self.__class__, cls) \
1278 or cls in [self.__class__.__name__, self._name]:
1279 return True
1280 for f in self.packetfields:
1281 fvalue_gen = self.getfieldval(f.name)
1282 if fvalue_gen is None:
1283 continue
1284 if not f.islist:
1285 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1286 for fvalue in fvalue_gen:
1287 if isinstance(fvalue, Packet):
1288 ret = fvalue.haslayer(cls, _subclass=_subclass)
1289 if ret:
1290 return ret
1291 return self.payload.haslayer(cls, _subclass=_subclass)
1293 def getlayer(self,
1294 cls, # type: Union[int, Type[Packet], str]
1295 nb=1, # type: int
1296 _track=None, # type: Optional[List[int]]
1297 _subclass=None, # type: Optional[bool]
1298 **flt # type: Any
1299 ):
1300 # type: (...) -> Optional[Packet]
1301 """Return the nb^th layer that is an instance of cls, matching flt
1302values.
1303 """
1304 if _subclass is None:
1305 _subclass = self.match_subclass or None
1306 if _subclass:
1307 match = issubtype
1308 else:
1309 match = lambda x, t: bool(x == t)
1310 # Note:
1311 # cls can be int, packet, str
1312 # string_class_name can be packet, str (packet or packet+field)
1313 # class_name can be packet, str (packet only)
1314 if isinstance(cls, int):
1315 nb = cls + 1
1316 string_class_name = "" # type: Union[Type[Packet], str]
1317 else:
1318 string_class_name = cls
1319 class_name = "" # type: Union[Type[Packet], str]
1320 fld = None # type: Optional[str]
1321 if isinstance(string_class_name, str) and "." in string_class_name:
1322 class_name, fld = string_class_name.split(".", 1)
1323 else:
1324 class_name, fld = string_class_name, None
1325 if not class_name or match(self.__class__, class_name) \
1326 or class_name in [self.__class__.__name__, self._name]:
1327 if all(self.getfieldval(fldname) == fldvalue
1328 for fldname, fldvalue in flt.items()):
1329 if nb == 1:
1330 if fld is None:
1331 return self
1332 else:
1333 return self.getfieldval(fld) # type: ignore
1334 else:
1335 nb -= 1
1336 for f in self.packetfields:
1337 fvalue_gen = self.getfieldval(f.name)
1338 if fvalue_gen is None:
1339 continue
1340 if not f.islist:
1341 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1342 for fvalue in fvalue_gen:
1343 if isinstance(fvalue, Packet):
1344 track = [] # type: List[int]
1345 ret = fvalue.getlayer(class_name, nb=nb, _track=track,
1346 _subclass=_subclass, **flt)
1347 if ret is not None:
1348 return ret
1349 nb = track[0]
1350 return self.payload.getlayer(class_name, nb=nb, _track=_track,
1351 _subclass=_subclass, **flt)
1353 def firstlayer(self):
1354 # type: () -> Packet
1355 q = self
1356 while q.underlayer is not None:
1357 q = q.underlayer
1358 return q
1360 def __getitem__(self, cls):
1361 # type: (Union[Type[Packet], str]) -> Any
1362 if isinstance(cls, slice):
1363 lname = cls.start
1364 if cls.stop:
1365 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {}))
1366 else:
1367 ret = self.getlayer(cls.start, **(cls.step or {}))
1368 else:
1369 lname = cls
1370 ret = self.getlayer(cls)
1371 if ret is None:
1372 if isinstance(lname, type):
1373 name = lname.__name__
1374 elif not isinstance(lname, bytes):
1375 name = repr(lname)
1376 else:
1377 name = cast(str, lname)
1378 raise IndexError("Layer [%s] not found" % name)
1379 return ret
1381 def __delitem__(self, cls):
1382 # type: (Type[Packet]) -> None
1383 del self[cls].underlayer.payload
1385 def __setitem__(self, cls, val):
1386 # type: (Type[Packet], Packet) -> None
1387 self[cls].underlayer.payload = val
1389 def __contains__(self, cls):
1390 # type: (Union[Type[Packet], str]) -> int
1391 """
1392 "cls in self" returns true if self has a layer which is an
1393 instance of cls.
1394 """
1395 return self.haslayer(cls)
1397 def route(self):
1398 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
1399 return self.payload.route()
1401 def fragment(self, *args, **kargs):
1402 # type: (*Any, **Any) -> List[Packet]
1403 return self.payload.fragment(*args, **kargs)
1405 def display(self, *args, **kargs): # Deprecated. Use show()
1406 # type: (*Any, **Any) -> None
1407 """Deprecated. Use show() method."""
1408 self.show(*args, **kargs)
1410 def _show_or_dump(self,
1411 dump=False, # type: bool
1412 indent=3, # type: int
1413 lvl="", # type: str
1414 label_lvl="", # type: str
1415 first_call=True # type: bool
1416 ):
1417 # type: (...) -> Optional[str]
1418 """
1419 Internal method that shows or dumps a hierarchical view of a packet.
1420 Called by show.
1422 :param dump: determine if it prints or returns the string value
1423 :param int indent: the size of indentation for each layer
1424 :param str lvl: additional information about the layer lvl
1425 :param str label_lvl: additional information about the layer fields
1426 :param first_call: determine if the current function is the first
1427 :return: return a hierarchical view if dump, else print it
1428 """
1430 if dump:
1431 from scapy.themes import ColorTheme, AnsiColorTheme
1432 ct: ColorTheme = AnsiColorTheme() # No color for dump output
1433 else:
1434 ct = conf.color_theme
1435 s = "%s%s %s %s\n" % (label_lvl,
1436 ct.punct("###["),
1437 ct.layer_name(self.name),
1438 ct.punct("]###"))
1439 fields = self.fields_desc.copy()
1440 while fields:
1441 f = fields.pop(0)
1442 if isinstance(f, ConditionalField) and not f._evalcond(self):
1443 continue
1444 if hasattr(f, "fields"): # Field has subfields
1445 s += "%s %s =\n" % (
1446 label_lvl + lvl,
1447 ct.depreciate_field_name(f.name),
1448 )
1449 lvl += " " * indent * self.show_indent
1450 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)):
1451 fields.insert(i, fld)
1452 continue
1453 if isinstance(f, Emph) or f in conf.emph:
1454 ncol = ct.emph_field_name
1455 vcol = ct.emph_field_value
1456 else:
1457 ncol = ct.field_name
1458 vcol = ct.field_value
1459 pad = max(0, 10 - len(f.name)) * " "
1460 fvalue = self.getfieldval(f.name)
1461 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501
1462 s += "%s %s%s%s%s\n" % (label_lvl + lvl,
1463 ct.punct("\\"),
1464 ncol(f.name),
1465 pad,
1466 ct.punct("\\"))
1467 fvalue_gen = SetGen(
1468 fvalue,
1469 _iterpacket=0
1470 ) # type: SetGen[Packet]
1471 for fvalue in fvalue_gen:
1472 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501
1473 else:
1474 begn = "%s %s%s%s " % (label_lvl + lvl,
1475 ncol(f.name),
1476 pad,
1477 ct.punct("="),)
1478 reprval = f.i2repr(self, fvalue)
1479 if isinstance(reprval, str):
1480 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501
1481 len(lvl) +
1482 len(f.name) +
1483 4))
1484 s += "%s%s\n" % (begn, vcol(reprval))
1485 if self.payload:
1486 s += self.payload._show_or_dump( # type: ignore
1487 dump=dump,
1488 indent=indent,
1489 lvl=lvl + (" " * indent * self.show_indent),
1490 label_lvl=label_lvl,
1491 first_call=False
1492 )
1494 if first_call and not dump:
1495 print(s)
1496 return None
1497 else:
1498 return s
1500 def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1501 # type: (bool, int, str, str) -> Optional[Any]
1502 """
1503 Prints or returns (when "dump" is true) a hierarchical view of the
1504 packet.
1506 :param dump: determine if it prints or returns the string value
1507 :param int indent: the size of indentation for each layer
1508 :param str lvl: additional information about the layer lvl
1509 :param str label_lvl: additional information about the layer fields
1510 :return: return a hierarchical view if dump, else print it
1511 """
1512 return self._show_or_dump(dump, indent, lvl, label_lvl)
1514 def show2(self, dump=False, indent=3, lvl="", label_lvl=""):
1515 # type: (bool, int, str, str) -> Optional[Any]
1516 """
1517 Prints or returns (when "dump" is true) a hierarchical view of an
1518 assembled version of the packet, so that automatic fields are
1519 calculated (checksums, etc.)
1521 :param dump: determine if it prints or returns the string value
1522 :param int indent: the size of indentation for each layer
1523 :param str lvl: additional information about the layer lvl
1524 :param str label_lvl: additional information about the layer fields
1525 :return: return a hierarchical view if dump, else print it
1526 """
1527 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl)
1529 def sprintf(self, fmt, relax=1):
1530 # type: (str, int) -> str
1531 """
1532 sprintf(format, [relax=1]) -> str
1534 Where format is a string that can include directives. A directive
1535 begins and ends by % and has the following format:
1536 ``%[fmt[r],][cls[:nb].]field%``
1538 :param fmt: is a classic printf directive, "r" can be appended for raw
1539 substitution:
1540 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer
1541 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer).
1542 Special case : "%.time%" is the creation time.
1543 Ex::
1545 p.sprintf(
1546 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% "
1547 "%03xr,IP.proto% %r,TCP.flags%"
1548 )
1550 Moreover, the format string can include conditional statements. A
1551 conditional statement looks like : {layer:string} where layer is a
1552 layer name, and string is the string to insert in place of the
1553 condition if it is true, i.e. if layer is present. If layer is
1554 preceded by a "!", the result is inverted. Conditions can be
1555 imbricated. A valid statement can be::
1557 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
1558 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}")
1560 A side effect is that, to obtain "{" and "}" characters, you must use
1561 "%(" and "%)".
1562 """
1564 escape = {"%": "%",
1565 "(": "{",
1566 ")": "}"}
1568 # Evaluate conditions
1569 while "{" in fmt:
1570 i = fmt.rindex("{")
1571 j = fmt[i + 1:].index("}")
1572 cond = fmt[i + 1:i + j + 1]
1573 k = cond.find(":")
1574 if k < 0:
1575 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501
1576 cond, format_ = cond[:k], cond[k + 1:]
1577 res = False
1578 if cond[0] == "!":
1579 res = True
1580 cond = cond[1:]
1581 if self.haslayer(cond):
1582 res = not res
1583 if not res:
1584 format_ = ""
1585 fmt = fmt[:i] + format_ + fmt[i + j + 2:]
1587 # Evaluate directives
1588 s = ""
1589 while "%" in fmt:
1590 i = fmt.index("%")
1591 s += fmt[:i]
1592 fmt = fmt[i + 1:]
1593 if fmt and fmt[0] in escape:
1594 s += escape[fmt[0]]
1595 fmt = fmt[1:]
1596 continue
1597 try:
1598 i = fmt.index("%")
1599 sfclsfld = fmt[:i]
1600 fclsfld = sfclsfld.split(",")
1601 if len(fclsfld) == 1:
1602 f = "s"
1603 clsfld = fclsfld[0]
1604 elif len(fclsfld) == 2:
1605 f, clsfld = fclsfld
1606 else:
1607 raise Scapy_Exception
1608 if "." in clsfld:
1609 cls, fld = clsfld.split(".")
1610 else:
1611 cls = self.__class__.__name__
1612 fld = clsfld
1613 num = 1
1614 if ":" in cls:
1615 cls, snum = cls.split(":")
1616 num = int(snum)
1617 fmt = fmt[i + 1:]
1618 except Exception:
1619 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501
1620 else:
1621 if fld == "time":
1622 val = time.strftime(
1623 "%H:%M:%S.%%06i",
1624 time.localtime(float(self.time))
1625 ) % int((self.time - int(self.time)) * 1000000)
1626 elif cls == self.__class__.__name__ and hasattr(self, fld):
1627 if num > 1:
1628 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501
1629 f = "s"
1630 else:
1631 try:
1632 val = self.getfieldval(fld)
1633 except AttributeError:
1634 val = getattr(self, fld)
1635 if f[-1] == "r": # Raw field value
1636 f = f[:-1]
1637 if not f:
1638 f = "s"
1639 else:
1640 if fld in self.fieldtype:
1641 val = self.fieldtype[fld].i2repr(self, val)
1642 else:
1643 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax)
1644 f = "s"
1645 s += ("%" + f) % val
1647 s += fmt
1648 return s
1650 def mysummary(self):
1651 # type: () -> str
1652 """DEV: can be overloaded to return a string that summarizes the layer.
1653 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501
1654 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501
1655 mysummary() must be called if they are present."""
1656 return ""
1658 def _do_summary(self):
1659 # type: () -> Tuple[int, str, List[Any]]
1660 found, s, needed = self.payload._do_summary()
1661 ret = ""
1662 if not found or self.__class__ in needed:
1663 ret = self.mysummary()
1664 if isinstance(ret, tuple):
1665 ret, n = ret
1666 needed += n
1667 if ret or needed:
1668 found = 1
1669 if not ret:
1670 ret = self.__class__.__name__ if self.show_summary else ""
1671 if self.__class__ in conf.emph:
1672 impf = []
1673 for f in self.fields_desc:
1674 if f in conf.emph:
1675 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501
1676 ret = "%s [%s]" % (ret, " ".join(impf))
1677 if ret and s:
1678 ret = "%s / %s" % (ret, s)
1679 else:
1680 ret = "%s%s" % (ret, s)
1681 return found, ret, needed
1683 def summary(self, intern=0):
1684 # type: (int) -> str
1685 """Prints a one line summary of a packet."""
1686 return self._do_summary()[1]
1688 def lastlayer(self, layer=None):
1689 # type: (Optional[Packet]) -> Packet
1690 """Returns the uppest layer of the packet"""
1691 return self.payload.lastlayer(self)
1693 def decode_payload_as(self, cls):
1694 # type: (Type[Packet]) -> None
1695 """Reassembles the payload and decode it using another packet class"""
1696 s = raw(self.payload)
1697 self.payload = cls(s, _internal=1, _underlayer=self)
1698 pp = self
1699 while pp.underlayer is not None:
1700 pp = pp.underlayer
1701 self.payload.dissection_done(pp)
1703 def _command(self, json=False):
1704 # type: (bool) -> List[Tuple[str, Any]]
1705 """
1706 Internal method used to generate command() and json()
1707 """
1708 f = []
1709 iterator: Iterator[Tuple[str, Any]]
1710 if json:
1711 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc)
1712 else:
1713 iterator = iter(self.fields.items())
1714 for fn, fv in iterator:
1715 fld = self.get_field(fn)
1716 if isinstance(fv, (list, dict, set)) and not fv and not fld.default:
1717 continue
1718 if isinstance(fv, Packet):
1719 if json:
1720 fv = {k: v for (k, v) in fv._command(json=True)}
1721 else:
1722 fv = fv.command()
1723 elif fld.islist and fld.holds_packets and isinstance(fv, list):
1724 if json:
1725 fv = [
1726 {k: v for (k, v) in x}
1727 for x in map(lambda y: Packet._command(y, json=True), fv)
1728 ]
1729 else:
1730 fv = "[%s]" % ",".join(map(Packet.command, fv))
1731 elif fld.islist and isinstance(fv, list):
1732 if json:
1733 fv = [
1734 getattr(x, 'command', lambda: repr(x))()
1735 for x in fv
1736 ]
1737 else:
1738 fv = "[%s]" % ",".join(
1739 getattr(x, 'command', lambda: repr(x))()
1740 for x in fv
1741 )
1742 elif isinstance(fv, FlagValue):
1743 fv = int(fv)
1744 elif callable(getattr(fv, 'command', None)):
1745 fv = fv.command(json=json)
1746 else:
1747 if json:
1748 if isinstance(fv, bytes):
1749 fv = fv.decode("utf-8", errors="backslashreplace")
1750 else:
1751 fv = fld.i2h(self, fv)
1752 else:
1753 fv = repr(fld.i2h(self, fv))
1754 f.append((fn, fv))
1755 return f
1757 def command(self):
1758 # type: () -> str
1759 """
1760 Returns a string representing the command you have to type to
1761 obtain the same packet
1762 """
1763 c = "%s(%s)" % (
1764 self.__class__.__name__,
1765 ", ".join("%s=%s" % x for x in self._command())
1766 )
1767 pc = self.payload.command()
1768 if pc:
1769 c += "/" + pc
1770 return c
1772 def json(self):
1773 # type: () -> str
1774 """
1775 Returns a JSON representing the packet.
1777 Please note that this cannot be used for bijective usage: data loss WILL occur,
1778 so it will not make sense to try to rebuild the packet from the output.
1779 This must only be used for a grepping/displaying purpose.
1780 """
1781 dump = json.dumps({k: v for (k, v) in self._command(json=True)})
1782 pc = self.payload.json()
1783 if pc:
1784 dump = dump[:-1] + ", \"payload\": %s}" % pc
1785 return dump
1788class NoPayload(Packet):
1789 def __new__(cls, *args, **kargs):
1790 # type: (Type[Packet], *Any, **Any) -> NoPayload
1791 singl = cls.__dict__.get("__singl__")
1792 if singl is None:
1793 cls.__singl__ = singl = Packet.__new__(cls)
1794 Packet.__init__(singl)
1795 return cast(NoPayload, singl)
1797 def __init__(self, *args, **kargs):
1798 # type: (*Any, **Any) -> None
1799 pass
1801 def dissection_done(self, pkt):
1802 # type: (Packet) -> None
1803 pass
1805 def add_payload(self, payload):
1806 # type: (Union[Packet, bytes]) -> NoReturn
1807 raise Scapy_Exception("Can't add payload to NoPayload instance")
1809 def remove_payload(self):
1810 # type: () -> None
1811 pass
1813 def add_underlayer(self, underlayer):
1814 # type: (Any) -> None
1815 pass
1817 def remove_underlayer(self, other):
1818 # type: (Packet) -> None
1819 pass
1821 def add_parent(self, parent):
1822 # type: (Any) -> None
1823 pass
1825 def remove_parent(self, other):
1826 # type: (Packet) -> None
1827 pass
1829 def copy(self):
1830 # type: () -> NoPayload
1831 return self
1833 def clear_cache(self):
1834 # type: () -> None
1835 pass
1837 def __repr__(self):
1838 # type: () -> str
1839 return ""
1841 def __str__(self):
1842 # type: () -> str
1843 return ""
1845 def __bytes__(self):
1846 # type: () -> bytes
1847 return b""
1849 def __nonzero__(self):
1850 # type: () -> bool
1851 return False
1852 __bool__ = __nonzero__
1854 def do_build(self):
1855 # type: () -> bytes
1856 return b""
1858 def build(self):
1859 # type: () -> bytes
1860 return b""
1862 def build_padding(self):
1863 # type: () -> bytes
1864 return b""
1866 def build_done(self, p):
1867 # type: (bytes) -> bytes
1868 return p
1870 def build_ps(self, internal=0):
1871 # type: (int) -> Tuple[bytes, List[Any]]
1872 return b"", []
1874 def getfieldval(self, attr):
1875 # type: (str) -> NoReturn
1876 raise AttributeError(attr)
1878 def getfield_and_val(self, attr):
1879 # type: (str) -> NoReturn
1880 raise AttributeError(attr)
1882 def setfieldval(self, attr, val):
1883 # type: (str, Any) -> NoReturn
1884 raise AttributeError(attr)
1886 def delfieldval(self, attr):
1887 # type: (str) -> NoReturn
1888 raise AttributeError(attr)
1890 def hide_defaults(self):
1891 # type: () -> None
1892 pass
1894 def __iter__(self):
1895 # type: () -> Iterator[Packet]
1896 return iter([])
1898 def __eq__(self, other):
1899 # type: (Any) -> bool
1900 if isinstance(other, NoPayload):
1901 return True
1902 return False
1904 def hashret(self):
1905 # type: () -> bytes
1906 return b""
1908 def answers(self, other):
1909 # type: (Packet) -> bool
1910 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501
1912 def haslayer(self, cls, _subclass=None):
1913 # type: (Union[Type[Packet], str], Optional[bool]) -> int
1914 return 0
1916 def getlayer(self,
1917 cls, # type: Union[int, Type[Packet], str]
1918 nb=1, # type: int
1919 _track=None, # type: Optional[List[int]]
1920 _subclass=None, # type: Optional[bool]
1921 **flt # type: Any
1922 ):
1923 # type: (...) -> Optional[Packet]
1924 if _track is not None:
1925 _track.append(nb)
1926 return None
1928 def fragment(self, *args, **kargs):
1929 # type: (*Any, **Any) -> List[Packet]
1930 raise Scapy_Exception("cannot fragment this packet")
1932 def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1933 # type: (bool, int, str, str) -> None
1934 pass
1936 def sprintf(self, fmt, relax=1):
1937 # type: (str, int) -> str
1938 if relax:
1939 return "??"
1940 else:
1941 raise Scapy_Exception("Format not found [%s]" % fmt)
1943 def _do_summary(self):
1944 # type: () -> Tuple[int, str, List[Any]]
1945 return 0, "", []
1947 def layers(self):
1948 # type: () -> List[Type[Packet]]
1949 return []
1951 def lastlayer(self, layer=None):
1952 # type: (Optional[Packet]) -> Packet
1953 return layer or self
1955 def command(self):
1956 # type: () -> str
1957 return ""
1959 def json(self):
1960 # type: () -> str
1961 return ""
1963 def route(self):
1964 # type: () -> Tuple[None, None, None]
1965 return (None, None, None)
1968####################
1969# packet classes #
1970####################
1973class Raw(Packet):
1974 name = "Raw"
1975 fields_desc = [StrField("load", b"")]
1977 def __init__(self, _pkt=b"", *args, **kwargs):
1978 # type: (bytes, *Any, **Any) -> None
1979 if _pkt and not isinstance(_pkt, bytes):
1980 if isinstance(_pkt, tuple):
1981 _pkt, bn = _pkt
1982 _pkt = bytes_encode(_pkt), bn
1983 else:
1984 _pkt = bytes_encode(_pkt)
1985 super(Raw, self).__init__(_pkt, *args, **kwargs)
1987 def answers(self, other):
1988 # type: (Packet) -> int
1989 return 1
1991 def mysummary(self):
1992 # type: () -> str
1993 cs = conf.raw_summary
1994 if cs:
1995 if callable(cs):
1996 return "Raw %s" % cs(self.load)
1997 else:
1998 return "Raw %r" % self.load
1999 return Packet.mysummary(self)
2002class Padding(Raw):
2003 name = "Padding"
2005 def self_build(self, field_pos_list=None):
2006 # type: (Optional[Any]) -> bytes
2007 return b""
2009 def build_padding(self):
2010 # type: () -> bytes
2011 return (
2012 bytes_encode(self.load) if self.raw_packet_cache is None
2013 else self.raw_packet_cache
2014 ) + self.payload.build_padding()
2017conf.raw_layer = Raw
2018conf.padding_layer = Padding
2019if conf.default_l2 is None:
2020 conf.default_l2 = Raw
2022#################
2023# Bind layers #
2024#################
2027def bind_bottom_up(lower, # type: Type[Packet]
2028 upper, # type: Type[Packet]
2029 __fval=None, # type: Optional[Any]
2030 **fval # type: Any
2031 ):
2032 # type: (...) -> None
2033 r"""Bind 2 layers for dissection.
2034 The upper layer will be chosen for dissection on top of the lower layer, if
2035 ALL the passed arguments are validated. If multiple calls are made with
2036 the same layers, the last one will be used as default.
2038 ex:
2039 >>> bind_bottom_up(Ether, SNAP, type=0x1234)
2040 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501
2041 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501
2042 """
2043 if __fval is not None:
2044 fval.update(__fval)
2045 lower.payload_guess = lower.payload_guess[:]
2046 lower.payload_guess.append((fval, upper))
2049def bind_top_down(lower, # type: Type[Packet]
2050 upper, # type: Type[Packet]
2051 __fval=None, # type: Optional[Any]
2052 **fval # type: Any
2053 ):
2054 # type: (...) -> None
2055 """Bind 2 layers for building.
2056 When the upper layer is added as a payload of the lower layer, all the
2057 arguments will be applied to them.
2059 ex:
2060 >>> bind_top_down(Ether, SNAP, type=0x1234)
2061 >>> Ether()/SNAP()
2062 <Ether type=0x1234 |<SNAP |>>
2063 """
2064 if __fval is not None:
2065 fval.update(__fval)
2066 upper._overload_fields = upper._overload_fields.copy() # type: ignore
2067 upper._overload_fields[lower] = fval
2070@conf.commands.register
2071def bind_layers(lower, # type: Type[Packet]
2072 upper, # type: Type[Packet]
2073 __fval=None, # type: Optional[Dict[str, int]]
2074 **fval # type: Any
2075 ):
2076 # type: (...) -> None
2077 """Bind 2 layers on some specific fields' values.
2079 It makes the packet being built and dissected when the arguments
2080 are present.
2082 This function calls both bind_bottom_up and bind_top_down, with
2083 all passed arguments.
2085 Please have a look at their docs:
2086 - help(bind_bottom_up)
2087 - help(bind_top_down)
2088 """
2089 if __fval is not None:
2090 fval.update(__fval)
2091 bind_top_down(lower, upper, **fval)
2092 bind_bottom_up(lower, upper, **fval)
2095def split_bottom_up(lower, # type: Type[Packet]
2096 upper, # type: Type[Packet]
2097 __fval=None, # type: Optional[Any]
2098 **fval # type: Any
2099 ):
2100 # type: (...) -> None
2101 """This call un-links an association that was made using bind_bottom_up.
2102 Have a look at help(bind_bottom_up)
2103 """
2104 if __fval is not None:
2105 fval.update(__fval)
2107 def do_filter(params, cls):
2108 # type: (Dict[str, int], Type[Packet]) -> bool
2109 params_is_invalid = any(
2110 k not in params or params[k] != v for k, v in fval.items()
2111 )
2112 return cls != upper or params_is_invalid
2113 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)]
2116def split_top_down(lower, # type: Type[Packet]
2117 upper, # type: Type[Packet]
2118 __fval=None, # type: Optional[Any]
2119 **fval # type: Any
2120 ):
2121 # type: (...) -> None
2122 """This call un-links an association that was made using bind_top_down.
2123 Have a look at help(bind_top_down)
2124 """
2125 if __fval is not None:
2126 fval.update(__fval)
2127 if lower in upper._overload_fields:
2128 ofval = upper._overload_fields[lower]
2129 if any(k not in ofval or ofval[k] != v for k, v in fval.items()):
2130 return
2131 upper._overload_fields = upper._overload_fields.copy() # type: ignore
2132 del upper._overload_fields[lower]
2135@conf.commands.register
2136def split_layers(lower, # type: Type[Packet]
2137 upper, # type: Type[Packet]
2138 __fval=None, # type: Optional[Any]
2139 **fval # type: Any
2140 ):
2141 # type: (...) -> None
2142 """Split 2 layers previously bound.
2143 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501
2144 bind_layers.
2146 Please have a look at their docs:
2147 - help(split_bottom_up)
2148 - help(split_top_down)
2149 """
2150 if __fval is not None:
2151 fval.update(__fval)
2152 split_bottom_up(lower, upper, **fval)
2153 split_top_down(lower, upper, **fval)
2156@conf.commands.register
2157def explore(layer=None):
2158 # type: (Optional[str]) -> None
2159 """Function used to discover the Scapy layers and protocols.
2160 It helps to see which packets exists in contrib or layer files.
2162 params:
2163 - layer: If specified, the function will explore the layer. If not,
2164 the GUI mode will be activated, to browse the available layers
2166 examples:
2167 >>> explore() # Launches the GUI
2168 >>> explore("dns") # Explore scapy.layers.dns
2169 >>> explore("http2") # Explore scapy.contrib.http2
2170 >>> explore(scapy.layers.bluetooth4LE)
2172 Note: to search a packet by name, use ls("name") rather than explore.
2173 """
2174 if layer is None: # GUI MODE
2175 if not conf.interactive:
2176 raise Scapy_Exception("explore() GUI-mode cannot be run in "
2177 "interactive mode. Please provide a "
2178 "'layer' parameter !")
2179 # 0 - Imports
2180 try:
2181 import prompt_toolkit
2182 except ImportError:
2183 raise ImportError("prompt_toolkit is not installed ! "
2184 "You may install IPython, which contains it, via"
2185 " `pip install ipython`")
2186 if not _version_checker(prompt_toolkit, (2, 0)):
2187 raise ImportError("prompt_toolkit >= 2.0.0 is required !")
2188 # Only available with prompt_toolkit > 2.0, not released on PyPi yet
2189 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \
2190 button_dialog
2191 from prompt_toolkit.formatted_text import HTML
2192 # Check for prompt_toolkit >= 3.0.0
2193 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str]
2194 if _version_checker(prompt_toolkit, (3, 0)):
2195 call_ptk = lambda x: x.run()
2196 # 1 - Ask for layer or contrib
2197 btn_diag = button_dialog(
2198 title="Scapy v%s" % conf.version,
2199 text=HTML(
2200 '<style bg="white" fg="red">Chose the type of packets'
2201 ' you want to explore:</style>'
2202 ),
2203 buttons=[
2204 ("Layers", "layers"),
2205 ("Contribs", "contribs"),
2206 ("Cancel", "cancel")
2207 ])
2208 action = call_ptk(btn_diag)
2209 # 2 - Retrieve list of Packets
2210 if action == "layers":
2211 # Get all loaded layers
2212 lvalues = conf.layers.layers()
2213 # Restrict to layers-only (not contribs) + packet.py and asn1*.py
2214 values = [x for x in lvalues if ("layers" in x[0] or
2215 "packet" in x[0] or
2216 "asn1" in x[0])]
2217 elif action == "contribs":
2218 # Get all existing contribs
2219 from scapy.main import list_contrib
2220 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True))
2221 values = [(x['name'], x['description'])
2222 for x in cvalues]
2223 # Remove very specific modules
2224 values = [x for x in values if "can" not in x[0]]
2225 else:
2226 # Escape/Cancel was pressed
2227 return
2228 # Build tree
2229 if action == "contribs":
2230 # A tree is a dictionary. Each layer contains a keyword
2231 # _l which contains the files in the layer, and a _name
2232 # argument which is its name. The other keys are the subfolders,
2233 # which are similar dictionaries
2234 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501
2235 for name, desc in values:
2236 if "." in name: # Folder detected
2237 parts = name.split(".")
2238 subtree = tree
2239 for pa in parts[:-1]:
2240 if pa not in subtree:
2241 subtree[pa] = {}
2242 # one layer deeper
2243 subtree = subtree[pa] # type: ignore
2244 subtree["_name"] = pa # type: ignore
2245 if "_l" not in subtree:
2246 subtree["_l"] = []
2247 subtree["_l"].append((parts[-1], desc)) # type: ignore
2248 else:
2249 tree["_l"].append((name, desc)) # type: ignore
2250 elif action == "layers":
2251 tree = {"_l": values}
2252 # 3 - Ask for the layer/contrib module to explore
2253 current = tree # type: Any
2254 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501
2255 while True:
2256 # Generate tests & form
2257 folders = list(current.keys())
2258 _radio_values = [
2259 ("$" + name, str('[+] ' + name.capitalize()))
2260 for name in folders if not name.startswith("_")
2261 ] + current.get("_l", []) # type: List[str]
2262 cur_path = ""
2263 if previous:
2264 cur_path = ".".join(
2265 itertools.chain(
2266 (x["_name"] for x in previous[1:]), # type: ignore
2267 (current["_name"],)
2268 )
2269 )
2270 extra_text = (
2271 '\n<style bg="white" fg="green">> scapy.%s</style>'
2272 ) % (action + ("." + cur_path if cur_path else ""))
2273 # Show popup
2274 rd_diag = radiolist_dialog(
2275 values=_radio_values,
2276 title="Scapy v%s" % conf.version,
2277 text=HTML(
2278 (
2279 '<style bg="white" fg="red">Please select a file'
2280 'among the following, to see all layers contained in'
2281 ' it:</style>'
2282 ) + extra_text
2283 ),
2284 cancel_text="Back" if previous else "Cancel"
2285 )
2286 result = call_ptk(rd_diag)
2287 if result is None:
2288 # User pressed "Cancel/Back"
2289 if previous: # Back
2290 current = previous.pop()
2291 continue
2292 else: # Cancel
2293 return
2294 if result.startswith("$"):
2295 previous.append(current)
2296 current = current[result[1:]]
2297 else:
2298 # Enter on layer
2299 if previous: # In subfolder
2300 result = cur_path + "." + result
2301 break
2302 # 4 - (Contrib only): load contrib
2303 if action == "contribs":
2304 from scapy.main import load_contrib
2305 load_contrib(result)
2306 result = "scapy.contrib." + result
2307 else: # NON-GUI MODE
2308 # We handle layer as a short layer name, full layer name
2309 # or the module itself
2310 if isinstance(layer, types.ModuleType):
2311 layer = layer.__name__
2312 if isinstance(layer, str):
2313 if layer.startswith("scapy.layers."):
2314 result = layer
2315 else:
2316 if layer.startswith("scapy.contrib."):
2317 layer = layer.replace("scapy.contrib.", "")
2318 from scapy.main import load_contrib
2319 load_contrib(layer)
2320 result_layer, result_contrib = (("scapy.layers.%s" % layer),
2321 ("scapy.contrib.%s" % layer))
2322 if result_layer in conf.layers.ldict:
2323 result = result_layer
2324 elif result_contrib in conf.layers.ldict:
2325 result = result_contrib
2326 else:
2327 raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2328 else:
2329 warning("Wrong usage ! Check out help(explore)")
2330 return
2332 # COMMON PART
2333 # Get the list of all Packets contained in that module
2334 try:
2335 all_layers = conf.layers.ldict[result]
2336 except KeyError:
2337 raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2338 # Print
2339 print(conf.color_theme.layer_name("Packets contained in %s:" % result))
2340 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]]
2341 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers]
2342 print(pretty_list(rtlst, [("Class", "Name")], borders=True))
2345def _pkt_ls(obj, # type: Union[Packet, Type[Packet]]
2346 verbose=False, # type: bool
2347 ):
2348 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501
2349 """Internal function used to resolve `fields_desc` to display it.
2351 :param obj: a packet object or class
2352 :returns: a list containing tuples [(name, clsname, clsname_extras,
2353 default, long_attrs)]
2354 """
2355 is_pkt = isinstance(obj, Packet)
2356 if not issubtype(obj, Packet) and not is_pkt:
2357 raise ValueError
2358 fields = []
2359 for f in obj.fields_desc:
2360 cur_fld = f
2361 attrs = [] # type: List[str]
2362 long_attrs = [] # type: List[str]
2363 while isinstance(cur_fld, (Emph, ConditionalField)):
2364 if isinstance(cur_fld, ConditionalField):
2365 attrs.append(cur_fld.__class__.__name__[:4])
2366 cur_fld = cur_fld.fld
2367 name = cur_fld.name
2368 default = cur_fld.default
2369 if verbose and isinstance(cur_fld, EnumField) \
2370 and hasattr(cur_fld, "i2s") and cur_fld.i2s:
2371 if len(cur_fld.i2s or []) < 50:
2372 long_attrs.extend(
2373 "%s: %d" % (strval, numval)
2374 for numval, strval in
2375 sorted(cur_fld.i2s.items())
2376 )
2377 elif isinstance(cur_fld, MultiEnumField):
2378 if isinstance(obj, Packet):
2379 obj_pkt = obj
2380 else:
2381 obj_pkt = obj()
2382 fld_depend = cur_fld.depends_on(obj_pkt)
2383 attrs.append("Depends on %s" % fld_depend)
2384 if verbose:
2385 cur_i2s = cur_fld.i2s_multi.get(
2386 cur_fld.depends_on(obj_pkt), {}
2387 )
2388 if len(cur_i2s) < 50:
2389 long_attrs.extend(
2390 "%s: %d" % (strval, numval)
2391 for numval, strval in
2392 sorted(cur_i2s.items())
2393 )
2394 elif verbose and isinstance(cur_fld, FlagsField):
2395 names = cur_fld.names
2396 long_attrs.append(", ".join(names))
2397 elif isinstance(cur_fld, MultipleTypeField):
2398 default = cur_fld.dflt.default
2399 attrs.append(", ".join(
2400 x[0].__class__.__name__ for x in
2401 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)])
2402 ))
2404 cls = cur_fld.__class__
2405 class_name_extras = "(%s)" % (
2406 ", ".join(attrs)
2407 ) if attrs else ""
2408 if isinstance(cur_fld, BitField):
2409 class_name_extras += " (%d bit%s)" % (
2410 cur_fld.size,
2411 "s" if cur_fld.size > 1 else ""
2412 )
2413 fields.append(
2414 (name,
2415 cls,
2416 class_name_extras,
2417 repr(default),
2418 long_attrs)
2419 )
2420 return fields
2423@conf.commands.register
2424def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]]
2425 case_sensitive=False, # type: bool
2426 verbose=False # type: bool
2427 ):
2428 # type: (...) -> None
2429 """List available layers, or infos on a given layer class or name.
2431 :param obj: Packet / packet name to use
2432 :param case_sensitive: if obj is a string, is it case sensitive?
2433 :param verbose:
2434 """
2435 if obj is None or isinstance(obj, str):
2436 tip = False
2437 if obj is None:
2438 tip = True
2439 all_layers = sorted(conf.layers, key=lambda x: x.__name__)
2440 else:
2441 pattern = re.compile(
2442 obj,
2443 0 if case_sensitive else re.I
2444 )
2445 # We first order by accuracy, then length
2446 if case_sensitive:
2447 sorter = lambda x: (x.__name__.index(obj), len(x.__name__))
2448 else:
2449 obj = obj.lower()
2450 sorter = lambda x: (x.__name__.lower().index(obj),
2451 len(x.__name__))
2452 all_layers = sorted((layer for layer in conf.layers
2453 if (isinstance(layer.__name__, str) and
2454 pattern.search(layer.__name__)) or
2455 (isinstance(layer.name, str) and
2456 pattern.search(layer.name))),
2457 key=sorter)
2458 for layer in all_layers:
2459 print("%-10s : %s" % (layer.__name__, layer._name))
2460 if tip and conf.interactive:
2461 print("\nTIP: You may use explore() to navigate through all "
2462 "layers using a clear GUI")
2463 else:
2464 try:
2465 fields = _pkt_ls(
2466 obj,
2467 verbose=verbose
2468 )
2469 is_pkt = isinstance(obj, Packet)
2470 # Print
2471 for fname, cls, clsne, dflt, long_attrs in fields:
2472 clsinfo = cls.__name__ + " " + clsne
2473 print("%-10s : %-35s =" % (fname, clsinfo), end=' ')
2474 if is_pkt:
2475 print("%-15r" % (getattr(obj, fname),), end=' ')
2476 print("(%r)" % (dflt,))
2477 for attr in long_attrs:
2478 print("%-15s%s" % ("", attr))
2479 # Restart for payload if any
2480 if is_pkt:
2481 obj = cast(Packet, obj)
2482 if isinstance(obj.payload, NoPayload):
2483 return
2484 print("--")
2485 ls(obj.payload)
2486 except ValueError:
2487 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501
2490@conf.commands.register
2491def rfc(cls, ret=False, legend=True):
2492 # type: (Type[Packet], bool, bool) -> Optional[str]
2493 """
2494 Generate an RFC-like representation of a packet def.
2496 :param cls: the Packet class
2497 :param ret: return the result instead of printing (def. False)
2498 :param legend: show text under the diagram (default True)
2500 Ex::
2502 >>> rfc(Ether)
2504 """
2505 if not issubclass(cls, Packet):
2506 raise TypeError("Packet class expected")
2507 cur_len = 0
2508 cur_line = []
2509 lines = []
2510 # Get the size (width) that a field will take
2511 # when formatted, from its length in bits
2512 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int]
2513 ident = 0 # Fields UUID
2514 # Generate packet groups
2515 for f in cls.fields_desc:
2516 flen = int(f.sz * 8)
2517 cur_len += flen
2518 ident += 1
2519 # Fancy field name
2520 fname = f.name.upper().replace("_", " ")
2521 # The field might exceed the current line or
2522 # take more than one line. Copy it as required
2523 while True:
2524 over = max(0, cur_len - 32) # Exceed
2525 len1 = clsize(flen - over) # What fits
2526 cur_line.append((fname[:len1], len1, ident))
2527 if cur_len >= 32:
2528 # Current line is full. start a new line
2529 lines.append(cur_line)
2530 cur_len = flen = over
2531 fname = "" # do not repeat the field
2532 cur_line = []
2533 if not over:
2534 # there is no data left
2535 break
2536 else:
2537 # End of the field
2538 break
2539 # Add the last line if un-finished
2540 if cur_line:
2541 lines.append(cur_line)
2542 # Calculate separations between lines
2543 seps = []
2544 seps.append("+-" * 32 + "+\n")
2545 for i in range(len(lines) - 1):
2546 # Start with a full line
2547 sep = "+-" * 32 + "+\n"
2548 # Get the line above and below the current
2549 # separation
2550 above, below = lines[i], lines[i + 1]
2551 # The last field of above is shared with below
2552 if above[-1][2] == below[0][2]:
2553 # where the field in "above" starts
2554 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1
2555 # where the field in "below" ends
2556 pos_below = below[0][1]
2557 if pos_above < pos_below:
2558 # they are overlapping.
2559 # Now crop the space between those pos
2560 # and fill it with " "
2561 pos_above = pos_above + pos_above % 2
2562 sep = (
2563 sep[:1 + pos_above] +
2564 " " * (pos_below - pos_above) +
2565 sep[1 + pos_below:]
2566 )
2567 # line is complete
2568 seps.append(sep)
2569 # Graph
2570 result = ""
2571 # Bytes markers
2572 result += " " + (" " * 19).join(
2573 str(x) for x in range(4)
2574 ) + "\n"
2575 # Bits markers
2576 result += " " + " ".join(
2577 str(x % 10) for x in range(32)
2578 ) + "\n"
2579 # Add fields and their separations
2580 for line, sep in zip(lines, seps):
2581 result += sep
2582 for elt, flen, _ in line:
2583 result += "|" + elt.center(flen, " ")
2584 result += "|\n"
2585 result += "+-" * (cur_len or 32) + "+\n"
2586 # Annotate with the figure name
2587 if legend:
2588 result += "\n" + ("Fig. " + cls.__name__).center(66, " ")
2589 # return if asked for, else print
2590 if ret:
2591 return result
2592 print(result)
2593 return None
2596#############
2597# Fuzzing #
2598#############
2600_P = TypeVar('_P', bound=Packet)
2603@conf.commands.register
2604def fuzz(p, # type: _P
2605 _inplace=0, # type: int
2606 ):
2607 # type: (...) -> _P
2608 """
2609 Transform a layer into a fuzzy layer by replacing some default values
2610 by random objects.
2612 :param p: the Packet instance to fuzz
2613 :return: the fuzzed packet.
2614 """
2615 if not _inplace:
2616 p = p.copy()
2617 q = cast(Packet, p)
2618 while not isinstance(q, NoPayload):
2619 new_default_fields = {}
2620 multiple_type_fields = [] # type: List[str]
2621 for f in q.fields_desc:
2622 if isinstance(f, PacketListField):
2623 for r in getattr(q, f.name):
2624 fuzz(r, _inplace=1)
2625 elif isinstance(f, MultipleTypeField):
2626 # the type of the field will depend on others
2627 multiple_type_fields.append(f.name)
2628 elif f.default is not None:
2629 if not isinstance(f, ConditionalField) or f._evalcond(q):
2630 rnd = f.randval()
2631 if rnd is not None:
2632 new_default_fields[f.name] = rnd
2633 # Process packets with MultipleTypeFields
2634 if multiple_type_fields:
2635 # freeze the other random values
2636 new_default_fields = {
2637 key: (val._fix() if isinstance(val, VolatileValue) else val)
2638 for key, val in new_default_fields.items()
2639 }
2640 q.default_fields.update(new_default_fields)
2641 new_default_fields.clear()
2642 # add the random values of the MultipleTypeFields
2643 for name in multiple_type_fields:
2644 fld = cast(MultipleTypeField, q.get_field(name))
2645 rnd = fld._find_fld_pkt(q).randval()
2646 if rnd is not None:
2647 new_default_fields[name] = rnd
2648 q.default_fields.update(new_default_fields)
2649 q = q.payload
2650 return p