Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/packet.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Packet class
9Provides:
10 - the default Packet classes
11 - binding mechanisms
12 - fuzz() method
13 - exploration methods: explore() / ls()
14"""
16from collections import defaultdict
18import json
19import re
20import time
21import itertools
22import copy
23import types
24import warnings
26from scapy.fields import (
27 AnyField,
28 BitField,
29 ConditionalField,
30 Emph,
31 EnumField,
32 Field,
33 FlagsField,
34 FlagValue,
35 MayEnd,
36 MultiEnumField,
37 MultipleTypeField,
38 PadField,
39 PacketListField,
40 RawVal,
41 StrField,
42)
43from scapy.config import conf, _version_checker
44from scapy.compat import raw, orb, bytes_encode
45from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \
46 _CanvasDumpExtended
47from scapy.interfaces import _GlobInterfaceType
48from scapy.volatile import RandField, VolatileValue
49from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \
50 pretty_list, EDecimal
51from scapy.error import Scapy_Exception, log_runtime, warning
52from scapy.libs.test_pyx import PYX
54# Typing imports
55from typing import (
56 Any,
57 Callable,
58 Dict,
59 Iterator,
60 List,
61 NoReturn,
62 Optional,
63 Set,
64 Tuple,
65 Type,
66 TypeVar,
67 Union,
68 Sequence,
69 cast,
70)
71from scapy.compat import Self
73try:
74 import pyx
75except ImportError:
76 pass
79_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]])
82class Packet(
83 BasePacket,
84 _CanvasDumpExtended,
85 metaclass=Packet_metaclass
86):
87 __slots__ = [
88 "time", "sent_time", "name",
89 "default_fields", "fields", "fieldtype",
90 "overload_fields", "overloaded_fields",
91 "packetfields",
92 "original", "explicit", "raw_packet_cache",
93 "raw_packet_cache_fields", "_pkt", "post_transforms",
94 "stop_dissection_after",
95 # then payload, underlayer and parent
96 "payload", "underlayer", "parent",
97 "name",
98 # used for sr()
99 "_answered",
100 # used when sniffing
101 "direction", "sniffed_on",
102 # handle snaplen Vs real length
103 "wirelen",
104 "comments",
105 "process_information"
106 ]
107 name = None
108 fields_desc = [] # type: List[AnyField]
109 deprecated_fields = {} # type: Dict[str, Tuple[str, str]]
110 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]]
111 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]]
112 show_indent = 1
113 show_summary = True
114 match_subclass = False
115 class_dont_cache = {} # type: Dict[Type[Packet], bool]
116 class_packetfields = {} # type: Dict[Type[Packet], Any]
117 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]]
118 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]]
119 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501
121 @classmethod
122 def from_hexcap(cls):
123 # type: (Type[Packet]) -> Packet
124 return cls(import_hexcap())
126 @classmethod
127 def upper_bonds(self):
128 # type: () -> None
129 for fval, upper in self.payload_guess:
130 print(
131 "%-20s %s" % (
132 upper.__name__,
133 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
134 )
135 )
137 @classmethod
138 def lower_bonds(self):
139 # type: () -> None
140 for lower, fval in self._overload_fields.items():
141 print(
142 "%-20s %s" % (
143 lower.__name__,
144 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
145 )
146 )
148 def __init__(self,
149 _pkt=b"", # type: Union[bytes, bytearray]
150 post_transform=None, # type: Any
151 _internal=0, # type: int
152 _underlayer=None, # type: Optional[Packet]
153 _parent=None, # type: Optional[Packet]
154 stop_dissection_after=None, # type: Optional[Type[Packet]]
155 **fields # type: Any
156 ):
157 # type: (...) -> None
158 self.time = time.time() # type: Union[EDecimal, float]
159 self.sent_time = None # type: Union[EDecimal, float, None]
160 self.name = (self.__class__.__name__
161 if self._name is None else
162 self._name)
163 self.default_fields = {} # type: Dict[str, Any]
164 self.overload_fields = self._overload_fields
165 self.overloaded_fields = {} # type: Dict[str, Any]
166 self.fields = {} # type: Dict[str, Any]
167 self.fieldtype = {} # type: Dict[str, AnyField]
168 self.packetfields = [] # type: List[AnyField]
169 self.payload = NoPayload() # type: Packet
170 self.init_fields(bool(_pkt))
171 self.underlayer = _underlayer
172 self.parent = _parent
173 if isinstance(_pkt, bytearray):
174 _pkt = bytes(_pkt)
175 self.original = _pkt
176 self.explicit = 0
177 self.raw_packet_cache = None # type: Optional[bytes]
178 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501
179 self.wirelen = None # type: Optional[int]
180 self.direction = None # type: Optional[int]
181 self.sniffed_on = None # type: Optional[_GlobInterfaceType]
182 self.comments = None # type: Optional[List[bytes]]
183 self.process_information = None # type: Optional[Dict[str, Any]]
184 self.stop_dissection_after = stop_dissection_after
185 if _pkt:
186 self.dissect(_pkt)
187 if not _internal:
188 self.dissection_done(self)
189 # We use this strange initialization so that the fields
190 # are initialized in their declaration order.
191 # It is required to always support MultipleTypeField
192 for field in self.fields_desc:
193 fname = field.name
194 try:
195 value = fields.pop(fname)
196 except KeyError:
197 continue
198 self.fields[fname] = value if isinstance(value, RawVal) else \
199 self.get_field(fname).any2i(self, value)
200 # The remaining fields are unknown
201 for fname in fields:
202 if fname in self.deprecated_fields:
203 # Resolve deprecated fields
204 value = fields[fname]
205 fname = self._resolve_alias(fname)
206 self.fields[fname] = value if isinstance(value, RawVal) else \
207 self.get_field(fname).any2i(self, value)
208 continue
209 raise AttributeError(fname)
210 if isinstance(post_transform, list):
211 self.post_transforms = post_transform
212 elif post_transform is None:
213 self.post_transforms = []
214 else:
215 self.post_transforms = [post_transform]
217 _PickleType = Tuple[
218 Union[EDecimal, float],
219 Optional[Union[EDecimal, float, None]],
220 Optional[int],
221 Optional[_GlobInterfaceType],
222 Optional[int],
223 Optional[bytes],
224 ]
225 _PickleStateType = Union[_PickleType, Dict[str, Any]]
227 @property
228 def comment(self):
229 # type: () -> Optional[bytes]
230 """Get the comment of the packet"""
231 if self.comments and len(self.comments):
232 return self.comments[0]
233 return None
235 @comment.setter
236 def comment(self, value):
237 # type: (Optional[bytes]) -> None
238 """
239 Set the comment of the packet.
240 If value is None, it will clear the comments.
241 """
242 if value is not None:
243 self.comments = [value]
244 else:
245 self.comments = None
247 @classmethod
248 def _rebuild_pkt(cls, raw_packet):
249 # type: (Type[Packet], bytes) -> Packet
250 """Helper used by pickle to reconstruct Packet from raw bytes."""
251 return cls(raw_packet)
253 def __reduce__(self):
254 # type: () -> Tuple[Any, ...]
255 """Used by pickling methods"""
256 state = {
257 "pickle_state_version": 2,
258 "time": self.time,
259 "sent_time": self.sent_time,
260 "direction": self.direction,
261 "sniffed_on": self.sniffed_on,
262 "wirelen": self.wirelen,
263 # Keep both keys for compatibility with historical/transition code.
264 "comment": self.comment,
265 "comments": self.comments,
266 }
267 extra_slots = {}
268 for attr in type(self).__all_slots__ - set(Packet.__slots__):
269 if hasattr(self, attr):
270 extra_slots[attr] = getattr(self, attr)
271 if extra_slots:
272 state["extra_slots"] = extra_slots # type: ignore
273 return (type(self)._rebuild_pkt, (self.build(),), state)
275 def __setstate__(self, state):
276 # type: (Packet._PickleStateType) -> Packet
277 """Rebuild state using pickable methods"""
278 # Legacy format: tuple produced by older Packet.__reduce__.
279 if isinstance(state, tuple):
280 self.time = state[0]
281 self.sent_time = state[1]
282 self.direction = state[2]
283 self.sniffed_on = state[3]
284 self.wirelen = state[4]
285 self.comment = state[5]
286 return self
288 # New format: versioned dict metadata.
289 self.time = state.get("time", self.time)
290 self.sent_time = state.get("sent_time", self.sent_time)
291 self.direction = state.get("direction", self.direction)
292 self.sniffed_on = state.get("sniffed_on", self.sniffed_on)
293 self.wirelen = state.get("wirelen", self.wirelen)
295 if "comments" in state:
296 self.comments = state["comments"]
297 elif "comment" in state:
298 self.comment = state["comment"]
300 extra_slots = state.get("extra_slots", {})
301 if isinstance(extra_slots, dict):
302 for attr, value in extra_slots.items():
303 # Only restore known subclass slots; ignore stale/unknown entries.
304 if attr in type(self).__all_slots__ and attr not in Packet.__slots__:
305 try:
306 setattr(self, attr, value)
307 except AttributeError:
308 pass
309 return self
311 def __deepcopy__(self,
312 memo, # type: Any
313 ):
314 # type: (...) -> Packet
315 """Used by copy.deepcopy"""
316 return self.copy()
318 def init_fields(self, for_dissect_only=False):
319 # type: (bool) -> None
320 """
321 Initialize each fields of the fields_desc dict
322 """
324 if self.class_dont_cache.get(self.__class__, False):
325 self.do_init_fields(self.fields_desc)
326 else:
327 self.do_init_cached_fields(for_dissect_only=for_dissect_only)
329 def do_init_fields(self,
330 flist, # type: Sequence[AnyField]
331 ):
332 # type: (...) -> None
333 """
334 Initialize each fields of the fields_desc dict
335 """
336 default_fields = {}
337 for f in flist:
338 default_fields[f.name] = copy.deepcopy(f.default)
339 self.fieldtype[f.name] = f
340 if f.holds_packets:
341 self.packetfields.append(f)
342 # We set default_fields last to avoid race issues
343 self.default_fields = default_fields
345 def do_init_cached_fields(self, for_dissect_only=False):
346 # type: (bool) -> None
347 """
348 Initialize each fields of the fields_desc dict, or use the cached
349 fields information
350 """
352 cls_name = self.__class__
354 # Build the fields information
355 if Packet.class_default_fields.get(cls_name, None) is None:
356 self.prepare_cached_fields(self.fields_desc)
358 # Use fields information from cache
359 default_fields = Packet.class_default_fields.get(cls_name, None)
360 if default_fields:
361 self.default_fields = default_fields
362 self.fieldtype = Packet.class_fieldtype[cls_name]
363 self.packetfields = Packet.class_packetfields[cls_name]
365 # Optimization: no need for references when only dissecting.
366 if for_dissect_only:
367 return
369 # Deepcopy default references
370 for fname in Packet.class_default_fields_ref[cls_name]:
371 value = self.default_fields[fname]
372 try:
373 self.fields[fname] = value.copy()
374 except AttributeError:
375 # Python 2.7 - list only
376 self.fields[fname] = value[:]
378 def prepare_cached_fields(self, flist):
379 # type: (Sequence[AnyField]) -> None
380 """
381 Prepare the cached fields of the fields_desc dict
382 """
384 cls_name = self.__class__
386 # Fields cache initialization
387 if not flist:
388 return
390 class_default_fields = dict()
391 class_default_fields_ref = list()
392 class_fieldtype = dict()
393 class_packetfields = list()
395 # Fields initialization
396 for f in flist:
397 if isinstance(f, MultipleTypeField):
398 # Abort
399 self.class_dont_cache[cls_name] = True
400 self.do_init_fields(self.fields_desc)
401 return
403 class_default_fields[f.name] = copy.deepcopy(f.default)
404 class_fieldtype[f.name] = f
405 if f.holds_packets:
406 class_packetfields.append(f)
408 # Remember references
409 if isinstance(f.default, (list, dict, set, RandField, Packet)):
410 class_default_fields_ref.append(f.name)
412 # Apply
413 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref
414 Packet.class_fieldtype[cls_name] = class_fieldtype
415 Packet.class_packetfields[cls_name] = class_packetfields
416 # Last to avoid racing issues
417 Packet.class_default_fields[cls_name] = class_default_fields
419 def dissection_done(self, pkt):
420 # type: (Packet) -> None
421 """DEV: will be called after a dissection is completed"""
422 self.post_dissection(pkt)
423 self.payload.dissection_done(pkt)
425 def post_dissection(self, pkt):
426 # type: (Packet) -> None
427 """DEV: is called after the dissection of the whole packet"""
428 pass
430 def get_field(self, fld):
431 # type: (str) -> AnyField
432 """DEV: returns the field instance from the name of the field"""
433 return self.fieldtype[fld]
435 def add_payload(self, payload):
436 # type: (Union[Packet, bytes]) -> None
437 if payload is None:
438 return
439 elif not isinstance(self.payload, NoPayload):
440 self.payload.add_payload(payload)
441 else:
442 if isinstance(payload, Packet):
443 self.payload = payload
444 payload.add_underlayer(self)
445 for t in self.aliastypes:
446 if t in payload.overload_fields:
447 self.overloaded_fields = payload.overload_fields[t]
448 break
449 elif isinstance(payload, (bytes, str, bytearray, memoryview)):
450 self.payload = conf.raw_layer(load=bytes_encode(payload))
451 else:
452 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501
454 def remove_payload(self):
455 # type: () -> None
456 self.payload.remove_underlayer(self)
457 self.payload = NoPayload()
458 self.overloaded_fields = {}
460 def add_underlayer(self, underlayer):
461 # type: (Packet) -> None
462 self.underlayer = underlayer
464 def remove_underlayer(self, other):
465 # type: (Packet) -> None
466 self.underlayer = None
468 def add_parent(self, parent):
469 # type: (Packet) -> None
470 """Set packet parent.
471 When packet is an element in PacketListField, parent field would
472 point to the list owner packet."""
473 self.parent = parent
475 def remove_parent(self, other):
476 # type: (Packet) -> None
477 """Remove packet parent.
478 When packet is an element in PacketListField, parent field would
479 point to the list owner packet."""
480 self.parent = None
482 def copy(self) -> Self:
483 """Returns a deep copy of the instance."""
484 clone = self.__class__()
485 clone.fields = self.copy_fields_dict(self.fields)
486 clone.default_fields = self.copy_fields_dict(self.default_fields)
487 clone.overloaded_fields = self.overloaded_fields.copy()
488 clone.underlayer = self.underlayer
489 clone.parent = self.parent
490 clone.explicit = self.explicit
491 clone.raw_packet_cache = self.raw_packet_cache
492 clone.raw_packet_cache_fields = self.copy_fields_dict(
493 self.raw_packet_cache_fields
494 )
495 clone.wirelen = self.wirelen
496 clone.post_transforms = self.post_transforms[:]
497 clone.payload = self.payload.copy()
498 clone.payload.add_underlayer(clone)
499 clone.time = self.time
500 clone.comments = self.comments
501 clone.direction = self.direction
502 clone.sniffed_on = self.sniffed_on
503 return clone
505 def _resolve_alias(self, attr):
506 # type: (str) -> str
507 new_attr, version = self.deprecated_fields[attr]
508 warnings.warn(
509 "%s has been deprecated in favor of %s since %s !" % (
510 attr, new_attr, version
511 ), DeprecationWarning
512 )
513 return new_attr
515 def getfieldval(self, attr):
516 # type: (str) -> Any
517 if self.deprecated_fields and attr in self.deprecated_fields:
518 attr = self._resolve_alias(attr)
519 if attr in self.fields:
520 return self.fields[attr]
521 if attr in self.overloaded_fields:
522 return self.overloaded_fields[attr]
523 if attr in self.default_fields:
524 return self.default_fields[attr]
525 return self.payload.getfieldval(attr)
527 def getfield_and_val(self, attr):
528 # type: (str) -> Tuple[AnyField, Any]
529 if self.deprecated_fields and attr in self.deprecated_fields:
530 attr = self._resolve_alias(attr)
531 if attr in self.fields:
532 return self.get_field(attr), self.fields[attr]
533 if attr in self.overloaded_fields:
534 return self.get_field(attr), self.overloaded_fields[attr]
535 if attr in self.default_fields:
536 return self.get_field(attr), self.default_fields[attr]
537 raise ValueError
539 def __getattr__(self, attr):
540 # type: (str) -> Any
541 try:
542 fld, v = self.getfield_and_val(attr)
543 except ValueError:
544 return self.payload.__getattr__(attr)
545 if fld is not None:
546 return v if isinstance(v, RawVal) else fld.i2h(self, v)
547 return v
549 def setfieldval(self, attr, val):
550 # type: (str, Any) -> None
551 if self.deprecated_fields and attr in self.deprecated_fields:
552 attr = self._resolve_alias(attr)
553 if attr in self.default_fields:
554 fld = self.get_field(attr)
555 if fld is None:
556 any2i = lambda x, y: y # type: Callable[..., Any]
557 else:
558 any2i = fld.any2i
559 self.fields[attr] = val if isinstance(val, RawVal) else \
560 any2i(self, val)
561 self.explicit = 0
562 self.raw_packet_cache = None
563 self.raw_packet_cache_fields = None
564 self.wirelen = None
565 elif attr == "payload":
566 self.remove_payload()
567 self.add_payload(val)
568 else:
569 self.payload.setfieldval(attr, val)
571 def __setattr__(self, attr, val):
572 # type: (str, Any) -> None
573 if attr in self.__all_slots__:
574 return object.__setattr__(self, attr, val)
575 try:
576 return self.setfieldval(attr, val)
577 except AttributeError:
578 pass
579 return object.__setattr__(self, attr, val)
581 def delfieldval(self, attr):
582 # type: (str) -> None
583 if attr in self.fields:
584 del self.fields[attr]
585 self.explicit = 0 # in case a default value must be explicit
586 self.raw_packet_cache = None
587 self.raw_packet_cache_fields = None
588 self.wirelen = None
589 elif attr in self.default_fields:
590 pass
591 elif attr == "payload":
592 self.remove_payload()
593 else:
594 self.payload.delfieldval(attr)
596 def __delattr__(self, attr):
597 # type: (str) -> None
598 if attr == "payload":
599 return self.remove_payload()
600 if attr in self.__all_slots__:
601 return object.__delattr__(self, attr)
602 try:
603 return self.delfieldval(attr)
604 except AttributeError:
605 pass
606 return object.__delattr__(self, attr)
608 def _superdir(self):
609 # type: () -> Set[str]
610 """
611 Return a list of slots and methods, including those from subclasses.
612 """
613 attrs = set() # type: Set[str]
614 cls = self.__class__
615 if hasattr(cls, '__all_slots__'):
616 attrs.update(cls.__all_slots__)
617 for bcls in cls.__mro__:
618 if hasattr(bcls, '__dict__'):
619 attrs.update(bcls.__dict__)
620 return attrs
622 def __dir__(self):
623 # type: () -> List[str]
624 """
625 Add fields to tab completion list.
626 """
627 return sorted(itertools.chain(self._superdir(), self.default_fields))
629 def __repr__(self):
630 # type: () -> str
631 s = ""
632 ct = conf.color_theme
633 for f in self.fields_desc:
634 if isinstance(f, ConditionalField) and not f._evalcond(self):
635 continue
636 if f.name in self.fields:
637 fval = self.fields[f.name]
638 if isinstance(fval, (list, dict, set)) and len(fval) == 0:
639 continue
640 val = f.i2repr(self, fval)
641 elif f.name in self.overloaded_fields:
642 fover = self.overloaded_fields[f.name]
643 if isinstance(fover, (list, dict, set)) and len(fover) == 0:
644 continue
645 val = f.i2repr(self, fover)
646 else:
647 continue
648 if isinstance(f, Emph) or f in conf.emph:
649 ncol = ct.emph_field_name
650 vcol = ct.emph_field_value
651 else:
652 ncol = ct.field_name
653 vcol = ct.field_value
655 s += " %s%s%s" % (ncol(f.name),
656 ct.punct("="),
657 vcol(val))
658 return "%s%s %s %s%s%s" % (ct.punct("<"),
659 ct.layer_name(self.__class__.__name__),
660 s,
661 ct.punct("|"),
662 repr(self.payload),
663 ct.punct(">"))
665 def __str__(self):
666 # type: () -> str
667 return self.summary()
669 def __bytes__(self):
670 # type: () -> bytes
671 return self.build()
673 def __div__(self, other):
674 # type: (Any) -> Self
675 if isinstance(other, Packet):
676 cloneA = self.copy()
677 cloneB = other.copy()
678 cloneA.add_payload(cloneB)
679 return cloneA
680 elif isinstance(other, (bytes, str, bytearray, memoryview)):
681 return self / conf.raw_layer(load=bytes_encode(other))
682 else:
683 return other.__rdiv__(self) # type: ignore
684 __truediv__ = __div__
686 def __rdiv__(self, other):
687 # type: (Any) -> Packet
688 if isinstance(other, (bytes, str, bytearray, memoryview)):
689 return conf.raw_layer(load=bytes_encode(other)) / self
690 else:
691 raise TypeError
692 __rtruediv__ = __rdiv__
694 def __mul__(self, other):
695 # type: (Any) -> List[Packet]
696 if isinstance(other, int):
697 return [self] * other
698 else:
699 raise TypeError
701 def __rmul__(self, other):
702 # type: (Any) -> List[Packet]
703 return self.__mul__(other)
705 def __nonzero__(self):
706 # type: () -> bool
707 return True
708 __bool__ = __nonzero__
710 def __len__(self):
711 # type: () -> int
712 return len(self.__bytes__())
714 def copy_field_value(self, fieldname, value):
715 # type: (str, Any) -> Any
716 return self.get_field(fieldname).do_copy(value)
718 def copy_fields_dict(self, fields):
719 # type: (_T) -> _T
720 if fields is None:
721 return None
722 return {fname: self.copy_field_value(fname, fval)
723 for fname, fval in fields.items()}
725 def _raw_packet_cache_field_value(self, fld, val, copy=False):
726 # type: (AnyField, Any, bool) -> Optional[Any]
727 """Get a value representative of a mutable field to detect changes"""
728 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any]
729 if fld.holds_packets:
730 # avoid copying whole packets (perf: #GH3894)
731 if fld.islist:
732 return [
733 (_cpy(x.fields), x.payload.raw_packet_cache) for x in val
734 ]
735 else:
736 return (_cpy(val.fields), val.payload.raw_packet_cache)
737 elif fld.islist or fld.ismutable:
738 return _cpy(val)
739 return None
741 def clear_cache(self):
742 # type: () -> None
743 """Clear the raw packet cache for the field and all its subfields"""
744 self.raw_packet_cache = None
745 for fname, fval in self.fields.items():
746 fld = self.get_field(fname)
747 if fld.holds_packets:
748 if isinstance(fval, Packet):
749 fval.clear_cache()
750 elif isinstance(fval, list):
751 for fsubval in fval:
752 fsubval.clear_cache()
753 self.payload.clear_cache()
755 def self_build(self):
756 # type: () -> bytes
757 """
758 Create the default layer regarding fields_desc dict
759 """
760 if self.raw_packet_cache is not None and \
761 self.raw_packet_cache_fields is not None:
762 for fname, fval in self.raw_packet_cache_fields.items():
763 fld, val = self.getfield_and_val(fname)
764 if self._raw_packet_cache_field_value(fld, val) != fval:
765 self.raw_packet_cache = None
766 self.raw_packet_cache_fields = None
767 self.wirelen = None
768 break
769 if self.raw_packet_cache is not None:
770 return self.raw_packet_cache
771 p = b""
772 for f in self.fields_desc:
773 val = self.getfieldval(f.name)
774 if isinstance(val, RawVal):
775 p += bytes(val)
776 else:
777 try:
778 p = f.addfield(self, p, val)
779 except Exception as ex:
780 try:
781 ex.args = (
782 "While building field '%s': " % f.name +
783 ex.args[0],
784 ) + ex.args[1:]
785 except (AttributeError, IndexError):
786 pass
787 raise ex
788 return p
790 def do_build_payload(self):
791 # type: () -> bytes
792 """
793 Create the default version of the payload layer
795 :return: a string of payload layer
796 """
797 return self.payload.do_build()
799 def do_build(self):
800 # type: () -> bytes
801 """
802 Create the default version of the layer
804 :return: a string of the packet with the payload
805 """
806 if not self.explicit:
807 self = next(iter(self))
808 pkt = self.self_build()
809 for t in self.post_transforms:
810 pkt = t(pkt)
811 pay = self.do_build_payload()
812 if self.raw_packet_cache is None:
813 return self.post_build(pkt, pay)
814 else:
815 return pkt + pay
817 def build_padding(self):
818 # type: () -> bytes
819 return self.payload.build_padding()
821 def build(self):
822 # type: () -> bytes
823 """
824 Create the current layer
826 :return: string of the packet with the payload
827 """
828 p = self.do_build()
829 p += self.build_padding()
830 p = self.build_done(p)
831 return p
833 def post_build(self, pkt, pay):
834 # type: (bytes, bytes) -> bytes
835 """
836 DEV: called right after the current layer is build.
838 :param str pkt: the current packet (build by self_build function)
839 :param str pay: the packet payload (build by do_build_payload function)
840 :return: a string of the packet with the payload
841 """
842 return pkt + pay
844 def build_done(self, p):
845 # type: (bytes) -> bytes
846 return self.payload.build_done(p)
848 def do_build_ps(self):
849 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501
850 p = b""
851 pl = []
852 q = b""
853 for f in self.fields_desc:
854 if isinstance(f, ConditionalField) and not f._evalcond(self):
855 continue
856 p = f.addfield(self, p, self.getfieldval(f.name))
857 if isinstance(p, bytes):
858 r = p[len(q):]
859 q = p
860 else:
861 r = b""
862 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r))
864 pkt, lst = self.payload.build_ps(internal=1)
865 p += pkt
866 lst.append((self, pl))
868 return p, lst
870 def build_ps(self, internal=0):
871 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501
872 p, lst = self.do_build_ps()
873# if not internal:
874# pkt = self
875# while pkt.haslayer(conf.padding_layer):
876# pkt = pkt.getlayer(conf.padding_layer)
877# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) )
878# p += pkt.load
879# pkt = pkt.payload
880 return p, lst
882 def canvas_dump(self, layer_shift=0, rebuild=1):
883 # type: (int, int) -> pyx.canvas.canvas
884 if PYX == 0:
885 raise ImportError("PyX and its dependencies must be installed")
886 canvas = pyx.canvas.canvas()
887 if rebuild:
888 _, t = self.__class__(raw(self)).build_ps()
889 else:
890 _, t = self.build_ps()
891 YTXTI = len(t)
892 for _, l in t:
893 YTXTI += len(l)
894 YTXT = float(YTXTI)
895 YDUMP = YTXT
897 XSTART = 1
898 XDSTART = 10
899 y = 0.0
900 yd = 0.0
901 XMUL = 0.55
902 YMUL = 0.4
904 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb)
905 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb)
906# backcolor=makecol(0.376, 0.729, 0.525, 1.0)
908 def hexstr(x):
909 # type: (bytes) -> str
910 return " ".join("%02x" % orb(c) for c in x)
912 def make_dump_txt(x, y, txt):
913 # type: (int, float, bytes) -> pyx.text.text
914 return pyx.text.text(
915 XDSTART + x * XMUL,
916 (YDUMP - y) * YMUL,
917 r"\tt{%s}" % hexstr(txt),
918 [pyx.text.size.Large]
919 )
921 def make_box(o):
922 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox
923 return pyx.box.rect(
924 o.left(), o.bottom(), o.width(), o.height(),
925 relcenter=(0.5, 0.5)
926 )
928 def make_frame(lst):
929 # type: (List[Any]) -> pyx.path.path
930 if len(lst) == 1:
931 b = lst[0].bbox()
932 b.enlarge(pyx.unit.u_pt)
933 return b.path()
934 else:
935 fb = lst[0].bbox()
936 fb.enlarge(pyx.unit.u_pt)
937 lb = lst[-1].bbox()
938 lb.enlarge(pyx.unit.u_pt)
939 if len(lst) == 2 and fb.left() > lb.right():
940 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()),
941 pyx.path.lineto(fb.left(), fb.top()),
942 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501
943 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501
944 pyx.path.moveto(lb.left(), lb.top()),
945 pyx.path.lineto(lb.right(), lb.top()),
946 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501
947 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501
948 else:
949 # XXX
950 gb = lst[1].bbox()
951 if gb != lb:
952 gb.enlarge(pyx.unit.u_pt)
953 kb = lst[-2].bbox()
954 if kb != gb and kb != lb:
955 kb.enlarge(pyx.unit.u_pt)
956 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()),
957 pyx.path.lineto(fb.right(), fb.top()),
958 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501
959 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501
960 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501
961 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501
962 pyx.path.lineto(lb.left(), gb.top()),
963 pyx.path.lineto(fb.left(), gb.top()),
964 pyx.path.closepath(),)
966 def make_dump(s, # type: bytes
967 shift=0, # type: int
968 y=0., # type: float
969 col=None, # type: pyx.color.color
970 bkcol=None, # type: pyx.color.color
971 large=16 # type: int
972 ):
973 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501
974 c = pyx.canvas.canvas()
975 tlist = []
976 while s:
977 dmp, s = s[:large - shift], s[large - shift:]
978 txt = make_dump_txt(shift, y, dmp)
979 tlist.append(txt)
980 shift += len(dmp)
981 if shift >= 16:
982 shift = 0
983 y += 1
984 if col is None:
985 col = pyx.color.rgb.red
986 if bkcol is None:
987 bkcol = pyx.color.rgb.white
988 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501
989 for txt in tlist:
990 c.insert(txt)
991 return c, tlist[-1].bbox(), shift, y
993 last_shift, last_y = 0, 0.0
994 while t:
995 bkcol = next(backcolor)
996 proto, fields = t.pop()
997 y += 0.5
998 pt = pyx.text.text(
999 XSTART,
1000 (YTXT - y) * YMUL,
1001 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(
1002 str(proto.name)
1003 ),
1004 [pyx.text.size.Large]
1005 )
1006 y += 1
1007 ptbb = pt.bbox()
1008 ptbb.enlarge(pyx.unit.u_pt * 2)
1009 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501
1010 canvas.insert(pt)
1011 for field, fval, fdump in fields:
1012 col = next(forecolor)
1013 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501
1014 if isinstance(field, BitField):
1015 fsize = '%sb' % field.size
1016 else:
1017 fsize = '%sB' % len(fdump)
1018 if (hasattr(field, 'field') and
1019 'LE' in field.field.__class__.__name__[:3] or
1020 'LE' in field.__class__.__name__[:3]):
1021 fsize = r'$\scriptstyle\langle$' + fsize
1022 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501
1023 if isinstance(fval, str):
1024 if len(fval) > 18:
1025 fval = fval[:18] + "[...]"
1026 else:
1027 fval = ""
1028 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501
1029 y += 1.0
1030 if fdump:
1031 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501
1033 dtb = target
1034 vtb = vt.bbox()
1035 bxvt = make_box(vtb)
1036 bxdt = make_box(dtb)
1037 dtb.enlarge(pyx.unit.u_pt)
1038 try:
1039 if yd < 0:
1040 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501
1041 else:
1042 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501
1043 except Exception:
1044 pass
1045 else:
1046 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501
1048 canvas.insert(dt)
1050 canvas.insert(ft)
1051 canvas.insert(st)
1052 canvas.insert(vt)
1053 last_y += layer_shift
1055 return canvas
1057 def extract_padding(self, s):
1058 # type: (bytes) -> Tuple[bytes, Optional[bytes]]
1059 """
1060 DEV: to be overloaded to extract current layer's padding.
1062 :param str s: the current layer
1063 :return: a couple of strings (actual layer, padding)
1064 """
1065 return s, None
1067 def post_dissect(self, s):
1068 # type: (bytes) -> bytes
1069 """DEV: is called right after the current layer has been dissected"""
1070 return s
1072 def pre_dissect(self, s):
1073 # type: (bytes) -> bytes
1074 """DEV: is called right before the current layer is dissected"""
1075 return s
1077 def do_dissect(self, s):
1078 # type: (bytes) -> bytes
1079 _raw = s
1080 self.raw_packet_cache_fields = {}
1081 for f in self.fields_desc:
1082 s, fval = f.getfield(self, s)
1083 # Skip unused ConditionalField
1084 if isinstance(f, ConditionalField) and fval is None:
1085 continue
1086 # We need to track fields with mutable values to discard
1087 # .raw_packet_cache when needed.
1088 if (f.islist or f.holds_packets or f.ismutable) and fval is not None:
1089 self.raw_packet_cache_fields[f.name] = \
1090 self._raw_packet_cache_field_value(f, fval, copy=True)
1091 self.fields[f.name] = fval
1092 # Nothing left to dissect
1093 if not s and (isinstance(f, MayEnd) or
1094 (fval is not None and isinstance(f, ConditionalField) and
1095 isinstance(f.fld, MayEnd))):
1096 break
1097 self.raw_packet_cache = _raw[:-len(s)] if s else _raw
1098 self.explicit = 1
1099 return s
1101 def do_dissect_payload(self, s):
1102 # type: (bytes) -> None
1103 """
1104 Perform the dissection of the layer's payload
1106 :param str s: the raw layer
1107 """
1108 if s:
1109 if (
1110 self.stop_dissection_after and
1111 isinstance(self, self.stop_dissection_after)
1112 ):
1113 # stop dissection here
1114 p = conf.raw_layer(s, _internal=1, _underlayer=self)
1115 self.add_payload(p)
1116 return
1117 cls = self.guess_payload_class(s)
1118 try:
1119 p = cls(
1120 s,
1121 stop_dissection_after=self.stop_dissection_after,
1122 _internal=1,
1123 _underlayer=self,
1124 )
1125 except KeyboardInterrupt:
1126 raise
1127 except Exception:
1128 if conf.debug_dissector:
1129 if issubtype(cls, Packet):
1130 log_runtime.error("%s dissector failed", cls.__name__)
1131 else:
1132 log_runtime.error("%s.guess_payload_class() returned "
1133 "[%s]",
1134 self.__class__.__name__, repr(cls))
1135 if cls is not None:
1136 raise
1137 p = conf.raw_layer(s, _internal=1, _underlayer=self)
1138 self.add_payload(p)
1140 def dissect(self, s):
1141 # type: (bytes) -> None
1142 s = self.pre_dissect(s)
1144 s = self.do_dissect(s)
1146 s = self.post_dissect(s)
1148 payl, pad = self.extract_padding(s)
1149 self.do_dissect_payload(payl)
1150 if pad and conf.padding:
1151 self.add_payload(conf.padding_layer(pad))
1153 def guess_payload_class(self, payload):
1154 # type: (bytes) -> Type[Packet]
1155 """
1156 DEV: Guesses the next payload class from layer bonds.
1157 Can be overloaded to use a different mechanism.
1159 :param str payload: the layer's payload
1160 :return: the payload class
1161 """
1162 for t in self.aliastypes:
1163 for fval, cls in t.payload_guess:
1164 try:
1165 if all(v == self.getfieldval(k)
1166 for k, v in fval.items()):
1167 return cls # type: ignore
1168 except AttributeError:
1169 pass
1170 return self.default_payload_class(payload)
1172 def default_payload_class(self, payload):
1173 # type: (bytes) -> Type[Packet]
1174 """
1175 DEV: Returns the default payload class if nothing has been found by the
1176 guess_payload_class() method.
1178 :param str payload: the layer's payload
1179 :return: the default payload class define inside the configuration file
1180 """
1181 return conf.raw_layer
1183 def hide_defaults(self):
1184 # type: () -> None
1185 """Removes fields' values that are the same as default values."""
1186 # use list(): self.fields is modified in the loop
1187 for k, v in list(self.fields.items()):
1188 v = self.fields[k]
1189 if k in self.default_fields:
1190 if self.default_fields[k] == v:
1191 del self.fields[k]
1192 self.payload.hide_defaults()
1194 def clone_with(self, payload=None, **kargs):
1195 # type: (Optional[Any], **Any) -> Any
1196 pkt = self.__class__()
1197 pkt.explicit = 1
1198 pkt.fields = kargs
1199 pkt.default_fields = self.copy_fields_dict(self.default_fields)
1200 pkt.overloaded_fields = self.overloaded_fields.copy()
1201 pkt.time = self.time
1202 pkt.underlayer = self.underlayer
1203 pkt.parent = self.parent
1204 pkt.post_transforms = self.post_transforms
1205 pkt.raw_packet_cache = self.raw_packet_cache
1206 pkt.raw_packet_cache_fields = self.copy_fields_dict(
1207 self.raw_packet_cache_fields
1208 )
1209 pkt.wirelen = self.wirelen
1210 pkt.comments = self.comments
1211 pkt.sniffed_on = self.sniffed_on
1212 pkt.direction = self.direction
1213 if payload is not None:
1214 pkt.add_payload(payload)
1215 return pkt
1217 def __iter__(self):
1218 # type: () -> Iterator[Packet]
1219 """Iterates through all sub-packets generated by this Packet."""
1220 def loop(todo, done, self=self):
1221 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet]
1222 if todo:
1223 eltname = todo.pop()
1224 elt = self.getfieldval(eltname)
1225 if not isinstance(elt, Gen):
1226 if self.get_field(eltname).islist:
1227 elt = SetGen([elt])
1228 else:
1229 elt = SetGen(elt)
1230 for e in elt:
1231 done[eltname] = e
1232 for x in loop(todo[:], done):
1233 yield x
1234 else:
1235 if isinstance(self.payload, NoPayload):
1236 payloads = SetGen([None]) # type: SetGen[Packet]
1237 else:
1238 payloads = self.payload
1239 for payl in payloads:
1240 # Let's make sure subpackets are consistent
1241 done2 = done.copy()
1242 for k in done2:
1243 if isinstance(done2[k], VolatileValue):
1244 done2[k] = done2[k]._fix()
1245 pkt = self.clone_with(payload=payl, **done2)
1246 yield pkt
1248 if self.explicit or self.raw_packet_cache is not None:
1249 todo = []
1250 done = self.fields
1251 else:
1252 todo = [k for (k, v) in itertools.chain(self.default_fields.items(),
1253 self.overloaded_fields.items())
1254 if isinstance(v, VolatileValue)] + list(self.fields)
1255 done = {}
1256 return loop(todo, done)
1258 def iterpayloads(self):
1259 # type: () -> Iterator[Packet]
1260 """Used to iter through the payloads of a Packet.
1261 Useful for DNS or 802.11 for instance.
1262 """
1263 yield self
1264 current = self
1265 while current.payload:
1266 current = current.payload
1267 yield current
1269 def __gt__(self, other):
1270 # type: (Packet) -> int
1271 """True if other is an answer from self (self ==> other)."""
1272 if isinstance(other, Packet):
1273 return other < self
1274 elif isinstance(other, bytes):
1275 return 1
1276 else:
1277 raise TypeError((self, other))
1279 def __lt__(self, other):
1280 # type: (Packet) -> int
1281 """True if self is an answer from other (other ==> self)."""
1282 if isinstance(other, Packet):
1283 return self.answers(other)
1284 elif isinstance(other, bytes):
1285 return 1
1286 else:
1287 raise TypeError((self, other))
1289 def __eq__(self, other):
1290 # type: (Any) -> bool
1291 if not isinstance(other, self.__class__):
1292 return False
1293 for f in self.fields_desc:
1294 if f not in other.fields_desc:
1295 return False
1296 if self.getfieldval(f.name) != other.getfieldval(f.name):
1297 return False
1298 return self.payload == other.payload
1300 def __ne__(self, other):
1301 # type: (Any) -> bool
1302 return not self.__eq__(other)
1304 # Note: setting __hash__ to None is the standard way
1305 # of making an object un-hashable. mypy doesn't know that
1306 __hash__ = None # type: ignore
1308 def hashret(self):
1309 # type: () -> bytes
1310 """DEV: returns a string that has the same value for a request
1311 and its answer."""
1312 return self.payload.hashret()
1314 def answers(self, other):
1315 # type: (Packet) -> int
1316 """DEV: true if self is an answer from other"""
1317 if other.__class__ == self.__class__:
1318 return self.payload.answers(other.payload)
1319 return 0
1321 def layers(self):
1322 # type: () -> List[Type[Packet]]
1323 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501
1324 layers = []
1325 lyr = self # type: Optional[Packet]
1326 while lyr:
1327 layers.append(lyr.__class__)
1328 lyr = lyr.payload.getlayer(0, _subclass=True)
1329 return layers
1331 def haslayer(self, cls, _subclass=None):
1332 # type: (Union[Type[Packet], str], Optional[bool]) -> int
1333 """
1334 true if self has a layer that is an instance of cls.
1335 Superseded by "cls in self" syntax.
1336 """
1337 if _subclass is None:
1338 _subclass = self.match_subclass or None
1339 if _subclass:
1340 match = issubtype
1341 else:
1342 match = lambda x, t: bool(x == t)
1343 if cls is None or match(self.__class__, cls) \
1344 or cls in [self.__class__.__name__, self._name]:
1345 return True
1346 for f in self.packetfields:
1347 fvalue_gen = self.getfieldval(f.name)
1348 if fvalue_gen is None:
1349 continue
1350 if not f.islist:
1351 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1352 for fvalue in fvalue_gen:
1353 if isinstance(fvalue, Packet):
1354 ret = fvalue.haslayer(cls, _subclass=_subclass)
1355 if ret:
1356 return ret
1357 return self.payload.haslayer(cls, _subclass=_subclass)
1359 def getlayer(self,
1360 cls, # type: Union[int, Type[Packet], str]
1361 nb=1, # type: int
1362 _track=None, # type: Optional[List[int]]
1363 _subclass=None, # type: Optional[bool]
1364 **flt # type: Any
1365 ):
1366 # type: (...) -> Optional[Packet]
1367 """Return the nb^th layer that is an instance of cls, matching flt
1368values.
1369 """
1370 if _subclass is None:
1371 _subclass = self.match_subclass or None
1372 if _subclass:
1373 match = issubtype
1374 else:
1375 match = lambda x, t: bool(x == t)
1376 # Note:
1377 # cls can be int, packet, str
1378 # string_class_name can be packet, str (packet or packet+field)
1379 # class_name can be packet, str (packet only)
1380 if isinstance(cls, int):
1381 nb = cls + 1
1382 string_class_name = "" # type: Union[Type[Packet], str]
1383 else:
1384 string_class_name = cls
1385 class_name = "" # type: Union[Type[Packet], str]
1386 fld = None # type: Optional[str]
1387 if isinstance(string_class_name, str) and "." in string_class_name:
1388 class_name, fld = string_class_name.split(".", 1)
1389 else:
1390 class_name, fld = string_class_name, None
1391 if not class_name or match(self.__class__, class_name) \
1392 or class_name in [self.__class__.__name__, self._name]:
1393 if all(self.getfieldval(fldname) == fldvalue
1394 for fldname, fldvalue in flt.items()):
1395 if nb == 1:
1396 if fld is None:
1397 return self
1398 else:
1399 return self.getfieldval(fld) # type: ignore
1400 else:
1401 nb -= 1
1402 for f in self.packetfields:
1403 fvalue_gen = self.getfieldval(f.name)
1404 if fvalue_gen is None:
1405 continue
1406 if not f.islist:
1407 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1408 for fvalue in fvalue_gen:
1409 if isinstance(fvalue, Packet):
1410 track = [] # type: List[int]
1411 ret = fvalue.getlayer(class_name, nb=nb, _track=track,
1412 _subclass=_subclass, **flt)
1413 if ret is not None:
1414 return ret
1415 nb = track[0]
1416 return self.payload.getlayer(class_name, nb=nb, _track=_track,
1417 _subclass=_subclass, **flt)
1419 def firstlayer(self):
1420 # type: () -> Packet
1421 q = self
1422 while q.underlayer is not None:
1423 q = q.underlayer
1424 return q
1426 def __getitem__(self, cls):
1427 # type: (Union[Type[Packet], str]) -> Any
1428 if isinstance(cls, slice):
1429 lname = cls.start
1430 if cls.stop:
1431 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {}))
1432 else:
1433 ret = self.getlayer(cls.start, **(cls.step or {}))
1434 else:
1435 lname = cls
1436 ret = self.getlayer(cls)
1437 if ret is None:
1438 if isinstance(lname, type):
1439 name = lname.__name__
1440 elif not isinstance(lname, bytes):
1441 name = repr(lname)
1442 else:
1443 name = cast(str, lname)
1444 raise IndexError("Layer [%s] not found" % name)
1445 return ret
1447 def __delitem__(self, cls):
1448 # type: (Type[Packet]) -> None
1449 del self[cls].underlayer.payload
1451 def __setitem__(self, cls, val):
1452 # type: (Type[Packet], Packet) -> None
1453 self[cls].underlayer.payload = val
1455 def __contains__(self, cls):
1456 # type: (Union[Type[Packet], str]) -> int
1457 """
1458 "cls in self" returns true if self has a layer which is an
1459 instance of cls.
1460 """
1461 return self.haslayer(cls)
1463 def route(self):
1464 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
1465 return self.payload.route()
1467 def fragment(self, *args, **kargs):
1468 # type: (*Any, **Any) -> List[Packet]
1469 return self.payload.fragment(*args, **kargs)
1471 def display(self, *args, **kargs): # Deprecated. Use show()
1472 # type: (*Any, **Any) -> None
1473 """Deprecated. Use show() method."""
1474 self.show(*args, **kargs)
1476 def _show_or_dump(self,
1477 dump=False, # type: bool
1478 indent=3, # type: int
1479 lvl="", # type: str
1480 label_lvl="", # type: str
1481 first_call=True # type: bool
1482 ):
1483 # type: (...) -> Optional[str]
1484 """
1485 Internal method that shows or dumps a hierarchical view of a packet.
1486 Called by show.
1488 :param dump: determine if it prints or returns the string value
1489 :param int indent: the size of indentation for each layer
1490 :param str lvl: additional information about the layer lvl
1491 :param str label_lvl: additional information about the layer fields
1492 :param first_call: determine if the current function is the first
1493 :return: return a hierarchical view if dump, else print it
1494 """
1496 if dump:
1497 from scapy.themes import ColorTheme, AnsiColorTheme
1498 ct: ColorTheme = AnsiColorTheme() # No color for dump output
1499 else:
1500 ct = conf.color_theme
1501 s = "%s%s %s %s\n" % (label_lvl,
1502 ct.punct("###["),
1503 ct.layer_name(self.name),
1504 ct.punct("]###"))
1505 fields = self.fields_desc.copy()
1506 while fields:
1507 f = fields.pop(0)
1508 if isinstance(f, ConditionalField) and not f._evalcond(self):
1509 continue
1510 if hasattr(f, "fields"): # Field has subfields
1511 s += "%s %s =\n" % (
1512 label_lvl + lvl,
1513 ct.depreciate_field_name(f.name),
1514 )
1515 lvl += " " * indent * self.show_indent
1516 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)):
1517 fields.insert(i, fld)
1518 continue
1519 if isinstance(f, Emph) or f in conf.emph:
1520 ncol = ct.emph_field_name
1521 vcol = ct.emph_field_value
1522 else:
1523 ncol = ct.field_name
1524 vcol = ct.field_value
1525 pad = max(0, 10 - len(f.name)) * " "
1526 fvalue = self.getfieldval(f.name)
1527 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501
1528 s += "%s %s%s%s%s\n" % (label_lvl + lvl,
1529 ct.punct("\\"),
1530 ncol(f.name),
1531 pad,
1532 ct.punct("\\"))
1533 fvalue_gen = SetGen(
1534 fvalue,
1535 _iterpacket=0
1536 ) # type: SetGen[Packet]
1537 for fvalue in fvalue_gen:
1538 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501
1539 else:
1540 begn = "%s %s%s%s " % (label_lvl + lvl,
1541 ncol(f.name),
1542 pad,
1543 ct.punct("="),)
1544 reprval = f.i2repr(self, fvalue)
1545 if isinstance(reprval, str):
1546 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501
1547 len(lvl) +
1548 len(f.name) +
1549 4))
1550 s += "%s%s\n" % (begn, vcol(reprval))
1551 if self.payload:
1552 s += self.payload._show_or_dump( # type: ignore
1553 dump=dump,
1554 indent=indent,
1555 lvl=lvl + (" " * indent * self.show_indent),
1556 label_lvl=label_lvl,
1557 first_call=False
1558 )
1560 if first_call and not dump:
1561 print(s)
1562 return None
1563 else:
1564 return s
1566 def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1567 # type: (bool, int, str, str) -> Optional[Any]
1568 """
1569 Prints or returns (when "dump" is true) a hierarchical view of the
1570 packet.
1572 :param dump: determine if it prints or returns the string value
1573 :param int indent: the size of indentation for each layer
1574 :param str lvl: additional information about the layer lvl
1575 :param str label_lvl: additional information about the layer fields
1576 :return: return a hierarchical view if dump, else print it
1577 """
1578 return self._show_or_dump(dump, indent, lvl, label_lvl)
1580 def show2(self, dump=False, indent=3, lvl="", label_lvl=""):
1581 # type: (bool, int, str, str) -> Optional[Any]
1582 """
1583 Prints or returns (when "dump" is true) a hierarchical view of an
1584 assembled version of the packet, so that automatic fields are
1585 calculated (checksums, etc.)
1587 :param dump: determine if it prints or returns the string value
1588 :param int indent: the size of indentation for each layer
1589 :param str lvl: additional information about the layer lvl
1590 :param str label_lvl: additional information about the layer fields
1591 :return: return a hierarchical view if dump, else print it
1592 """
1593 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl)
1595 def sprintf(self, fmt, relax=1):
1596 # type: (str, int) -> str
1597 """
1598 sprintf(format, [relax=1]) -> str
1600 Where format is a string that can include directives. A directive
1601 begins and ends by % and has the following format:
1602 ``%[fmt[r],][cls[:nb].]field%``
1604 :param fmt: is a classic printf directive, "r" can be appended for raw
1605 substitution:
1606 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer
1607 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer).
1608 Special case : "%.time%" is the creation time.
1609 Ex::
1611 p.sprintf(
1612 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% "
1613 "%03xr,IP.proto% %r,TCP.flags%"
1614 )
1616 Moreover, the format string can include conditional statements. A
1617 conditional statement looks like : {layer:string} where layer is a
1618 layer name, and string is the string to insert in place of the
1619 condition if it is true, i.e. if layer is present. If layer is
1620 preceded by a "!", the result is inverted. Conditions can be
1621 imbricated. A valid statement can be::
1623 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
1624 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}")
1626 A side effect is that, to obtain "{" and "}" characters, you must use
1627 "%(" and "%)".
1628 """
1630 escape = {"%": "%",
1631 "(": "{",
1632 ")": "}"}
1634 # Evaluate conditions
1635 while "{" in fmt:
1636 i = fmt.rindex("{")
1637 j = fmt[i + 1:].index("}")
1638 cond = fmt[i + 1:i + j + 1]
1639 k = cond.find(":")
1640 if k < 0:
1641 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501
1642 cond, format_ = cond[:k], cond[k + 1:]
1643 res = False
1644 if cond[0] == "!":
1645 res = True
1646 cond = cond[1:]
1647 if self.haslayer(cond):
1648 res = not res
1649 if not res:
1650 format_ = ""
1651 fmt = fmt[:i] + format_ + fmt[i + j + 2:]
1653 # Evaluate directives
1654 s = ""
1655 while "%" in fmt:
1656 i = fmt.index("%")
1657 s += fmt[:i]
1658 fmt = fmt[i + 1:]
1659 if fmt and fmt[0] in escape:
1660 s += escape[fmt[0]]
1661 fmt = fmt[1:]
1662 continue
1663 try:
1664 i = fmt.index("%")
1665 sfclsfld = fmt[:i]
1666 fclsfld = sfclsfld.split(",")
1667 if len(fclsfld) == 1:
1668 f = "s"
1669 clsfld = fclsfld[0]
1670 elif len(fclsfld) == 2:
1671 f, clsfld = fclsfld
1672 else:
1673 raise Scapy_Exception
1674 if "." in clsfld:
1675 cls, fld = clsfld.split(".")
1676 else:
1677 cls = self.__class__.__name__
1678 fld = clsfld
1679 num = 1
1680 if ":" in cls:
1681 cls, snum = cls.split(":")
1682 num = int(snum)
1683 fmt = fmt[i + 1:]
1684 except Exception:
1685 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501
1686 else:
1687 if fld == "time":
1688 val = time.strftime(
1689 "%H:%M:%S.%%06i",
1690 time.localtime(float(self.time))
1691 ) % int((self.time - int(self.time)) * 1000000)
1692 elif cls == self.__class__.__name__ and hasattr(self, fld):
1693 if num > 1:
1694 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501
1695 f = "s"
1696 else:
1697 try:
1698 val = self.getfieldval(fld)
1699 except AttributeError:
1700 val = getattr(self, fld)
1701 if f[-1] == "r": # Raw field value
1702 f = f[:-1]
1703 if not f:
1704 f = "s"
1705 else:
1706 if fld in self.fieldtype:
1707 val = self.fieldtype[fld].i2repr(self, val)
1708 else:
1709 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax)
1710 f = "s"
1711 s += ("%" + f) % val
1713 s += fmt
1714 return s
1716 def mysummary(self):
1717 # type: () -> str
1718 """DEV: can be overloaded to return a string that summarizes the layer.
1719 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501
1720 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501
1721 mysummary() must be called if they are present."""
1722 return ""
1724 def _do_summary(self):
1725 # type: () -> Tuple[int, str, List[Any]]
1726 found, s, needed = self.payload._do_summary()
1727 ret = ""
1728 if not found or self.__class__ in needed:
1729 ret = self.mysummary()
1730 if isinstance(ret, tuple):
1731 ret, n = ret
1732 needed += n
1733 if ret or needed:
1734 found = 1
1735 if not ret:
1736 ret = self.__class__.__name__ if self.show_summary else ""
1737 if self.__class__ in conf.emph:
1738 impf = []
1739 for f in self.fields_desc:
1740 if f in conf.emph:
1741 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501
1742 ret = "%s [%s]" % (ret, " ".join(impf))
1743 if ret and s:
1744 ret = "%s / %s" % (ret, s)
1745 else:
1746 ret = "%s%s" % (ret, s)
1747 return found, ret, needed
1749 def summary(self, intern=0):
1750 # type: (int) -> str
1751 """Prints a one line summary of a packet."""
1752 return self._do_summary()[1]
1754 def lastlayer(self, layer=None):
1755 # type: (Optional[Packet]) -> Packet
1756 """Returns the uppest layer of the packet"""
1757 return self.payload.lastlayer(self)
1759 def decode_payload_as(self, cls):
1760 # type: (Type[Packet]) -> None
1761 """Reassembles the payload and decode it using another packet class"""
1762 s = raw(self.payload)
1763 self.payload = cls(s, _internal=1, _underlayer=self)
1764 pp = self
1765 while pp.underlayer is not None:
1766 pp = pp.underlayer
1767 self.payload.dissection_done(pp)
1769 def _command(self, json=False):
1770 # type: (bool) -> List[Tuple[str, Any]]
1771 """
1772 Internal method used to generate command() and json()
1773 """
1774 f = []
1775 iterator: Iterator[Tuple[str, Any]]
1776 if json:
1777 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc)
1778 else:
1779 iterator = iter(self.fields.items())
1780 for fn, fv in iterator:
1781 fld = self.get_field(fn)
1782 if isinstance(fv, (list, dict, set)) and not fv and not fld.default:
1783 continue
1784 if isinstance(fv, Packet):
1785 if json:
1786 fv = {k: v for (k, v) in fv._command(json=True)}
1787 else:
1788 fv = fv.command()
1789 elif fld.islist and fld.holds_packets and isinstance(fv, list):
1790 if json:
1791 fv = [
1792 {k: v for (k, v) in x}
1793 for x in map(lambda y: Packet._command(y, json=True), fv)
1794 ]
1795 else:
1796 fv = "[%s]" % ",".join(map(Packet.command, fv))
1797 elif fld.islist and isinstance(fv, list):
1798 if json:
1799 fv = [
1800 getattr(x, 'command', lambda: repr(x))()
1801 for x in fv
1802 ]
1803 else:
1804 fv = "[%s]" % ",".join(
1805 getattr(x, 'command', lambda: repr(x))()
1806 for x in fv
1807 )
1808 elif isinstance(fv, FlagValue):
1809 fv = int(fv)
1810 elif callable(getattr(fv, 'command', None)):
1811 fv = fv.command(json=json)
1812 else:
1813 if json:
1814 if isinstance(fv, bytes):
1815 fv = fv.decode("utf-8", errors="backslashreplace")
1816 else:
1817 fv = fld.i2h(self, fv)
1818 else:
1819 fv = repr(fld.i2h(self, fv))
1820 f.append((fn, fv))
1821 return f
1823 def command(self):
1824 # type: () -> str
1825 """
1826 Returns a string representing the command you have to type to
1827 obtain the same packet
1828 """
1829 c = "%s(%s)" % (
1830 self.__class__.__name__,
1831 ", ".join("%s=%s" % x for x in self._command())
1832 )
1833 pc = self.payload.command()
1834 if pc:
1835 c += "/" + pc
1836 return c
1838 def json(self):
1839 # type: () -> str
1840 """
1841 Returns a JSON representing the packet.
1843 Please note that this cannot be used for bijective usage: data loss WILL occur,
1844 so it will not make sense to try to rebuild the packet from the output.
1845 This must only be used for a grepping/displaying purpose.
1846 """
1847 dump = json.dumps({k: v for (k, v) in self._command(json=True)})
1848 pc = self.payload.json()
1849 if pc:
1850 dump = dump[:-1] + ", \"payload\": %s}" % pc
1851 return dump
1854class NoPayload(Packet):
1855 def __new__(cls, *args, **kargs):
1856 # type: (Type[Packet], *Any, **Any) -> NoPayload
1857 singl = cls.__dict__.get("__singl__")
1858 if singl is None:
1859 cls.__singl__ = singl = Packet.__new__(cls)
1860 Packet.__init__(singl)
1861 return cast(NoPayload, singl)
1863 def __init__(self, *args, **kargs):
1864 # type: (*Any, **Any) -> None
1865 pass
1867 def dissection_done(self, pkt):
1868 # type: (Packet) -> None
1869 pass
1871 def add_payload(self, payload):
1872 # type: (Union[Packet, bytes]) -> NoReturn
1873 raise Scapy_Exception("Can't add payload to NoPayload instance")
1875 def remove_payload(self):
1876 # type: () -> None
1877 pass
1879 def add_underlayer(self, underlayer):
1880 # type: (Any) -> None
1881 pass
1883 def remove_underlayer(self, other):
1884 # type: (Packet) -> None
1885 pass
1887 def add_parent(self, parent):
1888 # type: (Any) -> None
1889 pass
1891 def remove_parent(self, other):
1892 # type: (Packet) -> None
1893 pass
1895 def copy(self):
1896 # type: () -> NoPayload
1897 return self
1899 def clear_cache(self):
1900 # type: () -> None
1901 pass
1903 def __repr__(self):
1904 # type: () -> str
1905 return ""
1907 def __str__(self):
1908 # type: () -> str
1909 return ""
1911 def __bytes__(self):
1912 # type: () -> bytes
1913 return b""
1915 def __nonzero__(self):
1916 # type: () -> bool
1917 return False
1918 __bool__ = __nonzero__
1920 def do_build(self):
1921 # type: () -> bytes
1922 return b""
1924 def build(self):
1925 # type: () -> bytes
1926 return b""
1928 def build_padding(self):
1929 # type: () -> bytes
1930 return b""
1932 def build_done(self, p):
1933 # type: (bytes) -> bytes
1934 return p
1936 def build_ps(self, internal=0):
1937 # type: (int) -> Tuple[bytes, List[Any]]
1938 return b"", []
1940 def getfieldval(self, attr):
1941 # type: (str) -> NoReturn
1942 raise AttributeError(attr)
1944 def getfield_and_val(self, attr):
1945 # type: (str) -> NoReturn
1946 raise AttributeError(attr)
1948 def setfieldval(self, attr, val):
1949 # type: (str, Any) -> NoReturn
1950 raise AttributeError(attr)
1952 def delfieldval(self, attr):
1953 # type: (str) -> NoReturn
1954 raise AttributeError(attr)
1956 def hide_defaults(self):
1957 # type: () -> None
1958 pass
1960 def __iter__(self):
1961 # type: () -> Iterator[Packet]
1962 return iter([])
1964 def __eq__(self, other):
1965 # type: (Any) -> bool
1966 if isinstance(other, NoPayload):
1967 return True
1968 return False
1970 def hashret(self):
1971 # type: () -> bytes
1972 return b""
1974 def answers(self, other):
1975 # type: (Packet) -> bool
1976 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501
1978 def haslayer(self, cls, _subclass=None):
1979 # type: (Union[Type[Packet], str], Optional[bool]) -> int
1980 return 0
1982 def getlayer(self,
1983 cls, # type: Union[int, Type[Packet], str]
1984 nb=1, # type: int
1985 _track=None, # type: Optional[List[int]]
1986 _subclass=None, # type: Optional[bool]
1987 **flt # type: Any
1988 ):
1989 # type: (...) -> Optional[Packet]
1990 if _track is not None:
1991 _track.append(nb)
1992 return None
1994 def fragment(self, *args, **kargs):
1995 # type: (*Any, **Any) -> List[Packet]
1996 raise Scapy_Exception("cannot fragment this packet")
1998 def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1999 # type: (bool, int, str, str) -> None
2000 pass
2002 def sprintf(self, fmt, relax=1):
2003 # type: (str, int) -> str
2004 if relax:
2005 return "??"
2006 else:
2007 raise Scapy_Exception("Format not found [%s]" % fmt)
2009 def _do_summary(self):
2010 # type: () -> Tuple[int, str, List[Any]]
2011 return 0, "", []
2013 def layers(self):
2014 # type: () -> List[Type[Packet]]
2015 return []
2017 def lastlayer(self, layer=None):
2018 # type: (Optional[Packet]) -> Packet
2019 return layer or self
2021 def command(self):
2022 # type: () -> str
2023 return ""
2025 def json(self):
2026 # type: () -> str
2027 return ""
2029 def route(self):
2030 # type: () -> Tuple[None, None, None]
2031 return (None, None, None)
2034####################
2035# packet classes #
2036####################
2039class Raw(Packet):
2040 name = "Raw"
2041 fields_desc = [StrField("load", b"")]
2043 def __init__(self, _pkt=b"", *args, **kwargs):
2044 # type: (bytes, *Any, **Any) -> None
2045 if _pkt and not isinstance(_pkt, bytes):
2046 if isinstance(_pkt, tuple):
2047 _pkt, bn = _pkt
2048 _pkt = bytes_encode(_pkt), bn
2049 else:
2050 _pkt = bytes_encode(_pkt)
2051 super(Raw, self).__init__(_pkt, *args, **kwargs)
2053 def answers(self, other):
2054 # type: (Packet) -> int
2055 return 1
2057 def mysummary(self):
2058 # type: () -> str
2059 cs = conf.raw_summary
2060 if cs:
2061 if callable(cs):
2062 return "Raw %s" % cs(self.load)
2063 else:
2064 return "Raw %r" % self.load
2065 return Packet.mysummary(self)
2068class Padding(Raw):
2069 name = "Padding"
2071 def self_build(self):
2072 # type: (Optional[Any]) -> bytes
2073 return b""
2075 def build_padding(self):
2076 # type: () -> bytes
2077 return (
2078 bytes_encode(self.load) if self.raw_packet_cache is None
2079 else self.raw_packet_cache
2080 ) + self.payload.build_padding()
2083conf.raw_layer = Raw
2084conf.padding_layer = Padding
2085if conf.default_l2 is None:
2086 conf.default_l2 = Raw
2088#################
2089# Bind layers #
2090#################
2093def bind_bottom_up(lower, # type: Type[Packet]
2094 upper, # type: Type[Packet]
2095 __fval=None, # type: Optional[Any]
2096 **fval # type: Any
2097 ):
2098 # type: (...) -> None
2099 r"""Bind 2 layers for dissection.
2100 The upper layer will be chosen for dissection on top of the lower layer, if
2101 ALL the passed arguments are validated. If multiple calls are made with
2102 the same layers, the last one will be used as default.
2104 ex:
2105 >>> bind_bottom_up(Ether, SNAP, type=0x1234)
2106 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501
2107 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501
2108 """
2109 if __fval is not None:
2110 fval.update(__fval)
2111 lower.payload_guess = lower.payload_guess[:]
2112 lower.payload_guess.append((fval, upper))
2115def bind_top_down(lower, # type: Type[Packet]
2116 upper, # type: Type[Packet]
2117 __fval=None, # type: Optional[Any]
2118 **fval # type: Any
2119 ):
2120 # type: (...) -> None
2121 """Bind 2 layers for building.
2122 When the upper layer is added as a payload of the lower layer, all the
2123 arguments will be applied to them.
2125 ex:
2126 >>> bind_top_down(Ether, SNAP, type=0x1234)
2127 >>> Ether()/SNAP()
2128 <Ether type=0x1234 |<SNAP |>>
2129 """
2130 if __fval is not None:
2131 fval.update(__fval)
2132 upper._overload_fields = upper._overload_fields.copy() # type: ignore
2133 upper._overload_fields[lower] = fval
2136@conf.commands.register
2137def bind_layers(lower, # type: Type[Packet]
2138 upper, # type: Type[Packet]
2139 __fval=None, # type: Optional[Dict[str, int]]
2140 **fval # type: Any
2141 ):
2142 # type: (...) -> None
2143 """Bind 2 layers on some specific fields' values.
2145 It makes the packet being built and dissected when the arguments
2146 are present.
2148 This function calls both bind_bottom_up and bind_top_down, with
2149 all passed arguments.
2151 Please have a look at their docs:
2152 - help(bind_bottom_up)
2153 - help(bind_top_down)
2154 """
2155 if __fval is not None:
2156 fval.update(__fval)
2157 bind_top_down(lower, upper, **fval)
2158 bind_bottom_up(lower, upper, **fval)
2161def split_bottom_up(lower, # type: Type[Packet]
2162 upper, # type: Type[Packet]
2163 __fval=None, # type: Optional[Any]
2164 **fval # type: Any
2165 ):
2166 # type: (...) -> None
2167 """This call un-links an association that was made using bind_bottom_up.
2168 Have a look at help(bind_bottom_up)
2169 """
2170 if __fval is not None:
2171 fval.update(__fval)
2173 def do_filter(params, cls):
2174 # type: (Dict[str, int], Type[Packet]) -> bool
2175 params_is_invalid = any(
2176 k not in params or params[k] != v for k, v in fval.items()
2177 )
2178 return cls != upper or params_is_invalid
2179 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)]
2182def split_top_down(lower, # type: Type[Packet]
2183 upper, # type: Type[Packet]
2184 __fval=None, # type: Optional[Any]
2185 **fval # type: Any
2186 ):
2187 # type: (...) -> None
2188 """This call un-links an association that was made using bind_top_down.
2189 Have a look at help(bind_top_down)
2190 """
2191 if __fval is not None:
2192 fval.update(__fval)
2193 if lower in upper._overload_fields:
2194 ofval = upper._overload_fields[lower]
2195 if any(k not in ofval or ofval[k] != v for k, v in fval.items()):
2196 return
2197 upper._overload_fields = upper._overload_fields.copy() # type: ignore
2198 del upper._overload_fields[lower]
2201@conf.commands.register
2202def split_layers(lower, # type: Type[Packet]
2203 upper, # type: Type[Packet]
2204 __fval=None, # type: Optional[Any]
2205 **fval # type: Any
2206 ):
2207 # type: (...) -> None
2208 """Split 2 layers previously bound.
2209 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501
2210 bind_layers.
2212 Please have a look at their docs:
2213 - help(split_bottom_up)
2214 - help(split_top_down)
2215 """
2216 if __fval is not None:
2217 fval.update(__fval)
2218 split_bottom_up(lower, upper, **fval)
2219 split_top_down(lower, upper, **fval)
2222@conf.commands.register
2223def explore(layer=None):
2224 # type: (Optional[str]) -> None
2225 """Function used to discover the Scapy layers and protocols.
2226 It helps to see which packets exists in contrib or layer files.
2228 params:
2229 - layer: If specified, the function will explore the layer. If not,
2230 the GUI mode will be activated, to browse the available layers
2232 examples:
2233 >>> explore() # Launches the GUI
2234 >>> explore("dns") # Explore scapy.layers.dns
2235 >>> explore("http2") # Explore scapy.contrib.http2
2236 >>> explore(scapy.layers.bluetooth4LE)
2238 Note: to search a packet by name, use ls("name") rather than explore.
2239 """
2240 if layer is None: # GUI MODE
2241 if not conf.interactive:
2242 raise Scapy_Exception("explore() GUI-mode cannot be run in "
2243 "interactive mode. Please provide a "
2244 "'layer' parameter !")
2245 # 0 - Imports
2246 try:
2247 import prompt_toolkit
2248 except ImportError:
2249 raise ImportError("prompt_toolkit is not installed ! "
2250 "You may install IPython, which contains it, via"
2251 " `pip install ipython`")
2252 if not _version_checker(prompt_toolkit, (2, 0)):
2253 raise ImportError("prompt_toolkit >= 2.0.0 is required !")
2254 # Only available with prompt_toolkit > 2.0, not released on PyPi yet
2255 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \
2256 button_dialog
2257 from prompt_toolkit.formatted_text import HTML
2258 # Check for prompt_toolkit >= 3.0.0
2259 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str]
2260 if _version_checker(prompt_toolkit, (3, 0)):
2261 call_ptk = lambda x: x.run()
2262 # 1 - Ask for layer or contrib
2263 btn_diag = button_dialog(
2264 title="Scapy v%s" % conf.version,
2265 text=HTML(
2266 '<style bg="white" fg="red">Chose the type of packets'
2267 ' you want to explore:</style>'
2268 ),
2269 buttons=[
2270 ("Layers", "layers"),
2271 ("Contribs", "contribs"),
2272 ("Cancel", "cancel")
2273 ])
2274 action = call_ptk(btn_diag)
2275 # 2 - Retrieve list of Packets
2276 if action == "layers":
2277 # Get all loaded layers
2278 lvalues = conf.layers.layers()
2279 # Restrict to layers-only (not contribs) + packet.py and asn1*.py
2280 values = [x for x in lvalues if ("layers" in x[0] or
2281 "packet" in x[0] or
2282 "asn1" in x[0])]
2283 elif action == "contribs":
2284 # Get all existing contribs
2285 from scapy.main import list_contrib
2286 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True))
2287 values = [(x['name'], x['description'])
2288 for x in cvalues]
2289 # Remove very specific modules
2290 values = [x for x in values if "can" not in x[0]]
2291 else:
2292 # Escape/Cancel was pressed
2293 return
2294 # Build tree
2295 if action == "contribs":
2296 # A tree is a dictionary. Each layer contains a keyword
2297 # _l which contains the files in the layer, and a _name
2298 # argument which is its name. The other keys are the subfolders,
2299 # which are similar dictionaries
2300 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501
2301 for name, desc in values:
2302 if "." in name: # Folder detected
2303 parts = name.split(".")
2304 subtree = tree
2305 for pa in parts[:-1]:
2306 if pa not in subtree:
2307 subtree[pa] = {}
2308 # one layer deeper
2309 subtree = subtree[pa] # type: ignore
2310 subtree["_name"] = pa # type: ignore
2311 if "_l" not in subtree:
2312 subtree["_l"] = []
2313 subtree["_l"].append((parts[-1], desc)) # type: ignore
2314 else:
2315 tree["_l"].append((name, desc)) # type: ignore
2316 elif action == "layers":
2317 tree = {"_l": values}
2318 # 3 - Ask for the layer/contrib module to explore
2319 current = tree # type: Any
2320 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501
2321 while True:
2322 # Generate tests & form
2323 folders = list(current.keys())
2324 _radio_values = [
2325 ("$" + name, str('[+] ' + name.capitalize()))
2326 for name in folders if not name.startswith("_")
2327 ] + current.get("_l", []) # type: List[str]
2328 cur_path = ""
2329 if previous:
2330 cur_path = ".".join(
2331 itertools.chain(
2332 (x["_name"] for x in previous[1:]), # type: ignore
2333 (current["_name"],)
2334 )
2335 )
2336 extra_text = (
2337 '\n<style bg="white" fg="green">> scapy.%s</style>'
2338 ) % (action + ("." + cur_path if cur_path else ""))
2339 # Show popup
2340 rd_diag = radiolist_dialog(
2341 values=_radio_values,
2342 title="Scapy v%s" % conf.version,
2343 text=HTML(
2344 (
2345 '<style bg="white" fg="red">Please select a file'
2346 'among the following, to see all layers contained in'
2347 ' it:</style>'
2348 ) + extra_text
2349 ),
2350 cancel_text="Back" if previous else "Cancel"
2351 )
2352 result = call_ptk(rd_diag)
2353 if result is None:
2354 # User pressed "Cancel/Back"
2355 if previous: # Back
2356 current = previous.pop()
2357 continue
2358 else: # Cancel
2359 return
2360 if result.startswith("$"):
2361 previous.append(current)
2362 current = current[result[1:]]
2363 else:
2364 # Enter on layer
2365 if previous: # In subfolder
2366 result = cur_path + "." + result
2367 break
2368 # 4 - (Contrib only): load contrib
2369 if action == "contribs":
2370 from scapy.main import load_contrib
2371 load_contrib(result)
2372 result = "scapy.contrib." + result
2373 else: # NON-GUI MODE
2374 # We handle layer as a short layer name, full layer name
2375 # or the module itself
2376 if isinstance(layer, types.ModuleType):
2377 layer = layer.__name__
2378 if isinstance(layer, str):
2379 if layer.startswith("scapy.layers."):
2380 result = layer
2381 else:
2382 if layer.startswith("scapy.contrib."):
2383 layer = layer.replace("scapy.contrib.", "")
2384 from scapy.main import load_contrib
2385 load_contrib(layer)
2386 result_layer, result_contrib = (("scapy.layers.%s" % layer),
2387 ("scapy.contrib.%s" % layer))
2388 if result_layer in conf.layers.ldict:
2389 result = result_layer
2390 elif result_contrib in conf.layers.ldict:
2391 result = result_contrib
2392 else:
2393 raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2394 else:
2395 warning("Wrong usage ! Check out help(explore)")
2396 return
2398 # COMMON PART
2399 # Get the list of all Packets contained in that module
2400 try:
2401 all_layers = conf.layers.ldict[result]
2402 except KeyError:
2403 raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2404 # Print
2405 print(conf.color_theme.layer_name("Packets contained in %s:" % result))
2406 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]]
2407 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers]
2408 print(pretty_list(rtlst, [("Class", "Name")], borders=True))
2411def _pkt_ls(obj, # type: Union[Packet, Type[Packet]]
2412 verbose=False, # type: bool
2413 ):
2414 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501
2415 """Internal function used to resolve `fields_desc` to display it.
2417 :param obj: a packet object or class
2418 :returns: a list containing tuples [(name, clsname, clsname_extras,
2419 default, long_attrs)]
2420 """
2421 is_pkt = isinstance(obj, Packet)
2422 if not issubtype(obj, Packet) and not is_pkt:
2423 raise ValueError
2424 fields = []
2425 for f in obj.fields_desc:
2426 cur_fld = f
2427 attrs = [] # type: List[str]
2428 long_attrs = [] # type: List[str]
2429 while isinstance(cur_fld, (Emph, ConditionalField)):
2430 if isinstance(cur_fld, ConditionalField):
2431 attrs.append(cur_fld.__class__.__name__[:4])
2432 cur_fld = cur_fld.fld
2433 name = cur_fld.name
2434 default = cur_fld.default
2435 if verbose and isinstance(cur_fld, EnumField) \
2436 and hasattr(cur_fld, "i2s") and cur_fld.i2s:
2437 if len(cur_fld.i2s or []) < 50:
2438 long_attrs.extend(
2439 "%s: %d" % (strval, numval)
2440 for numval, strval in
2441 sorted(cur_fld.i2s.items())
2442 )
2443 elif isinstance(cur_fld, MultiEnumField):
2444 if isinstance(obj, Packet):
2445 obj_pkt = obj
2446 else:
2447 obj_pkt = obj()
2448 fld_depend = cur_fld.depends_on(obj_pkt)
2449 attrs.append("Depends on %s" % fld_depend)
2450 if verbose:
2451 cur_i2s = cur_fld.i2s_multi.get(
2452 cur_fld.depends_on(obj_pkt), {}
2453 )
2454 if len(cur_i2s) < 50:
2455 long_attrs.extend(
2456 "%s: %d" % (strval, numval)
2457 for numval, strval in
2458 sorted(cur_i2s.items())
2459 )
2460 elif verbose and isinstance(cur_fld, FlagsField):
2461 names = cur_fld.names
2462 long_attrs.append(", ".join(names))
2463 elif isinstance(cur_fld, MultipleTypeField):
2464 default = cur_fld.dflt.default
2465 attrs.append(", ".join(
2466 x[0].__class__.__name__ for x in
2467 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)])
2468 ))
2470 cls = cur_fld.__class__
2471 class_name_extras = "(%s)" % (
2472 ", ".join(attrs)
2473 ) if attrs else ""
2474 if isinstance(cur_fld, BitField):
2475 class_name_extras += " (%d bit%s)" % (
2476 cur_fld.size,
2477 "s" if cur_fld.size > 1 else ""
2478 )
2479 fields.append(
2480 (name,
2481 cls,
2482 class_name_extras,
2483 repr(default),
2484 long_attrs)
2485 )
2486 return fields
2489@conf.commands.register
2490def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]]
2491 case_sensitive=False, # type: bool
2492 verbose=False # type: bool
2493 ):
2494 # type: (...) -> None
2495 """List available layers, or infos on a given layer class or name.
2497 :param obj: Packet / packet name to use
2498 :param case_sensitive: if obj is a string, is it case sensitive?
2499 :param verbose:
2500 """
2501 if obj is None or isinstance(obj, str):
2502 tip = False
2503 if obj is None:
2504 tip = True
2505 all_layers = sorted(conf.layers, key=lambda x: x.__name__)
2506 else:
2507 pattern = re.compile(
2508 obj,
2509 0 if case_sensitive else re.I
2510 )
2511 # We first order by accuracy, then length
2512 if case_sensitive:
2513 sorter = lambda x: (x.__name__.index(obj), len(x.__name__))
2514 else:
2515 obj = obj.lower()
2516 sorter = lambda x: (x.__name__.lower().index(obj),
2517 len(x.__name__))
2518 all_layers = sorted((layer for layer in conf.layers
2519 if (isinstance(layer.__name__, str) and
2520 pattern.search(layer.__name__)) or
2521 (isinstance(layer.name, str) and
2522 pattern.search(layer.name))),
2523 key=sorter)
2524 for layer in all_layers:
2525 print("%-10s : %s" % (layer.__name__, layer._name))
2526 if tip and conf.interactive:
2527 print("\nTIP: You may use explore() to navigate through all "
2528 "layers using a clear GUI")
2529 else:
2530 try:
2531 fields = _pkt_ls(
2532 obj,
2533 verbose=verbose
2534 )
2535 is_pkt = isinstance(obj, Packet)
2536 # Print
2537 for fname, cls, clsne, dflt, long_attrs in fields:
2538 clsinfo = cls.__name__ + " " + clsne
2539 print("%-10s : %-35s =" % (fname, clsinfo), end=' ')
2540 if is_pkt:
2541 print("%-15r" % (getattr(obj, fname),), end=' ')
2542 print("(%r)" % (dflt,))
2543 for attr in long_attrs:
2544 print("%-15s%s" % ("", attr))
2545 # Restart for payload if any
2546 if is_pkt:
2547 obj = cast(Packet, obj)
2548 if isinstance(obj.payload, NoPayload):
2549 return
2550 print("--")
2551 ls(obj.payload)
2552 except ValueError:
2553 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501
2556@conf.commands.register
2557def rfc(cls, ret=False, legend=True):
2558 # type: (Type[Packet], bool, bool) -> Optional[str]
2559 """
2560 Generate an RFC-like representation of a packet def.
2562 :param cls: the Packet class
2563 :param ret: return the result instead of printing (def. False)
2564 :param legend: show text under the diagram (default True)
2566 Ex::
2568 >>> rfc(Ether)
2570 """
2571 if not issubclass(cls, Packet):
2572 raise TypeError("Packet class expected")
2573 cur_len = 0
2574 cur_line = []
2575 lines = []
2576 # Get the size (width) that a field will take
2577 # when formatted, from its length in bits
2578 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int]
2579 ident = 0 # Fields UUID
2581 # Generate packet groups
2582 def _iterfields() -> Iterator[Tuple[str, int]]:
2583 for f in cls.fields_desc:
2584 # Fancy field name
2585 fname = f.name.upper().replace("_", " ")
2586 fsize = int(f.sz * 8)
2587 yield fname, fsize
2588 # Add padding optionally
2589 if isinstance(f, PadField):
2590 if isinstance(f._align, tuple):
2591 pad = - cur_len % (f._align[0] * 8)
2592 else:
2593 pad = - cur_len % (f._align * 8)
2594 if pad:
2595 yield "padding", pad
2596 for fname, flen in _iterfields():
2597 cur_len += flen
2598 ident += 1
2599 # The field might exceed the current line or
2600 # take more than one line. Copy it as required
2601 while True:
2602 over = max(0, cur_len - 32) # Exceed
2603 len1 = clsize(flen - over) # What fits
2604 cur_line.append((fname[:len1], len1, ident))
2605 if cur_len >= 32:
2606 # Current line is full. start a new line
2607 lines.append(cur_line)
2608 cur_len = flen = over
2609 fname = "" # do not repeat the field
2610 cur_line = []
2611 if not over:
2612 # there is no data left
2613 break
2614 else:
2615 # End of the field
2616 break
2617 # Add the last line if un-finished
2618 if cur_line:
2619 lines.append(cur_line)
2620 # Calculate separations between lines
2621 seps = []
2622 seps.append("+-" * 32 + "+\n")
2623 for i in range(len(lines) - 1):
2624 # Start with a full line
2625 sep = "+-" * 32 + "+\n"
2626 # Get the line above and below the current
2627 # separation
2628 above, below = lines[i], lines[i + 1]
2629 # The last field of above is shared with below
2630 if above[-1][2] == below[0][2]:
2631 # where the field in "above" starts
2632 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1
2633 # where the field in "below" ends
2634 pos_below = below[0][1]
2635 if pos_above < pos_below:
2636 # they are overlapping.
2637 # Now crop the space between those pos
2638 # and fill it with " "
2639 pos_above = pos_above + pos_above % 2
2640 sep = (
2641 sep[:1 + pos_above] +
2642 " " * (pos_below - pos_above) +
2643 sep[1 + pos_below:]
2644 )
2645 # line is complete
2646 seps.append(sep)
2647 # Graph
2648 result = ""
2649 # Bytes markers
2650 result += " " + (" " * 19).join(
2651 str(x) for x in range(4)
2652 ) + "\n"
2653 # Bits markers
2654 result += " " + " ".join(
2655 str(x % 10) for x in range(32)
2656 ) + "\n"
2657 # Add fields and their separations
2658 for line, sep in zip(lines, seps):
2659 result += sep
2660 for elt, flen, _ in line:
2661 result += "|" + elt.center(flen, " ")
2662 result += "|\n"
2663 result += "+-" * (cur_len or 32) + "+\n"
2664 # Annotate with the figure name
2665 if legend:
2666 result += "\n" + ("Fig. " + cls.__name__).center(66, " ")
2667 # return if asked for, else print
2668 if ret:
2669 return result
2670 print(result)
2671 return None
2674#############
2675# Fuzzing #
2676#############
2678_P = TypeVar('_P', bound=Packet)
2681@conf.commands.register
2682def fuzz(p, # type: _P
2683 _inplace=0, # type: int
2684 ):
2685 # type: (...) -> _P
2686 """
2687 Transform a layer into a fuzzy layer by replacing some default values
2688 by random objects.
2690 :param p: the Packet instance to fuzz
2691 :return: the fuzzed packet.
2692 """
2693 if not _inplace:
2694 p = p.copy()
2695 q = cast(Packet, p)
2696 while not isinstance(q, NoPayload):
2697 new_default_fields = {}
2698 multiple_type_fields = [] # type: List[str]
2699 for f in q.fields_desc:
2700 if isinstance(f, PacketListField):
2701 for r in getattr(q, f.name):
2702 fuzz(r, _inplace=1)
2703 elif isinstance(f, MultipleTypeField):
2704 # the type of the field will depend on others
2705 multiple_type_fields.append(f.name)
2706 elif f.default is not None:
2707 if not isinstance(f, ConditionalField) or f._evalcond(q):
2708 rnd = f.randval()
2709 if rnd is not None:
2710 new_default_fields[f.name] = rnd
2711 # Process packets with MultipleTypeFields
2712 if multiple_type_fields:
2713 # freeze the other random values
2714 new_default_fields = {
2715 key: (val._fix() if isinstance(val, VolatileValue) else val)
2716 for key, val in new_default_fields.items()
2717 }
2718 q.default_fields.update(new_default_fields)
2719 new_default_fields.clear()
2720 # add the random values of the MultipleTypeFields
2721 for name in multiple_type_fields:
2722 fld = cast(MultipleTypeField, q.get_field(name))
2723 rnd = fld._find_fld_pkt(q).randval()
2724 if rnd is not None:
2725 new_default_fields[name] = rnd
2726 q.default_fields.update(new_default_fields)
2727 q = q.payload
2728 return p