Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/fields.py: 63%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Copyright (C) Michael Farrell <micolous+git@gmail.com>
8"""
9Fields: basic data structures that make up parts of packets.
10"""
12import calendar
13import collections
14import copy
15import datetime
16import inspect
17import math
18import socket
19import struct
20import time
21import warnings
23from types import MethodType
24from uuid import UUID
25from enum import Enum
27from scapy.config import conf
28from scapy.dadict import DADict
29from scapy.volatile import RandBin, RandByte, RandEnumKeys, RandInt, \
30 RandIP, RandIP6, RandLong, RandMAC, RandNum, RandShort, RandSInt, \
31 RandSByte, RandTermString, RandUUID, VolatileValue, RandSShort, \
32 RandSLong, RandFloat
33from scapy.data import EPOCH
34from scapy.error import log_runtime, Scapy_Exception
35from scapy.compat import bytes_hex, chb, orb, plain_str, raw, bytes_encode
36from scapy.pton_ntop import inet_ntop, inet_pton
37from scapy.utils import inet_aton, inet_ntoa, lhex, mac2str, str2mac, EDecimal
38from scapy.utils6 import in6_6to4ExtractAddr, in6_isaddr6to4, \
39 in6_isaddrTeredo, in6_ptop, Net6, teredoAddrExtractInfo
40from scapy.base_classes import Gen, Net, BasePacket, Field_metaclass
41from scapy.error import warning
43# Typing imports
44from typing import (
45 Any,
46 AnyStr,
47 Callable,
48 Dict,
49 List,
50 Generic,
51 Optional,
52 Set,
53 Tuple,
54 Type,
55 TypeVar,
56 Union,
57 # func
58 cast,
59 TYPE_CHECKING,
60)
62if TYPE_CHECKING:
63 # Do not import on runtime ! (import loop)
64 from scapy.packet import Packet
67class RawVal:
68 r"""
69 A raw value that will not be processed by the field and inserted
70 as-is in the packet string.
72 Example::
74 >>> a = IP(len=RawVal("####"))
75 >>> bytes(a)
76 b'F\x00####\x00\x01\x00\x005\xb5\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00'
78 """
80 def __init__(self, val=b""):
81 # type: (bytes) -> None
82 self.val = bytes_encode(val)
84 def __str__(self):
85 # type: () -> str
86 return str(self.val)
88 def __bytes__(self):
89 # type: () -> bytes
90 return self.val
92 def __len__(self):
93 # type: () -> int
94 return len(self.val)
96 def __repr__(self):
97 # type: () -> str
98 return "<RawVal [%r]>" % self.val
101class ObservableDict(Dict[int, str]):
102 """
103 Helper class to specify a protocol extendable for runtime modifications
104 """
106 def __init__(self, *args, **kw):
107 # type: (*Dict[int, str], **Any) -> None
108 self.observers = [] # type: List[_EnumField[Any]]
109 super(ObservableDict, self).__init__(*args, **kw)
111 def observe(self, observer):
112 # type: (_EnumField[Any]) -> None
113 self.observers.append(observer)
115 def __setitem__(self, key, value):
116 # type: (int, str) -> None
117 for o in self.observers:
118 o.notify_set(self, key, value)
119 super(ObservableDict, self).__setitem__(key, value)
121 def __delitem__(self, key):
122 # type: (int) -> None
123 for o in self.observers:
124 o.notify_del(self, key)
125 super(ObservableDict, self).__delitem__(key)
127 def update(self, anotherDict): # type: ignore
128 for k in anotherDict:
129 self[k] = anotherDict[k]
132############
133# Fields #
134############
136I = TypeVar('I') # Internal storage # noqa: E741
137M = TypeVar('M') # Machine storage
140class Field(Generic[I, M], metaclass=Field_metaclass):
141 """
142 For more information on how this works, please refer to the
143 'Adding new protocols' chapter in the online documentation:
145 https://scapy.readthedocs.io/en/stable/build_dissect.html
146 """
147 __slots__ = [
148 "name",
149 "fmt",
150 "default",
151 "sz",
152 "owners",
153 "struct"
154 ]
155 islist = 0
156 ismutable = False
157 holds_packets = 0
159 def __init__(self, name, default, fmt="H"):
160 # type: (str, Any, str) -> None
161 if not isinstance(name, str):
162 raise ValueError("name should be a string")
163 self.name = name
164 if fmt[0] in "@=<>!":
165 self.fmt = fmt
166 else:
167 self.fmt = "!" + fmt
168 self.struct = struct.Struct(self.fmt)
169 self.default = self.any2i(None, default)
170 self.sz = struct.calcsize(self.fmt) # type: int
171 self.owners = [] # type: List[Type[Packet]]
173 def register_owner(self, cls):
174 # type: (Type[Packet]) -> None
175 self.owners.append(cls)
177 def i2len(self,
178 pkt, # type: Packet
179 x, # type: Any
180 ):
181 # type: (...) -> int
182 """Convert internal value to a length usable by a FieldLenField"""
183 if isinstance(x, RawVal):
184 return len(x)
185 return self.sz
187 def i2count(self, pkt, x):
188 # type: (Optional[Packet], I) -> int
189 """Convert internal value to a number of elements usable by a FieldLenField.
190 Always 1 except for list fields"""
191 return 1
193 def h2i(self, pkt, x):
194 # type: (Optional[Packet], Any) -> I
195 """Convert human value to internal value"""
196 return cast(I, x)
198 def i2h(self, pkt, x):
199 # type: (Optional[Packet], I) -> Any
200 """Convert internal value to human value"""
201 return x
203 def m2i(self, pkt, x):
204 # type: (Optional[Packet], M) -> I
205 """Convert machine value to internal value"""
206 return cast(I, x)
208 def i2m(self, pkt, x):
209 # type: (Optional[Packet], Optional[I]) -> M
210 """Convert internal value to machine value"""
211 if x is None:
212 return cast(M, 0)
213 elif isinstance(x, str):
214 return cast(M, bytes_encode(x))
215 return cast(M, x)
217 def any2i(self, pkt, x):
218 # type: (Optional[Packet], Any) -> Optional[I]
219 """Try to understand the most input values possible and make an internal value from them""" # noqa: E501
220 return self.h2i(pkt, x)
222 def i2repr(self, pkt, x):
223 # type: (Optional[Packet], I) -> str
224 """Convert internal value to a nice representation"""
225 return repr(self.i2h(pkt, x))
227 def addfield(self, pkt, s, val):
228 # type: (Packet, bytes, Optional[I]) -> bytes
229 """Add an internal value to a string
231 Copy the network representation of field `val` (belonging to layer
232 `pkt`) to the raw string packet `s`, and return the new string packet.
233 """
234 try:
235 return s + self.struct.pack(self.i2m(pkt, val))
236 except struct.error as ex:
237 raise ValueError(
238 "Incorrect type of value for field %s:\n" % self.name +
239 "struct.error('%s')\n" % ex +
240 "To inject bytes into the field regardless of the type, " +
241 "use RawVal. See help(RawVal)"
242 )
244 def getfield(self, pkt, s):
245 # type: (Packet, bytes) -> Tuple[bytes, I]
246 """Extract an internal value from a string
248 Extract from the raw packet `s` the field value belonging to layer
249 `pkt`.
251 Returns a two-element list,
252 first the raw packet string after having removed the extracted field,
253 second the extracted field itself in internal representation.
254 """
255 return s[self.sz:], self.m2i(pkt, self.struct.unpack(s[:self.sz])[0])
257 def do_copy(self, x):
258 # type: (I) -> I
259 if isinstance(x, list):
260 x = x[:] # type: ignore
261 for i in range(len(x)):
262 if isinstance(x[i], BasePacket):
263 x[i] = x[i].copy()
264 return x # type: ignore
265 if hasattr(x, "copy"):
266 return x.copy() # type: ignore
267 return x
269 def __repr__(self):
270 # type: () -> str
271 return "<%s (%s).%s>" % (
272 self.__class__.__name__,
273 ",".join(x.__name__ for x in self.owners),
274 self.name
275 )
277 def copy(self):
278 # type: () -> Field[I, M]
279 return copy.copy(self)
281 def randval(self):
282 # type: () -> VolatileValue[Any]
283 """Return a volatile object whose value is both random and suitable for this field""" # noqa: E501
284 fmtt = self.fmt[-1]
285 if fmtt in "BbHhIiQq":
286 return {"B": RandByte, "b": RandSByte,
287 "H": RandShort, "h": RandSShort,
288 "I": RandInt, "i": RandSInt,
289 "Q": RandLong, "q": RandSLong}[fmtt]()
290 elif fmtt == "s":
291 if self.fmt[0] in "0123456789":
292 value = int(self.fmt[:-1])
293 else:
294 value = int(self.fmt[1:-1])
295 return RandBin(value)
296 else:
297 raise ValueError(
298 "no random class for [%s] (fmt=%s)." % (
299 self.name, self.fmt
300 )
301 )
304class _FieldContainer(object):
305 """
306 A field that acts as a container for another field
307 """
308 __slots__ = ["fld"]
310 def __getattr__(self, attr):
311 # type: (str) -> Any
312 return getattr(self.fld, attr)
315AnyField = Union[Field[Any, Any], _FieldContainer]
318class Emph(_FieldContainer):
319 """Empathize sub-layer for display"""
320 __slots__ = ["fld"]
322 def __init__(self, fld):
323 # type: (Any) -> None
324 self.fld = fld
326 def __eq__(self, other):
327 # type: (Any) -> bool
328 return bool(self.fld == other)
330 def __hash__(self):
331 # type: () -> int
332 return hash(self.fld)
335class MayEnd(_FieldContainer):
336 """
337 Allow packet dissection to end after the dissection of this field
338 if no bytes are left.
340 A good example would be a length field that can be 0 or a set value,
341 and where it would be too annoying to use multiple ConditionalFields
343 Important note: any field below this one MUST default
344 to an empty value, else the behavior will be unexpected.
345 """
346 __slots__ = ["fld"]
348 def __init__(self, fld):
349 # type: (Any) -> None
350 self.fld = fld
352 def __eq__(self, other):
353 # type: (Any) -> bool
354 return bool(self.fld == other)
356 def __hash__(self):
357 # type: () -> int
358 return hash(self.fld)
361class ActionField(_FieldContainer):
362 __slots__ = ["fld", "_action_method", "_privdata"]
364 def __init__(self, fld, action_method, **kargs):
365 # type: (Field[Any, Any], str, **Any) -> None
366 self.fld = fld
367 self._action_method = action_method
368 self._privdata = kargs
370 def any2i(self, pkt, val):
371 # type: (Optional[Packet], int) -> Any
372 getattr(pkt, self._action_method)(val, self.fld, **self._privdata)
373 return getattr(self.fld, "any2i")(pkt, val)
376class ConditionalField(_FieldContainer):
377 __slots__ = ["fld", "cond"]
379 def __init__(self,
380 fld, # type: AnyField
381 cond # type: Callable[[Packet], bool]
382 ):
383 # type: (...) -> None
384 self.fld = fld
385 self.cond = cond
387 def _evalcond(self, pkt):
388 # type: (Packet) -> bool
389 return bool(self.cond(pkt))
391 def any2i(self, pkt, x):
392 # type: (Optional[Packet], Any) -> Any
393 # BACKWARD COMPATIBILITY
394 # Note: we shouldn't need this function. (it's not correct)
395 # However, having i2h implemented (#2364), it changes the default
396 # behavior and broke all packets that wrongly use two ConditionalField
397 # with the same name. Those packets are the problem: they are wrongly
398 # built (they should either be re-using the same conditional field, or
399 # using a MultipleTypeField).
400 # But I don't want to dive into fixing all of them just yet,
401 # so for now, let's keep this this way, even though it's not correct.
402 if type(self.fld) is Field:
403 return x
404 return self.fld.any2i(pkt, x)
406 def i2h(self, pkt, val):
407 # type: (Optional[Packet], Any) -> Any
408 if pkt and not self._evalcond(pkt):
409 return None
410 return self.fld.i2h(pkt, val)
412 def getfield(self, pkt, s):
413 # type: (Packet, bytes) -> Tuple[bytes, Any]
414 if self._evalcond(pkt):
415 return self.fld.getfield(pkt, s)
416 else:
417 return s, None
419 def addfield(self, pkt, s, val):
420 # type: (Packet, bytes, Any) -> bytes
421 if self._evalcond(pkt):
422 return self.fld.addfield(pkt, s, val)
423 else:
424 return s
426 def __getattr__(self, attr):
427 # type: (str) -> Any
428 return getattr(self.fld, attr)
431class MultipleTypeField(_FieldContainer):
432 """
433 MultipleTypeField are used for fields that can be implemented by
434 various Field subclasses, depending on conditions on the packet.
436 It is initialized with `flds` and `dflt`.
438 :param dflt: is the default field type, to be used when none of the
439 conditions matched the current packet.
440 :param flds: is a list of tuples (`fld`, `cond`) or (`fld`, `cond`, `hint`)
441 where `fld` if a field type, and `cond` a "condition" to
442 determine if `fld` is the field type that should be used.
444 ``cond`` is either:
446 - a callable `cond_pkt` that accepts one argument (the packet) and
447 returns True if `fld` should be used, False otherwise.
448 - a tuple (`cond_pkt`, `cond_pkt_val`), where `cond_pkt` is the same
449 as in the previous case and `cond_pkt_val` is a callable that
450 accepts two arguments (the packet, and the value to be set) and
451 returns True if `fld` should be used, False otherwise.
453 See scapy.layers.l2.ARP (type "help(ARP)" in Scapy) for an example of
454 use.
455 """
457 __slots__ = ["flds", "dflt", "hints", "name", "default"]
459 def __init__(
460 self,
461 flds: List[Union[
462 Tuple[Field[Any, Any], Any, str],
463 Tuple[Field[Any, Any], Any]
464 ]],
465 dflt: Field[Any, Any]
466 ) -> None:
467 self.hints = {
468 x[0]: x[2]
469 for x in flds
470 if len(x) == 3
471 }
472 self.flds = [
473 (x[0], x[1]) for x in flds
474 ]
475 self.dflt = dflt
476 self.default = None # So that we can detect changes in defaults
477 self.name = self.dflt.name
478 if any(x[0].name != self.name for x in self.flds):
479 warnings.warn(
480 ("All fields should have the same name in a "
481 "MultipleTypeField (%s). Use hints.") % self.name,
482 SyntaxWarning
483 )
485 def _iterate_fields_cond(self, pkt, val, use_val):
486 # type: (Optional[Packet], Any, bool) -> Field[Any, Any]
487 """Internal function used by _find_fld_pkt & _find_fld_pkt_val"""
488 # Iterate through the fields
489 for fld, cond in self.flds:
490 if isinstance(cond, tuple):
491 if use_val:
492 if val is None:
493 val = self.dflt.default
494 if cond[1](pkt, val):
495 return fld
496 continue
497 else:
498 cond = cond[0]
499 if cond(pkt):
500 return fld
501 return self.dflt
503 def _find_fld_pkt(self, pkt):
504 # type: (Optional[Packet]) -> Field[Any, Any]
505 """Given a Packet instance `pkt`, returns the Field subclass to be
506used. If you know the value to be set (e.g., in .addfield()), use
507._find_fld_pkt_val() instead.
509 """
510 return self._iterate_fields_cond(pkt, None, False)
512 def _find_fld_pkt_val(self,
513 pkt, # type: Optional[Packet]
514 val, # type: Any
515 ):
516 # type: (...) -> Tuple[Field[Any, Any], Any]
517 """Given a Packet instance `pkt` and the value `val` to be set,
518returns the Field subclass to be used, and the updated `val` if necessary.
520 """
521 fld = self._iterate_fields_cond(pkt, val, True)
522 if val is None:
523 val = fld.default
524 return fld, val
526 def _find_fld(self):
527 # type: () -> Field[Any, Any]
528 """Returns the Field subclass to be used, depending on the Packet
529instance, or the default subclass.
531DEV: since the Packet instance is not provided, we have to use a hack
532to guess it. It should only be used if you cannot provide the current
533Packet instance (for example, because of the current Scapy API).
535If you have the current Packet instance, use ._find_fld_pkt_val() (if
536the value to set is also known) of ._find_fld_pkt() instead.
538 """
539 # Hack to preserve current Scapy API
540 # See https://stackoverflow.com/a/7272464/3223422
541 frame = inspect.currentframe().f_back.f_back # type: ignore
542 while frame is not None:
543 try:
544 pkt = frame.f_locals['self']
545 except KeyError:
546 pass
547 else:
548 if isinstance(pkt, tuple(self.dflt.owners)):
549 if not pkt.default_fields:
550 # Packet not initialized
551 return self.dflt
552 return self._find_fld_pkt(pkt)
553 frame = frame.f_back
554 return self.dflt
556 def getfield(self,
557 pkt, # type: Packet
558 s, # type: bytes
559 ):
560 # type: (...) -> Tuple[bytes, Any]
561 return self._find_fld_pkt(pkt).getfield(pkt, s)
563 def addfield(self, pkt, s, val):
564 # type: (Packet, bytes, Any) -> bytes
565 fld, val = self._find_fld_pkt_val(pkt, val)
566 return fld.addfield(pkt, s, val)
568 def any2i(self, pkt, val):
569 # type: (Optional[Packet], Any) -> Any
570 fld, val = self._find_fld_pkt_val(pkt, val)
571 return fld.any2i(pkt, val)
573 def h2i(self, pkt, val):
574 # type: (Optional[Packet], Any) -> Any
575 fld, val = self._find_fld_pkt_val(pkt, val)
576 return fld.h2i(pkt, val)
578 def i2h(self,
579 pkt, # type: Packet
580 val, # type: Any
581 ):
582 # type: (...) -> Any
583 fld, val = self._find_fld_pkt_val(pkt, val)
584 return fld.i2h(pkt, val)
586 def i2m(self, pkt, val):
587 # type: (Optional[Packet], Optional[Any]) -> Any
588 fld, val = self._find_fld_pkt_val(pkt, val)
589 return fld.i2m(pkt, val)
591 def i2len(self, pkt, val):
592 # type: (Packet, Any) -> int
593 fld, val = self._find_fld_pkt_val(pkt, val)
594 return fld.i2len(pkt, val)
596 def i2repr(self, pkt, val):
597 # type: (Optional[Packet], Any) -> str
598 fld, val = self._find_fld_pkt_val(pkt, val)
599 hint = ""
600 if fld in self.hints:
601 hint = " (%s)" % self.hints[fld]
602 return fld.i2repr(pkt, val) + hint
604 def register_owner(self, cls):
605 # type: (Type[Packet]) -> None
606 for fld, _ in self.flds:
607 fld.owners.append(cls)
608 self.dflt.owners.append(cls)
610 def get_fields_list(self):
611 # type: () -> List[Any]
612 return [self]
614 @property
615 def fld(self):
616 # type: () -> Field[Any, Any]
617 return self._find_fld()
620class PadField(_FieldContainer):
621 """Add bytes after the proxified field so that it ends at the specified
622 alignment from its beginning"""
623 __slots__ = ["fld", "_align", "_padwith"]
625 def __init__(self, fld, align, padwith=None):
626 # type: (AnyField, int, Optional[bytes]) -> None
627 self.fld = fld
628 self._align = align
629 self._padwith = padwith or b"\x00"
631 def padlen(self, flen, pkt):
632 # type: (int, Packet) -> int
633 return -flen % self._align
635 def getfield(self,
636 pkt, # type: Packet
637 s, # type: bytes
638 ):
639 # type: (...) -> Tuple[bytes, Any]
640 remain, val = self.fld.getfield(pkt, s)
641 padlen = self.padlen(len(s) - len(remain), pkt)
642 return remain[padlen:], val
644 def addfield(self,
645 pkt, # type: Packet
646 s, # type: bytes
647 val, # type: Any
648 ):
649 # type: (...) -> bytes
650 sval = self.fld.addfield(pkt, b"", val)
651 return s + sval + (
652 self.padlen(len(sval), pkt) * self._padwith
653 )
656class ReversePadField(PadField):
657 """Add bytes BEFORE the proxified field so that it starts at the specified
658 alignment from its beginning"""
660 def original_length(self, pkt):
661 # type: (Packet) -> int
662 return len(pkt.original)
664 def getfield(self,
665 pkt, # type: Packet
666 s, # type: bytes
667 ):
668 # type: (...) -> Tuple[bytes, Any]
669 # We need to get the length that has already been dissected
670 padlen = self.padlen(self.original_length(pkt) - len(s), pkt)
671 return self.fld.getfield(pkt, s[padlen:])
673 def addfield(self,
674 pkt, # type: Packet
675 s, # type: bytes
676 val, # type: Any
677 ):
678 # type: (...) -> bytes
679 sval = self.fld.addfield(pkt, b"", val)
680 return s + struct.pack("%is" % (
681 self.padlen(len(s), pkt)
682 ), self._padwith) + sval
685class TrailerBytes(bytes):
686 """
687 Reverses slice operations to take from the back of the packet,
688 not the front
689 """
691 def __getitem__(self, item): # type: ignore
692 # type: (Union[int, slice]) -> Union[int, bytes]
693 if isinstance(item, int):
694 if item < 0:
695 item = 1 + item
696 else:
697 item = len(self) - 1 - item
698 elif isinstance(item, slice):
699 start, stop, step = item.start, item.stop, item.step
700 new_start = -stop if stop else None
701 new_stop = -start if start else None
702 item = slice(new_start, new_stop, step)
703 return super(self.__class__, self).__getitem__(item)
706class TrailerField(_FieldContainer):
707 """Special Field that gets its value from the end of the *packet*
708 (Note: not layer, but packet).
710 Mostly used for FCS
711 """
712 __slots__ = ["fld"]
714 def __init__(self, fld):
715 # type: (Field[Any, Any]) -> None
716 self.fld = fld
718 # Note: this is ugly. Very ugly.
719 # Do not copy this crap elsewhere, so that if one day we get
720 # brave enough to refactor it, it'll be easier.
722 def getfield(self, pkt, s):
723 # type: (Packet, bytes) -> Tuple[bytes, int]
724 previous_post_dissect = pkt.post_dissect
726 def _post_dissect(self, s):
727 # type: (Packet, bytes) -> bytes
728 # Reset packet to allow post_build
729 self.raw_packet_cache = None
730 self.post_dissect = previous_post_dissect # type: ignore
731 return previous_post_dissect(s)
732 pkt.post_dissect = MethodType(_post_dissect, pkt) # type: ignore
733 s = TrailerBytes(s)
734 s, val = self.fld.getfield(pkt, s)
735 return bytes(s), val
737 def addfield(self, pkt, s, val):
738 # type: (Packet, bytes, Optional[int]) -> bytes
739 previous_post_build = pkt.post_build
740 value = self.fld.addfield(pkt, b"", val)
742 def _post_build(self, p, pay):
743 # type: (Packet, bytes, bytes) -> bytes
744 pay += value
745 self.post_build = previous_post_build # type: ignore
746 return previous_post_build(p, pay)
747 pkt.post_build = MethodType(_post_build, pkt) # type: ignore
748 return s
751class FCSField(TrailerField):
752 """
753 A FCS field that gets appended at the end of the *packet* (not layer).
754 """
756 def __init__(self, *args, **kwargs):
757 # type: (*Any, **Any) -> None
758 super(FCSField, self).__init__(Field(*args, **kwargs))
760 def i2repr(self, pkt, x):
761 # type: (Optional[Packet], int) -> str
762 return lhex(self.i2h(pkt, x))
765class DestField(Field[str, bytes]):
766 __slots__ = ["defaultdst"]
767 # Each subclass must have its own bindings attribute
768 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]]
770 def __init__(self, name, default):
771 # type: (str, str) -> None
772 self.defaultdst = default
774 def dst_from_pkt(self, pkt):
775 # type: (Packet) -> str
776 for addr, condition in self.bindings.get(pkt.payload.__class__, []):
777 try:
778 if all(pkt.payload.getfieldval(field) == value
779 for field, value in condition.items()):
780 return addr # type: ignore
781 except AttributeError:
782 pass
783 return self.defaultdst
785 @classmethod
786 def bind_addr(cls, layer, addr, **condition):
787 # type: (Type[Packet], str, **Any) -> None
788 cls.bindings.setdefault(layer, []).append( # type: ignore
789 (addr, condition)
790 )
793class MACField(Field[Optional[str], bytes]):
794 def __init__(self, name, default):
795 # type: (str, Optional[Any]) -> None
796 Field.__init__(self, name, default, "6s")
798 def i2m(self, pkt, x):
799 # type: (Optional[Packet], Optional[str]) -> bytes
800 if not x:
801 return b"\0\0\0\0\0\0"
802 try:
803 y = mac2str(x)
804 except (struct.error, OverflowError):
805 y = bytes_encode(x)
806 return y
808 def m2i(self, pkt, x):
809 # type: (Optional[Packet], bytes) -> str
810 return str2mac(x)
812 def any2i(self, pkt, x):
813 # type: (Optional[Packet], Any) -> str
814 if isinstance(x, bytes) and len(x) == 6:
815 return self.m2i(pkt, x)
816 return cast(str, x)
818 def i2repr(self, pkt, x):
819 # type: (Optional[Packet], Optional[str]) -> str
820 x = self.i2h(pkt, x)
821 if x is None:
822 return repr(x)
823 if self in conf.resolve:
824 x = conf.manufdb._resolve_MAC(x)
825 return x
827 def randval(self):
828 # type: () -> RandMAC
829 return RandMAC()
832class LEMACField(MACField):
833 def i2m(self, pkt, x):
834 # type: (Optional[Packet], Optional[str]) -> bytes
835 return MACField.i2m(self, pkt, x)[::-1]
837 def m2i(self, pkt, x):
838 # type: (Optional[Packet], bytes) -> str
839 return MACField.m2i(self, pkt, x[::-1])
842class IPField(Field[Union[str, Net], bytes]):
843 def __init__(self, name, default):
844 # type: (str, Optional[str]) -> None
845 Field.__init__(self, name, default, "4s")
847 def h2i(self, pkt, x):
848 # type: (Optional[Packet], Union[AnyStr, List[AnyStr]]) -> Any
849 if isinstance(x, bytes):
850 x = plain_str(x) # type: ignore
851 if isinstance(x, str):
852 try:
853 inet_aton(x)
854 except socket.error:
855 return Net(x)
856 elif isinstance(x, tuple):
857 if len(x) != 2:
858 raise ValueError("Invalid IP format")
859 return Net(*x)
860 elif isinstance(x, list):
861 return [self.h2i(pkt, n) for n in x]
862 return x
864 def i2h(self, pkt, x):
865 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str
866 return cast(str, x)
868 def resolve(self, x):
869 # type: (str) -> str
870 if self in conf.resolve:
871 try:
872 ret = socket.gethostbyaddr(x)[0]
873 except Exception:
874 pass
875 else:
876 if ret:
877 return ret
878 return x
880 def i2m(self, pkt, x):
881 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
882 if x is None:
883 return b'\x00\x00\x00\x00'
884 return inet_aton(plain_str(x))
886 def m2i(self, pkt, x):
887 # type: (Optional[Packet], bytes) -> str
888 return inet_ntoa(x)
890 def any2i(self, pkt, x):
891 # type: (Optional[Packet], Any) -> Any
892 return self.h2i(pkt, x)
894 def i2repr(self, pkt, x):
895 # type: (Optional[Packet], Union[str, Net]) -> str
896 r = self.resolve(self.i2h(pkt, x))
897 return r if isinstance(r, str) else repr(r)
899 def randval(self):
900 # type: () -> RandIP
901 return RandIP()
904class SourceIPField(IPField):
905 __slots__ = ["dstname"]
907 def __init__(self, name, dstname):
908 # type: (str, Optional[str]) -> None
909 IPField.__init__(self, name, None)
910 self.dstname = dstname
912 def __findaddr(self, pkt):
913 # type: (Packet) -> str
914 if conf.route is None:
915 # unused import, only to initialize conf.route
916 import scapy.route # noqa: F401
917 dst = ("0.0.0.0" if self.dstname is None
918 else getattr(pkt, self.dstname) or "0.0.0.0")
919 if isinstance(dst, (Gen, list)):
920 r = {
921 conf.route.route(str(daddr))
922 for daddr in dst
923 } # type: Set[Tuple[str, str, str]]
924 if len(r) > 1:
925 warning("More than one possible route for %r" % (dst,))
926 return min(r)[1]
927 return conf.route.route(dst)[1]
929 def i2m(self, pkt, x):
930 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
931 if x is None and pkt is not None:
932 x = self.__findaddr(pkt)
933 return super(SourceIPField, self).i2m(pkt, x)
935 def i2h(self, pkt, x):
936 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str
937 if x is None and pkt is not None:
938 x = self.__findaddr(pkt)
939 return super(SourceIPField, self).i2h(pkt, x)
942class IP6Field(Field[Optional[Union[str, Net6]], bytes]):
943 def __init__(self, name, default):
944 # type: (str, Optional[str]) -> None
945 Field.__init__(self, name, default, "16s")
947 def h2i(self, pkt, x):
948 # type: (Optional[Packet], Optional[str]) -> str
949 if isinstance(x, bytes):
950 x = plain_str(x)
951 if isinstance(x, str):
952 try:
953 x = in6_ptop(x)
954 except socket.error:
955 return Net6(x) # type: ignore
956 elif isinstance(x, tuple):
957 if len(x) != 2:
958 raise ValueError("Invalid IPv6 format")
959 return Net6(*x)
960 elif isinstance(x, list):
961 x = [self.h2i(pkt, n) for n in x]
962 return x # type: ignore
964 def i2h(self, pkt, x):
965 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
966 return cast(str, x)
968 def i2m(self, pkt, x):
969 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
970 if x is None:
971 x = "::"
972 return inet_pton(socket.AF_INET6, plain_str(x))
974 def m2i(self, pkt, x):
975 # type: (Optional[Packet], bytes) -> str
976 return inet_ntop(socket.AF_INET6, x)
978 def any2i(self, pkt, x):
979 # type: (Optional[Packet], Optional[str]) -> str
980 return self.h2i(pkt, x)
982 def i2repr(self, pkt, x):
983 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
984 if x is None:
985 return self.i2h(pkt, x)
986 elif not isinstance(x, Net6) and not isinstance(x, list):
987 if in6_isaddrTeredo(x): # print Teredo info
988 server, _, maddr, mport = teredoAddrExtractInfo(x)
989 return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr, mport) # noqa: E501
990 elif in6_isaddr6to4(x): # print encapsulated address
991 vaddr = in6_6to4ExtractAddr(x)
992 return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr)
993 r = self.i2h(pkt, x) # No specific information to return
994 return r if isinstance(r, str) else repr(r)
996 def randval(self):
997 # type: () -> RandIP6
998 return RandIP6()
1001class SourceIP6Field(IP6Field):
1002 __slots__ = ["dstname"]
1004 def __init__(self, name, dstname):
1005 # type: (str, str) -> None
1006 IP6Field.__init__(self, name, None)
1007 self.dstname = dstname
1009 def i2m(self, pkt, x):
1010 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
1011 if x is None:
1012 dst = ("::" if self.dstname is None else
1013 getattr(pkt, self.dstname) or "::")
1014 iff, x, nh = conf.route6.route(dst)
1015 return super(SourceIP6Field, self).i2m(pkt, x)
1017 def i2h(self, pkt, x):
1018 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
1019 if x is None:
1020 if conf.route6 is None:
1021 # unused import, only to initialize conf.route6
1022 import scapy.route6 # noqa: F401
1023 dst = ("::" if self.dstname is None else getattr(pkt, self.dstname)) # noqa: E501
1024 if isinstance(dst, (Gen, list)):
1025 r = {conf.route6.route(str(daddr))
1026 for daddr in dst}
1027 if len(r) > 1:
1028 warning("More than one possible route for %r" % (dst,))
1029 x = min(r)[1]
1030 else:
1031 x = conf.route6.route(dst)[1]
1032 return super(SourceIP6Field, self).i2h(pkt, x)
1035class DestIP6Field(IP6Field, DestField):
1036 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]]
1038 def __init__(self, name, default):
1039 # type: (str, str) -> None
1040 IP6Field.__init__(self, name, None)
1041 DestField.__init__(self, name, default)
1043 def i2m(self, pkt, x):
1044 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
1045 if x is None and pkt is not None:
1046 x = self.dst_from_pkt(pkt)
1047 return IP6Field.i2m(self, pkt, x)
1049 def i2h(self, pkt, x):
1050 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
1051 if x is None and pkt is not None:
1052 x = self.dst_from_pkt(pkt)
1053 return super(DestIP6Field, self).i2h(pkt, x)
1056class ByteField(Field[int, int]):
1057 def __init__(self, name, default):
1058 # type: (str, Optional[int]) -> None
1059 Field.__init__(self, name, default, "B")
1062class XByteField(ByteField):
1063 def i2repr(self, pkt, x):
1064 # type: (Optional[Packet], int) -> str
1065 return lhex(self.i2h(pkt, x))
1068# XXX Unused field: at least add some tests
1069class OByteField(ByteField):
1070 def i2repr(self, pkt, x):
1071 # type: (Optional[Packet], int) -> str
1072 return "%03o" % self.i2h(pkt, x)
1075class ThreeBytesField(Field[int, int]):
1076 def __init__(self, name, default):
1077 # type: (str, int) -> None
1078 Field.__init__(self, name, default, "!I")
1080 def addfield(self, pkt, s, val):
1081 # type: (Packet, bytes, Optional[int]) -> bytes
1082 return s + struct.pack(self.fmt, self.i2m(pkt, val))[1:4]
1084 def getfield(self, pkt, s):
1085 # type: (Packet, bytes) -> Tuple[bytes, int]
1086 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0]) # noqa: E501
1089class X3BytesField(ThreeBytesField, XByteField):
1090 def i2repr(self, pkt, x):
1091 # type: (Optional[Packet], int) -> str
1092 return XByteField.i2repr(self, pkt, x)
1095class LEThreeBytesField(ByteField):
1096 def __init__(self, name, default):
1097 # type: (str, Optional[int]) -> None
1098 Field.__init__(self, name, default, "<I")
1100 def addfield(self, pkt, s, val):
1101 # type: (Packet, bytes, Optional[int]) -> bytes
1102 return s + struct.pack(self.fmt, self.i2m(pkt, val))[:3]
1104 def getfield(self, pkt, s):
1105 # type: (Optional[Packet], bytes) -> Tuple[bytes, int]
1106 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, s[:3] + b"\x00")[0]) # noqa: E501
1109class XLE3BytesField(LEThreeBytesField, XByteField):
1110 def i2repr(self, pkt, x):
1111 # type: (Optional[Packet], int) -> str
1112 return XByteField.i2repr(self, pkt, x)
1115def LEX3BytesField(*args, **kwargs):
1116 # type: (*Any, **Any) -> Any
1117 warnings.warn(
1118 "LEX3BytesField is deprecated. Use XLE3BytesField",
1119 DeprecationWarning
1120 )
1121 return XLE3BytesField(*args, **kwargs)
1124class NBytesField(Field[int, List[int]]):
1125 def __init__(self, name, default, sz):
1126 # type: (str, Optional[int], int) -> None
1127 Field.__init__(self, name, default, "<" + "B" * sz)
1129 def i2m(self, pkt, x):
1130 # type: (Optional[Packet], Optional[int]) -> List[int]
1131 if x is None:
1132 return [0] * self.sz
1133 x2m = list()
1134 for _ in range(self.sz):
1135 x2m.append(x % 256)
1136 x //= 256
1137 return x2m[::-1]
1139 def m2i(self, pkt, x):
1140 # type: (Optional[Packet], Union[List[int], int]) -> int
1141 if isinstance(x, int):
1142 return x
1143 # x can be a tuple when coming from struct.unpack (from getfield)
1144 if isinstance(x, (list, tuple)):
1145 return sum(d * (256 ** i) for i, d in enumerate(reversed(x)))
1146 return 0
1148 def i2repr(self, pkt, x):
1149 # type: (Optional[Packet], int) -> str
1150 if isinstance(x, int):
1151 return '%i' % x
1152 return super(NBytesField, self).i2repr(pkt, x)
1154 def addfield(self, pkt, s, val):
1155 # type: (Optional[Packet], bytes, Optional[int]) -> bytes
1156 return s + self.struct.pack(*self.i2m(pkt, val))
1158 def getfield(self, pkt, s):
1159 # type: (Optional[Packet], bytes) -> Tuple[bytes, int]
1160 return (s[self.sz:],
1161 self.m2i(pkt, self.struct.unpack(s[:self.sz]))) # type: ignore
1163 def randval(self):
1164 # type: () -> RandNum
1165 return RandNum(0, 2 ** (self.sz * 8) - 1)
1168class XNBytesField(NBytesField):
1169 def i2repr(self, pkt, x):
1170 # type: (Optional[Packet], int) -> str
1171 if isinstance(x, int):
1172 return '0x%x' % x
1173 # x can be a tuple when coming from struct.unpack (from getfield)
1174 if isinstance(x, (list, tuple)):
1175 return "0x" + "".join("%02x" % b for b in x)
1176 return super(XNBytesField, self).i2repr(pkt, x)
1179class SignedByteField(Field[int, int]):
1180 def __init__(self, name, default):
1181 # type: (str, Optional[int]) -> None
1182 Field.__init__(self, name, default, "b")
1185class FieldValueRangeException(Scapy_Exception):
1186 pass
1189class MaximumItemsCount(Scapy_Exception):
1190 pass
1193class FieldAttributeException(Scapy_Exception):
1194 pass
1197class YesNoByteField(ByteField):
1198 """
1199 A byte based flag field that shows representation of its number
1200 based on a given association
1202 In its default configuration the following representation is generated:
1203 x == 0 : 'no'
1204 x != 0 : 'yes'
1206 In more sophisticated use-cases (e.g. yes/no/invalid) one can use the
1207 config attribute to configure.
1208 Key-value, key-range and key-value-set associations that will be used to
1209 generate the values representation.
1211 - A range is given by a tuple (<first-val>, <last-value>) including the
1212 last value.
1213 - A single-value tuple is treated as scalar.
1214 - A list defines a set of (probably non consecutive) values that should be
1215 associated to a given key.
1217 All values not associated with a key will be shown as number of type
1218 unsigned byte.
1220 **For instance**::
1222 config = {
1223 'no' : 0,
1224 'foo' : (1,22),
1225 'yes' : 23,
1226 'bar' : [24,25, 42, 48, 87, 253]
1227 }
1229 Generates the following representations::
1231 x == 0 : 'no'
1232 x == 15: 'foo'
1233 x == 23: 'yes'
1234 x == 42: 'bar'
1235 x == 43: 43
1237 Another example, using the config attribute one could also revert
1238 the stock-yes-no-behavior::
1240 config = {
1241 'yes' : 0,
1242 'no' : (1,255)
1243 }
1245 Will generate the following value representation::
1247 x == 0 : 'yes'
1248 x != 0 : 'no'
1250 """
1251 __slots__ = ['eval_fn']
1253 def _build_config_representation(self, config):
1254 # type: (Dict[str, Any]) -> None
1255 assoc_table = dict()
1256 for key in config:
1257 value_spec = config[key]
1259 value_spec_type = type(value_spec)
1261 if value_spec_type is int:
1262 if value_spec < 0 or value_spec > 255:
1263 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501
1264 'must be in range [0..255]'.format(value_spec)) # noqa: E501
1265 assoc_table[value_spec] = key
1267 elif value_spec_type is list:
1268 for value in value_spec:
1269 if value < 0 or value > 255:
1270 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501
1271 'must be in range [0..255]'.format(value)) # noqa: E501
1272 assoc_table[value] = key
1274 elif value_spec_type is tuple:
1275 value_spec_len = len(value_spec)
1276 if value_spec_len != 2:
1277 raise FieldAttributeException('invalid length {} of given config item tuple {} - must be ' # noqa: E501
1278 '(<start-range>, <end-range>).'.format(value_spec_len, value_spec)) # noqa: E501
1280 value_range_start = value_spec[0]
1281 if value_range_start < 0 or value_range_start > 255:
1282 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501
1283 'must be in range [0..255]'.format(value_range_start)) # noqa: E501
1285 value_range_end = value_spec[1]
1286 if value_range_end < 0 or value_range_end > 255:
1287 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501
1288 'must be in range [0..255]'.format(value_range_end)) # noqa: E501
1290 for value in range(value_range_start, value_range_end + 1):
1292 assoc_table[value] = key
1294 self.eval_fn = lambda x: assoc_table[x] if x in assoc_table else x
1296 def __init__(self, name, default, config=None):
1297 # type: (str, int, Optional[Dict[str, Any]]) -> None
1299 if not config:
1300 # this represents the common use case and therefore it is kept small # noqa: E501
1301 self.eval_fn = lambda x: 'no' if x == 0 else 'yes'
1302 else:
1303 self._build_config_representation(config)
1304 ByteField.__init__(self, name, default)
1306 def i2repr(self, pkt, x):
1307 # type: (Optional[Packet], int) -> str
1308 return self.eval_fn(x) # type: ignore
1311class ShortField(Field[int, int]):
1312 def __init__(self, name, default):
1313 # type: (str, Optional[int]) -> None
1314 Field.__init__(self, name, default, "H")
1317class SignedShortField(Field[int, int]):
1318 def __init__(self, name, default):
1319 # type: (str, Optional[int]) -> None
1320 Field.__init__(self, name, default, "h")
1323class LEShortField(Field[int, int]):
1324 def __init__(self, name, default):
1325 # type: (str, Optional[int]) -> None
1326 Field.__init__(self, name, default, "<H")
1329class LESignedShortField(Field[int, int]):
1330 def __init__(self, name, default):
1331 # type: (str, Optional[int]) -> None
1332 Field.__init__(self, name, default, "<h")
1335class XShortField(ShortField):
1336 def i2repr(self, pkt, x):
1337 # type: (Optional[Packet], int) -> str
1338 return lhex(self.i2h(pkt, x))
1341class IntField(Field[int, int]):
1342 def __init__(self, name, default):
1343 # type: (str, Optional[int]) -> None
1344 Field.__init__(self, name, default, "I")
1347class SignedIntField(Field[int, int]):
1348 def __init__(self, name, default):
1349 # type: (str, int) -> None
1350 Field.__init__(self, name, default, "i")
1353class LEIntField(Field[int, int]):
1354 def __init__(self, name, default):
1355 # type: (str, Optional[int]) -> None
1356 Field.__init__(self, name, default, "<I")
1359class LESignedIntField(Field[int, int]):
1360 def __init__(self, name, default):
1361 # type: (str, int) -> None
1362 Field.__init__(self, name, default, "<i")
1365class XIntField(IntField):
1366 def i2repr(self, pkt, x):
1367 # type: (Optional[Packet], int) -> str
1368 return lhex(self.i2h(pkt, x))
1371class XLEIntField(LEIntField, XIntField):
1372 def i2repr(self, pkt, x):
1373 # type: (Optional[Packet], int) -> str
1374 return XIntField.i2repr(self, pkt, x)
1377class XLEShortField(LEShortField, XShortField):
1378 def i2repr(self, pkt, x):
1379 # type: (Optional[Packet], int) -> str
1380 return XShortField.i2repr(self, pkt, x)
1383class LongField(Field[int, int]):
1384 def __init__(self, name, default):
1385 # type: (str, int) -> None
1386 Field.__init__(self, name, default, "Q")
1389class SignedLongField(Field[int, int]):
1390 def __init__(self, name, default):
1391 # type: (str, Optional[int]) -> None
1392 Field.__init__(self, name, default, "q")
1395class LELongField(LongField):
1396 def __init__(self, name, default):
1397 # type: (str, Optional[int]) -> None
1398 Field.__init__(self, name, default, "<Q")
1401class LESignedLongField(Field[int, int]):
1402 def __init__(self, name, default):
1403 # type: (str, Optional[Any]) -> None
1404 Field.__init__(self, name, default, "<q")
1407class XLongField(LongField):
1408 def i2repr(self, pkt, x):
1409 # type: (Optional[Packet], int) -> str
1410 return lhex(self.i2h(pkt, x))
1413class XLELongField(LELongField, XLongField):
1414 def i2repr(self, pkt, x):
1415 # type: (Optional[Packet], int) -> str
1416 return XLongField.i2repr(self, pkt, x)
1419class IEEEFloatField(Field[int, int]):
1420 def __init__(self, name, default):
1421 # type: (str, Optional[int]) -> None
1422 Field.__init__(self, name, default, "f")
1425class IEEEDoubleField(Field[int, int]):
1426 def __init__(self, name, default):
1427 # type: (str, Optional[int]) -> None
1428 Field.__init__(self, name, default, "d")
1431class _StrField(Field[I, bytes]):
1432 __slots__ = ["remain"]
1434 def __init__(self, name, default, fmt="H", remain=0):
1435 # type: (str, Optional[I], str, int) -> None
1436 Field.__init__(self, name, default, fmt)
1437 self.remain = remain
1439 def i2len(self, pkt, x):
1440 # type: (Optional[Packet], Any) -> int
1441 if x is None:
1442 return 0
1443 return len(x)
1445 def any2i(self, pkt, x):
1446 # type: (Optional[Packet], Any) -> I
1447 if isinstance(x, str):
1448 x = bytes_encode(x)
1449 return super(_StrField, self).any2i(pkt, x) # type: ignore
1451 def i2repr(self, pkt, x):
1452 # type: (Optional[Packet], I) -> str
1453 if x and isinstance(x, bytes):
1454 return repr(x)
1455 return super(_StrField, self).i2repr(pkt, x)
1457 def i2m(self, pkt, x):
1458 # type: (Optional[Packet], Optional[I]) -> bytes
1459 if x is None:
1460 return b""
1461 if not isinstance(x, bytes):
1462 return bytes_encode(x)
1463 return x
1465 def addfield(self, pkt, s, val):
1466 # type: (Packet, bytes, Optional[I]) -> bytes
1467 return s + self.i2m(pkt, val)
1469 def getfield(self, pkt, s):
1470 # type: (Packet, bytes) -> Tuple[bytes, I]
1471 if self.remain == 0:
1472 return b"", self.m2i(pkt, s)
1473 else:
1474 return s[-self.remain:], self.m2i(pkt, s[:-self.remain])
1476 def randval(self):
1477 # type: () -> RandBin
1478 return RandBin(RandNum(0, 1200))
1481class StrField(_StrField[bytes]):
1482 pass
1485class StrFieldUtf16(StrField):
1486 def any2i(self, pkt, x):
1487 # type: (Optional[Packet], Optional[str]) -> bytes
1488 if isinstance(x, str):
1489 return self.h2i(pkt, x)
1490 return super(StrFieldUtf16, self).any2i(pkt, x)
1492 def i2repr(self, pkt, x):
1493 # type: (Optional[Packet], bytes) -> str
1494 return plain_str(self.i2h(pkt, x))
1496 def h2i(self, pkt, x):
1497 # type: (Optional[Packet], Optional[str]) -> bytes
1498 return plain_str(x).encode('utf-16-le', errors="replace")
1500 def i2h(self, pkt, x):
1501 # type: (Optional[Packet], bytes) -> str
1502 return bytes_encode(x).decode('utf-16-le', errors="replace")
1505class _StrEnumField:
1506 def __init__(self, **kwargs):
1507 # type: (**Any) -> None
1508 self.enum = kwargs.pop("enum", {})
1510 def i2repr(self, pkt, v):
1511 # type: (Optional[Packet], bytes) -> str
1512 r = v.rstrip(b"\0")
1513 rr = repr(r)
1514 if self.enum:
1515 if v in self.enum:
1516 rr = "%s (%s)" % (rr, self.enum[v])
1517 elif r in self.enum:
1518 rr = "%s (%s)" % (rr, self.enum[r])
1519 return rr
1522class StrEnumField(_StrEnumField, StrField):
1523 __slots__ = ["enum"]
1525 def __init__(
1526 self,
1527 name, # type: str
1528 default, # type: bytes
1529 enum=None, # type: Optional[Dict[str, str]]
1530 **kwargs # type: Any
1531 ):
1532 # type: (...) -> None
1533 StrField.__init__(self, name, default, **kwargs) # type: ignore
1534 self.enum = enum
1537K = TypeVar('K', List[BasePacket], BasePacket, Optional[BasePacket])
1540class _PacketField(_StrField[K]):
1541 __slots__ = ["cls"]
1542 holds_packets = 1
1544 def __init__(self,
1545 name, # type: str
1546 default, # type: Optional[K]
1547 pkt_cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501
1548 ):
1549 # type: (...) -> None
1550 super(_PacketField, self).__init__(name, default)
1551 self.cls = pkt_cls
1553 def i2m(self,
1554 pkt, # type: Optional[Packet]
1555 i, # type: Any
1556 ):
1557 # type: (...) -> bytes
1558 if i is None:
1559 return b""
1560 return raw(i)
1562 def m2i(self, pkt, m): # type: ignore
1563 # type: (Optional[Packet], bytes) -> Packet
1564 try:
1565 # we want to set parent wherever possible
1566 return self.cls(m, _parent=pkt) # type: ignore
1567 except TypeError:
1568 return self.cls(m)
1571class _PacketFieldSingle(_PacketField[K]):
1572 def any2i(self, pkt, x):
1573 # type: (Optional[Packet], Any) -> K
1574 if x and pkt and hasattr(x, "add_parent"):
1575 cast("Packet", x).add_parent(pkt)
1576 return super(_PacketFieldSingle, self).any2i(pkt, x)
1578 def getfield(self,
1579 pkt, # type: Packet
1580 s, # type: bytes
1581 ):
1582 # type: (...) -> Tuple[bytes, K]
1583 i = self.m2i(pkt, s)
1584 remain = b""
1585 if conf.padding_layer in i:
1586 r = i[conf.padding_layer]
1587 del r.underlayer.payload
1588 remain = r.load
1589 return remain, i # type: ignore
1592class PacketField(_PacketFieldSingle[BasePacket]):
1593 def randval(self): # type: ignore
1594 # type: () -> Packet
1595 from scapy.packet import fuzz
1596 return fuzz(self.cls()) # type: ignore
1599class PacketLenField(_PacketFieldSingle[Optional[BasePacket]]):
1600 __slots__ = ["length_from"]
1602 def __init__(self,
1603 name, # type: str
1604 default, # type: Packet
1605 cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501
1606 length_from=None # type: Optional[Callable[[Packet], int]] # noqa: E501
1607 ):
1608 # type: (...) -> None
1609 super(PacketLenField, self).__init__(name, default, cls)
1610 self.length_from = length_from or (lambda x: 0)
1612 def getfield(self,
1613 pkt, # type: Packet
1614 s, # type: bytes
1615 ):
1616 # type: (...) -> Tuple[bytes, Optional[BasePacket]]
1617 len_pkt = self.length_from(pkt)
1618 i = None
1619 if len_pkt:
1620 try:
1621 i = self.m2i(pkt, s[:len_pkt])
1622 except Exception:
1623 if conf.debug_dissector:
1624 raise
1625 i = conf.raw_layer(load=s[:len_pkt])
1626 return s[len_pkt:], i
1629class PacketListField(_PacketField[List[BasePacket]]):
1630 """PacketListField represents a list containing a series of Packet instances
1631 that might occur right in the middle of another Packet field.
1632 This field type may also be used to indicate that a series of Packet
1633 instances have a sibling semantic instead of a parent/child relationship
1634 (i.e. a stack of layers). All elements in PacketListField have current
1635 packet referenced in parent field.
1636 """
1637 __slots__ = ["count_from", "length_from", "next_cls_cb", "max_count"]
1638 islist = 1
1640 def __init__(
1641 self,
1642 name, # type: str
1643 default, # type: Optional[List[BasePacket]]
1644 pkt_cls=None, # type: Optional[Union[Callable[[bytes], Packet], Type[Packet]]] # noqa: E501
1645 count_from=None, # type: Optional[Callable[[Packet], int]]
1646 length_from=None, # type: Optional[Callable[[Packet], int]]
1647 next_cls_cb=None, # type: Optional[Callable[[Packet, List[BasePacket], Optional[Packet], bytes], Type[Packet]]] # noqa: E501
1648 max_count=None, # type: Optional[int]
1649 ):
1650 # type: (...) -> None
1651 """
1652 The number of Packet instances that are dissected by this field can
1653 be parametrized using one of three different mechanisms/parameters:
1655 * count_from: a callback that returns the number of Packet
1656 instances to dissect. The callback prototype is::
1658 count_from(pkt:Packet) -> int
1660 * length_from: a callback that returns the number of bytes that
1661 must be dissected by this field. The callback prototype is::
1663 length_from(pkt:Packet) -> int
1665 * next_cls_cb: a callback that enables a Scapy developer to
1666 dynamically discover if another Packet instance should be
1667 dissected or not. See below for this callback prototype.
1669 The bytes that are not consumed during the dissection of this field
1670 are passed to the next field of the current packet.
1672 For the serialization of such a field, the list of Packets that are
1673 contained in a PacketListField can be heterogeneous and is
1674 unrestricted.
1676 The type of the Packet instances that are dissected with this field is
1677 specified or discovered using one of the following mechanism:
1679 * the pkt_cls parameter may contain a callable that returns an
1680 instance of the dissected Packet. This may either be a
1681 reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py)
1682 to generate an homogeneous PacketListField or a function
1683 deciding the type of the Packet instance
1684 (e.g. _CDPGuessAddrRecord in contrib/cdp.py)
1686 * the pkt_cls parameter may contain a class object with a defined
1687 ``dispatch_hook`` classmethod. That method must return a Packet
1688 instance. The ``dispatch_hook`` callmethod must implement the
1689 following prototype::
1691 dispatch_hook(cls,
1692 _pkt:Optional[Packet],
1693 *args, **kargs
1694 ) -> Type[Packet]
1696 The _pkt parameter may contain a reference to the packet
1697 instance containing the PacketListField that is being
1698 dissected.
1700 * the ``next_cls_cb`` parameter may contain a callable whose
1701 prototype is::
1703 cbk(pkt:Packet,
1704 lst:List[Packet],
1705 cur:Optional[Packet],
1706 remain:str
1707 ) -> Optional[Type[Packet]]
1709 The pkt argument contains a reference to the Packet instance
1710 containing the PacketListField that is being dissected.
1711 The lst argument is the list of all Packet instances that were
1712 previously parsed during the current ``PacketListField``
1713 dissection, saved for the very last Packet instance.
1714 The cur argument contains a reference to that very last parsed
1715 ``Packet`` instance. The remain argument contains the bytes
1716 that may still be consumed by the current PacketListField
1717 dissection operation.
1719 This callback returns either the type of the next Packet to
1720 dissect or None to indicate that no more Packet are to be
1721 dissected.
1723 These four arguments allows a variety of dynamic discovery of
1724 the number of Packet to dissect and of the type of each one of
1725 these Packets, including: type determination based on current
1726 Packet instances or its underlayers, continuation based on the
1727 previously parsed Packet instances within that PacketListField,
1728 continuation based on a look-ahead on the bytes to be
1729 dissected...
1731 The pkt_cls and next_cls_cb parameters are semantically exclusive,
1732 although one could specify both. If both are specified, pkt_cls is
1733 silently ignored. The same is true for count_from and next_cls_cb.
1735 length_from and next_cls_cb are compatible and the dissection will
1736 end, whichever of the two stop conditions comes first.
1738 :param name: the name of the field
1739 :param default: the default value of this field; generally an empty
1740 Python list
1741 :param pkt_cls: either a callable returning a Packet instance or a
1742 class object defining a ``dispatch_hook`` class method
1743 :param count_from: a callback returning the number of Packet
1744 instances to dissect.
1745 :param length_from: a callback returning the number of bytes to dissect
1746 :param next_cls_cb: a callback returning either None or the type of
1747 the next Packet to dissect.
1748 :param max_count: an int containing the max amount of results. This is
1749 a safety mechanism, exceeding this value will raise a Scapy_Exception.
1750 """
1751 if default is None:
1752 default = [] # Create a new list for each instance
1753 super(PacketListField, self).__init__(
1754 name,
1755 default,
1756 pkt_cls # type: ignore
1757 )
1758 self.count_from = count_from
1759 self.length_from = length_from
1760 self.next_cls_cb = next_cls_cb
1761 self.max_count = max_count
1763 def any2i(self, pkt, x):
1764 # type: (Optional[Packet], Any) -> List[BasePacket]
1765 if not isinstance(x, list):
1766 if x and pkt and hasattr(x, "add_parent"):
1767 x.add_parent(pkt)
1768 return [x]
1769 elif pkt:
1770 for i in x:
1771 if not i or not hasattr(i, "add_parent"):
1772 continue
1773 i.add_parent(pkt)
1774 return x
1776 def i2count(self,
1777 pkt, # type: Optional[Packet]
1778 val, # type: List[BasePacket]
1779 ):
1780 # type: (...) -> int
1781 if isinstance(val, list):
1782 return len(val)
1783 return 1
1785 def i2len(self,
1786 pkt, # type: Optional[Packet]
1787 val, # type: List[Packet]
1788 ):
1789 # type: (...) -> int
1790 return sum(len(self.i2m(pkt, p)) for p in val)
1792 def getfield(self, pkt, s):
1793 # type: (Packet, bytes) -> Tuple[bytes, List[BasePacket]]
1794 c = len_pkt = cls = None
1795 if self.length_from is not None:
1796 len_pkt = self.length_from(pkt)
1797 elif self.count_from is not None:
1798 c = self.count_from(pkt)
1799 if self.next_cls_cb is not None:
1800 cls = self.next_cls_cb(pkt, [], None, s)
1801 c = 1
1802 if cls is None:
1803 c = 0
1805 lst = [] # type: List[BasePacket]
1806 ret = b""
1807 remain = s
1808 if len_pkt is not None:
1809 remain, ret = s[:len_pkt], s[len_pkt:]
1810 while remain:
1811 if c is not None:
1812 if c <= 0:
1813 break
1814 c -= 1
1815 try:
1816 if cls is not None:
1817 try:
1818 # we want to set parent wherever possible
1819 p = cls(remain, _parent=pkt)
1820 except TypeError:
1821 p = cls(remain)
1822 else:
1823 p = self.m2i(pkt, remain)
1824 except Exception:
1825 if conf.debug_dissector:
1826 raise
1827 p = conf.raw_layer(load=remain)
1828 remain = b""
1829 else:
1830 if conf.padding_layer in p:
1831 pad = p[conf.padding_layer]
1832 remain = pad.load
1833 del pad.underlayer.payload
1834 if self.next_cls_cb is not None:
1835 cls = self.next_cls_cb(pkt, lst, p, remain)
1836 if cls is not None:
1837 c = 0 if c is None else c
1838 c += 1
1839 else:
1840 remain = b""
1841 lst.append(p)
1842 if len(lst) > (self.max_count or conf.max_list_count):
1843 raise MaximumItemsCount(
1844 "Maximum amount of items reached in PacketListField: %s "
1845 "(defaults to conf.max_list_count)"
1846 % (self.max_count or conf.max_list_count)
1847 )
1849 if isinstance(remain, tuple):
1850 remain, nb = remain
1851 return (remain + ret, nb), lst
1852 else:
1853 return remain + ret, lst
1855 def i2m(self,
1856 pkt, # type: Optional[Packet]
1857 i, # type: Any
1858 ):
1859 # type: (...) -> bytes
1860 return bytes_encode(i)
1862 def addfield(self, pkt, s, val):
1863 # type: (Packet, bytes, Any) -> bytes
1864 return s + b"".join(self.i2m(pkt, v) for v in val)
1867class StrFixedLenField(StrField):
1868 __slots__ = ["length_from"]
1870 def __init__(
1871 self,
1872 name, # type: str
1873 default, # type: Optional[bytes]
1874 length=None, # type: Optional[int]
1875 length_from=None, # type: Optional[Callable[[Packet], int]] # noqa: E501
1876 ):
1877 # type: (...) -> None
1878 super(StrFixedLenField, self).__init__(name, default)
1879 self.length_from = length_from or (lambda x: 0)
1880 if length is not None:
1881 self.sz = length
1882 self.length_from = lambda x, length=length: length # type: ignore
1884 def i2repr(self,
1885 pkt, # type: Optional[Packet]
1886 v, # type: bytes
1887 ):
1888 # type: (...) -> str
1889 if isinstance(v, bytes):
1890 v = v.rstrip(b"\0")
1891 return super(StrFixedLenField, self).i2repr(pkt, v)
1893 def getfield(self, pkt, s):
1894 # type: (Packet, bytes) -> Tuple[bytes, bytes]
1895 len_pkt = self.length_from(pkt)
1896 if len_pkt == 0:
1897 return s, b""
1898 return s[len_pkt:], self.m2i(pkt, s[:len_pkt])
1900 def addfield(self, pkt, s, val):
1901 # type: (Packet, bytes, Optional[bytes]) -> bytes
1902 len_pkt = self.length_from(pkt)
1903 if len_pkt is None:
1904 return s + self.i2m(pkt, val)
1905 return s + struct.pack("%is" % len_pkt, self.i2m(pkt, val))
1907 def randval(self):
1908 # type: () -> RandBin
1909 try:
1910 return RandBin(self.length_from(None)) # type: ignore
1911 except Exception:
1912 return RandBin(RandNum(0, 200))
1915class StrFixedLenFieldUtf16(StrFixedLenField, StrFieldUtf16):
1916 pass
1919class StrFixedLenEnumField(_StrEnumField, StrFixedLenField):
1920 __slots__ = ["enum"]
1922 def __init__(
1923 self,
1924 name, # type: str
1925 default, # type: bytes
1926 enum=None, # type: Optional[Dict[str, str]]
1927 length=None, # type: Optional[int]
1928 length_from=None # type: Optional[Callable[[Optional[Packet]], int]] # noqa: E501
1929 ):
1930 # type: (...) -> None
1931 StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from) # noqa: E501
1932 self.enum = enum
1935class NetBIOSNameField(StrFixedLenField):
1936 def __init__(self, name, default, length=31):
1937 # type: (str, bytes, int) -> None
1938 StrFixedLenField.__init__(self, name, default, length)
1940 def i2m(self, pkt, y):
1941 # type: (Optional[Packet], Optional[bytes]) -> bytes
1942 if pkt:
1943 len_pkt = self.length_from(pkt) // 2
1944 else:
1945 len_pkt = 0
1946 x = bytes_encode(y or b"") # type: bytes
1947 x += b" " * len_pkt
1948 x = x[:len_pkt]
1949 x = b"".join(
1950 chb(0x41 + (orb(b) >> 4)) +
1951 chb(0x41 + (orb(b) & 0xf))
1952 for b in x
1953 ) # noqa: E501
1954 return b" " + x
1956 def m2i(self, pkt, x):
1957 # type: (Optional[Packet], bytes) -> bytes
1958 x = x[1:].strip(b"\x00").strip(b" ")
1959 return b"".join(map(
1960 lambda x, y: chb(
1961 (((orb(x) - 1) & 0xf) << 4) + ((orb(y) - 1) & 0xf)
1962 ),
1963 x[::2], x[1::2]
1964 ))
1967class StrLenField(StrField):
1968 """
1969 StrField with a length
1971 :param length_from: a function that returns the size of the string
1972 :param max_length: max size to use as randval
1973 """
1974 __slots__ = ["length_from", "max_length"]
1975 ON_WIRE_SIZE_UTF16 = True
1977 def __init__(
1978 self,
1979 name, # type: str
1980 default, # type: bytes
1981 length_from=None, # type: Optional[Callable[[Packet], int]]
1982 max_length=None, # type: Optional[Any]
1983 ):
1984 # type: (...) -> None
1985 super(StrLenField, self).__init__(name, default)
1986 self.length_from = length_from
1987 self.max_length = max_length
1989 def getfield(self, pkt, s):
1990 # type: (Any, bytes) -> Tuple[bytes, bytes]
1991 len_pkt = (self.length_from or (lambda x: 0))(pkt)
1992 if not self.ON_WIRE_SIZE_UTF16:
1993 len_pkt *= 2
1994 if len_pkt == 0:
1995 return s, b""
1996 return s[len_pkt:], self.m2i(pkt, s[:len_pkt])
1998 def randval(self):
1999 # type: () -> RandBin
2000 return RandBin(RandNum(0, self.max_length or 1200))
2003class _XStrField(Field[bytes, bytes]):
2004 def i2repr(self, pkt, x):
2005 # type: (Optional[Packet], bytes) -> str
2006 if isinstance(x, bytes):
2007 return bytes_hex(x).decode()
2008 return super(_XStrField, self).i2repr(pkt, x)
2011class XStrField(_XStrField, StrField):
2012 """
2013 StrField which value is printed as hexadecimal.
2014 """
2017class XStrLenField(_XStrField, StrLenField):
2018 """
2019 StrLenField which value is printed as hexadecimal.
2020 """
2023class XStrFixedLenField(_XStrField, StrFixedLenField):
2024 """
2025 StrFixedLenField which value is printed as hexadecimal.
2026 """
2029class XLEStrLenField(XStrLenField):
2030 def i2m(self, pkt, x):
2031 # type: (Optional[Packet], Optional[bytes]) -> bytes
2032 if not x:
2033 return b""
2034 return x[:: -1]
2036 def m2i(self, pkt, x):
2037 # type: (Optional[Packet], bytes) -> bytes
2038 return x[:: -1]
2041class StrLenFieldUtf16(StrLenField, StrFieldUtf16):
2042 pass
2045class StrLenEnumField(_StrEnumField, StrLenField):
2046 __slots__ = ["enum"]
2048 def __init__(
2049 self,
2050 name, # type: str
2051 default, # type: bytes
2052 enum=None, # type: Optional[Dict[str, str]]
2053 **kwargs # type: Any
2054 ):
2055 # type: (...) -> None
2056 StrLenField.__init__(self, name, default, **kwargs)
2057 self.enum = enum
2060class BoundStrLenField(StrLenField):
2061 __slots__ = ["minlen", "maxlen"]
2063 def __init__(
2064 self,
2065 name, # type: str
2066 default, # type: bytes
2067 minlen=0, # type: int
2068 maxlen=255, # type: int
2069 length_from=None # type: Optional[Callable[[Packet], int]]
2070 ):
2071 # type: (...) -> None
2072 StrLenField.__init__(self, name, default, length_from=length_from)
2073 self.minlen = minlen
2074 self.maxlen = maxlen
2076 def randval(self):
2077 # type: () -> RandBin
2078 return RandBin(RandNum(self.minlen, self.maxlen))
2081class FieldListField(Field[List[Any], List[Any]]):
2082 __slots__ = ["field", "count_from", "length_from", "max_count"]
2083 islist = 1
2085 def __init__(
2086 self,
2087 name, # type: str
2088 default, # type: Optional[List[AnyField]]
2089 field, # type: AnyField
2090 length_from=None, # type: Optional[Callable[[Packet], int]]
2091 count_from=None, # type: Optional[Callable[[Packet], int]]
2092 max_count=None, # type: Optional[int]
2093 ):
2094 # type: (...) -> None
2095 if default is None:
2096 default = [] # Create a new list for each instance
2097 self.field = field
2098 Field.__init__(self, name, default)
2099 self.count_from = count_from
2100 self.length_from = length_from
2101 self.max_count = max_count
2103 def i2count(self, pkt, val):
2104 # type: (Optional[Packet], List[Any]) -> int
2105 if isinstance(val, list):
2106 return len(val)
2107 return 1
2109 def i2len(self, pkt, val):
2110 # type: (Packet, List[Any]) -> int
2111 return int(sum(self.field.i2len(pkt, v) for v in val))
2113 def any2i(self, pkt, x):
2114 # type: (Optional[Packet], List[Any]) -> List[Any]
2115 if not isinstance(x, list):
2116 return [self.field.any2i(pkt, x)]
2117 else:
2118 return [self.field.any2i(pkt, e) for e in x]
2120 def i2repr(self,
2121 pkt, # type: Optional[Packet]
2122 x, # type: List[Any]
2123 ):
2124 # type: (...) -> str
2125 return "[%s]" % ", ".join(self.field.i2repr(pkt, v) for v in x)
2127 def addfield(self,
2128 pkt, # type: Packet
2129 s, # type: bytes
2130 val, # type: Optional[List[Any]]
2131 ):
2132 # type: (...) -> bytes
2133 val = self.i2m(pkt, val)
2134 for v in val:
2135 s = self.field.addfield(pkt, s, v)
2136 return s
2138 def getfield(self,
2139 pkt, # type: Packet
2140 s, # type: bytes
2141 ):
2142 # type: (...) -> Any
2143 c = len_pkt = None
2144 if self.length_from is not None:
2145 len_pkt = self.length_from(pkt)
2146 elif self.count_from is not None:
2147 c = self.count_from(pkt)
2149 val = []
2150 ret = b""
2151 if len_pkt is not None:
2152 s, ret = s[:len_pkt], s[len_pkt:]
2154 while s:
2155 if c is not None:
2156 if c <= 0:
2157 break
2158 c -= 1
2159 s, v = self.field.getfield(pkt, s)
2160 val.append(v)
2161 if len(val) > (self.max_count or conf.max_list_count):
2162 raise MaximumItemsCount(
2163 "Maximum amount of items reached in FieldListField: %s "
2164 "(defaults to conf.max_list_count)"
2165 % (self.max_count or conf.max_list_count)
2166 )
2168 if isinstance(s, tuple):
2169 s, bn = s
2170 return (s + ret, bn), val
2171 else:
2172 return s + ret, val
2175class FieldLenField(Field[int, int]):
2176 __slots__ = ["length_of", "count_of", "adjust"]
2178 def __init__(
2179 self,
2180 name, # type: str
2181 default, # type: Optional[Any]
2182 length_of=None, # type: Optional[str]
2183 fmt="H", # type: str
2184 count_of=None, # type: Optional[str]
2185 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int]
2186 ):
2187 # type: (...) -> None
2188 Field.__init__(self, name, default, fmt)
2189 self.length_of = length_of
2190 self.count_of = count_of
2191 self.adjust = adjust
2193 def i2m(self, pkt, x):
2194 # type: (Optional[Packet], Optional[int]) -> int
2195 if x is None and pkt is not None:
2196 if self.length_of is not None:
2197 fld, fval = pkt.getfield_and_val(self.length_of)
2198 f = fld.i2len(pkt, fval)
2199 elif self.count_of is not None:
2200 fld, fval = pkt.getfield_and_val(self.count_of)
2201 f = fld.i2count(pkt, fval)
2202 else:
2203 raise ValueError(
2204 "Field should have either length_of or count_of"
2205 )
2206 x = self.adjust(pkt, f)
2207 elif x is None:
2208 x = 0
2209 return x
2212class StrNullField(StrField):
2213 DELIMITER = b"\x00"
2215 def addfield(self, pkt, s, val):
2216 # type: (Packet, bytes, Optional[bytes]) -> bytes
2217 return s + self.i2m(pkt, val) + self.DELIMITER
2219 def getfield(self,
2220 pkt, # type: Packet
2221 s, # type: bytes
2222 ):
2223 # type: (...) -> Tuple[bytes, bytes]
2224 len_str = 0
2225 while True:
2226 len_str = s.find(self.DELIMITER, len_str)
2227 if len_str < 0:
2228 # DELIMITER not found: return empty
2229 return b"", s
2230 if len_str % len(self.DELIMITER):
2231 len_str += 1
2232 else:
2233 break
2234 return s[len_str + len(self.DELIMITER):], self.m2i(pkt, s[:len_str])
2236 def randval(self):
2237 # type: () -> RandTermString
2238 return RandTermString(RandNum(0, 1200), self.DELIMITER)
2240 def i2len(self, pkt, x):
2241 # type: (Optional[Packet], Any) -> int
2242 return super(StrNullField, self).i2len(pkt, x) + 1
2245class StrNullFieldUtf16(StrNullField, StrFieldUtf16):
2246 DELIMITER = b"\x00\x00"
2249class StrStopField(StrField):
2250 __slots__ = ["stop", "additional"]
2252 def __init__(self, name, default, stop, additional=0):
2253 # type: (str, str, bytes, int) -> None
2254 Field.__init__(self, name, default)
2255 self.stop = stop
2256 self.additional = additional
2258 def getfield(self, pkt, s):
2259 # type: (Optional[Packet], bytes) -> Tuple[bytes, bytes]
2260 len_str = s.find(self.stop)
2261 if len_str < 0:
2262 return b"", s
2263 len_str += len(self.stop) + self.additional
2264 return s[len_str:], s[:len_str]
2266 def randval(self):
2267 # type: () -> RandTermString
2268 return RandTermString(RandNum(0, 1200), self.stop)
2271class LenField(Field[int, int]):
2272 """
2273 If None, will be filled with the size of the payload
2274 """
2275 __slots__ = ["adjust"]
2277 def __init__(self, name, default, fmt="H", adjust=lambda x: x):
2278 # type: (str, Optional[Any], str, Callable[[int], int]) -> None
2279 Field.__init__(self, name, default, fmt)
2280 self.adjust = adjust
2282 def i2m(self,
2283 pkt, # type: Optional[Packet]
2284 x, # type: Optional[int]
2285 ):
2286 # type: (...) -> int
2287 if x is None:
2288 x = 0
2289 if pkt is not None:
2290 x = self.adjust(len(pkt.payload))
2291 return x
2294class BCDFloatField(Field[float, int]):
2295 def i2m(self, pkt, x):
2296 # type: (Optional[Packet], Optional[float]) -> int
2297 if x is None:
2298 return 0
2299 return int(256 * x)
2301 def m2i(self, pkt, x):
2302 # type: (Optional[Packet], int) -> float
2303 return x / 256.0
2306class _BitField(Field[I, int]):
2307 """
2308 Field to handle bits.
2310 :param name: name of the field
2311 :param default: default value
2312 :param size: size (in bits). If negative, Low endian
2313 :param tot_size: size of the total group of bits (in bytes) the bitfield
2314 is in. If negative, Low endian.
2315 :param end_tot_size: same but for the BitField ending a group.
2317 Example - normal usage::
2319 0 1 2 3
2320 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
2321 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2322 | A | B | C |
2323 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2325 Fig. TestPacket
2327 class TestPacket(Packet):
2328 fields_desc = [
2329 BitField("a", 0, 14),
2330 BitField("b", 0, 16),
2331 BitField("c", 0, 2),
2332 ]
2334 Example - Low endian stored as 16 bits on the network::
2336 x x x x x x x x x x x x x x x x
2337 a [b] [ c ] [ a ]
2339 Will first get reversed during dissecion:
2341 x x x x x x x x x x x x x x x x
2342 [ a ] [b] [ c ]
2344 class TestPacket(Packet):
2345 fields_desc = [
2346 BitField("a", 0, 9, tot_size=-2),
2347 BitField("b", 0, 2),
2348 BitField("c", 0, 5, end_tot_size=-2)
2349 ]
2351 """
2352 __slots__ = ["rev", "size", "tot_size", "end_tot_size"]
2354 def __init__(self, name, default, size,
2355 tot_size=0, end_tot_size=0):
2356 # type: (str, Optional[I], int, int, int) -> None
2357 Field.__init__(self, name, default)
2358 if callable(size):
2359 size = size(self)
2360 self.rev = size < 0 or tot_size < 0 or end_tot_size < 0
2361 self.size = abs(size)
2362 if not tot_size:
2363 tot_size = self.size // 8
2364 self.tot_size = abs(tot_size)
2365 if not end_tot_size:
2366 end_tot_size = self.size // 8
2367 self.end_tot_size = abs(end_tot_size)
2368 # Fields always have a round sz except BitField
2369 # so to keep it simple, we'll ignore it here.
2370 self.sz = self.size / 8. # type: ignore
2372 # We need to # type: ignore a few things because of how special
2373 # BitField is
2374 def addfield(self, # type: ignore
2375 pkt, # type: Packet
2376 s, # type: Union[Tuple[bytes, int, int], bytes]
2377 ival, # type: I
2378 ):
2379 # type: (...) -> Union[Tuple[bytes, int, int], bytes]
2380 val = self.i2m(pkt, ival)
2381 if isinstance(s, tuple):
2382 s, bitsdone, v = s
2383 else:
2384 bitsdone = 0
2385 v = 0
2386 v <<= self.size
2387 v |= val & ((1 << self.size) - 1)
2388 bitsdone += self.size
2389 while bitsdone >= 8:
2390 bitsdone -= 8
2391 s = s + struct.pack("!B", v >> bitsdone)
2392 v &= (1 << bitsdone) - 1
2393 if bitsdone:
2394 return s, bitsdone, v
2395 else:
2396 # Apply LE if necessary
2397 if self.rev and self.end_tot_size > 1:
2398 s = s[:-self.end_tot_size] + s[-self.end_tot_size:][::-1]
2399 return s
2401 def getfield(self, # type: ignore
2402 pkt, # type: Packet
2403 s, # type: Union[Tuple[bytes, int], bytes]
2404 ):
2405 # type: (...) -> Union[Tuple[Tuple[bytes, int], I], Tuple[bytes, I]] # noqa: E501
2406 if isinstance(s, tuple):
2407 s, bn = s
2408 else:
2409 bn = 0
2410 # Apply LE if necessary
2411 if self.rev and self.tot_size > 1:
2412 s = s[:self.tot_size][::-1] + s[self.tot_size:]
2414 # we don't want to process all the string
2415 nb_bytes = (self.size + bn - 1) // 8 + 1
2416 w = s[:nb_bytes]
2418 # split the substring byte by byte
2419 _bytes = struct.unpack('!%dB' % nb_bytes, w)
2421 b = 0
2422 for c in range(nb_bytes):
2423 b |= int(_bytes[c]) << (nb_bytes - c - 1) * 8
2425 # get rid of high order bits
2426 b &= (1 << (nb_bytes * 8 - bn)) - 1
2428 # remove low order bits
2429 b = b >> (nb_bytes * 8 - self.size - bn)
2431 bn += self.size
2432 s = s[bn // 8:]
2433 bn = bn % 8
2434 b2 = self.m2i(pkt, b)
2435 if bn:
2436 return (s, bn), b2
2437 else:
2438 return s, b2
2440 def randval(self):
2441 # type: () -> RandNum
2442 return RandNum(0, 2**self.size - 1)
2444 def i2len(self, pkt, x): # type: ignore
2445 # type: (Optional[Packet], Optional[float]) -> float
2446 return float(self.size) / 8
2449class BitField(_BitField[int]):
2450 __doc__ = _BitField.__doc__
2453class BitLenField(BitField):
2454 __slots__ = ["length_from"]
2456 def __init__(self,
2457 name, # type: str
2458 default, # type: Optional[int]
2459 length_from # type: Callable[[Packet], int]
2460 ):
2461 # type: (...) -> None
2462 self.length_from = length_from
2463 super(BitLenField, self).__init__(name, default, 0)
2465 def getfield(self, # type: ignore
2466 pkt, # type: Packet
2467 s, # type: Union[Tuple[bytes, int], bytes]
2468 ):
2469 # type: (...) -> Union[Tuple[Tuple[bytes, int], int], Tuple[bytes, int]] # noqa: E501
2470 self.size = self.length_from(pkt)
2471 return super(BitLenField, self).getfield(pkt, s)
2473 def addfield(self, # type: ignore
2474 pkt, # type: Packet
2475 s, # type: Union[Tuple[bytes, int, int], bytes]
2476 val # type: int
2477 ):
2478 # type: (...) -> Union[Tuple[bytes, int, int], bytes]
2479 self.size = self.length_from(pkt)
2480 return super(BitLenField, self).addfield(pkt, s, val)
2483class BitFieldLenField(BitField):
2484 __slots__ = ["length_of", "count_of", "adjust", "tot_size", "end_tot_size"]
2486 def __init__(self,
2487 name, # type: str
2488 default, # type: Optional[int]
2489 size, # type: int
2490 length_of=None, # type: Optional[Union[Callable[[Optional[Packet]], int], str]] # noqa: E501
2491 count_of=None, # type: Optional[str]
2492 adjust=lambda pkt, x: x, # type: Callable[[Optional[Packet], int], int] # noqa: E501
2493 tot_size=0, # type: int
2494 end_tot_size=0, # type: int
2495 ):
2496 # type: (...) -> None
2497 super(BitFieldLenField, self).__init__(name, default, size,
2498 tot_size, end_tot_size)
2499 self.length_of = length_of
2500 self.count_of = count_of
2501 self.adjust = adjust
2503 def i2m(self, pkt, x):
2504 # type: (Optional[Packet], Optional[Any]) -> int
2505 return FieldLenField.i2m(self, pkt, x) # type: ignore
2508class XBitField(BitField):
2509 def i2repr(self, pkt, x):
2510 # type: (Optional[Packet], int) -> str
2511 return lhex(self.i2h(pkt, x))
2514class _EnumField(Field[Union[List[I], I], I]):
2515 def __init__(self,
2516 name, # type: str
2517 default, # type: Optional[I]
2518 enum, # type: Union[Dict[I, str], Dict[str, I], List[str], DADict[I, str], Type[Enum], Tuple[Callable[[I], str], Callable[[str], I]]] # noqa: E501
2519 fmt="H", # type: str
2520 ):
2521 # type: (...) -> None
2522 """ Initializes enum fields.
2524 @param name: name of this field
2525 @param default: default value of this field
2526 @param enum: either an enum, a dict or a tuple of two callables.
2527 Dict keys are the internal values, while the dict
2528 values are the user-friendly representations. If the
2529 tuple is provided, the first callable receives the
2530 internal value as parameter and returns the
2531 user-friendly representation and the second callable
2532 does the converse. The first callable may return None
2533 to default to a literal string (repr()) representation.
2534 @param fmt: struct.pack format used to parse and serialize the
2535 internal value from and to machine representation.
2536 """
2537 if isinstance(enum, ObservableDict):
2538 cast(ObservableDict, enum).observe(self)
2540 if isinstance(enum, tuple):
2541 self.i2s_cb = enum[0] # type: Optional[Callable[[I], str]]
2542 self.s2i_cb = enum[1] # type: Optional[Callable[[str], I]]
2543 self.i2s = None # type: Optional[Dict[I, str]]
2544 self.s2i = None # type: Optional[Dict[str, I]]
2545 elif isinstance(enum, type) and issubclass(enum, Enum):
2546 # Python's Enum
2547 i2s = self.i2s = {}
2548 s2i = self.s2i = {}
2549 self.i2s_cb = None
2550 self.s2i_cb = None
2551 names = [x.name for x in enum]
2552 for n in names:
2553 value = enum[n].value
2554 i2s[value] = n
2555 s2i[n] = value
2556 else:
2557 i2s = self.i2s = {}
2558 s2i = self.s2i = {}
2559 self.i2s_cb = None
2560 self.s2i_cb = None
2561 keys = [] # type: List[I]
2562 if isinstance(enum, list):
2563 keys = list(range(len(enum))) # type: ignore
2564 elif isinstance(enum, DADict):
2565 keys = enum.keys()
2566 else:
2567 keys = list(enum) # type: ignore
2568 if any(isinstance(x, str) for x in keys):
2569 i2s, s2i = s2i, i2s # type: ignore
2570 for k in keys:
2571 value = cast(str, enum[k]) # type: ignore
2572 i2s[k] = value
2573 s2i[value] = k
2574 Field.__init__(self, name, default, fmt)
2576 def any2i_one(self, pkt, x):
2577 # type: (Optional[Packet], Any) -> I
2578 if isinstance(x, Enum):
2579 return cast(I, x.value)
2580 elif isinstance(x, str):
2581 if self.s2i:
2582 x = self.s2i[x]
2583 elif self.s2i_cb:
2584 x = self.s2i_cb(x)
2585 return cast(I, x)
2587 def _i2repr(self, pkt, x):
2588 # type: (Optional[Packet], I) -> str
2589 return repr(x)
2591 def i2repr_one(self, pkt, x):
2592 # type: (Optional[Packet], I) -> str
2593 if self not in conf.noenum and not isinstance(x, VolatileValue):
2594 if self.i2s:
2595 try:
2596 return self.i2s[x]
2597 except KeyError:
2598 pass
2599 elif self.i2s_cb:
2600 ret = self.i2s_cb(x)
2601 if ret is not None:
2602 return ret
2603 return self._i2repr(pkt, x)
2605 def any2i(self, pkt, x):
2606 # type: (Optional[Packet], Any) -> Union[I, List[I]]
2607 if isinstance(x, list):
2608 return [self.any2i_one(pkt, z) for z in x]
2609 else:
2610 return self.any2i_one(pkt, x)
2612 def i2repr(self, pkt, x): # type: ignore
2613 # type: (Optional[Packet], Any) -> Union[List[str], str]
2614 if isinstance(x, list):
2615 return [self.i2repr_one(pkt, z) for z in x]
2616 else:
2617 return self.i2repr_one(pkt, x)
2619 def notify_set(self, enum, key, value):
2620 # type: (ObservableDict, I, str) -> None
2621 ks = "0x%x" if isinstance(key, int) else "%s"
2622 log_runtime.debug(
2623 "At %s: Change to %s at " + ks, self, value, key
2624 )
2625 if self.i2s is not None and self.s2i is not None:
2626 self.i2s[key] = value
2627 self.s2i[value] = key
2629 def notify_del(self, enum, key):
2630 # type: (ObservableDict, I) -> None
2631 ks = "0x%x" if isinstance(key, int) else "%s"
2632 log_runtime.debug("At %s: Delete value at " + ks, self, key)
2633 if self.i2s is not None and self.s2i is not None:
2634 value = self.i2s[key]
2635 del self.i2s[key]
2636 del self.s2i[value]
2639class EnumField(_EnumField[I]):
2640 __slots__ = ["i2s", "s2i", "s2i_cb", "i2s_cb"]
2643class CharEnumField(EnumField[str]):
2644 def __init__(self,
2645 name, # type: str
2646 default, # type: str
2647 enum, # type: Union[Dict[str, str], Tuple[Callable[[str], str], Callable[[str], str]]] # noqa: E501
2648 fmt="1s", # type: str
2649 ):
2650 # type: (...) -> None
2651 super(CharEnumField, self).__init__(name, default, enum, fmt)
2652 if self.i2s is not None:
2653 k = list(self.i2s)
2654 if k and len(k[0]) != 1:
2655 self.i2s, self.s2i = self.s2i, self.i2s
2657 def any2i_one(self, pkt, x):
2658 # type: (Optional[Packet], str) -> str
2659 if len(x) != 1:
2660 if self.s2i:
2661 x = self.s2i[x]
2662 elif self.s2i_cb:
2663 x = self.s2i_cb(x)
2664 return x
2667class BitEnumField(_BitField[Union[List[int], int]], _EnumField[int]):
2668 __slots__ = EnumField.__slots__
2670 def __init__(self, name, default, size, enum, **kwargs):
2671 # type: (str, Optional[int], int, Dict[int, str], **Any) -> None
2672 _EnumField.__init__(self, name, default, enum)
2673 _BitField.__init__(self, name, default, size, **kwargs)
2675 def any2i(self, pkt, x):
2676 # type: (Optional[Packet], Any) -> Union[List[int], int]
2677 return _EnumField.any2i(self, pkt, x)
2679 def i2repr(self,
2680 pkt, # type: Optional[Packet]
2681 x, # type: Union[List[int], int]
2682 ):
2683 # type: (...) -> Any
2684 return _EnumField.i2repr(self, pkt, x)
2687class BitLenEnumField(BitLenField, _EnumField[int]):
2688 __slots__ = EnumField.__slots__
2690 def __init__(self,
2691 name, # type: str
2692 default, # type: Optional[int]
2693 length_from, # type: Callable[[Packet], int]
2694 enum, # type: Dict[int, str]
2695 **kwargs, # type: Any
2696 ):
2697 # type: (...) -> None
2698 _EnumField.__init__(self, name, default, enum)
2699 BitLenField.__init__(self, name, default, length_from, **kwargs)
2701 def any2i(self, pkt, x):
2702 # type: (Optional[Packet], Any) -> int
2703 return _EnumField.any2i(self, pkt, x) # type: ignore
2705 def i2repr(self,
2706 pkt, # type: Optional[Packet]
2707 x, # type: Union[List[int], int]
2708 ):
2709 # type: (...) -> Any
2710 return _EnumField.i2repr(self, pkt, x)
2713class ShortEnumField(EnumField[int]):
2714 __slots__ = EnumField.__slots__
2716 def __init__(self,
2717 name, # type: str
2718 default, # type: int
2719 enum, # type: Union[Dict[int, str], Dict[str, int], Tuple[Callable[[int], str], Callable[[str], int]], DADict[int, str]] # noqa: E501
2720 ):
2721 # type: (...) -> None
2722 super(ShortEnumField, self).__init__(name, default, enum, "H")
2725class LEShortEnumField(EnumField[int]):
2726 def __init__(self, name, default, enum):
2727 # type: (str, int, Union[Dict[int, str], List[str]]) -> None
2728 super(LEShortEnumField, self).__init__(name, default, enum, "<H")
2731class LELongEnumField(EnumField[int]):
2732 def __init__(self, name, default, enum):
2733 # type: (str, int, Union[Dict[int, str], List[str]]) -> None
2734 super(LELongEnumField, self).__init__(name, default, enum, "<Q")
2737class ByteEnumField(EnumField[int]):
2738 def __init__(self, name, default, enum):
2739 # type: (str, Optional[int], Dict[int, str]) -> None
2740 super(ByteEnumField, self).__init__(name, default, enum, "B")
2743class XByteEnumField(ByteEnumField):
2744 def i2repr_one(self, pkt, x):
2745 # type: (Optional[Packet], int) -> str
2746 if self not in conf.noenum and not isinstance(x, VolatileValue):
2747 if self.i2s:
2748 try:
2749 return self.i2s[x]
2750 except KeyError:
2751 pass
2752 elif self.i2s_cb:
2753 ret = self.i2s_cb(x)
2754 if ret is not None:
2755 return ret
2756 return lhex(x)
2759class IntEnumField(EnumField[int]):
2760 def __init__(self, name, default, enum):
2761 # type: (str, Optional[int], Dict[int, str]) -> None
2762 super(IntEnumField, self).__init__(name, default, enum, "I")
2765class SignedIntEnumField(EnumField[int]):
2766 def __init__(self, name, default, enum):
2767 # type: (str, Optional[int], Dict[int, str]) -> None
2768 super(SignedIntEnumField, self).__init__(name, default, enum, "i")
2771class LEIntEnumField(EnumField[int]):
2772 def __init__(self, name, default, enum):
2773 # type: (str, int, Dict[int, str]) -> None
2774 super(LEIntEnumField, self).__init__(name, default, enum, "<I")
2777class XShortEnumField(ShortEnumField):
2778 def _i2repr(self, pkt, x):
2779 # type: (Optional[Packet], Any) -> str
2780 return lhex(x)
2783class LE3BytesEnumField(LEThreeBytesField, _EnumField[int]):
2784 __slots__ = EnumField.__slots__
2786 def __init__(self, name, default, enum):
2787 # type: (str, Optional[int], Dict[int, str]) -> None
2788 _EnumField.__init__(self, name, default, enum)
2789 LEThreeBytesField.__init__(self, name, default)
2791 def any2i(self, pkt, x):
2792 # type: (Optional[Packet], Any) -> int
2793 return _EnumField.any2i(self, pkt, x) # type: ignore
2795 def i2repr(self, pkt, x): # type: ignore
2796 # type: (Optional[Packet], Any) -> Union[List[str], str]
2797 return _EnumField.i2repr(self, pkt, x)
2800class XLE3BytesEnumField(LE3BytesEnumField):
2801 def _i2repr(self, pkt, x):
2802 # type: (Optional[Packet], Any) -> str
2803 return lhex(x)
2806class _MultiEnumField(_EnumField[I]):
2807 def __init__(self,
2808 name, # type: str
2809 default, # type: int
2810 enum, # type: Dict[I, Dict[I, str]]
2811 depends_on, # type: Callable[[Optional[Packet]], I]
2812 fmt="H" # type: str
2813 ):
2814 # type: (...) -> None
2816 self.depends_on = depends_on
2817 self.i2s_multi = enum
2818 self.s2i_multi = {} # type: Dict[I, Dict[str, I]]
2819 self.s2i_all = {} # type: Dict[str, I]
2820 for m in enum:
2821 s2i = {} # type: Dict[str, I]
2822 self.s2i_multi[m] = s2i
2823 for k, v in enum[m].items():
2824 s2i[v] = k
2825 self.s2i_all[v] = k
2826 Field.__init__(self, name, default, fmt)
2828 def any2i_one(self, pkt, x):
2829 # type: (Optional[Packet], Any) -> I
2830 if isinstance(x, str):
2831 v = self.depends_on(pkt)
2832 if v in self.s2i_multi:
2833 s2i = self.s2i_multi[v]
2834 if x in s2i:
2835 return s2i[x]
2836 return self.s2i_all[x]
2837 return cast(I, x)
2839 def i2repr_one(self, pkt, x):
2840 # type: (Optional[Packet], I) -> str
2841 v = self.depends_on(pkt)
2842 if isinstance(v, VolatileValue):
2843 return repr(v)
2844 if v in self.i2s_multi:
2845 return str(self.i2s_multi[v].get(x, x))
2846 return str(x)
2849class MultiEnumField(_MultiEnumField[int], EnumField[int]):
2850 __slots__ = ["depends_on", "i2s_multi", "s2i_multi", "s2i_all"]
2853class BitMultiEnumField(_BitField[Union[List[int], int]],
2854 _MultiEnumField[int]):
2855 __slots__ = EnumField.__slots__ + MultiEnumField.__slots__
2857 def __init__(
2858 self,
2859 name, # type: str
2860 default, # type: int
2861 size, # type: int
2862 enum, # type: Dict[int, Dict[int, str]]
2863 depends_on # type: Callable[[Optional[Packet]], int]
2864 ):
2865 # type: (...) -> None
2866 _MultiEnumField.__init__(self, name, default, enum, depends_on)
2867 self.rev = size < 0
2868 self.size = abs(size)
2869 self.sz = self.size / 8. # type: ignore
2871 def any2i(self, pkt, x):
2872 # type: (Optional[Packet], Any) -> Union[List[int], int]
2873 return _MultiEnumField[int].any2i(
2874 self, # type: ignore
2875 pkt,
2876 x
2877 )
2879 def i2repr( # type: ignore
2880 self,
2881 pkt, # type: Optional[Packet]
2882 x # type: Union[List[int], int]
2883 ):
2884 # type: (...) -> Union[str, List[str]]
2885 return _MultiEnumField[int].i2repr(
2886 self, # type: ignore
2887 pkt,
2888 x
2889 )
2892class ByteEnumKeysField(ByteEnumField):
2893 """ByteEnumField that picks valid values when fuzzed. """
2895 def randval(self):
2896 # type: () -> RandEnumKeys
2897 return RandEnumKeys(self.i2s or {})
2900class ShortEnumKeysField(ShortEnumField):
2901 """ShortEnumField that picks valid values when fuzzed. """
2903 def randval(self):
2904 # type: () -> RandEnumKeys
2905 return RandEnumKeys(self.i2s or {})
2908class IntEnumKeysField(IntEnumField):
2909 """IntEnumField that picks valid values when fuzzed. """
2911 def randval(self):
2912 # type: () -> RandEnumKeys
2913 return RandEnumKeys(self.i2s or {})
2916# Little endian fixed length field
2919class LEFieldLenField(FieldLenField):
2920 def __init__(
2921 self,
2922 name, # type: str
2923 default, # type: Optional[Any]
2924 length_of=None, # type: Optional[str]
2925 fmt="<H", # type: str
2926 count_of=None, # type: Optional[str]
2927 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int]
2928 ):
2929 # type: (...) -> None
2930 FieldLenField.__init__(
2931 self, name, default,
2932 length_of=length_of,
2933 fmt=fmt,
2934 count_of=count_of,
2935 adjust=adjust
2936 )
2939class FlagValueIter(object):
2941 __slots__ = ["flagvalue", "cursor"]
2943 def __init__(self, flagvalue):
2944 # type: (FlagValue) -> None
2945 self.flagvalue = flagvalue
2946 self.cursor = 0
2948 def __iter__(self):
2949 # type: () -> FlagValueIter
2950 return self
2952 def __next__(self):
2953 # type: () -> str
2954 x = int(self.flagvalue)
2955 x >>= self.cursor
2956 while x:
2957 self.cursor += 1
2958 if x & 1:
2959 return self.flagvalue.names[self.cursor - 1]
2960 x >>= 1
2961 raise StopIteration
2963 next = __next__
2966class FlagValue(object):
2967 __slots__ = ["value", "names", "multi"]
2969 def _fixvalue(self, value):
2970 # type: (Any) -> int
2971 if not value:
2972 return 0
2973 if isinstance(value, str):
2974 value = value.split('+') if self.multi else list(value)
2975 if isinstance(value, list):
2976 y = 0
2977 for i in value:
2978 y |= 1 << self.names.index(i)
2979 value = y
2980 return int(value)
2982 def __init__(self, value, names):
2983 # type: (Union[List[str], int, str], Union[List[str], str]) -> None
2984 self.multi = isinstance(names, list)
2985 self.names = names
2986 self.value = self._fixvalue(value)
2988 def __hash__(self):
2989 # type: () -> int
2990 return hash(self.value)
2992 def __int__(self):
2993 # type: () -> int
2994 return self.value
2996 def __eq__(self, other):
2997 # type: (Any) -> bool
2998 return self.value == self._fixvalue(other)
3000 def __lt__(self, other):
3001 # type: (Any) -> bool
3002 return self.value < self._fixvalue(other)
3004 def __le__(self, other):
3005 # type: (Any) -> bool
3006 return self.value <= self._fixvalue(other)
3008 def __gt__(self, other):
3009 # type: (Any) -> bool
3010 return self.value > self._fixvalue(other)
3012 def __ge__(self, other):
3013 # type: (Any) -> bool
3014 return self.value >= self._fixvalue(other)
3016 def __ne__(self, other):
3017 # type: (Any) -> bool
3018 return self.value != self._fixvalue(other)
3020 def __and__(self, other):
3021 # type: (int) -> FlagValue
3022 return self.__class__(self.value & self._fixvalue(other), self.names)
3023 __rand__ = __and__
3025 def __or__(self, other):
3026 # type: (int) -> FlagValue
3027 return self.__class__(self.value | self._fixvalue(other), self.names)
3028 __ror__ = __or__
3029 __add__ = __or__ # + is an alias for |
3031 def __sub__(self, other):
3032 # type: (int) -> FlagValue
3033 return self.__class__(
3034 self.value & (2 ** len(self.names) - 1 - self._fixvalue(other)),
3035 self.names
3036 )
3038 def __xor__(self, other):
3039 # type: (int) -> FlagValue
3040 return self.__class__(self.value ^ self._fixvalue(other), self.names)
3042 def __lshift__(self, other):
3043 # type: (int) -> int
3044 return self.value << self._fixvalue(other)
3046 def __rshift__(self, other):
3047 # type: (int) -> int
3048 return self.value >> self._fixvalue(other)
3050 def __nonzero__(self):
3051 # type: () -> bool
3052 return bool(self.value)
3053 __bool__ = __nonzero__
3055 def flagrepr(self):
3056 # type: () -> str
3057 warnings.warn(
3058 "obj.flagrepr() is obsolete. Use str(obj) instead.",
3059 DeprecationWarning
3060 )
3061 return str(self)
3063 def __str__(self):
3064 # type: () -> str
3065 i = 0
3066 r = []
3067 x = int(self)
3068 while x:
3069 if x & 1:
3070 try:
3071 name = self.names[i]
3072 except IndexError:
3073 name = "?"
3074 r.append(name)
3075 i += 1
3076 x >>= 1
3077 return ("+" if self.multi else "").join(r)
3079 def __iter__(self):
3080 # type: () -> FlagValueIter
3081 return FlagValueIter(self)
3083 def __repr__(self):
3084 # type: () -> str
3085 return "<Flag %d (%s)>" % (self, self)
3087 def __deepcopy__(self, memo):
3088 # type: (Dict[Any, Any]) -> FlagValue
3089 return self.__class__(int(self), self.names)
3091 def __getattr__(self, attr):
3092 # type: (str) -> Any
3093 if attr in self.__slots__:
3094 return super(FlagValue, self).__getattribute__(attr)
3095 try:
3096 if self.multi:
3097 return bool((2 ** self.names.index(attr)) & int(self))
3098 return all(bool((2 ** self.names.index(flag)) & int(self))
3099 for flag in attr)
3100 except ValueError:
3101 if '_' in attr:
3102 try:
3103 return self.__getattr__(attr.replace('_', '-'))
3104 except AttributeError:
3105 pass
3106 return super(FlagValue, self).__getattribute__(attr)
3108 def __setattr__(self, attr, value):
3109 # type: (str, Union[List[str], int, str]) -> None
3110 if attr == "value" and not isinstance(value, int):
3111 raise ValueError(value)
3112 if attr in self.__slots__:
3113 return super(FlagValue, self).__setattr__(attr, value)
3114 if attr in self.names:
3115 if value:
3116 self.value |= (2 ** self.names.index(attr))
3117 else:
3118 self.value &= ~(2 ** self.names.index(attr))
3119 else:
3120 return super(FlagValue, self).__setattr__(attr, value)
3122 def copy(self):
3123 # type: () -> FlagValue
3124 return self.__class__(self.value, self.names)
3127class FlagsField(_BitField[Optional[Union[int, FlagValue]]]):
3128 """ Handle Flag type field
3130 Make sure all your flags have a label
3132 Example (list):
3133 >>> from scapy.packet import Packet
3134 >>> class FlagsTest(Packet):
3135 fields_desc = [FlagsField("flags", 0, 8, ["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7"])] # noqa: E501
3136 >>> FlagsTest(flags=9).show2()
3137 ###[ FlagsTest ]###
3138 flags = f0+f3
3140 Example (str):
3141 >>> from scapy.packet import Packet
3142 >>> class TCPTest(Packet):
3143 fields_desc = [
3144 BitField("reserved", 0, 7),
3145 FlagsField("flags", 0x2, 9, "FSRPAUECN")
3146 ]
3147 >>> TCPTest(flags=3).show2()
3148 ###[ FlagsTest ]###
3149 reserved = 0
3150 flags = FS
3152 Example (dict):
3153 >>> from scapy.packet import Packet
3154 >>> class FlagsTest2(Packet):
3155 fields_desc = [
3156 FlagsField("flags", 0x2, 16, {
3157 0x0001: "A",
3158 0x0008: "B",
3159 })
3160 ]
3162 :param name: field's name
3163 :param default: default value for the field
3164 :param size: number of bits in the field (in bits). if negative, LE
3165 :param names: (list or str or dict) label for each flag
3166 If it's a str or a list, the least Significant Bit tag's name
3167 is written first.
3168 """
3169 ismutable = True
3170 __slots__ = ["names"]
3172 def __init__(self,
3173 name, # type: str
3174 default, # type: Optional[Union[int, FlagValue]]
3175 size, # type: int
3176 names # type: Union[List[str], str, Dict[int, str]]
3177 ):
3178 # type: (...) -> None
3179 # Convert the dict to a list
3180 if isinstance(names, dict):
3181 tmp = ["bit_%d" % i for i in range(abs(size))]
3182 for i, v in names.items():
3183 tmp[int(math.floor(math.log(i, 2)))] = v
3184 names = tmp
3185 # Store the names as str or list
3186 self.names = names
3187 super(FlagsField, self).__init__(name, default, size)
3189 def _fixup_val(self, x):
3190 # type: (Any) -> Optional[FlagValue]
3191 """Returns a FlagValue instance when needed. Internal method, to be
3192used in *2i() and i2*() methods.
3194 """
3195 if isinstance(x, (FlagValue, VolatileValue)):
3196 return x # type: ignore
3197 if x is None:
3198 return None
3199 return FlagValue(x, self.names)
3201 def any2i(self, pkt, x):
3202 # type: (Optional[Packet], Any) -> Optional[FlagValue]
3203 return self._fixup_val(super(FlagsField, self).any2i(pkt, x))
3205 def m2i(self, pkt, x):
3206 # type: (Optional[Packet], int) -> Optional[FlagValue]
3207 return self._fixup_val(super(FlagsField, self).m2i(pkt, x))
3209 def i2h(self, pkt, x):
3210 # type: (Optional[Packet], Any) -> Optional[FlagValue]
3211 return self._fixup_val(super(FlagsField, self).i2h(pkt, x))
3213 def i2repr(self,
3214 pkt, # type: Optional[Packet]
3215 x, # type: Any
3216 ):
3217 # type: (...) -> str
3218 if isinstance(x, (list, tuple)):
3219 return repr(type(x)(
3220 "None" if v is None else str(self._fixup_val(v)) for v in x
3221 ))
3222 return "None" if x is None else str(self._fixup_val(x))
3225MultiFlagsEntry = collections.namedtuple('MultiFlagsEntry', ['short', 'long'])
3228class MultiFlagsField(_BitField[Set[str]]):
3229 __slots__ = FlagsField.__slots__ + ["depends_on"]
3231 def __init__(self,
3232 name, # type: str
3233 default, # type: Set[str]
3234 size, # type: int
3235 names, # type: Dict[int, Dict[int, MultiFlagsEntry]]
3236 depends_on, # type: Callable[[Optional[Packet]], int]
3237 ):
3238 # type: (...) -> None
3239 self.names = names
3240 self.depends_on = depends_on
3241 super(MultiFlagsField, self).__init__(name, default, size)
3243 def any2i(self, pkt, x):
3244 # type: (Optional[Packet], Any) -> Set[str]
3245 if not isinstance(x, (set, int)):
3246 raise ValueError('set expected')
3248 if pkt is not None:
3249 if isinstance(x, int):
3250 return self.m2i(pkt, x)
3251 else:
3252 v = self.depends_on(pkt)
3253 if v is not None:
3254 assert v in self.names, 'invalid dependency'
3255 these_names = self.names[v]
3256 s = set()
3257 for i in x:
3258 for val in these_names.values():
3259 if val.short == i:
3260 s.add(i)
3261 break
3262 else:
3263 assert False, 'Unknown flag "{}" with this dependency'.format(i) # noqa: E501
3264 continue
3265 return s
3266 if isinstance(x, int):
3267 return set()
3268 return x
3270 def i2m(self, pkt, x):
3271 # type: (Optional[Packet], Optional[Set[str]]) -> int
3272 v = self.depends_on(pkt)
3273 these_names = self.names.get(v, {})
3275 r = 0
3276 if x is None:
3277 return r
3278 for flag_set in x:
3279 for i, val in these_names.items():
3280 if val.short == flag_set:
3281 r |= 1 << i
3282 break
3283 else:
3284 r |= 1 << int(flag_set[len('bit '):])
3285 return r
3287 def m2i(self, pkt, x):
3288 # type: (Optional[Packet], int) -> Set[str]
3289 v = self.depends_on(pkt)
3290 these_names = self.names.get(v, {})
3292 r = set()
3293 i = 0
3294 while x:
3295 if x & 1:
3296 if i in these_names:
3297 r.add(these_names[i].short)
3298 else:
3299 r.add('bit {}'.format(i))
3300 x >>= 1
3301 i += 1
3302 return r
3304 def i2repr(self, pkt, x):
3305 # type: (Optional[Packet], Set[str]) -> str
3306 v = self.depends_on(pkt)
3307 these_names = self.names.get(v, {})
3309 r = set()
3310 for flag_set in x:
3311 for i in these_names.values():
3312 if i.short == flag_set:
3313 r.add("{} ({})".format(i.long, i.short))
3314 break
3315 else:
3316 r.add(flag_set)
3317 return repr(r)
3320class FixedPointField(BitField):
3321 __slots__ = ['frac_bits']
3323 def __init__(self, name, default, size, frac_bits=16):
3324 # type: (str, int, int, int) -> None
3325 self.frac_bits = frac_bits
3326 super(FixedPointField, self).__init__(name, default, size)
3328 def any2i(self, pkt, val):
3329 # type: (Optional[Packet], Optional[float]) -> Optional[int]
3330 if val is None:
3331 return val
3332 ival = int(val)
3333 fract = int((val - ival) * 2**self.frac_bits)
3334 return (ival << self.frac_bits) | fract
3336 def i2h(self, pkt, val):
3337 # type: (Optional[Packet], Optional[int]) -> Optional[EDecimal]
3338 # A bit of trickery to get precise floats
3339 if val is None:
3340 return val
3341 int_part = val >> self.frac_bits
3342 pw = 2.0**self.frac_bits
3343 frac_part = EDecimal(val & (1 << self.frac_bits) - 1)
3344 frac_part /= pw # type: ignore
3345 return int_part + frac_part.normalize(int(math.log10(pw)))
3347 def i2repr(self, pkt, val):
3348 # type: (Optional[Packet], int) -> str
3349 return str(self.i2h(pkt, val))
3352# Base class for IPv4 and IPv6 Prefixes inspired by IPField and IP6Field.
3353# Machine values are encoded in a multiple of wordbytes bytes.
3354class _IPPrefixFieldBase(Field[Tuple[str, int], Tuple[bytes, int]]):
3355 __slots__ = ["wordbytes", "maxbytes", "aton", "ntoa", "length_from"]
3357 def __init__(
3358 self,
3359 name, # type: str
3360 default, # type: Tuple[str, int]
3361 wordbytes, # type: int
3362 maxbytes, # type: int
3363 aton, # type: Callable[..., Any]
3364 ntoa, # type: Callable[..., Any]
3365 length_from=None # type: Optional[Callable[[Packet], int]]
3366 ):
3367 # type: (...) -> None
3368 self.wordbytes = wordbytes
3369 self.maxbytes = maxbytes
3370 self.aton = aton
3371 self.ntoa = ntoa
3372 Field.__init__(self, name, default, "%is" % self.maxbytes)
3373 if length_from is None:
3374 length_from = lambda x: 0
3375 self.length_from = length_from
3377 def _numbytes(self, pfxlen):
3378 # type: (int) -> int
3379 wbits = self.wordbytes * 8
3380 return ((pfxlen + (wbits - 1)) // wbits) * self.wordbytes
3382 def h2i(self, pkt, x):
3383 # type: (Optional[Packet], str) -> Tuple[str, int]
3384 # "fc00:1::1/64" -> ("fc00:1::1", 64)
3385 [pfx, pfxlen] = x.split('/')
3386 self.aton(pfx) # check for validity
3387 return (pfx, int(pfxlen))
3389 def i2h(self, pkt, x):
3390 # type: (Optional[Packet], Tuple[str, int]) -> str
3391 # ("fc00:1::1", 64) -> "fc00:1::1/64"
3392 (pfx, pfxlen) = x
3393 return "%s/%i" % (pfx, pfxlen)
3395 def i2m(self,
3396 pkt, # type: Optional[Packet]
3397 x # type: Optional[Tuple[str, int]]
3398 ):
3399 # type: (...) -> Tuple[bytes, int]
3400 # ("fc00:1::1", 64) -> (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) # noqa: E501
3401 if x is None:
3402 pfx, pfxlen = "", 0
3403 else:
3404 (pfx, pfxlen) = x
3405 s = self.aton(pfx)
3406 return (s[:self._numbytes(pfxlen)], pfxlen)
3408 def m2i(self, pkt, x):
3409 # type: (Optional[Packet], Tuple[bytes, int]) -> Tuple[str, int]
3410 # (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) -> ("fc00:1::1", 64) # noqa: E501
3411 (s, pfxlen) = x
3413 if len(s) < self.maxbytes:
3414 s = s + (b"\0" * (self.maxbytes - len(s)))
3415 return (self.ntoa(s), pfxlen)
3417 def any2i(self, pkt, x):
3418 # type: (Optional[Packet], Optional[Any]) -> Tuple[str, int]
3419 if x is None:
3420 return (self.ntoa(b"\0" * self.maxbytes), 1)
3422 return self.h2i(pkt, x)
3424 def i2len(self, pkt, x):
3425 # type: (Packet, Tuple[str, int]) -> int
3426 (_, pfxlen) = x
3427 return pfxlen
3429 def addfield(self, pkt, s, val):
3430 # type: (Packet, bytes, Optional[Tuple[str, int]]) -> bytes
3431 (rawpfx, pfxlen) = self.i2m(pkt, val)
3432 fmt = "!%is" % self._numbytes(pfxlen)
3433 return s + struct.pack(fmt, rawpfx)
3435 def getfield(self, pkt, s):
3436 # type: (Packet, bytes) -> Tuple[bytes, Tuple[str, int]]
3437 pfxlen = self.length_from(pkt)
3438 numbytes = self._numbytes(pfxlen)
3439 fmt = "!%is" % numbytes
3440 return s[numbytes:], self.m2i(pkt, (struct.unpack(fmt, s[:numbytes])[0], pfxlen)) # noqa: E501
3443class IPPrefixField(_IPPrefixFieldBase):
3444 def __init__(
3445 self,
3446 name, # type: str
3447 default, # type: Tuple[str, int]
3448 wordbytes=1, # type: int
3449 length_from=None # type: Optional[Callable[[Packet], int]]
3450 ):
3451 _IPPrefixFieldBase.__init__(
3452 self,
3453 name,
3454 default,
3455 wordbytes,
3456 4,
3457 inet_aton,
3458 inet_ntoa,
3459 length_from
3460 )
3463class IP6PrefixField(_IPPrefixFieldBase):
3464 def __init__(
3465 self,
3466 name, # type: str
3467 default, # type: Tuple[str, int]
3468 wordbytes=1, # type: int
3469 length_from=None # type: Optional[Callable[[Packet], int]]
3470 ):
3471 # type: (...) -> None
3472 _IPPrefixFieldBase.__init__(
3473 self,
3474 name,
3475 default,
3476 wordbytes,
3477 16,
3478 lambda a: inet_pton(socket.AF_INET6, a),
3479 lambda n: inet_ntop(socket.AF_INET6, n),
3480 length_from
3481 )
3484class UTCTimeField(Field[float, int]):
3485 __slots__ = ["epoch", "delta", "strf",
3486 "use_msec", "use_micro", "use_nano", "custom_scaling"]
3488 def __init__(self,
3489 name, # type: str
3490 default, # type: int
3491 use_msec=False, # type: bool
3492 use_micro=False, # type: bool
3493 use_nano=False, # type: bool
3494 epoch=None, # type: Optional[Tuple[int, int, int, int, int, int, int, int, int]] # noqa: E501
3495 strf="%a, %d %b %Y %H:%M:%S %z", # type: str
3496 custom_scaling=None, # type: Optional[int]
3497 fmt="I" # type: str
3498 ):
3499 # type: (...) -> None
3500 Field.__init__(self, name, default, fmt=fmt)
3501 mk_epoch = EPOCH if epoch is None else calendar.timegm(epoch)
3502 self.epoch = mk_epoch
3503 self.delta = mk_epoch - EPOCH
3504 self.strf = strf
3505 self.use_msec = use_msec
3506 self.use_micro = use_micro
3507 self.use_nano = use_nano
3508 self.custom_scaling = custom_scaling
3510 def i2repr(self, pkt, x):
3511 # type: (Optional[Packet], float) -> str
3512 if x is None:
3513 x = time.time() - self.delta
3514 elif self.use_msec:
3515 x = x / 1e3
3516 elif self.use_micro:
3517 x = x / 1e6
3518 elif self.use_nano:
3519 x = x / 1e9
3520 elif self.custom_scaling:
3521 x = x / self.custom_scaling
3522 x += self.delta
3523 # To make negative timestamps work on all plateforms (e.g. Windows),
3524 # we need a trick.
3525 t = (
3526 datetime.datetime(1970, 1, 1) +
3527 datetime.timedelta(seconds=x)
3528 ).strftime(self.strf)
3529 return "%s (%d)" % (t, int(x))
3531 def i2m(self, pkt, x):
3532 # type: (Optional[Packet], Optional[float]) -> int
3533 if x is None:
3534 x = time.time() - self.delta
3535 if self.use_msec:
3536 x = x * 1e3
3537 elif self.use_micro:
3538 x = x * 1e6
3539 elif self.use_nano:
3540 x = x * 1e9
3541 elif self.custom_scaling:
3542 x = x * self.custom_scaling
3543 return int(x)
3544 return int(x)
3547class SecondsIntField(Field[float, int]):
3548 __slots__ = ["use_msec", "use_micro", "use_nano"]
3550 def __init__(self, name, default,
3551 use_msec=False,
3552 use_micro=False,
3553 use_nano=False):
3554 # type: (str, int, bool, bool, bool) -> None
3555 Field.__init__(self, name, default, "I")
3556 self.use_msec = use_msec
3557 self.use_micro = use_micro
3558 self.use_nano = use_nano
3560 def i2repr(self, pkt, x):
3561 # type: (Optional[Packet], Optional[float]) -> str
3562 if x is None:
3563 y = 0 # type: Union[int, float]
3564 elif self.use_msec:
3565 y = x / 1e3
3566 elif self.use_micro:
3567 y = x / 1e6
3568 elif self.use_nano:
3569 y = x / 1e9
3570 else:
3571 y = x
3572 return "%s sec" % y
3575class _ScalingField(object):
3576 def __init__(self,
3577 name, # type: str
3578 default, # type: float
3579 scaling=1, # type: Union[int, float]
3580 unit="", # type: str
3581 offset=0, # type: Union[int, float]
3582 ndigits=3, # type: int
3583 fmt="B", # type: str
3584 ):
3585 # type: (...) -> None
3586 self.scaling = scaling
3587 self.unit = unit
3588 self.offset = offset
3589 self.ndigits = ndigits
3590 Field.__init__(self, name, default, fmt) # type: ignore
3592 def i2m(self,
3593 pkt, # type: Optional[Packet]
3594 x # type: Optional[Union[int, float]]
3595 ):
3596 # type: (...) -> Union[int, float]
3597 if x is None:
3598 x = 0
3599 x = (x - self.offset) / self.scaling
3600 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore
3601 x = int(round(x))
3602 return x
3604 def m2i(self, pkt, x):
3605 # type: (Optional[Packet], Union[int, float]) -> Union[int, float]
3606 x = x * self.scaling + self.offset
3607 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore
3608 x = round(x, self.ndigits)
3609 return x
3611 def any2i(self, pkt, x):
3612 # type: (Optional[Packet], Any) -> Union[int, float]
3613 if isinstance(x, (str, bytes)):
3614 x = struct.unpack(self.fmt, bytes_encode(x))[0] # type: ignore
3615 x = self.m2i(pkt, x)
3616 if not isinstance(x, (int, float)):
3617 raise ValueError("Unknown type")
3618 return x
3620 def i2repr(self, pkt, x):
3621 # type: (Optional[Packet], Union[int, float]) -> str
3622 return "%s %s" % (
3623 self.i2h(pkt, x), # type: ignore
3624 self.unit
3625 )
3627 def randval(self):
3628 # type: () -> RandFloat
3629 value = Field.randval(self) # type: ignore
3630 if value is not None:
3631 min_val = round(value.min * self.scaling + self.offset,
3632 self.ndigits)
3633 max_val = round(value.max * self.scaling + self.offset,
3634 self.ndigits)
3636 return RandFloat(min(min_val, max_val), max(min_val, max_val))
3639class ScalingField(_ScalingField,
3640 Field[Union[int, float], Union[int, float]]):
3641 """ Handle physical values which are scaled and/or offset for communication
3643 Example:
3644 >>> from scapy.packet import Packet
3645 >>> class ScalingFieldTest(Packet):
3646 fields_desc = [ScalingField('data', 0, scaling=0.1, offset=-1, unit='mV')] # noqa: E501
3647 >>> ScalingFieldTest(data=10).show2()
3648 ###[ ScalingFieldTest ]###
3649 data= 10.0 mV
3650 >>> hexdump(ScalingFieldTest(data=10))
3651 0000 6E n
3652 >>> hexdump(ScalingFieldTest(data=b"\x6D"))
3653 0000 6D m
3654 >>> ScalingFieldTest(data=b"\x6D").show2()
3655 ###[ ScalingFieldTest ]###
3656 data= 9.9 mV
3658 bytes(ScalingFieldTest(...)) will produce 0x6E in this example.
3659 0x6E is 110 (decimal). This is calculated through the scaling factor
3660 and the offset. "data" was set to 10, which means, we want to transfer
3661 the physical value 10 mV. To calculate the value, which has to be
3662 sent on the bus, the offset has to subtracted and the scaling has to be
3663 applied by division through the scaling factor.
3664 bytes = (data - offset) / scaling
3665 bytes = ( 10 - (-1) ) / 0.1
3666 bytes = 110 = 0x6E
3668 If you want to force a certain internal value, you can assign a byte-
3669 string to the field (data=b"\x6D"). If a string of a bytes object is
3670 given to the field, no internal value conversion will be applied
3672 :param name: field's name
3673 :param default: default value for the field
3674 :param scaling: scaling factor for the internal value conversion
3675 :param unit: string for the unit representation of the internal value
3676 :param offset: value to offset the internal value during conversion
3677 :param ndigits: number of fractional digits for the internal conversion
3678 :param fmt: struct.pack format used to parse and serialize the internal value from and to machine representation # noqa: E501
3679 """
3682class BitScalingField(_ScalingField, BitField): # type: ignore
3683 """
3684 A ScalingField that is a BitField
3685 """
3687 def __init__(self, name, default, size, *args, **kwargs):
3688 # type: (str, int, int, *Any, **Any) -> None
3689 _ScalingField.__init__(self, name, default, *args, **kwargs)
3690 BitField.__init__(self, name, default, size) # type: ignore
3693class OUIField(X3BytesField):
3694 """
3695 A field designed to carry a OUI (3 bytes)
3696 """
3698 def i2repr(self, pkt, val):
3699 # type: (Optional[Packet], int) -> str
3700 by_val = struct.pack("!I", val or 0)[1:]
3701 oui = str2mac(by_val + b"\0" * 3)[:8]
3702 if conf.manufdb:
3703 fancy = conf.manufdb._get_manuf(oui)
3704 if fancy != oui:
3705 return "%s (%s)" % (fancy, oui)
3706 return oui
3709class UUIDField(Field[UUID, bytes]):
3710 """Field for UUID storage, wrapping Python's uuid.UUID type.
3712 The internal storage format of this field is ``uuid.UUID`` from the Python
3713 standard library.
3715 There are three formats (``uuid_fmt``) for this field type:
3717 * ``FORMAT_BE`` (default): the UUID is six fields in big-endian byte order,
3718 per RFC 4122.
3720 This format is used by DHCPv6 (RFC 6355) and most network protocols.
3722 * ``FORMAT_LE``: the UUID is six fields, with ``time_low``, ``time_mid``
3723 and ``time_high_version`` in little-endian byte order. This *doesn't*
3724 change the arrangement of the fields from RFC 4122.
3726 This format is used by Microsoft's COM/OLE libraries.
3728 * ``FORMAT_REV``: the UUID is a single 128-bit integer in little-endian
3729 byte order. This *changes the arrangement* of the fields.
3731 This format is used by Bluetooth Low Energy.
3733 Note: You should use the constants here.
3735 The "human encoding" of this field supports a number of different input
3736 formats, and wraps Python's ``uuid.UUID`` library appropriately:
3738 * Given a bytearray, bytes or str of 16 bytes, this class decodes UUIDs in
3739 wire format.
3741 * Given a bytearray, bytes or str of other lengths, this delegates to
3742 ``uuid.UUID`` the Python standard library. This supports a number of
3743 different encoding options -- see the Python standard library
3744 documentation for more details.
3746 * Given an int or long, presumed to be a 128-bit integer to pass to
3747 ``uuid.UUID``.
3749 * Given a tuple:
3751 * Tuples of 11 integers are treated as having the last 6 integers forming
3752 the ``node`` field, and are merged before being passed as a tuple of 6
3753 integers to ``uuid.UUID``.
3755 * Otherwise, the tuple is passed as the ``fields`` parameter to
3756 ``uuid.UUID`` directly without modification.
3758 ``uuid.UUID`` expects a tuple of 6 integers.
3760 Other types (such as ``uuid.UUID``) are passed through.
3761 """
3763 __slots__ = ["uuid_fmt"]
3765 FORMAT_BE = 0
3766 FORMAT_LE = 1
3767 FORMAT_REV = 2
3769 # Change this when we get new formats
3770 FORMATS = (FORMAT_BE, FORMAT_LE, FORMAT_REV)
3772 def __init__(self, name, default, uuid_fmt=FORMAT_BE):
3773 # type: (str, Optional[int], int) -> None
3774 self.uuid_fmt = uuid_fmt
3775 self._check_uuid_fmt()
3776 Field.__init__(self, name, default, "16s")
3778 def _check_uuid_fmt(self):
3779 # type: () -> None
3780 """Checks .uuid_fmt, and raises an exception if it is not valid."""
3781 if self.uuid_fmt not in UUIDField.FORMATS:
3782 raise FieldValueRangeException(
3783 "Unsupported uuid_fmt ({})".format(self.uuid_fmt))
3785 def i2m(self, pkt, x):
3786 # type: (Optional[Packet], Optional[UUID]) -> bytes
3787 self._check_uuid_fmt()
3788 if x is None:
3789 return b'\0' * 16
3790 if self.uuid_fmt == UUIDField.FORMAT_BE:
3791 return x.bytes
3792 elif self.uuid_fmt == UUIDField.FORMAT_LE:
3793 return x.bytes_le
3794 elif self.uuid_fmt == UUIDField.FORMAT_REV:
3795 return x.bytes[::-1]
3796 else:
3797 raise FieldAttributeException("Unknown fmt")
3799 def m2i(self,
3800 pkt, # type: Optional[Packet]
3801 x, # type: bytes
3802 ):
3803 # type: (...) -> UUID
3804 self._check_uuid_fmt()
3805 if self.uuid_fmt == UUIDField.FORMAT_BE:
3806 return UUID(bytes=x)
3807 elif self.uuid_fmt == UUIDField.FORMAT_LE:
3808 return UUID(bytes_le=x)
3809 elif self.uuid_fmt == UUIDField.FORMAT_REV:
3810 return UUID(bytes=x[::-1])
3811 else:
3812 raise FieldAttributeException("Unknown fmt")
3814 def any2i(self,
3815 pkt, # type: Optional[Packet]
3816 x # type: Any # noqa: E501
3817 ):
3818 # type: (...) -> Optional[UUID]
3819 # Python's uuid doesn't handle bytearray, so convert to an immutable
3820 # type first.
3821 if isinstance(x, bytearray):
3822 x = bytes_encode(x)
3824 if isinstance(x, int):
3825 u = UUID(int=x)
3826 elif isinstance(x, tuple):
3827 if len(x) == 11:
3828 # For compatibility with dce_rpc: this packs into a tuple where
3829 # elements 7..10 are the 48-bit node ID.
3830 node = 0
3831 for i in x[5:]:
3832 node = (node << 8) | i
3834 x = (x[0], x[1], x[2], x[3], x[4], node)
3836 u = UUID(fields=x)
3837 elif isinstance(x, (str, bytes)):
3838 if len(x) == 16:
3839 # Raw bytes
3840 u = self.m2i(pkt, bytes_encode(x))
3841 else:
3842 u = UUID(plain_str(x))
3843 elif isinstance(x, (UUID, RandUUID)):
3844 u = cast(UUID, x)
3845 else:
3846 return None
3847 return u
3849 @staticmethod
3850 def randval():
3851 # type: () -> RandUUID
3852 return RandUUID()
3855class UUIDEnumField(UUIDField, _EnumField[UUID]):
3856 __slots__ = EnumField.__slots__
3858 def __init__(self, name, default, enum, uuid_fmt=0):
3859 # type: (str, Optional[int], Any, int) -> None
3860 _EnumField.__init__(self, name, default, enum, "16s") # type: ignore
3861 UUIDField.__init__(self, name, default, uuid_fmt=uuid_fmt)
3863 def any2i(self, pkt, x):
3864 # type: (Optional[Packet], Any) -> UUID
3865 return _EnumField.any2i(self, pkt, x) # type: ignore
3867 def i2repr(self,
3868 pkt, # type: Optional[Packet]
3869 x, # type: UUID
3870 ):
3871 # type: (...) -> Any
3872 return _EnumField.i2repr(self, pkt, x)
3875class BitExtendedField(Field[Optional[int], bytes]):
3876 """
3877 Bit Extended Field
3879 This type of field has a variable number of bytes. Each byte is defined
3880 as follows:
3881 - 7 bits of data
3882 - 1 bit an an extension bit:
3884 + 0 means it is last byte of the field ("stopping bit")
3885 + 1 means there is another byte after this one ("forwarding bit")
3887 To get the actual data, it is necessary to hop the binary data byte per
3888 byte and to check the extension bit until 0
3889 """
3891 __slots__ = ["extension_bit"]
3893 def prepare_byte(self, x):
3894 # type: (int) -> int
3895 # Moves the forwarding bit to the LSB
3896 x = int(x)
3897 fx_bit = (x & 2**self.extension_bit) >> self.extension_bit
3898 lsb_bits = x & 2**self.extension_bit - 1
3899 msb_bits = x >> (self.extension_bit + 1)
3900 x = (msb_bits << (self.extension_bit + 1)) + (lsb_bits << 1) + fx_bit
3901 return x
3903 def str2extended(self, x=b""):
3904 # type: (bytes) -> Tuple[bytes, Optional[int]]
3905 # For convenience, we reorder the byte so that the forwarding
3906 # bit is always the LSB. We then apply the same algorithm
3907 # whatever the real forwarding bit position
3909 # First bit is the stopping bit at zero
3910 bits = 0b0
3911 end = None
3913 # We retrieve 7 bits.
3914 # If "forwarding bit" is 1 then we continue on another byte
3915 i = 0
3916 for c in bytearray(x):
3917 c = self.prepare_byte(c)
3918 bits = bits << 7 | (int(c) >> 1)
3919 if not int(c) & 0b1:
3920 end = x[i + 1:]
3921 break
3922 i = i + 1
3923 if end is None:
3924 # We reached the end of the data but there was no
3925 # "ending bit". This is not normal.
3926 return b"", None
3927 else:
3928 return end, bits
3930 def extended2str(self, x):
3931 # type: (Optional[int]) -> bytes
3932 if x is None:
3933 return b""
3934 x = int(x)
3935 s = []
3936 LSByte = True
3937 FX_Missing = True
3938 bits = 0b0
3939 i = 0
3940 while (x > 0 or FX_Missing):
3941 if i == 8:
3942 # End of byte
3943 i = 0
3944 s.append(bits)
3945 bits = 0b0
3946 FX_Missing = True
3947 else:
3948 if i % 8 == self.extension_bit:
3949 # This is extension bit
3950 if LSByte:
3951 bits = bits | 0b0 << i
3952 LSByte = False
3953 else:
3954 bits = bits | 0b1 << i
3955 FX_Missing = False
3956 else:
3957 bits = bits | (x & 0b1) << i
3958 x = x >> 1
3959 # Still some bits
3960 i = i + 1
3961 s.append(bits)
3963 result = "".encode()
3964 for x in s[:: -1]:
3965 result = result + struct.pack(">B", x)
3966 return result
3968 def __init__(self, name, default, extension_bit):
3969 # type: (str, Optional[Any], int) -> None
3970 Field.__init__(self, name, default, "B")
3971 self.extension_bit = extension_bit
3973 def i2m(self, pkt, x):
3974 # type: (Optional[Any], Optional[int]) -> bytes
3975 return self.extended2str(x)
3977 def m2i(self, pkt, x):
3978 # type: (Optional[Any], bytes) -> Optional[int]
3979 return self.str2extended(x)[1]
3981 def addfield(self, pkt, s, val):
3982 # type: (Optional[Packet], bytes, Optional[int]) -> bytes
3983 return s + self.i2m(pkt, val)
3985 def getfield(self, pkt, s):
3986 # type: (Optional[Any], bytes) -> Tuple[bytes, Optional[int]]
3987 return self.str2extended(s)
3990class LSBExtendedField(BitExtendedField):
3991 # This is a BitExtendedField with the extension bit on LSB
3992 def __init__(self, name, default):
3993 # type: (str, Optional[Any]) -> None
3994 BitExtendedField.__init__(self, name, default, extension_bit=0)
3997class MSBExtendedField(BitExtendedField):
3998 # This is a BitExtendedField with the extension bit on MSB
3999 def __init__(self, name, default):
4000 # type: (str, Optional[Any]) -> None
4001 BitExtendedField.__init__(self, name, default, extension_bit=7)