Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/fields.py: 69%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Copyright (C) Michael Farrell <micolous+git@gmail.com>
8"""
9Fields: basic data structures that make up parts of packets.
10"""
12import calendar
13import collections
14import copy
15import datetime
16import inspect
17import math
18import socket
19import struct
20import time
21import warnings
23from types import MethodType
24from uuid import UUID
25from enum import Enum
27from scapy.config import conf
28from scapy.dadict import DADict
29from scapy.volatile import RandBin, RandByte, RandEnumKeys, RandInt, \
30 RandIP, RandIP6, RandLong, RandMAC, RandNum, RandShort, RandSInt, \
31 RandSByte, RandTermString, RandUUID, VolatileValue, RandSShort, \
32 RandSLong, RandFloat
33from scapy.data import EPOCH
34from scapy.error import log_runtime, Scapy_Exception
35from scapy.compat import bytes_hex, plain_str, raw, bytes_encode
36from scapy.pton_ntop import inet_ntop, inet_pton
37from scapy.utils import inet_aton, inet_ntoa, lhex, mac2str, str2mac, EDecimal
38from scapy.utils6 import in6_6to4ExtractAddr, in6_isaddr6to4, \
39 in6_isaddrTeredo, in6_ptop, Net6, teredoAddrExtractInfo
40from scapy.base_classes import (
41 _ScopedIP,
42 BasePacket,
43 Field_metaclass,
44 Net,
45 ScopedIP,
46)
48# Typing imports
49from typing import (
50 Any,
51 AnyStr,
52 Callable,
53 Dict,
54 List,
55 Generic,
56 Optional,
57 Set,
58 Tuple,
59 Type,
60 TypeVar,
61 Union,
62 # func
63 cast,
64 TYPE_CHECKING,
65)
67if TYPE_CHECKING:
68 # Do not import on runtime ! (import loop)
69 from scapy.packet import Packet
72class RawVal:
73 r"""
74 A raw value that will not be processed by the field and inserted
75 as-is in the packet string.
77 Example::
79 >>> a = IP(len=RawVal("####"))
80 >>> bytes(a)
81 b'F\x00####\x00\x01\x00\x005\xb5\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00'
83 """
85 def __init__(self, val=b""):
86 # type: (bytes) -> None
87 self.val = bytes_encode(val)
89 def __str__(self):
90 # type: () -> str
91 return str(self.val)
93 def __bytes__(self):
94 # type: () -> bytes
95 return self.val
97 def __len__(self):
98 # type: () -> int
99 return len(self.val)
101 def __repr__(self):
102 # type: () -> str
103 return "<RawVal [%r]>" % self.val
106class ObservableDict(Dict[int, str]):
107 """
108 Helper class to specify a protocol extendable for runtime modifications
109 """
111 def __init__(self, *args, **kw):
112 # type: (*Dict[int, str], **Any) -> None
113 self.observers = [] # type: List[_EnumField[Any]]
114 super(ObservableDict, self).__init__(*args, **kw)
116 def observe(self, observer):
117 # type: (_EnumField[Any]) -> None
118 self.observers.append(observer)
120 def __setitem__(self, key, value):
121 # type: (int, str) -> None
122 for o in self.observers:
123 o.notify_set(self, key, value)
124 super(ObservableDict, self).__setitem__(key, value)
126 def __delitem__(self, key):
127 # type: (int) -> None
128 for o in self.observers:
129 o.notify_del(self, key)
130 super(ObservableDict, self).__delitem__(key)
132 def update(self, anotherDict): # type: ignore
133 for k in anotherDict:
134 self[k] = anotherDict[k]
137############
138# Fields #
139############
141I = TypeVar('I') # Internal storage # noqa: E741
142M = TypeVar('M') # Machine storage
145class Field(Generic[I, M], metaclass=Field_metaclass):
146 """
147 For more information on how this works, please refer to the
148 'Adding new protocols' chapter in the online documentation:
150 https://scapy.readthedocs.io/en/stable/build_dissect.html
151 """
152 __slots__ = [
153 "name",
154 "fmt",
155 "default",
156 "sz",
157 "owners",
158 "struct"
159 ]
160 islist = 0
161 ismutable = False
162 holds_packets = 0
164 def __init__(self, name, default, fmt="H"):
165 # type: (str, Any, str) -> None
166 if not isinstance(name, str):
167 raise ValueError("name should be a string")
168 self.name = name
169 if fmt[0] in "@=<>!":
170 self.fmt = fmt
171 else:
172 self.fmt = "!" + fmt
173 self.struct = struct.Struct(self.fmt)
174 self.default = self.any2i(None, default)
175 self.sz = struct.calcsize(self.fmt) # type: int
176 self.owners = [] # type: List[Type[Packet]]
178 def register_owner(self, cls):
179 # type: (Type[Packet]) -> None
180 self.owners.append(cls)
182 def i2len(self,
183 pkt, # type: Packet
184 x, # type: Any
185 ):
186 # type: (...) -> int
187 """Convert internal value to a length usable by a FieldLenField"""
188 if isinstance(x, RawVal):
189 return len(x)
190 return self.sz
192 def i2count(self, pkt, x):
193 # type: (Optional[Packet], I) -> int
194 """Convert internal value to a number of elements usable by a FieldLenField.
195 Always 1 except for list fields"""
196 return 1
198 def h2i(self, pkt, x):
199 # type: (Optional[Packet], Any) -> I
200 """Convert human value to internal value"""
201 return cast(I, x)
203 def i2h(self, pkt, x):
204 # type: (Optional[Packet], I) -> Any
205 """Convert internal value to human value"""
206 return x
208 def m2i(self, pkt, x):
209 # type: (Optional[Packet], M) -> I
210 """Convert machine value to internal value"""
211 return cast(I, x)
213 def i2m(self, pkt, x):
214 # type: (Optional[Packet], Optional[I]) -> M
215 """Convert internal value to machine value"""
216 if x is None:
217 return cast(M, 0)
218 elif isinstance(x, str):
219 return cast(M, bytes_encode(x))
220 return cast(M, x)
222 def any2i(self, pkt, x):
223 # type: (Optional[Packet], Any) -> Optional[I]
224 """Try to understand the most input values possible and make an internal value from them""" # noqa: E501
225 return self.h2i(pkt, x)
227 def i2repr(self, pkt, x):
228 # type: (Optional[Packet], I) -> str
229 """Convert internal value to a nice representation"""
230 return repr(self.i2h(pkt, x))
232 def addfield(self, pkt, s, val):
233 # type: (Packet, bytes, Optional[I]) -> bytes
234 """Add an internal value to a string
236 Copy the network representation of field `val` (belonging to layer
237 `pkt`) to the raw string packet `s`, and return the new string packet.
238 """
239 try:
240 return s + self.struct.pack(self.i2m(pkt, val))
241 except struct.error as ex:
242 raise ValueError(
243 "Incorrect type of value for field %s:\n" % self.name +
244 "struct.error('%s')\n" % ex +
245 "To inject bytes into the field regardless of the type, " +
246 "use RawVal. See help(RawVal)"
247 )
249 def getfield(self, pkt, s):
250 # type: (Packet, bytes) -> Tuple[bytes, I]
251 """Extract an internal value from a string
253 Extract from the raw packet `s` the field value belonging to layer
254 `pkt`.
256 Returns a two-element list,
257 first the raw packet string after having removed the extracted field,
258 second the extracted field itself in internal representation.
259 """
260 return s[self.sz:], self.m2i(pkt, self.struct.unpack(s[:self.sz])[0])
262 def do_copy(self, x):
263 # type: (I) -> I
264 if isinstance(x, list):
265 x = x[:] # type: ignore
266 for i in range(len(x)):
267 if isinstance(x[i], BasePacket):
268 x[i] = x[i].copy()
269 return x # type: ignore
270 if hasattr(x, "copy"):
271 return x.copy() # type: ignore
272 return x
274 def __repr__(self):
275 # type: () -> str
276 return "<%s (%s).%s>" % (
277 self.__class__.__name__,
278 ",".join(x.__name__ for x in self.owners),
279 self.name
280 )
282 def copy(self):
283 # type: () -> Field[I, M]
284 return copy.copy(self)
286 def randval(self):
287 # type: () -> VolatileValue[Any]
288 """Return a volatile object whose value is both random and suitable for this field""" # noqa: E501
289 fmtt = self.fmt[-1]
290 if fmtt in "BbHhIiQq":
291 return {"B": RandByte, "b": RandSByte,
292 "H": RandShort, "h": RandSShort,
293 "I": RandInt, "i": RandSInt,
294 "Q": RandLong, "q": RandSLong}[fmtt]()
295 elif fmtt == "s":
296 if self.fmt[0] in "0123456789":
297 value = int(self.fmt[:-1])
298 else:
299 value = int(self.fmt[1:-1])
300 return RandBin(value)
301 else:
302 raise ValueError(
303 "no random class for [%s] (fmt=%s)." % (
304 self.name, self.fmt
305 )
306 )
309class _FieldContainer(object):
310 """
311 A field that acts as a container for another field
312 """
313 __slots__ = ["fld"]
315 def __getattr__(self, attr):
316 # type: (str) -> Any
317 return getattr(self.fld, attr)
320AnyField = Union[Field[Any, Any], _FieldContainer]
323class Emph(_FieldContainer):
324 """Empathize sub-layer for display"""
325 __slots__ = ["fld"]
327 def __init__(self, fld):
328 # type: (Any) -> None
329 self.fld = fld
331 def __eq__(self, other):
332 # type: (Any) -> bool
333 return bool(self.fld == other)
335 def __hash__(self):
336 # type: () -> int
337 return hash(self.fld)
340class MayEnd(_FieldContainer):
341 """
342 Allow packet dissection to end after the dissection of this field
343 if no bytes are left.
345 A good example would be a length field that can be 0 or a set value,
346 and where it would be too annoying to use multiple ConditionalFields
348 Important note: any field below this one MUST default
349 to an empty value, else the behavior will be unexpected.
350 """
351 __slots__ = ["fld"]
353 def __init__(self, fld):
354 # type: (Any) -> None
355 self.fld = fld
357 def __eq__(self, other):
358 # type: (Any) -> bool
359 return bool(self.fld == other)
361 def __hash__(self):
362 # type: () -> int
363 return hash(self.fld)
366class ActionField(_FieldContainer):
367 __slots__ = ["fld", "_action_method", "_privdata"]
369 def __init__(self, fld, action_method, **kargs):
370 # type: (Field[Any, Any], str, **Any) -> None
371 self.fld = fld
372 self._action_method = action_method
373 self._privdata = kargs
375 def any2i(self, pkt, val):
376 # type: (Optional[Packet], int) -> Any
377 getattr(pkt, self._action_method)(val, self.fld, **self._privdata)
378 return getattr(self.fld, "any2i")(pkt, val)
381class ConditionalField(_FieldContainer):
382 __slots__ = ["fld", "cond"]
384 def __init__(self,
385 fld, # type: AnyField
386 cond # type: Callable[[Packet], bool]
387 ):
388 # type: (...) -> None
389 self.fld = fld
390 self.cond = cond
392 def _evalcond(self, pkt):
393 # type: (Packet) -> bool
394 return bool(self.cond(pkt))
396 def any2i(self, pkt, x):
397 # type: (Optional[Packet], Any) -> Any
398 # BACKWARD COMPATIBILITY
399 # Note: we shouldn't need this function. (it's not correct)
400 # However, having i2h implemented (#2364), it changes the default
401 # behavior and broke all packets that wrongly use two ConditionalField
402 # with the same name. Those packets are the problem: they are wrongly
403 # built (they should either be reusing the same conditional field, or
404 # using a MultipleTypeField).
405 # But I don't want to dive into fixing all of them just yet,
406 # so for now, let's keep this this way, even though it's not correct.
407 if type(self.fld) is Field:
408 return x
409 return self.fld.any2i(pkt, x)
411 def i2h(self, pkt, val):
412 # type: (Optional[Packet], Any) -> Any
413 if pkt and not self._evalcond(pkt):
414 return None
415 return self.fld.i2h(pkt, val)
417 def getfield(self, pkt, s):
418 # type: (Packet, bytes) -> Tuple[bytes, Any]
419 if self._evalcond(pkt):
420 return self.fld.getfield(pkt, s)
421 else:
422 return s, None
424 def addfield(self, pkt, s, val):
425 # type: (Packet, bytes, Any) -> bytes
426 if self._evalcond(pkt):
427 return self.fld.addfield(pkt, s, val)
428 else:
429 return s
431 def __getattr__(self, attr):
432 # type: (str) -> Any
433 return getattr(self.fld, attr)
436class MultipleTypeField(_FieldContainer):
437 """
438 MultipleTypeField are used for fields that can be implemented by
439 various Field subclasses, depending on conditions on the packet.
441 It is initialized with `flds` and `dflt`.
443 :param dflt: is the default field type, to be used when none of the
444 conditions matched the current packet.
445 :param flds: is a list of tuples (`fld`, `cond`) or (`fld`, `cond`, `hint`)
446 where `fld` if a field type, and `cond` a "condition" to
447 determine if `fld` is the field type that should be used.
449 ``cond`` is either:
451 - a callable `cond_pkt` that accepts one argument (the packet) and
452 returns True if `fld` should be used, False otherwise.
453 - a tuple (`cond_pkt`, `cond_pkt_val`), where `cond_pkt` is the same
454 as in the previous case and `cond_pkt_val` is a callable that
455 accepts two arguments (the packet, and the value to be set) and
456 returns True if `fld` should be used, False otherwise.
458 See scapy.layers.l2.ARP (type "help(ARP)" in Scapy) for an example of
459 use.
460 """
462 __slots__ = ["flds", "dflt", "hints", "name", "default"]
464 def __init__(
465 self,
466 flds: List[Union[
467 Tuple[Field[Any, Any], Any, str],
468 Tuple[Field[Any, Any], Any]
469 ]],
470 dflt: Field[Any, Any]
471 ) -> None:
472 self.hints = {
473 x[0]: x[2]
474 for x in flds
475 if len(x) == 3
476 }
477 self.flds = [
478 (x[0], x[1]) for x in flds
479 ]
480 self.dflt = dflt
481 self.default = None # So that we can detect changes in defaults
482 self.name = self.dflt.name
483 if any(x[0].name != self.name for x in self.flds):
484 warnings.warn(
485 ("All fields should have the same name in a "
486 "MultipleTypeField (%s). Use hints.") % self.name,
487 SyntaxWarning
488 )
490 def _iterate_fields_cond(self, pkt, val, use_val):
491 # type: (Optional[Packet], Any, bool) -> Field[Any, Any]
492 """Internal function used by _find_fld_pkt & _find_fld_pkt_val"""
493 # Iterate through the fields
494 for fld, cond in self.flds:
495 if isinstance(cond, tuple):
496 if use_val:
497 if val is None:
498 val = self.dflt.default
499 if cond[1](pkt, val):
500 return fld
501 continue
502 else:
503 cond = cond[0]
504 if cond(pkt):
505 return fld
506 return self.dflt
508 def _find_fld_pkt(self, pkt):
509 # type: (Optional[Packet]) -> Field[Any, Any]
510 """Given a Packet instance `pkt`, returns the Field subclass to be
511used. If you know the value to be set (e.g., in .addfield()), use
512._find_fld_pkt_val() instead.
514 """
515 return self._iterate_fields_cond(pkt, None, False)
517 def _find_fld_pkt_val(self,
518 pkt, # type: Optional[Packet]
519 val, # type: Any
520 ):
521 # type: (...) -> Tuple[Field[Any, Any], Any]
522 """Given a Packet instance `pkt` and the value `val` to be set,
523returns the Field subclass to be used, and the updated `val` if necessary.
525 """
526 fld = self._iterate_fields_cond(pkt, val, True)
527 if val is None:
528 val = fld.default
529 return fld, val
531 def _find_fld(self):
532 # type: () -> Field[Any, Any]
533 """Returns the Field subclass to be used, depending on the Packet
534instance, or the default subclass.
536DEV: since the Packet instance is not provided, we have to use a hack
537to guess it. It should only be used if you cannot provide the current
538Packet instance (for example, because of the current Scapy API).
540If you have the current Packet instance, use ._find_fld_pkt_val() (if
541the value to set is also known) of ._find_fld_pkt() instead.
543 """
544 # Hack to preserve current Scapy API
545 # See https://stackoverflow.com/a/7272464/3223422
546 frame = inspect.currentframe().f_back.f_back # type: ignore
547 while frame is not None:
548 try:
549 pkt = frame.f_locals['self']
550 except KeyError:
551 pass
552 else:
553 if isinstance(pkt, tuple(self.dflt.owners)):
554 if not pkt.default_fields:
555 # Packet not initialized
556 return self.dflt
557 return self._find_fld_pkt(pkt)
558 frame = frame.f_back
559 return self.dflt
561 def getfield(self,
562 pkt, # type: Packet
563 s, # type: bytes
564 ):
565 # type: (...) -> Tuple[bytes, Any]
566 return self._find_fld_pkt(pkt).getfield(pkt, s)
568 def addfield(self, pkt, s, val):
569 # type: (Packet, bytes, Any) -> bytes
570 fld, val = self._find_fld_pkt_val(pkt, val)
571 return fld.addfield(pkt, s, val)
573 def any2i(self, pkt, val):
574 # type: (Optional[Packet], Any) -> Any
575 fld, val = self._find_fld_pkt_val(pkt, val)
576 return fld.any2i(pkt, val)
578 def h2i(self, pkt, val):
579 # type: (Optional[Packet], Any) -> Any
580 fld, val = self._find_fld_pkt_val(pkt, val)
581 return fld.h2i(pkt, val)
583 def i2h(self,
584 pkt, # type: Packet
585 val, # type: Any
586 ):
587 # type: (...) -> Any
588 fld, val = self._find_fld_pkt_val(pkt, val)
589 return fld.i2h(pkt, val)
591 def i2m(self, pkt, val):
592 # type: (Optional[Packet], Optional[Any]) -> Any
593 fld, val = self._find_fld_pkt_val(pkt, val)
594 return fld.i2m(pkt, val)
596 def i2len(self, pkt, val):
597 # type: (Packet, Any) -> int
598 fld, val = self._find_fld_pkt_val(pkt, val)
599 return fld.i2len(pkt, val)
601 def i2repr(self, pkt, val):
602 # type: (Optional[Packet], Any) -> str
603 fld, val = self._find_fld_pkt_val(pkt, val)
604 hint = ""
605 if fld in self.hints:
606 hint = " (%s)" % self.hints[fld]
607 return fld.i2repr(pkt, val) + hint
609 def register_owner(self, cls):
610 # type: (Type[Packet]) -> None
611 for fld, _ in self.flds:
612 fld.owners.append(cls)
613 self.dflt.owners.append(cls)
615 def get_fields_list(self):
616 # type: () -> List[Any]
617 return [self]
619 @property
620 def fld(self):
621 # type: () -> Field[Any, Any]
622 return self._find_fld()
625class PadField(_FieldContainer):
626 """Add bytes after the proxified field so that it ends at the specified
627 alignment from its beginning"""
628 __slots__ = ["fld", "_align", "_padwith"]
630 def __init__(self, fld, align, padwith=None):
631 # type: (AnyField, int, Optional[bytes]) -> None
632 self.fld = fld
633 self._align = align
634 self._padwith = padwith or b"\x00"
636 def padlen(self, flen, pkt):
637 # type: (int, Packet) -> int
638 return -flen % self._align
640 def getfield(self,
641 pkt, # type: Packet
642 s, # type: bytes
643 ):
644 # type: (...) -> Tuple[bytes, Any]
645 remain, val = self.fld.getfield(pkt, s)
646 padlen = self.padlen(len(s) - len(remain), pkt)
647 return remain[padlen:], val
649 def addfield(self,
650 pkt, # type: Packet
651 s, # type: bytes
652 val, # type: Any
653 ):
654 # type: (...) -> bytes
655 sval = self.fld.addfield(pkt, b"", val)
656 return s + sval + (
657 self.padlen(len(sval), pkt) * self._padwith
658 )
661class ReversePadField(PadField):
662 """Add bytes BEFORE the proxified field so that it starts at the specified
663 alignment from its beginning"""
665 def original_length(self, pkt):
666 # type: (Packet) -> int
667 return len(pkt.original)
669 def getfield(self,
670 pkt, # type: Packet
671 s, # type: bytes
672 ):
673 # type: (...) -> Tuple[bytes, Any]
674 # We need to get the length that has already been dissected
675 padlen = self.padlen(self.original_length(pkt) - len(s), pkt)
676 return self.fld.getfield(pkt, s[padlen:])
678 def addfield(self,
679 pkt, # type: Packet
680 s, # type: bytes
681 val, # type: Any
682 ):
683 # type: (...) -> bytes
684 sval = self.fld.addfield(pkt, b"", val)
685 return s + struct.pack("%is" % (
686 self.padlen(len(s), pkt)
687 ), self._padwith) + sval
690class TrailerBytes(bytes):
691 """
692 Reverses slice operations to take from the back of the packet,
693 not the front
694 """
696 def __getitem__(self, item): # type: ignore
697 # type: (Union[int, slice]) -> Union[int, bytes]
698 if isinstance(item, int):
699 if item < 0:
700 item = 1 + item
701 else:
702 item = len(self) - 1 - item
703 elif isinstance(item, slice):
704 start, stop, step = item.start, item.stop, item.step
705 new_start = -stop if stop else None
706 new_stop = -start if start else None
707 item = slice(new_start, new_stop, step)
708 return super(self.__class__, self).__getitem__(item)
711class TrailerField(_FieldContainer):
712 """Special Field that gets its value from the end of the *packet*
713 (Note: not layer, but packet).
715 Mostly used for FCS
716 """
717 __slots__ = ["fld"]
719 def __init__(self, fld):
720 # type: (Field[Any, Any]) -> None
721 self.fld = fld
723 # Note: this is ugly. Very ugly.
724 # Do not copy this crap elsewhere, so that if one day we get
725 # brave enough to refactor it, it'll be easier.
727 def getfield(self, pkt, s):
728 # type: (Packet, bytes) -> Tuple[bytes, int]
729 previous_post_dissect = pkt.post_dissect
731 def _post_dissect(self, s):
732 # type: (Packet, bytes) -> bytes
733 # Reset packet to allow post_build
734 self.raw_packet_cache = None
735 self.post_dissect = previous_post_dissect # type: ignore
736 return previous_post_dissect(s)
737 pkt.post_dissect = MethodType(_post_dissect, pkt) # type: ignore
738 s = TrailerBytes(s)
739 s, val = self.fld.getfield(pkt, s)
740 return bytes(s), val
742 def addfield(self, pkt, s, val):
743 # type: (Packet, bytes, Optional[int]) -> bytes
744 previous_post_build = pkt.post_build
745 value = self.fld.addfield(pkt, b"", val)
747 def _post_build(self, p, pay):
748 # type: (Packet, bytes, bytes) -> bytes
749 pay += value
750 self.post_build = previous_post_build # type: ignore
751 return previous_post_build(p, pay)
752 pkt.post_build = MethodType(_post_build, pkt) # type: ignore
753 return s
756class FCSField(TrailerField):
757 """
758 A FCS field that gets appended at the end of the *packet* (not layer).
759 """
761 def __init__(self, *args, **kwargs):
762 # type: (*Any, **Any) -> None
763 super(FCSField, self).__init__(Field(*args, **kwargs))
765 def i2repr(self, pkt, x):
766 # type: (Optional[Packet], int) -> str
767 return lhex(self.i2h(pkt, x))
770class DestField(Field[str, bytes]):
771 __slots__ = ["defaultdst"]
772 # Each subclass must have its own bindings attribute
773 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]]
775 def __init__(self, name, default):
776 # type: (str, str) -> None
777 self.defaultdst = default
779 def dst_from_pkt(self, pkt):
780 # type: (Packet) -> str
781 for addr, condition in self.bindings.get(pkt.payload.__class__, []):
782 try:
783 if all(pkt.payload.getfieldval(field) == value
784 for field, value in condition.items()):
785 return addr # type: ignore
786 except AttributeError:
787 pass
788 return self.defaultdst
790 @classmethod
791 def bind_addr(cls, layer, addr, **condition):
792 # type: (Type[Packet], str, **Any) -> None
793 cls.bindings.setdefault(layer, []).append( # type: ignore
794 (addr, condition)
795 )
798class MACField(Field[Optional[str], bytes]):
799 def __init__(self, name, default):
800 # type: (str, Optional[Any]) -> None
801 Field.__init__(self, name, default, "6s")
803 def i2m(self, pkt, x):
804 # type: (Optional[Packet], Optional[str]) -> bytes
805 if not x:
806 return b"\0\0\0\0\0\0"
807 try:
808 y = mac2str(x)
809 except (struct.error, OverflowError, ValueError):
810 y = bytes_encode(x)
811 return y
813 def m2i(self, pkt, x):
814 # type: (Optional[Packet], bytes) -> str
815 return str2mac(x)
817 def any2i(self, pkt, x):
818 # type: (Optional[Packet], Any) -> str
819 if isinstance(x, bytes) and len(x) == 6:
820 return self.m2i(pkt, x)
821 return cast(str, x)
823 def i2repr(self, pkt, x):
824 # type: (Optional[Packet], Optional[str]) -> str
825 x = self.i2h(pkt, x)
826 if x is None:
827 return repr(x)
828 if self in conf.resolve:
829 x = conf.manufdb._resolve_MAC(x)
830 return x
832 def randval(self):
833 # type: () -> RandMAC
834 return RandMAC()
837class LEMACField(MACField):
838 def i2m(self, pkt, x):
839 # type: (Optional[Packet], Optional[str]) -> bytes
840 return MACField.i2m(self, pkt, x)[::-1]
842 def m2i(self, pkt, x):
843 # type: (Optional[Packet], bytes) -> str
844 return MACField.m2i(self, pkt, x[::-1])
847class IPField(Field[Union[str, Net], bytes]):
848 def __init__(self, name, default):
849 # type: (str, Optional[str]) -> None
850 Field.__init__(self, name, default, "4s")
852 def h2i(self, pkt, x):
853 # type: (Optional[Packet], Union[AnyStr, List[AnyStr]]) -> Any
854 if isinstance(x, bytes):
855 x = plain_str(x) # type: ignore
856 if isinstance(x, _ScopedIP):
857 return x
858 elif isinstance(x, str):
859 x = ScopedIP(x)
860 try:
861 inet_aton(x)
862 except socket.error:
863 return Net(x)
864 elif isinstance(x, tuple):
865 if len(x) != 2:
866 raise ValueError("Invalid IP format")
867 return Net(*x)
868 elif isinstance(x, list):
869 return [self.h2i(pkt, n) for n in x]
870 return x
872 def i2h(self, pkt, x):
873 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str
874 return cast(str, x)
876 def resolve(self, x):
877 # type: (str) -> str
878 if self in conf.resolve:
879 try:
880 ret = socket.gethostbyaddr(x)[0]
881 except Exception:
882 pass
883 else:
884 if ret:
885 return ret
886 return x
888 def i2m(self, pkt, x):
889 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
890 if x is None:
891 return b'\x00\x00\x00\x00'
892 return inet_aton(plain_str(x))
894 def m2i(self, pkt, x):
895 # type: (Optional[Packet], bytes) -> str
896 return inet_ntoa(x)
898 def any2i(self, pkt, x):
899 # type: (Optional[Packet], Any) -> Any
900 return self.h2i(pkt, x)
902 def i2repr(self, pkt, x):
903 # type: (Optional[Packet], Union[str, Net]) -> str
904 if isinstance(x, _ScopedIP) and x.scope:
905 return repr(x)
906 r = self.resolve(self.i2h(pkt, x))
907 return r if isinstance(r, str) else repr(r)
909 def randval(self):
910 # type: () -> RandIP
911 return RandIP()
914class SourceIPField(IPField):
915 def __init__(self, name):
916 # type: (str) -> None
917 IPField.__init__(self, name, None)
919 def __findaddr(self, pkt):
920 # type: (Packet) -> Optional[str]
921 if conf.route is None:
922 # unused import, only to initialize conf.route
923 import scapy.route # noqa: F401
924 return pkt.route()[1] or conf.route.route()[1]
926 def i2m(self, pkt, x):
927 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
928 if x is None and pkt is not None:
929 x = self.__findaddr(pkt)
930 return super(SourceIPField, self).i2m(pkt, x)
932 def i2h(self, pkt, x):
933 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str
934 if x is None and pkt is not None:
935 x = self.__findaddr(pkt)
936 return super(SourceIPField, self).i2h(pkt, x)
939class IP6Field(Field[Optional[Union[str, Net6]], bytes]):
940 def __init__(self, name, default):
941 # type: (str, Optional[str]) -> None
942 Field.__init__(self, name, default, "16s")
944 def h2i(self, pkt, x):
945 # type: (Optional[Packet], Any) -> str
946 if isinstance(x, bytes):
947 x = plain_str(x)
948 if isinstance(x, _ScopedIP):
949 return x
950 elif isinstance(x, str):
951 x = ScopedIP(x)
952 try:
953 x = ScopedIP(in6_ptop(x), scope=x.scope)
954 except socket.error:
955 return Net6(x) # type: ignore
956 elif isinstance(x, tuple):
957 if len(x) != 2:
958 raise ValueError("Invalid IPv6 format")
959 return Net6(*x) # type: ignore
960 elif isinstance(x, list):
961 x = [self.h2i(pkt, n) for n in x]
962 return x # type: ignore
964 def i2h(self, pkt, x):
965 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
966 return cast(str, x)
968 def i2m(self, pkt, x):
969 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
970 if x is None:
971 x = "::"
972 return inet_pton(socket.AF_INET6, plain_str(x))
974 def m2i(self, pkt, x):
975 # type: (Optional[Packet], bytes) -> str
976 return inet_ntop(socket.AF_INET6, x)
978 def any2i(self, pkt, x):
979 # type: (Optional[Packet], Optional[str]) -> str
980 return self.h2i(pkt, x)
982 def i2repr(self, pkt, x):
983 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
984 if x is None:
985 return self.i2h(pkt, x)
986 elif not isinstance(x, Net6) and not isinstance(x, list):
987 if in6_isaddrTeredo(x): # print Teredo info
988 server, _, maddr, mport = teredoAddrExtractInfo(x)
989 return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr, mport) # noqa: E501
990 elif in6_isaddr6to4(x): # print encapsulated address
991 vaddr = in6_6to4ExtractAddr(x)
992 return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr)
993 elif isinstance(x, _ScopedIP) and x.scope:
994 return repr(x)
995 r = self.i2h(pkt, x) # No specific information to return
996 return r if isinstance(r, str) else repr(r)
998 def randval(self):
999 # type: () -> RandIP6
1000 return RandIP6()
1003class SourceIP6Field(IP6Field):
1004 def __init__(self, name):
1005 # type: (str) -> None
1006 IP6Field.__init__(self, name, None)
1008 def __findaddr(self, pkt):
1009 # type: (Packet) -> Optional[str]
1010 if conf.route6 is None:
1011 # unused import, only to initialize conf.route
1012 import scapy.route6 # noqa: F401
1013 return pkt.route()[1]
1015 def i2m(self, pkt, x):
1016 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
1017 if x is None and pkt is not None:
1018 x = self.__findaddr(pkt)
1019 return super(SourceIP6Field, self).i2m(pkt, x)
1021 def i2h(self, pkt, x):
1022 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
1023 if x is None and pkt is not None:
1024 x = self.__findaddr(pkt)
1025 return super(SourceIP6Field, self).i2h(pkt, x)
1028class DestIP6Field(IP6Field, DestField):
1029 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]]
1031 def __init__(self, name, default):
1032 # type: (str, str) -> None
1033 IP6Field.__init__(self, name, None)
1034 DestField.__init__(self, name, default)
1036 def i2m(self, pkt, x):
1037 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
1038 if x is None and pkt is not None:
1039 x = self.dst_from_pkt(pkt)
1040 return IP6Field.i2m(self, pkt, x)
1042 def i2h(self, pkt, x):
1043 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
1044 if x is None and pkt is not None:
1045 x = self.dst_from_pkt(pkt)
1046 return super(DestIP6Field, self).i2h(pkt, x)
1049class ByteField(Field[int, int]):
1050 def __init__(self, name, default):
1051 # type: (str, Optional[int]) -> None
1052 Field.__init__(self, name, default, "B")
1055class XByteField(ByteField):
1056 def i2repr(self, pkt, x):
1057 # type: (Optional[Packet], int) -> str
1058 return lhex(self.i2h(pkt, x))
1061# XXX Unused field: at least add some tests
1062class OByteField(ByteField):
1063 def i2repr(self, pkt, x):
1064 # type: (Optional[Packet], int) -> str
1065 return "%03o" % self.i2h(pkt, x)
1068class ThreeBytesField(Field[int, int]):
1069 def __init__(self, name, default):
1070 # type: (str, int) -> None
1071 Field.__init__(self, name, default, "!I")
1073 def addfield(self, pkt, s, val):
1074 # type: (Packet, bytes, Optional[int]) -> bytes
1075 return s + struct.pack(self.fmt, self.i2m(pkt, val))[1:4]
1077 def getfield(self, pkt, s):
1078 # type: (Packet, bytes) -> Tuple[bytes, int]
1079 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0]) # noqa: E501
1082class X3BytesField(ThreeBytesField, XByteField):
1083 def i2repr(self, pkt, x):
1084 # type: (Optional[Packet], int) -> str
1085 return XByteField.i2repr(self, pkt, x)
1088class LEThreeBytesField(ByteField):
1089 def __init__(self, name, default):
1090 # type: (str, Optional[int]) -> None
1091 Field.__init__(self, name, default, "<I")
1093 def addfield(self, pkt, s, val):
1094 # type: (Packet, bytes, Optional[int]) -> bytes
1095 return s + struct.pack(self.fmt, self.i2m(pkt, val))[:3]
1097 def getfield(self, pkt, s):
1098 # type: (Optional[Packet], bytes) -> Tuple[bytes, int]
1099 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, s[:3] + b"\x00")[0]) # noqa: E501
1102class XLE3BytesField(LEThreeBytesField, XByteField):
1103 def i2repr(self, pkt, x):
1104 # type: (Optional[Packet], int) -> str
1105 return XByteField.i2repr(self, pkt, x)
1108def LEX3BytesField(*args, **kwargs):
1109 # type: (*Any, **Any) -> Any
1110 warnings.warn(
1111 "LEX3BytesField is deprecated. Use XLE3BytesField",
1112 DeprecationWarning
1113 )
1114 return XLE3BytesField(*args, **kwargs)
1117class NBytesField(Field[int, List[int]]):
1118 def __init__(self, name, default, sz):
1119 # type: (str, Optional[int], int) -> None
1120 Field.__init__(self, name, default, "<" + "B" * sz)
1122 def i2m(self, pkt, x):
1123 # type: (Optional[Packet], Optional[int]) -> List[int]
1124 if x is None:
1125 return [0] * self.sz
1126 x2m = list()
1127 for _ in range(self.sz):
1128 x2m.append(x % 256)
1129 x //= 256
1130 return x2m[::-1]
1132 def m2i(self, pkt, x):
1133 # type: (Optional[Packet], Union[List[int], int]) -> int
1134 if isinstance(x, int):
1135 return x
1136 # x can be a tuple when coming from struct.unpack (from getfield)
1137 if isinstance(x, (list, tuple)):
1138 return sum(d * (256 ** i) for i, d in enumerate(reversed(x)))
1139 return 0
1141 def i2repr(self, pkt, x):
1142 # type: (Optional[Packet], int) -> str
1143 if isinstance(x, int):
1144 return '%i' % x
1145 return super(NBytesField, self).i2repr(pkt, x)
1147 def addfield(self, pkt, s, val):
1148 # type: (Optional[Packet], bytes, Optional[int]) -> bytes
1149 return s + self.struct.pack(*self.i2m(pkt, val))
1151 def getfield(self, pkt, s):
1152 # type: (Optional[Packet], bytes) -> Tuple[bytes, int]
1153 return (s[self.sz:],
1154 self.m2i(pkt, self.struct.unpack(s[:self.sz]))) # type: ignore
1156 def randval(self):
1157 # type: () -> RandNum
1158 return RandNum(0, 2 ** (self.sz * 8) - 1)
1161class XNBytesField(NBytesField):
1162 def i2repr(self, pkt, x):
1163 # type: (Optional[Packet], int) -> str
1164 if isinstance(x, int):
1165 return '0x%x' % x
1166 # x can be a tuple when coming from struct.unpack (from getfield)
1167 if isinstance(x, (list, tuple)):
1168 return "0x" + "".join("%02x" % b for b in x)
1169 return super(XNBytesField, self).i2repr(pkt, x)
1172class SignedByteField(Field[int, int]):
1173 def __init__(self, name, default):
1174 # type: (str, Optional[int]) -> None
1175 Field.__init__(self, name, default, "b")
1178class FieldValueRangeException(Scapy_Exception):
1179 pass
1182class MaximumItemsCount(Scapy_Exception):
1183 pass
1186class FieldAttributeException(Scapy_Exception):
1187 pass
1190class YesNoByteField(ByteField):
1191 """
1192 A byte based flag field that shows representation of its number
1193 based on a given association
1195 In its default configuration the following representation is generated:
1196 x == 0 : 'no'
1197 x != 0 : 'yes'
1199 In more sophisticated use-cases (e.g. yes/no/invalid) one can use the
1200 config attribute to configure.
1201 Key-value, key-range and key-value-set associations that will be used to
1202 generate the values representation.
1204 - A range is given by a tuple (<first-val>, <last-value>) including the
1205 last value.
1206 - A single-value tuple is treated as scalar.
1207 - A list defines a set of (probably non consecutive) values that should be
1208 associated to a given key.
1210 All values not associated with a key will be shown as number of type
1211 unsigned byte.
1213 **For instance**::
1215 config = {
1216 'no' : 0,
1217 'foo' : (1,22),
1218 'yes' : 23,
1219 'bar' : [24,25, 42, 48, 87, 253]
1220 }
1222 Generates the following representations::
1224 x == 0 : 'no'
1225 x == 15: 'foo'
1226 x == 23: 'yes'
1227 x == 42: 'bar'
1228 x == 43: 43
1230 Another example, using the config attribute one could also revert
1231 the stock-yes-no-behavior::
1233 config = {
1234 'yes' : 0,
1235 'no' : (1,255)
1236 }
1238 Will generate the following value representation::
1240 x == 0 : 'yes'
1241 x != 0 : 'no'
1243 """
1244 __slots__ = ['eval_fn']
1246 def _build_config_representation(self, config):
1247 # type: (Dict[str, Any]) -> None
1248 assoc_table = dict()
1249 for key in config:
1250 value_spec = config[key]
1252 value_spec_type = type(value_spec)
1254 if value_spec_type is int:
1255 if value_spec < 0 or value_spec > 255:
1256 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501
1257 'must be in range [0..255]'.format(value_spec)) # noqa: E501
1258 assoc_table[value_spec] = key
1260 elif value_spec_type is list:
1261 for value in value_spec:
1262 if value < 0 or value > 255:
1263 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501
1264 'must be in range [0..255]'.format(value)) # noqa: E501
1265 assoc_table[value] = key
1267 elif value_spec_type is tuple:
1268 value_spec_len = len(value_spec)
1269 if value_spec_len != 2:
1270 raise FieldAttributeException('invalid length {} of given config item tuple {} - must be ' # noqa: E501
1271 '(<start-range>, <end-range>).'.format(value_spec_len, value_spec)) # noqa: E501
1273 value_range_start = value_spec[0]
1274 if value_range_start < 0 or value_range_start > 255:
1275 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501
1276 'must be in range [0..255]'.format(value_range_start)) # noqa: E501
1278 value_range_end = value_spec[1]
1279 if value_range_end < 0 or value_range_end > 255:
1280 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501
1281 'must be in range [0..255]'.format(value_range_end)) # noqa: E501
1283 for value in range(value_range_start, value_range_end + 1):
1285 assoc_table[value] = key
1287 self.eval_fn = lambda x: assoc_table[x] if x in assoc_table else x
1289 def __init__(self, name, default, config=None):
1290 # type: (str, int, Optional[Dict[str, Any]]) -> None
1292 if not config:
1293 # this represents the common use case and therefore it is kept small # noqa: E501
1294 self.eval_fn = lambda x: 'no' if x == 0 else 'yes'
1295 else:
1296 self._build_config_representation(config)
1297 ByteField.__init__(self, name, default)
1299 def i2repr(self, pkt, x):
1300 # type: (Optional[Packet], int) -> str
1301 return self.eval_fn(x) # type: ignore
1304class ShortField(Field[int, int]):
1305 def __init__(self, name, default):
1306 # type: (str, Optional[int]) -> None
1307 Field.__init__(self, name, default, "H")
1310class SignedShortField(Field[int, int]):
1311 def __init__(self, name, default):
1312 # type: (str, Optional[int]) -> None
1313 Field.__init__(self, name, default, "h")
1316class LEShortField(Field[int, int]):
1317 def __init__(self, name, default):
1318 # type: (str, Optional[int]) -> None
1319 Field.__init__(self, name, default, "<H")
1322class LESignedShortField(Field[int, int]):
1323 def __init__(self, name, default):
1324 # type: (str, Optional[int]) -> None
1325 Field.__init__(self, name, default, "<h")
1328class XShortField(ShortField):
1329 def i2repr(self, pkt, x):
1330 # type: (Optional[Packet], int) -> str
1331 return lhex(self.i2h(pkt, x))
1334class IntField(Field[int, int]):
1335 def __init__(self, name, default):
1336 # type: (str, Optional[int]) -> None
1337 Field.__init__(self, name, default, "I")
1340class SignedIntField(Field[int, int]):
1341 def __init__(self, name, default):
1342 # type: (str, int) -> None
1343 Field.__init__(self, name, default, "i")
1346class LEIntField(Field[int, int]):
1347 def __init__(self, name, default):
1348 # type: (str, Optional[int]) -> None
1349 Field.__init__(self, name, default, "<I")
1352class LESignedIntField(Field[int, int]):
1353 def __init__(self, name, default):
1354 # type: (str, int) -> None
1355 Field.__init__(self, name, default, "<i")
1358class XIntField(IntField):
1359 def i2repr(self, pkt, x):
1360 # type: (Optional[Packet], int) -> str
1361 return lhex(self.i2h(pkt, x))
1364class XLEIntField(LEIntField, XIntField):
1365 def i2repr(self, pkt, x):
1366 # type: (Optional[Packet], int) -> str
1367 return XIntField.i2repr(self, pkt, x)
1370class XLEShortField(LEShortField, XShortField):
1371 def i2repr(self, pkt, x):
1372 # type: (Optional[Packet], int) -> str
1373 return XShortField.i2repr(self, pkt, x)
1376class LongField(Field[int, int]):
1377 def __init__(self, name, default):
1378 # type: (str, int) -> None
1379 Field.__init__(self, name, default, "Q")
1382class SignedLongField(Field[int, int]):
1383 def __init__(self, name, default):
1384 # type: (str, Optional[int]) -> None
1385 Field.__init__(self, name, default, "q")
1388class LELongField(LongField):
1389 def __init__(self, name, default):
1390 # type: (str, Optional[int]) -> None
1391 Field.__init__(self, name, default, "<Q")
1394class LESignedLongField(Field[int, int]):
1395 def __init__(self, name, default):
1396 # type: (str, Optional[Any]) -> None
1397 Field.__init__(self, name, default, "<q")
1400class XLongField(LongField):
1401 def i2repr(self, pkt, x):
1402 # type: (Optional[Packet], int) -> str
1403 return lhex(self.i2h(pkt, x))
1406class XLELongField(LELongField, XLongField):
1407 def i2repr(self, pkt, x):
1408 # type: (Optional[Packet], int) -> str
1409 return XLongField.i2repr(self, pkt, x)
1412class IEEEFloatField(Field[int, int]):
1413 def __init__(self, name, default):
1414 # type: (str, Optional[int]) -> None
1415 Field.__init__(self, name, default, "f")
1418class IEEEDoubleField(Field[int, int]):
1419 def __init__(self, name, default):
1420 # type: (str, Optional[int]) -> None
1421 Field.__init__(self, name, default, "d")
1424class _StrField(Field[I, bytes]):
1425 __slots__ = ["remain"]
1427 def __init__(self, name, default, fmt="H", remain=0):
1428 # type: (str, Optional[I], str, int) -> None
1429 Field.__init__(self, name, default, fmt)
1430 self.remain = remain
1432 def i2len(self, pkt, x):
1433 # type: (Optional[Packet], Any) -> int
1434 if x is None:
1435 return 0
1436 return len(x)
1438 def any2i(self, pkt, x):
1439 # type: (Optional[Packet], Any) -> I
1440 if isinstance(x, str):
1441 x = bytes_encode(x)
1442 return super(_StrField, self).any2i(pkt, x) # type: ignore
1444 def i2repr(self, pkt, x):
1445 # type: (Optional[Packet], I) -> str
1446 if x and isinstance(x, bytes):
1447 return repr(x)
1448 return super(_StrField, self).i2repr(pkt, x)
1450 def i2m(self, pkt, x):
1451 # type: (Optional[Packet], Optional[I]) -> bytes
1452 if x is None:
1453 return b""
1454 if not isinstance(x, bytes):
1455 return bytes_encode(x)
1456 return x
1458 def addfield(self, pkt, s, val):
1459 # type: (Packet, bytes, Optional[I]) -> bytes
1460 return s + self.i2m(pkt, val)
1462 def getfield(self, pkt, s):
1463 # type: (Packet, bytes) -> Tuple[bytes, I]
1464 if self.remain == 0:
1465 return b"", self.m2i(pkt, s)
1466 else:
1467 return s[-self.remain:], self.m2i(pkt, s[:-self.remain])
1469 def randval(self):
1470 # type: () -> RandBin
1471 return RandBin(RandNum(0, 1200))
1474class StrField(_StrField[bytes]):
1475 pass
1478class StrFieldUtf16(StrField):
1479 def any2i(self, pkt, x):
1480 # type: (Optional[Packet], Optional[str]) -> bytes
1481 if isinstance(x, str):
1482 return self.h2i(pkt, x)
1483 return super(StrFieldUtf16, self).any2i(pkt, x)
1485 def i2repr(self, pkt, x):
1486 # type: (Optional[Packet], bytes) -> str
1487 return plain_str(self.i2h(pkt, x))
1489 def h2i(self, pkt, x):
1490 # type: (Optional[Packet], Optional[str]) -> bytes
1491 return plain_str(x).encode('utf-16-le', errors="replace")
1493 def i2h(self, pkt, x):
1494 # type: (Optional[Packet], bytes) -> str
1495 return bytes_encode(x).decode('utf-16-le', errors="replace")
1498class _StrEnumField:
1499 def __init__(self, **kwargs):
1500 # type: (**Any) -> None
1501 self.enum = kwargs.pop("enum", {})
1503 def i2repr(self, pkt, v):
1504 # type: (Optional[Packet], bytes) -> str
1505 r = v.rstrip(b"\0")
1506 rr = repr(r)
1507 if self.enum:
1508 if v in self.enum:
1509 rr = "%s (%s)" % (rr, self.enum[v])
1510 elif r in self.enum:
1511 rr = "%s (%s)" % (rr, self.enum[r])
1512 return rr
1515class StrEnumField(_StrEnumField, StrField):
1516 __slots__ = ["enum"]
1518 def __init__(
1519 self,
1520 name, # type: str
1521 default, # type: bytes
1522 enum=None, # type: Optional[Dict[str, str]]
1523 **kwargs # type: Any
1524 ):
1525 # type: (...) -> None
1526 StrField.__init__(self, name, default, **kwargs) # type: ignore
1527 self.enum = enum
1530K = TypeVar('K', List[BasePacket], BasePacket, Optional[BasePacket])
1533class _PacketField(_StrField[K]):
1534 __slots__ = ["cls"]
1535 holds_packets = 1
1537 def __init__(self,
1538 name, # type: str
1539 default, # type: Optional[K]
1540 pkt_cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501
1541 ):
1542 # type: (...) -> None
1543 super(_PacketField, self).__init__(name, default)
1544 self.cls = pkt_cls
1546 def i2m(self,
1547 pkt, # type: Optional[Packet]
1548 i, # type: Any
1549 ):
1550 # type: (...) -> bytes
1551 if i is None:
1552 return b""
1553 return raw(i)
1555 def m2i(self, pkt, m): # type: ignore
1556 # type: (Optional[Packet], bytes) -> Packet
1557 try:
1558 # we want to set parent wherever possible
1559 return self.cls(m, _parent=pkt) # type: ignore
1560 except TypeError:
1561 return self.cls(m)
1564class _PacketFieldSingle(_PacketField[K]):
1565 def any2i(self, pkt, x):
1566 # type: (Optional[Packet], Any) -> K
1567 if x and pkt and hasattr(x, "add_parent"):
1568 cast("Packet", x).add_parent(pkt)
1569 return super(_PacketFieldSingle, self).any2i(pkt, x)
1571 def getfield(self,
1572 pkt, # type: Packet
1573 s, # type: bytes
1574 ):
1575 # type: (...) -> Tuple[bytes, K]
1576 i = self.m2i(pkt, s)
1577 remain = b""
1578 if conf.padding_layer in i:
1579 r = i[conf.padding_layer]
1580 del r.underlayer.payload
1581 remain = r.load
1582 return remain, i # type: ignore
1585class PacketField(_PacketFieldSingle[BasePacket]):
1586 def randval(self): # type: ignore
1587 # type: () -> Packet
1588 from scapy.packet import fuzz
1589 return fuzz(self.cls()) # type: ignore
1592class PacketLenField(_PacketFieldSingle[Optional[BasePacket]]):
1593 __slots__ = ["length_from"]
1595 def __init__(self,
1596 name, # type: str
1597 default, # type: Packet
1598 cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501
1599 length_from=None # type: Optional[Callable[[Packet], int]] # noqa: E501
1600 ):
1601 # type: (...) -> None
1602 super(PacketLenField, self).__init__(name, default, cls)
1603 self.length_from = length_from or (lambda x: 0)
1605 def getfield(self,
1606 pkt, # type: Packet
1607 s, # type: bytes
1608 ):
1609 # type: (...) -> Tuple[bytes, Optional[BasePacket]]
1610 len_pkt = self.length_from(pkt)
1611 i = None
1612 if len_pkt:
1613 try:
1614 i = self.m2i(pkt, s[:len_pkt])
1615 except Exception:
1616 if conf.debug_dissector:
1617 raise
1618 i = conf.raw_layer(load=s[:len_pkt])
1619 return s[len_pkt:], i
1622class PacketListField(_PacketField[List[BasePacket]]):
1623 """PacketListField represents a list containing a series of Packet instances
1624 that might occur right in the middle of another Packet field.
1625 This field type may also be used to indicate that a series of Packet
1626 instances have a sibling semantic instead of a parent/child relationship
1627 (i.e. a stack of layers). All elements in PacketListField have current
1628 packet referenced in parent field.
1629 """
1630 __slots__ = ["count_from", "length_from", "next_cls_cb", "max_count"]
1631 islist = 1
1633 def __init__(
1634 self,
1635 name, # type: str
1636 default, # type: Optional[List[BasePacket]]
1637 pkt_cls=None, # type: Optional[Union[Callable[[bytes], Packet], Type[Packet]]] # noqa: E501
1638 count_from=None, # type: Optional[Callable[[Packet], int]]
1639 length_from=None, # type: Optional[Callable[[Packet], int]]
1640 next_cls_cb=None, # type: Optional[Callable[[Packet, List[BasePacket], Optional[Packet], bytes], Optional[Type[Packet]]]] # noqa: E501
1641 max_count=None, # type: Optional[int]
1642 ):
1643 # type: (...) -> None
1644 """
1645 The number of Packet instances that are dissected by this field can
1646 be parametrized using one of three different mechanisms/parameters:
1648 * count_from: a callback that returns the number of Packet
1649 instances to dissect. The callback prototype is::
1651 count_from(pkt:Packet) -> int
1653 * length_from: a callback that returns the number of bytes that
1654 must be dissected by this field. The callback prototype is::
1656 length_from(pkt:Packet) -> int
1658 * next_cls_cb: a callback that enables a Scapy developer to
1659 dynamically discover if another Packet instance should be
1660 dissected or not. See below for this callback prototype.
1662 The bytes that are not consumed during the dissection of this field
1663 are passed to the next field of the current packet.
1665 For the serialization of such a field, the list of Packets that are
1666 contained in a PacketListField can be heterogeneous and is
1667 unrestricted.
1669 The type of the Packet instances that are dissected with this field is
1670 specified or discovered using one of the following mechanism:
1672 * the pkt_cls parameter may contain a callable that returns an
1673 instance of the dissected Packet. This may either be a
1674 reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py)
1675 to generate an homogeneous PacketListField or a function
1676 deciding the type of the Packet instance
1677 (e.g. _CDPGuessAddrRecord in contrib/cdp.py)
1679 * the pkt_cls parameter may contain a class object with a defined
1680 ``dispatch_hook`` classmethod. That method must return a Packet
1681 instance. The ``dispatch_hook`` callmethod must implement the
1682 following prototype::
1684 dispatch_hook(cls,
1685 _pkt:Optional[Packet],
1686 *args, **kargs
1687 ) -> Type[Packet]
1689 The _pkt parameter may contain a reference to the packet
1690 instance containing the PacketListField that is being
1691 dissected.
1693 * the ``next_cls_cb`` parameter may contain a callable whose
1694 prototype is::
1696 cbk(pkt:Packet,
1697 lst:List[Packet],
1698 cur:Optional[Packet],
1699 remain:bytes,
1700 ) -> Optional[Type[Packet]]
1702 The pkt argument contains a reference to the Packet instance
1703 containing the PacketListField that is being dissected.
1704 The lst argument is the list of all Packet instances that were
1705 previously parsed during the current ``PacketListField``
1706 dissection, saved for the very last Packet instance.
1707 The cur argument contains a reference to that very last parsed
1708 ``Packet`` instance. The remain argument contains the bytes
1709 that may still be consumed by the current PacketListField
1710 dissection operation.
1712 This callback returns either the type of the next Packet to
1713 dissect or None to indicate that no more Packet are to be
1714 dissected.
1716 These four arguments allows a variety of dynamic discovery of
1717 the number of Packet to dissect and of the type of each one of
1718 these Packets, including: type determination based on current
1719 Packet instances or its underlayers, continuation based on the
1720 previously parsed Packet instances within that PacketListField,
1721 continuation based on a look-ahead on the bytes to be
1722 dissected...
1724 The pkt_cls and next_cls_cb parameters are semantically exclusive,
1725 although one could specify both. If both are specified, pkt_cls is
1726 silently ignored. The same is true for count_from and next_cls_cb.
1728 length_from and next_cls_cb are compatible and the dissection will
1729 end, whichever of the two stop conditions comes first.
1731 :param name: the name of the field
1732 :param default: the default value of this field; generally an empty
1733 Python list
1734 :param pkt_cls: either a callable returning a Packet instance or a
1735 class object defining a ``dispatch_hook`` class method
1736 :param count_from: a callback returning the number of Packet
1737 instances to dissect.
1738 :param length_from: a callback returning the number of bytes to dissect
1739 :param next_cls_cb: a callback returning either None or the type of
1740 the next Packet to dissect.
1741 :param max_count: an int containing the max amount of results. This is
1742 a safety mechanism, exceeding this value will raise a Scapy_Exception.
1743 """
1744 if default is None:
1745 default = [] # Create a new list for each instance
1746 super(PacketListField, self).__init__(
1747 name,
1748 default,
1749 pkt_cls # type: ignore
1750 )
1751 self.count_from = count_from
1752 self.length_from = length_from
1753 self.next_cls_cb = next_cls_cb
1754 self.max_count = max_count
1756 def any2i(self, pkt, x):
1757 # type: (Optional[Packet], Any) -> List[BasePacket]
1758 if not isinstance(x, list):
1759 if x and pkt and hasattr(x, "add_parent"):
1760 x.add_parent(pkt)
1761 return [x]
1762 elif pkt:
1763 for i in x:
1764 if not i or not hasattr(i, "add_parent"):
1765 continue
1766 i.add_parent(pkt)
1767 return x
1769 def i2count(self,
1770 pkt, # type: Optional[Packet]
1771 val, # type: List[BasePacket]
1772 ):
1773 # type: (...) -> int
1774 if isinstance(val, list):
1775 return len(val)
1776 return 1
1778 def i2len(self,
1779 pkt, # type: Optional[Packet]
1780 val, # type: List[Packet]
1781 ):
1782 # type: (...) -> int
1783 return sum(len(self.i2m(pkt, p)) for p in val)
1785 def getfield(self, pkt, s):
1786 # type: (Packet, bytes) -> Tuple[bytes, List[BasePacket]]
1787 c = len_pkt = cls = None
1788 if self.length_from is not None:
1789 len_pkt = self.length_from(pkt)
1790 elif self.count_from is not None:
1791 c = self.count_from(pkt)
1792 if self.next_cls_cb is not None:
1793 cls = self.next_cls_cb(pkt, [], None, s)
1794 c = 1
1795 if cls is None:
1796 c = 0
1798 lst = [] # type: List[BasePacket]
1799 ret = b""
1800 remain = s
1801 if len_pkt is not None:
1802 remain, ret = s[:len_pkt], s[len_pkt:]
1803 while remain:
1804 if c is not None:
1805 if c <= 0:
1806 break
1807 c -= 1
1808 try:
1809 if cls is not None:
1810 try:
1811 # we want to set parent wherever possible
1812 p = cls(remain, _parent=pkt)
1813 except TypeError:
1814 p = cls(remain)
1815 else:
1816 p = self.m2i(pkt, remain)
1817 except Exception:
1818 if conf.debug_dissector:
1819 raise
1820 p = conf.raw_layer(load=remain)
1821 remain = b""
1822 else:
1823 if conf.padding_layer in p:
1824 pad = p[conf.padding_layer]
1825 remain = pad.load
1826 del pad.underlayer.payload
1827 if self.next_cls_cb is not None:
1828 cls = self.next_cls_cb(pkt, lst, p, remain)
1829 if cls is not None:
1830 c = 0 if c is None else c
1831 c += 1
1832 else:
1833 remain = b""
1834 lst.append(p)
1835 if len(lst) > (self.max_count or conf.max_list_count):
1836 raise MaximumItemsCount(
1837 "Maximum amount of items reached in PacketListField: %s "
1838 "(defaults to conf.max_list_count)"
1839 % (self.max_count or conf.max_list_count)
1840 )
1842 if isinstance(remain, tuple):
1843 remain, nb = remain
1844 return (remain + ret, nb), lst
1845 else:
1846 return remain + ret, lst
1848 def i2m(self,
1849 pkt, # type: Optional[Packet]
1850 i, # type: Any
1851 ):
1852 # type: (...) -> bytes
1853 return bytes_encode(i)
1855 def addfield(self, pkt, s, val):
1856 # type: (Packet, bytes, Any) -> bytes
1857 return s + b"".join(self.i2m(pkt, v) for v in val)
1860class StrFixedLenField(StrField):
1861 __slots__ = ["length_from"]
1863 def __init__(
1864 self,
1865 name, # type: str
1866 default, # type: Optional[bytes]
1867 length=None, # type: Optional[int]
1868 length_from=None, # type: Optional[Callable[[Packet], int]] # noqa: E501
1869 ):
1870 # type: (...) -> None
1871 super(StrFixedLenField, self).__init__(name, default)
1872 self.length_from = length_from or (lambda x: 0)
1873 if length is not None:
1874 self.sz = length
1875 self.length_from = lambda x, length=length: length # type: ignore
1877 def i2repr(self,
1878 pkt, # type: Optional[Packet]
1879 v, # type: bytes
1880 ):
1881 # type: (...) -> str
1882 if isinstance(v, bytes):
1883 v = v.rstrip(b"\0")
1884 return super(StrFixedLenField, self).i2repr(pkt, v)
1886 def getfield(self, pkt, s):
1887 # type: (Packet, bytes) -> Tuple[bytes, bytes]
1888 len_pkt = self.length_from(pkt)
1889 if len_pkt == 0:
1890 return s, b""
1891 return s[len_pkt:], self.m2i(pkt, s[:len_pkt])
1893 def addfield(self, pkt, s, val):
1894 # type: (Packet, bytes, Optional[bytes]) -> bytes
1895 len_pkt = self.length_from(pkt)
1896 if len_pkt is None:
1897 return s + self.i2m(pkt, val)
1898 return s + struct.pack("%is" % len_pkt, self.i2m(pkt, val))
1900 def randval(self):
1901 # type: () -> RandBin
1902 try:
1903 return RandBin(self.length_from(None)) # type: ignore
1904 except Exception:
1905 return RandBin(RandNum(0, 200))
1908class StrFixedLenFieldUtf16(StrFixedLenField, StrFieldUtf16):
1909 pass
1912class StrFixedLenEnumField(_StrEnumField, StrFixedLenField):
1913 __slots__ = ["enum"]
1915 def __init__(
1916 self,
1917 name, # type: str
1918 default, # type: bytes
1919 enum=None, # type: Optional[Dict[str, str]]
1920 length=None, # type: Optional[int]
1921 length_from=None # type: Optional[Callable[[Optional[Packet]], int]] # noqa: E501
1922 ):
1923 # type: (...) -> None
1924 StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from) # noqa: E501
1925 self.enum = enum
1928class NetBIOSNameField(StrFixedLenField):
1929 def __init__(self, name, default, length=31):
1930 # type: (str, bytes, int) -> None
1931 StrFixedLenField.__init__(self, name, default, length)
1933 def h2i(self, pkt, x):
1934 # type: (Optional[Packet], bytes) -> bytes
1935 if x and len(x) > 15:
1936 x = x[:15]
1937 return x
1939 def i2m(self, pkt, y):
1940 # type: (Optional[Packet], Optional[bytes]) -> bytes
1941 if pkt:
1942 len_pkt = self.length_from(pkt) // 2
1943 else:
1944 len_pkt = 0
1945 x = bytes_encode(y or b"") # type: bytes
1946 x += b" " * len_pkt
1947 x = x[:len_pkt]
1948 x = b"".join(
1949 struct.pack(
1950 "!BB",
1951 0x41 + (b >> 4),
1952 0x41 + (b & 0xf),
1953 )
1954 for b in x
1955 )
1956 return b" " + x
1958 def m2i(self, pkt, x):
1959 # type: (Optional[Packet], bytes) -> bytes
1960 x = x[1:].strip(b"\x00")
1961 return b"".join(map(
1962 lambda x, y: struct.pack(
1963 "!B",
1964 (((x - 1) & 0xf) << 4) + ((y - 1) & 0xf)
1965 ),
1966 x[::2], x[1::2]
1967 )).rstrip(b" ")
1970class StrLenField(StrField):
1971 """
1972 StrField with a length
1974 :param length_from: a function that returns the size of the string
1975 :param max_length: max size to use as randval
1976 """
1977 __slots__ = ["length_from", "max_length"]
1978 ON_WIRE_SIZE_UTF16 = True
1980 def __init__(
1981 self,
1982 name, # type: str
1983 default, # type: bytes
1984 length_from=None, # type: Optional[Callable[[Packet], int]]
1985 max_length=None, # type: Optional[Any]
1986 ):
1987 # type: (...) -> None
1988 super(StrLenField, self).__init__(name, default)
1989 self.length_from = length_from
1990 self.max_length = max_length
1992 def getfield(self, pkt, s):
1993 # type: (Any, bytes) -> Tuple[bytes, bytes]
1994 len_pkt = (self.length_from or (lambda x: 0))(pkt)
1995 if not self.ON_WIRE_SIZE_UTF16:
1996 len_pkt *= 2
1997 if len_pkt == 0:
1998 return s, b""
1999 return s[len_pkt:], self.m2i(pkt, s[:len_pkt])
2001 def randval(self):
2002 # type: () -> RandBin
2003 return RandBin(RandNum(0, self.max_length or 1200))
2006class _XStrField(Field[bytes, bytes]):
2007 def i2repr(self, pkt, x):
2008 # type: (Optional[Packet], bytes) -> str
2009 if isinstance(x, bytes):
2010 return bytes_hex(x).decode()
2011 return super(_XStrField, self).i2repr(pkt, x)
2014class XStrField(_XStrField, StrField):
2015 """
2016 StrField which value is printed as hexadecimal.
2017 """
2020class XStrLenField(_XStrField, StrLenField):
2021 """
2022 StrLenField which value is printed as hexadecimal.
2023 """
2026class XStrFixedLenField(_XStrField, StrFixedLenField):
2027 """
2028 StrFixedLenField which value is printed as hexadecimal.
2029 """
2032class XLEStrLenField(XStrLenField):
2033 def i2m(self, pkt, x):
2034 # type: (Optional[Packet], Optional[bytes]) -> bytes
2035 if not x:
2036 return b""
2037 return x[:: -1]
2039 def m2i(self, pkt, x):
2040 # type: (Optional[Packet], bytes) -> bytes
2041 return x[:: -1]
2044class StrLenFieldUtf16(StrLenField, StrFieldUtf16):
2045 pass
2048class StrLenEnumField(_StrEnumField, StrLenField):
2049 __slots__ = ["enum"]
2051 def __init__(
2052 self,
2053 name, # type: str
2054 default, # type: bytes
2055 enum=None, # type: Optional[Dict[str, str]]
2056 **kwargs # type: Any
2057 ):
2058 # type: (...) -> None
2059 StrLenField.__init__(self, name, default, **kwargs)
2060 self.enum = enum
2063class BoundStrLenField(StrLenField):
2064 __slots__ = ["minlen", "maxlen"]
2066 def __init__(
2067 self,
2068 name, # type: str
2069 default, # type: bytes
2070 minlen=0, # type: int
2071 maxlen=255, # type: int
2072 length_from=None # type: Optional[Callable[[Packet], int]]
2073 ):
2074 # type: (...) -> None
2075 StrLenField.__init__(self, name, default, length_from=length_from)
2076 self.minlen = minlen
2077 self.maxlen = maxlen
2079 def randval(self):
2080 # type: () -> RandBin
2081 return RandBin(RandNum(self.minlen, self.maxlen))
2084class FieldListField(Field[List[Any], List[Any]]):
2085 __slots__ = ["field", "count_from", "length_from", "max_count"]
2086 islist = 1
2088 def __init__(
2089 self,
2090 name, # type: str
2091 default, # type: Optional[List[AnyField]]
2092 field, # type: AnyField
2093 length_from=None, # type: Optional[Callable[[Packet], int]]
2094 count_from=None, # type: Optional[Callable[[Packet], int]]
2095 max_count=None, # type: Optional[int]
2096 ):
2097 # type: (...) -> None
2098 if default is None:
2099 default = [] # Create a new list for each instance
2100 self.field = field
2101 Field.__init__(self, name, default)
2102 self.count_from = count_from
2103 self.length_from = length_from
2104 self.max_count = max_count
2106 def i2count(self, pkt, val):
2107 # type: (Optional[Packet], List[Any]) -> int
2108 if isinstance(val, list):
2109 return len(val)
2110 return 1
2112 def i2len(self, pkt, val):
2113 # type: (Packet, List[Any]) -> int
2114 return int(sum(self.field.i2len(pkt, v) for v in val))
2116 def any2i(self, pkt, x):
2117 # type: (Optional[Packet], List[Any]) -> List[Any]
2118 if not isinstance(x, list):
2119 return [self.field.any2i(pkt, x)]
2120 else:
2121 return [self.field.any2i(pkt, e) for e in x]
2123 def i2repr(self,
2124 pkt, # type: Optional[Packet]
2125 x, # type: List[Any]
2126 ):
2127 # type: (...) -> str
2128 return "[%s]" % ", ".join(self.field.i2repr(pkt, v) for v in x)
2130 def addfield(self,
2131 pkt, # type: Packet
2132 s, # type: bytes
2133 val, # type: Optional[List[Any]]
2134 ):
2135 # type: (...) -> bytes
2136 val = self.i2m(pkt, val)
2137 for v in val:
2138 s = self.field.addfield(pkt, s, v)
2139 return s
2141 def getfield(self,
2142 pkt, # type: Packet
2143 s, # type: bytes
2144 ):
2145 # type: (...) -> Any
2146 c = len_pkt = None
2147 if self.length_from is not None:
2148 len_pkt = self.length_from(pkt)
2149 elif self.count_from is not None:
2150 c = self.count_from(pkt)
2152 val = []
2153 ret = b""
2154 if len_pkt is not None:
2155 s, ret = s[:len_pkt], s[len_pkt:]
2157 while s:
2158 if c is not None:
2159 if c <= 0:
2160 break
2161 c -= 1
2162 s, v = self.field.getfield(pkt, s)
2163 val.append(v)
2164 if len(val) > (self.max_count or conf.max_list_count):
2165 raise MaximumItemsCount(
2166 "Maximum amount of items reached in FieldListField: %s "
2167 "(defaults to conf.max_list_count)"
2168 % (self.max_count or conf.max_list_count)
2169 )
2171 if isinstance(s, tuple):
2172 s, bn = s
2173 return (s + ret, bn), val
2174 else:
2175 return s + ret, val
2178class FieldLenField(Field[int, int]):
2179 __slots__ = ["length_of", "count_of", "adjust"]
2181 def __init__(
2182 self,
2183 name, # type: str
2184 default, # type: Optional[Any]
2185 length_of=None, # type: Optional[str]
2186 fmt="H", # type: str
2187 count_of=None, # type: Optional[str]
2188 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int]
2189 ):
2190 # type: (...) -> None
2191 Field.__init__(self, name, default, fmt)
2192 self.length_of = length_of
2193 self.count_of = count_of
2194 self.adjust = adjust
2196 def i2m(self, pkt, x):
2197 # type: (Optional[Packet], Optional[int]) -> int
2198 if x is None and pkt is not None:
2199 if self.length_of is not None:
2200 fld, fval = pkt.getfield_and_val(self.length_of)
2201 f = fld.i2len(pkt, fval)
2202 elif self.count_of is not None:
2203 fld, fval = pkt.getfield_and_val(self.count_of)
2204 f = fld.i2count(pkt, fval)
2205 else:
2206 raise ValueError(
2207 "Field should have either length_of or count_of"
2208 )
2209 x = self.adjust(pkt, f)
2210 elif x is None:
2211 x = 0
2212 return x
2215class StrNullField(StrField):
2216 DELIMITER = b"\x00"
2218 def addfield(self, pkt, s, val):
2219 # type: (Packet, bytes, Optional[bytes]) -> bytes
2220 return s + self.i2m(pkt, val) + self.DELIMITER
2222 def getfield(self,
2223 pkt, # type: Packet
2224 s, # type: bytes
2225 ):
2226 # type: (...) -> Tuple[bytes, bytes]
2227 len_str = 0
2228 while True:
2229 len_str = s.find(self.DELIMITER, len_str)
2230 if len_str < 0:
2231 # DELIMITER not found: return empty
2232 return b"", s
2233 if len_str % len(self.DELIMITER):
2234 len_str += 1
2235 else:
2236 break
2237 return s[len_str + len(self.DELIMITER):], self.m2i(pkt, s[:len_str])
2239 def randval(self):
2240 # type: () -> RandTermString
2241 return RandTermString(RandNum(0, 1200), self.DELIMITER)
2243 def i2len(self, pkt, x):
2244 # type: (Optional[Packet], Any) -> int
2245 return super(StrNullField, self).i2len(pkt, x) + 1
2248class StrNullFieldUtf16(StrNullField, StrFieldUtf16):
2249 DELIMITER = b"\x00\x00"
2252class StrStopField(StrField):
2253 __slots__ = ["stop", "additional"]
2255 def __init__(self, name, default, stop, additional=0):
2256 # type: (str, str, bytes, int) -> None
2257 Field.__init__(self, name, default)
2258 self.stop = stop
2259 self.additional = additional
2261 def getfield(self, pkt, s):
2262 # type: (Optional[Packet], bytes) -> Tuple[bytes, bytes]
2263 len_str = s.find(self.stop)
2264 if len_str < 0:
2265 return b"", s
2266 len_str += len(self.stop) + self.additional
2267 return s[len_str:], s[:len_str]
2269 def randval(self):
2270 # type: () -> RandTermString
2271 return RandTermString(RandNum(0, 1200), self.stop)
2274class LenField(Field[int, int]):
2275 """
2276 If None, will be filled with the size of the payload
2277 """
2278 __slots__ = ["adjust"]
2280 def __init__(self, name, default, fmt="H", adjust=lambda x: x):
2281 # type: (str, Optional[Any], str, Callable[[int], int]) -> None
2282 Field.__init__(self, name, default, fmt)
2283 self.adjust = adjust
2285 def i2m(self,
2286 pkt, # type: Optional[Packet]
2287 x, # type: Optional[int]
2288 ):
2289 # type: (...) -> int
2290 if x is None:
2291 x = 0
2292 if pkt is not None:
2293 x = self.adjust(len(pkt.payload))
2294 return x
2297class BCDFloatField(Field[float, int]):
2298 def i2m(self, pkt, x):
2299 # type: (Optional[Packet], Optional[float]) -> int
2300 if x is None:
2301 return 0
2302 return int(256 * x)
2304 def m2i(self, pkt, x):
2305 # type: (Optional[Packet], int) -> float
2306 return x / 256.0
2309class _BitField(Field[I, int]):
2310 """
2311 Field to handle bits.
2313 :param name: name of the field
2314 :param default: default value
2315 :param size: size (in bits). If negative, Low endian
2316 :param tot_size: size of the total group of bits (in bytes) the bitfield
2317 is in. If negative, Low endian.
2318 :param end_tot_size: same but for the BitField ending a group.
2320 Example - normal usage::
2322 0 1 2 3
2323 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
2324 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2325 | A | B | C |
2326 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2328 Fig. TestPacket
2330 class TestPacket(Packet):
2331 fields_desc = [
2332 BitField("a", 0, 14),
2333 BitField("b", 0, 16),
2334 BitField("c", 0, 2),
2335 ]
2337 Example - Low endian stored as 16 bits on the network::
2339 x x x x x x x x x x x x x x x x
2340 a [b] [ c ] [ a ]
2342 Will first get reversed during dissecion:
2344 x x x x x x x x x x x x x x x x
2345 [ a ] [b] [ c ]
2347 class TestPacket(Packet):
2348 fields_desc = [
2349 BitField("a", 0, 9, tot_size=-2),
2350 BitField("b", 0, 2),
2351 BitField("c", 0, 5, end_tot_size=-2)
2352 ]
2354 """
2355 __slots__ = ["rev", "size", "tot_size", "end_tot_size"]
2357 def __init__(self, name, default, size,
2358 tot_size=0, end_tot_size=0):
2359 # type: (str, Optional[I], int, int, int) -> None
2360 Field.__init__(self, name, default)
2361 if callable(size):
2362 size = size(self)
2363 self.rev = size < 0 or tot_size < 0 or end_tot_size < 0
2364 self.size = abs(size)
2365 if not tot_size:
2366 tot_size = self.size // 8
2367 self.tot_size = abs(tot_size)
2368 if not end_tot_size:
2369 end_tot_size = self.size // 8
2370 self.end_tot_size = abs(end_tot_size)
2371 # Fields always have a round sz except BitField
2372 # so to keep it simple, we'll ignore it here.
2373 self.sz = self.size / 8. # type: ignore
2375 # We need to # type: ignore a few things because of how special
2376 # BitField is
2377 def addfield(self, # type: ignore
2378 pkt, # type: Packet
2379 s, # type: Union[Tuple[bytes, int, int], bytes]
2380 ival, # type: I
2381 ):
2382 # type: (...) -> Union[Tuple[bytes, int, int], bytes]
2383 val = self.i2m(pkt, ival)
2384 if isinstance(s, tuple):
2385 s, bitsdone, v = s
2386 else:
2387 bitsdone = 0
2388 v = 0
2389 v <<= self.size
2390 v |= val & ((1 << self.size) - 1)
2391 bitsdone += self.size
2392 while bitsdone >= 8:
2393 bitsdone -= 8
2394 s = s + struct.pack("!B", v >> bitsdone)
2395 v &= (1 << bitsdone) - 1
2396 if bitsdone:
2397 return s, bitsdone, v
2398 else:
2399 # Apply LE if necessary
2400 if self.rev and self.end_tot_size > 1:
2401 s = s[:-self.end_tot_size] + s[-self.end_tot_size:][::-1]
2402 return s
2404 def getfield(self, # type: ignore
2405 pkt, # type: Packet
2406 s, # type: Union[Tuple[bytes, int], bytes]
2407 ):
2408 # type: (...) -> Union[Tuple[Tuple[bytes, int], I], Tuple[bytes, I]] # noqa: E501
2409 if isinstance(s, tuple):
2410 s, bn = s
2411 else:
2412 bn = 0
2413 # Apply LE if necessary
2414 if self.rev and self.tot_size > 1:
2415 s = s[:self.tot_size][::-1] + s[self.tot_size:]
2417 # we don't want to process all the string
2418 nb_bytes = (self.size + bn - 1) // 8 + 1
2419 w = s[:nb_bytes]
2421 # split the substring byte by byte
2422 _bytes = struct.unpack('!%dB' % nb_bytes, w)
2424 b = 0
2425 for c in range(nb_bytes):
2426 b |= int(_bytes[c]) << (nb_bytes - c - 1) * 8
2428 # get rid of high order bits
2429 b &= (1 << (nb_bytes * 8 - bn)) - 1
2431 # remove low order bits
2432 b = b >> (nb_bytes * 8 - self.size - bn)
2434 bn += self.size
2435 s = s[bn // 8:]
2436 bn = bn % 8
2437 b2 = self.m2i(pkt, b)
2438 if bn:
2439 return (s, bn), b2
2440 else:
2441 return s, b2
2443 def randval(self):
2444 # type: () -> RandNum
2445 return RandNum(0, 2**self.size - 1)
2447 def i2len(self, pkt, x): # type: ignore
2448 # type: (Optional[Packet], Optional[float]) -> float
2449 return float(self.size) / 8
2452class BitField(_BitField[int]):
2453 __doc__ = _BitField.__doc__
2456class BitLenField(BitField):
2457 __slots__ = ["length_from"]
2459 def __init__(self,
2460 name, # type: str
2461 default, # type: Optional[int]
2462 length_from # type: Callable[[Packet], int]
2463 ):
2464 # type: (...) -> None
2465 self.length_from = length_from
2466 super(BitLenField, self).__init__(name, default, 0)
2468 def getfield(self, # type: ignore
2469 pkt, # type: Packet
2470 s, # type: Union[Tuple[bytes, int], bytes]
2471 ):
2472 # type: (...) -> Union[Tuple[Tuple[bytes, int], int], Tuple[bytes, int]] # noqa: E501
2473 self.size = self.length_from(pkt)
2474 return super(BitLenField, self).getfield(pkt, s)
2476 def addfield(self, # type: ignore
2477 pkt, # type: Packet
2478 s, # type: Union[Tuple[bytes, int, int], bytes]
2479 val # type: int
2480 ):
2481 # type: (...) -> Union[Tuple[bytes, int, int], bytes]
2482 self.size = self.length_from(pkt)
2483 return super(BitLenField, self).addfield(pkt, s, val)
2486class BitFieldLenField(BitField):
2487 __slots__ = ["length_of", "count_of", "adjust", "tot_size", "end_tot_size"]
2489 def __init__(self,
2490 name, # type: str
2491 default, # type: Optional[int]
2492 size, # type: int
2493 length_of=None, # type: Optional[Union[Callable[[Optional[Packet]], int], str]] # noqa: E501
2494 count_of=None, # type: Optional[str]
2495 adjust=lambda pkt, x: x, # type: Callable[[Optional[Packet], int], int] # noqa: E501
2496 tot_size=0, # type: int
2497 end_tot_size=0, # type: int
2498 ):
2499 # type: (...) -> None
2500 super(BitFieldLenField, self).__init__(name, default, size,
2501 tot_size, end_tot_size)
2502 self.length_of = length_of
2503 self.count_of = count_of
2504 self.adjust = adjust
2506 def i2m(self, pkt, x):
2507 # type: (Optional[Packet], Optional[Any]) -> int
2508 return FieldLenField.i2m(self, pkt, x) # type: ignore
2511class XBitField(BitField):
2512 def i2repr(self, pkt, x):
2513 # type: (Optional[Packet], int) -> str
2514 return lhex(self.i2h(pkt, x))
2517_EnumType = Union[Dict[I, str], Dict[str, I], List[str], DADict[I, str], Type[Enum], Tuple[Callable[[I], str], Callable[[str], I]]] # noqa: E501
2520class _EnumField(Field[Union[List[I], I], I]):
2521 def __init__(self,
2522 name, # type: str
2523 default, # type: Optional[I]
2524 enum, # type: _EnumType[I]
2525 fmt="H", # type: str
2526 ):
2527 # type: (...) -> None
2528 """ Initializes enum fields.
2530 @param name: name of this field
2531 @param default: default value of this field
2532 @param enum: either an enum, a dict or a tuple of two callables.
2533 Dict keys are the internal values, while the dict
2534 values are the user-friendly representations. If the
2535 tuple is provided, the first callable receives the
2536 internal value as parameter and returns the
2537 user-friendly representation and the second callable
2538 does the converse. The first callable may return None
2539 to default to a literal string (repr()) representation.
2540 @param fmt: struct.pack format used to parse and serialize the
2541 internal value from and to machine representation.
2542 """
2543 if isinstance(enum, ObservableDict):
2544 cast(ObservableDict, enum).observe(self)
2546 if isinstance(enum, tuple):
2547 self.i2s_cb = enum[0] # type: Optional[Callable[[I], str]]
2548 self.s2i_cb = enum[1] # type: Optional[Callable[[str], I]]
2549 self.i2s = None # type: Optional[Dict[I, str]]
2550 self.s2i = None # type: Optional[Dict[str, I]]
2551 elif isinstance(enum, type) and issubclass(enum, Enum):
2552 # Python's Enum
2553 i2s = self.i2s = {}
2554 s2i = self.s2i = {}
2555 self.i2s_cb = None
2556 self.s2i_cb = None
2557 names = [x.name for x in enum]
2558 for n in names:
2559 value = enum[n].value
2560 i2s[value] = n
2561 s2i[n] = value
2562 else:
2563 i2s = self.i2s = {}
2564 s2i = self.s2i = {}
2565 self.i2s_cb = None
2566 self.s2i_cb = None
2567 keys = [] # type: List[I]
2568 if isinstance(enum, list):
2569 keys = list(range(len(enum))) # type: ignore
2570 elif isinstance(enum, DADict):
2571 keys = enum.keys()
2572 else:
2573 keys = list(enum) # type: ignore
2574 if any(isinstance(x, str) for x in keys):
2575 i2s, s2i = s2i, i2s # type: ignore
2576 for k in keys:
2577 value = cast(str, enum[k]) # type: ignore
2578 i2s[k] = value
2579 s2i[value] = k
2580 Field.__init__(self, name, default, fmt)
2582 def any2i_one(self, pkt, x):
2583 # type: (Optional[Packet], Any) -> I
2584 if isinstance(x, Enum):
2585 return cast(I, x.value)
2586 elif isinstance(x, str):
2587 if self.s2i:
2588 x = self.s2i[x]
2589 elif self.s2i_cb:
2590 x = self.s2i_cb(x)
2591 return cast(I, x)
2593 def _i2repr(self, pkt, x):
2594 # type: (Optional[Packet], I) -> str
2595 return repr(x)
2597 def i2repr_one(self, pkt, x):
2598 # type: (Optional[Packet], I) -> str
2599 if self not in conf.noenum and not isinstance(x, VolatileValue):
2600 if self.i2s:
2601 try:
2602 return self.i2s[x]
2603 except KeyError:
2604 pass
2605 elif self.i2s_cb:
2606 ret = self.i2s_cb(x)
2607 if ret is not None:
2608 return ret
2609 return self._i2repr(pkt, x)
2611 def any2i(self, pkt, x):
2612 # type: (Optional[Packet], Any) -> Union[I, List[I]]
2613 if isinstance(x, list):
2614 return [self.any2i_one(pkt, z) for z in x]
2615 else:
2616 return self.any2i_one(pkt, x)
2618 def i2repr(self, pkt, x): # type: ignore
2619 # type: (Optional[Packet], Any) -> Union[List[str], str]
2620 if isinstance(x, list):
2621 return [self.i2repr_one(pkt, z) for z in x]
2622 else:
2623 return self.i2repr_one(pkt, x)
2625 def notify_set(self, enum, key, value):
2626 # type: (ObservableDict, I, str) -> None
2627 ks = "0x%x" if isinstance(key, int) else "%s"
2628 log_runtime.debug(
2629 "At %s: Change to %s at " + ks, self, value, key
2630 )
2631 if self.i2s is not None and self.s2i is not None:
2632 self.i2s[key] = value
2633 self.s2i[value] = key
2635 def notify_del(self, enum, key):
2636 # type: (ObservableDict, I) -> None
2637 ks = "0x%x" if isinstance(key, int) else "%s"
2638 log_runtime.debug("At %s: Delete value at " + ks, self, key)
2639 if self.i2s is not None and self.s2i is not None:
2640 value = self.i2s[key]
2641 del self.i2s[key]
2642 del self.s2i[value]
2645class EnumField(_EnumField[I]):
2646 __slots__ = ["i2s", "s2i", "s2i_cb", "i2s_cb"]
2649class CharEnumField(EnumField[str]):
2650 def __init__(self,
2651 name, # type: str
2652 default, # type: str
2653 enum, # type: _EnumType[str]
2654 fmt="1s", # type: str
2655 ):
2656 # type: (...) -> None
2657 super(CharEnumField, self).__init__(name, default, enum, fmt)
2658 if self.i2s is not None:
2659 k = list(self.i2s)
2660 if k and len(k[0]) != 1:
2661 self.i2s, self.s2i = self.s2i, self.i2s
2663 def any2i_one(self, pkt, x):
2664 # type: (Optional[Packet], str) -> str
2665 if len(x) != 1:
2666 if self.s2i:
2667 x = self.s2i[x]
2668 elif self.s2i_cb:
2669 x = self.s2i_cb(x)
2670 return x
2673class BitEnumField(_BitField[Union[List[int], int]], _EnumField[int]):
2674 __slots__ = EnumField.__slots__
2676 def __init__(self,
2677 name, # type: str
2678 default, # type: Optional[int]
2679 size, # type: int
2680 enum, # type: _EnumType[int]
2681 **kwargs # type: Any
2682 ):
2683 # type: (...) -> None
2684 _EnumField.__init__(self, name, default, enum)
2685 _BitField.__init__(self, name, default, size, **kwargs)
2687 def any2i(self, pkt, x):
2688 # type: (Optional[Packet], Any) -> Union[List[int], int]
2689 return _EnumField.any2i(self, pkt, x)
2691 def i2repr(self,
2692 pkt, # type: Optional[Packet]
2693 x, # type: Union[List[int], int]
2694 ):
2695 # type: (...) -> Any
2696 return _EnumField.i2repr(self, pkt, x)
2699class BitLenEnumField(BitLenField, _EnumField[int]):
2700 __slots__ = EnumField.__slots__
2702 def __init__(self,
2703 name, # type: str
2704 default, # type: Optional[int]
2705 length_from, # type: Callable[[Packet], int]
2706 enum, # type: _EnumType[int]
2707 **kwargs, # type: Any
2708 ):
2709 # type: (...) -> None
2710 _EnumField.__init__(self, name, default, enum)
2711 BitLenField.__init__(self, name, default, length_from, **kwargs)
2713 def any2i(self, pkt, x):
2714 # type: (Optional[Packet], Any) -> int
2715 return _EnumField.any2i(self, pkt, x) # type: ignore
2717 def i2repr(self,
2718 pkt, # type: Optional[Packet]
2719 x, # type: Union[List[int], int]
2720 ):
2721 # type: (...) -> Any
2722 return _EnumField.i2repr(self, pkt, x)
2725class ShortEnumField(EnumField[int]):
2726 __slots__ = EnumField.__slots__
2728 def __init__(self,
2729 name, # type: str
2730 default, # type: Optional[int]
2731 enum, # type: _EnumType[int]
2732 ):
2733 # type: (...) -> None
2734 super(ShortEnumField, self).__init__(name, default, enum, "H")
2737class LEShortEnumField(EnumField[int]):
2738 def __init__(self,
2739 name, # type: str
2740 default, # type: Optional[int]
2741 enum, # type: _EnumType[int]
2742 ):
2743 # type: (...) -> None
2744 super(LEShortEnumField, self).__init__(name, default, enum, "<H")
2747class LongEnumField(EnumField[int]):
2748 def __init__(self,
2749 name, # type: str
2750 default, # type: Optional[int]
2751 enum, # type: _EnumType[int]
2752 ):
2753 # type: (...) -> None
2754 super(LongEnumField, self).__init__(name, default, enum, "Q")
2757class LELongEnumField(EnumField[int]):
2758 def __init__(self,
2759 name, # type: str
2760 default, # type: Optional[int]
2761 enum, # type: _EnumType[int]
2762 ):
2763 # type: (...) -> None
2764 super(LELongEnumField, self).__init__(name, default, enum, "<Q")
2767class ByteEnumField(EnumField[int]):
2768 def __init__(self,
2769 name, # type: str
2770 default, # type: Optional[int]
2771 enum, # type: _EnumType[int]
2772 ):
2773 # type: (...) -> None
2774 super(ByteEnumField, self).__init__(name, default, enum, "B")
2777class XByteEnumField(ByteEnumField):
2778 def i2repr_one(self, pkt, x):
2779 # type: (Optional[Packet], int) -> str
2780 if self not in conf.noenum and not isinstance(x, VolatileValue):
2781 if self.i2s:
2782 try:
2783 return self.i2s[x]
2784 except KeyError:
2785 pass
2786 elif self.i2s_cb:
2787 ret = self.i2s_cb(x)
2788 if ret is not None:
2789 return ret
2790 return lhex(x)
2793class IntEnumField(EnumField[int]):
2794 def __init__(self,
2795 name, # type: str
2796 default, # type: Optional[int]
2797 enum, # type: _EnumType[int]
2798 ):
2799 # type: (...) -> None
2800 super(IntEnumField, self).__init__(name, default, enum, "I")
2803class SignedIntEnumField(EnumField[int]):
2804 def __init__(self,
2805 name, # type: str
2806 default, # type: Optional[int]
2807 enum, # type: _EnumType[int]
2808 ):
2809 # type: (...) -> None
2810 super(SignedIntEnumField, self).__init__(name, default, enum, "i")
2813class LEIntEnumField(EnumField[int]):
2814 def __init__(self,
2815 name, # type: str
2816 default, # type: Optional[int]
2817 enum, # type: _EnumType[int]
2818 ):
2819 # type: (...) -> None
2820 super(LEIntEnumField, self).__init__(name, default, enum, "<I")
2823class XLEIntEnumField(LEIntEnumField):
2824 def _i2repr(self, pkt, x):
2825 # type: (Optional[Packet], Any) -> str
2826 return lhex(x)
2829class XShortEnumField(ShortEnumField):
2830 def _i2repr(self, pkt, x):
2831 # type: (Optional[Packet], Any) -> str
2832 return lhex(x)
2835class LE3BytesEnumField(LEThreeBytesField, _EnumField[int]):
2836 __slots__ = EnumField.__slots__
2838 def __init__(self,
2839 name, # type: str
2840 default, # type: Optional[int]
2841 enum, # type: _EnumType[int]
2842 ):
2843 # type: (...) -> None
2844 _EnumField.__init__(self, name, default, enum)
2845 LEThreeBytesField.__init__(self, name, default)
2847 def any2i(self, pkt, x):
2848 # type: (Optional[Packet], Any) -> int
2849 return _EnumField.any2i(self, pkt, x) # type: ignore
2851 def i2repr(self, pkt, x): # type: ignore
2852 # type: (Optional[Packet], Any) -> Union[List[str], str]
2853 return _EnumField.i2repr(self, pkt, x)
2856class XLE3BytesEnumField(LE3BytesEnumField):
2857 def _i2repr(self, pkt, x):
2858 # type: (Optional[Packet], Any) -> str
2859 return lhex(x)
2862class _MultiEnumField(_EnumField[I]):
2863 def __init__(self,
2864 name, # type: str
2865 default, # type: int
2866 enum, # type: Dict[I, Dict[I, str]]
2867 depends_on, # type: Callable[[Optional[Packet]], I]
2868 fmt="H" # type: str
2869 ):
2870 # type: (...) -> None
2872 self.depends_on = depends_on
2873 self.i2s_multi = enum
2874 self.s2i_multi = {} # type: Dict[I, Dict[str, I]]
2875 self.s2i_all = {} # type: Dict[str, I]
2876 for m in enum:
2877 s2i = {} # type: Dict[str, I]
2878 self.s2i_multi[m] = s2i
2879 for k, v in enum[m].items():
2880 s2i[v] = k
2881 self.s2i_all[v] = k
2882 Field.__init__(self, name, default, fmt)
2884 def any2i_one(self, pkt, x):
2885 # type: (Optional[Packet], Any) -> I
2886 if isinstance(x, str):
2887 v = self.depends_on(pkt)
2888 if v in self.s2i_multi:
2889 s2i = self.s2i_multi[v]
2890 if x in s2i:
2891 return s2i[x]
2892 return self.s2i_all[x]
2893 return cast(I, x)
2895 def i2repr_one(self, pkt, x):
2896 # type: (Optional[Packet], I) -> str
2897 v = self.depends_on(pkt)
2898 if isinstance(v, VolatileValue):
2899 return repr(v)
2900 if v in self.i2s_multi:
2901 return str(self.i2s_multi[v].get(x, x))
2902 return str(x)
2905class MultiEnumField(_MultiEnumField[int], EnumField[int]):
2906 __slots__ = ["depends_on", "i2s_multi", "s2i_multi", "s2i_all"]
2909class BitMultiEnumField(_BitField[Union[List[int], int]],
2910 _MultiEnumField[int]):
2911 __slots__ = EnumField.__slots__ + MultiEnumField.__slots__
2913 def __init__(
2914 self,
2915 name, # type: str
2916 default, # type: int
2917 size, # type: int
2918 enum, # type: Dict[int, Dict[int, str]]
2919 depends_on # type: Callable[[Optional[Packet]], int]
2920 ):
2921 # type: (...) -> None
2922 _MultiEnumField.__init__(self, name, default, enum, depends_on)
2923 self.rev = size < 0
2924 self.size = abs(size)
2925 self.sz = self.size / 8. # type: ignore
2927 def any2i(self, pkt, x):
2928 # type: (Optional[Packet], Any) -> Union[List[int], int]
2929 return _MultiEnumField[int].any2i(
2930 self, # type: ignore
2931 pkt,
2932 x
2933 )
2935 def i2repr( # type: ignore
2936 self,
2937 pkt, # type: Optional[Packet]
2938 x # type: Union[List[int], int]
2939 ):
2940 # type: (...) -> Union[str, List[str]]
2941 return _MultiEnumField[int].i2repr(
2942 self, # type: ignore
2943 pkt,
2944 x
2945 )
2948class ByteEnumKeysField(ByteEnumField):
2949 """ByteEnumField that picks valid values when fuzzed. """
2951 def randval(self):
2952 # type: () -> RandEnumKeys
2953 return RandEnumKeys(self.i2s or {})
2956class ShortEnumKeysField(ShortEnumField):
2957 """ShortEnumField that picks valid values when fuzzed. """
2959 def randval(self):
2960 # type: () -> RandEnumKeys
2961 return RandEnumKeys(self.i2s or {})
2964class IntEnumKeysField(IntEnumField):
2965 """IntEnumField that picks valid values when fuzzed. """
2967 def randval(self):
2968 # type: () -> RandEnumKeys
2969 return RandEnumKeys(self.i2s or {})
2972# Little endian fixed length field
2975class LEFieldLenField(FieldLenField):
2976 def __init__(
2977 self,
2978 name, # type: str
2979 default, # type: Optional[Any]
2980 length_of=None, # type: Optional[str]
2981 fmt="<H", # type: str
2982 count_of=None, # type: Optional[str]
2983 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int]
2984 ):
2985 # type: (...) -> None
2986 FieldLenField.__init__(
2987 self, name, default,
2988 length_of=length_of,
2989 fmt=fmt,
2990 count_of=count_of,
2991 adjust=adjust
2992 )
2995class FlagValueIter(object):
2997 __slots__ = ["flagvalue", "cursor"]
2999 def __init__(self, flagvalue):
3000 # type: (FlagValue) -> None
3001 self.flagvalue = flagvalue
3002 self.cursor = 0
3004 def __iter__(self):
3005 # type: () -> FlagValueIter
3006 return self
3008 def __next__(self):
3009 # type: () -> str
3010 x = int(self.flagvalue)
3011 x >>= self.cursor
3012 while x:
3013 self.cursor += 1
3014 if x & 1:
3015 return self.flagvalue.names[self.cursor - 1]
3016 x >>= 1
3017 raise StopIteration
3019 next = __next__
3022class FlagValue(object):
3023 __slots__ = ["value", "names", "multi"]
3025 def _fixvalue(self, value):
3026 # type: (Any) -> int
3027 if not value:
3028 return 0
3029 if isinstance(value, str):
3030 value = value.split('+') if self.multi else list(value)
3031 if isinstance(value, list):
3032 y = 0
3033 for i in value:
3034 y |= 1 << self.names.index(i)
3035 value = y
3036 return int(value)
3038 def __init__(self, value, names):
3039 # type: (Union[List[str], int, str], Union[List[str], str]) -> None
3040 self.multi = isinstance(names, list)
3041 self.names = names
3042 self.value = self._fixvalue(value)
3044 def __hash__(self):
3045 # type: () -> int
3046 return hash(self.value)
3048 def __int__(self):
3049 # type: () -> int
3050 return self.value
3052 def __eq__(self, other):
3053 # type: (Any) -> bool
3054 return self.value == self._fixvalue(other)
3056 def __lt__(self, other):
3057 # type: (Any) -> bool
3058 return self.value < self._fixvalue(other)
3060 def __le__(self, other):
3061 # type: (Any) -> bool
3062 return self.value <= self._fixvalue(other)
3064 def __gt__(self, other):
3065 # type: (Any) -> bool
3066 return self.value > self._fixvalue(other)
3068 def __ge__(self, other):
3069 # type: (Any) -> bool
3070 return self.value >= self._fixvalue(other)
3072 def __ne__(self, other):
3073 # type: (Any) -> bool
3074 return self.value != self._fixvalue(other)
3076 def __and__(self, other):
3077 # type: (int) -> FlagValue
3078 return self.__class__(self.value & self._fixvalue(other), self.names)
3079 __rand__ = __and__
3081 def __or__(self, other):
3082 # type: (int) -> FlagValue
3083 return self.__class__(self.value | self._fixvalue(other), self.names)
3084 __ror__ = __or__
3085 __add__ = __or__ # + is an alias for |
3087 def __sub__(self, other):
3088 # type: (int) -> FlagValue
3089 return self.__class__(
3090 self.value & (2 ** len(self.names) - 1 - self._fixvalue(other)),
3091 self.names
3092 )
3094 def __xor__(self, other):
3095 # type: (int) -> FlagValue
3096 return self.__class__(self.value ^ self._fixvalue(other), self.names)
3098 def __lshift__(self, other):
3099 # type: (int) -> int
3100 return self.value << self._fixvalue(other)
3102 def __rshift__(self, other):
3103 # type: (int) -> int
3104 return self.value >> self._fixvalue(other)
3106 def __nonzero__(self):
3107 # type: () -> bool
3108 return bool(self.value)
3109 __bool__ = __nonzero__
3111 def flagrepr(self):
3112 # type: () -> str
3113 warnings.warn(
3114 "obj.flagrepr() is obsolete. Use str(obj) instead.",
3115 DeprecationWarning
3116 )
3117 return str(self)
3119 def __str__(self):
3120 # type: () -> str
3121 i = 0
3122 r = []
3123 x = int(self)
3124 while x:
3125 if x & 1:
3126 try:
3127 name = self.names[i]
3128 except IndexError:
3129 name = "?"
3130 r.append(name)
3131 i += 1
3132 x >>= 1
3133 return ("+" if self.multi else "").join(r)
3135 def __iter__(self):
3136 # type: () -> FlagValueIter
3137 return FlagValueIter(self)
3139 def __repr__(self):
3140 # type: () -> str
3141 return "<Flag %d (%s)>" % (self, self)
3143 def __deepcopy__(self, memo):
3144 # type: (Dict[Any, Any]) -> FlagValue
3145 return self.__class__(int(self), self.names)
3147 def __getattr__(self, attr):
3148 # type: (str) -> Any
3149 if attr in self.__slots__:
3150 return super(FlagValue, self).__getattribute__(attr)
3151 try:
3152 if self.multi:
3153 return bool((2 ** self.names.index(attr)) & int(self))
3154 return all(bool((2 ** self.names.index(flag)) & int(self))
3155 for flag in attr)
3156 except ValueError:
3157 if '_' in attr:
3158 try:
3159 return self.__getattr__(attr.replace('_', '-'))
3160 except AttributeError:
3161 pass
3162 return super(FlagValue, self).__getattribute__(attr)
3164 def __setattr__(self, attr, value):
3165 # type: (str, Union[List[str], int, str]) -> None
3166 if attr == "value" and not isinstance(value, int):
3167 raise ValueError(value)
3168 if attr in self.__slots__:
3169 return super(FlagValue, self).__setattr__(attr, value)
3170 if attr in self.names:
3171 if value:
3172 self.value |= (2 ** self.names.index(attr))
3173 else:
3174 self.value &= ~(2 ** self.names.index(attr))
3175 else:
3176 return super(FlagValue, self).__setattr__(attr, value)
3178 def copy(self):
3179 # type: () -> FlagValue
3180 return self.__class__(self.value, self.names)
3183class FlagsField(_BitField[Optional[Union[int, FlagValue]]]):
3184 """ Handle Flag type field
3186 Make sure all your flags have a label
3188 Example (list):
3189 >>> from scapy.packet import Packet
3190 >>> class FlagsTest(Packet):
3191 fields_desc = [FlagsField("flags", 0, 8, ["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7"])] # noqa: E501
3192 >>> FlagsTest(flags=9).show2()
3193 ###[ FlagsTest ]###
3194 flags = f0+f3
3196 Example (str):
3197 >>> from scapy.packet import Packet
3198 >>> class TCPTest(Packet):
3199 fields_desc = [
3200 BitField("reserved", 0, 7),
3201 FlagsField("flags", 0x2, 9, "FSRPAUECN")
3202 ]
3203 >>> TCPTest(flags=3).show2()
3204 ###[ FlagsTest ]###
3205 reserved = 0
3206 flags = FS
3208 Example (dict):
3209 >>> from scapy.packet import Packet
3210 >>> class FlagsTest2(Packet):
3211 fields_desc = [
3212 FlagsField("flags", 0x2, 16, {
3213 0x0001: "A",
3214 0x0008: "B",
3215 })
3216 ]
3218 :param name: field's name
3219 :param default: default value for the field
3220 :param size: number of bits in the field (in bits). if negative, LE
3221 :param names: (list or str or dict) label for each flag
3222 If it's a str or a list, the least Significant Bit tag's name
3223 is written first.
3224 """
3225 ismutable = True
3226 __slots__ = ["names"]
3228 def __init__(self,
3229 name, # type: str
3230 default, # type: Optional[Union[int, FlagValue]]
3231 size, # type: int
3232 names, # type: Union[List[str], str, Dict[int, str]]
3233 **kwargs # type: Any
3234 ):
3235 # type: (...) -> None
3236 # Convert the dict to a list
3237 if isinstance(names, dict):
3238 tmp = ["bit_%d" % i for i in range(abs(size))]
3239 for i, v in names.items():
3240 tmp[int(math.floor(math.log(i, 2)))] = v
3241 names = tmp
3242 # Store the names as str or list
3243 self.names = names
3244 super(FlagsField, self).__init__(name, default, size, **kwargs)
3246 def _fixup_val(self, x):
3247 # type: (Any) -> Optional[FlagValue]
3248 """Returns a FlagValue instance when needed. Internal method, to be
3249used in *2i() and i2*() methods.
3251 """
3252 if isinstance(x, (FlagValue, VolatileValue)):
3253 return x # type: ignore
3254 if x is None:
3255 return None
3256 return FlagValue(x, self.names)
3258 def any2i(self, pkt, x):
3259 # type: (Optional[Packet], Any) -> Optional[FlagValue]
3260 return self._fixup_val(super(FlagsField, self).any2i(pkt, x))
3262 def m2i(self, pkt, x):
3263 # type: (Optional[Packet], int) -> Optional[FlagValue]
3264 return self._fixup_val(super(FlagsField, self).m2i(pkt, x))
3266 def i2h(self, pkt, x):
3267 # type: (Optional[Packet], Any) -> Optional[FlagValue]
3268 return self._fixup_val(super(FlagsField, self).i2h(pkt, x))
3270 def i2repr(self,
3271 pkt, # type: Optional[Packet]
3272 x, # type: Any
3273 ):
3274 # type: (...) -> str
3275 if isinstance(x, (list, tuple)):
3276 return repr(type(x)(
3277 "None" if v is None else str(self._fixup_val(v)) for v in x
3278 ))
3279 return "None" if x is None else str(self._fixup_val(x))
3282MultiFlagsEntry = collections.namedtuple('MultiFlagsEntry', ['short', 'long'])
3285class MultiFlagsField(_BitField[Set[str]]):
3286 __slots__ = FlagsField.__slots__ + ["depends_on"]
3288 def __init__(self,
3289 name, # type: str
3290 default, # type: Set[str]
3291 size, # type: int
3292 names, # type: Dict[int, Dict[int, MultiFlagsEntry]]
3293 depends_on, # type: Callable[[Optional[Packet]], int]
3294 ):
3295 # type: (...) -> None
3296 self.names = names
3297 self.depends_on = depends_on
3298 super(MultiFlagsField, self).__init__(name, default, size)
3300 def any2i(self, pkt, x):
3301 # type: (Optional[Packet], Any) -> Set[str]
3302 if not isinstance(x, (set, int)):
3303 raise ValueError('set expected')
3305 if pkt is not None:
3306 if isinstance(x, int):
3307 return self.m2i(pkt, x)
3308 else:
3309 v = self.depends_on(pkt)
3310 if v is not None:
3311 assert v in self.names, 'invalid dependency'
3312 these_names = self.names[v]
3313 s = set()
3314 for i in x:
3315 for val in these_names.values():
3316 if val.short == i:
3317 s.add(i)
3318 break
3319 else:
3320 assert False, 'Unknown flag "{}" with this dependency'.format(i) # noqa: E501
3321 continue
3322 return s
3323 if isinstance(x, int):
3324 return set()
3325 return x
3327 def i2m(self, pkt, x):
3328 # type: (Optional[Packet], Optional[Set[str]]) -> int
3329 v = self.depends_on(pkt)
3330 these_names = self.names.get(v, {})
3332 r = 0
3333 if x is None:
3334 return r
3335 for flag_set in x:
3336 for i, val in these_names.items():
3337 if val.short == flag_set:
3338 r |= 1 << i
3339 break
3340 else:
3341 r |= 1 << int(flag_set[len('bit '):])
3342 return r
3344 def m2i(self, pkt, x):
3345 # type: (Optional[Packet], int) -> Set[str]
3346 v = self.depends_on(pkt)
3347 these_names = self.names.get(v, {})
3349 r = set()
3350 i = 0
3351 while x:
3352 if x & 1:
3353 if i in these_names:
3354 r.add(these_names[i].short)
3355 else:
3356 r.add('bit {}'.format(i))
3357 x >>= 1
3358 i += 1
3359 return r
3361 def i2repr(self, pkt, x):
3362 # type: (Optional[Packet], Set[str]) -> str
3363 v = self.depends_on(pkt)
3364 these_names = self.names.get(v, {})
3366 r = set()
3367 for flag_set in x:
3368 for i in these_names.values():
3369 if i.short == flag_set:
3370 r.add("{} ({})".format(i.long, i.short))
3371 break
3372 else:
3373 r.add(flag_set)
3374 return repr(r)
3377class FixedPointField(BitField):
3378 __slots__ = ['frac_bits']
3380 def __init__(self, name, default, size, frac_bits=16):
3381 # type: (str, int, int, int) -> None
3382 self.frac_bits = frac_bits
3383 super(FixedPointField, self).__init__(name, default, size)
3385 def any2i(self, pkt, val):
3386 # type: (Optional[Packet], Optional[float]) -> Optional[int]
3387 if val is None:
3388 return val
3389 ival = int(val)
3390 fract = int((val - ival) * 2**self.frac_bits)
3391 return (ival << self.frac_bits) | fract
3393 def i2h(self, pkt, val):
3394 # type: (Optional[Packet], Optional[int]) -> Optional[EDecimal]
3395 # A bit of trickery to get precise floats
3396 if val is None:
3397 return val
3398 int_part = val >> self.frac_bits
3399 pw = 2.0**self.frac_bits
3400 frac_part = EDecimal(val & (1 << self.frac_bits) - 1)
3401 frac_part /= pw # type: ignore
3402 return int_part + frac_part.normalize(int(math.log10(pw)))
3404 def i2repr(self, pkt, val):
3405 # type: (Optional[Packet], int) -> str
3406 return str(self.i2h(pkt, val))
3409# Base class for IPv4 and IPv6 Prefixes inspired by IPField and IP6Field.
3410# Machine values are encoded in a multiple of wordbytes bytes.
3411class _IPPrefixFieldBase(Field[Tuple[str, int], Tuple[bytes, int]]):
3412 __slots__ = ["wordbytes", "maxbytes", "aton", "ntoa", "length_from"]
3414 def __init__(
3415 self,
3416 name, # type: str
3417 default, # type: Tuple[str, int]
3418 wordbytes, # type: int
3419 maxbytes, # type: int
3420 aton, # type: Callable[..., Any]
3421 ntoa, # type: Callable[..., Any]
3422 length_from=None # type: Optional[Callable[[Packet], int]]
3423 ):
3424 # type: (...) -> None
3425 self.wordbytes = wordbytes
3426 self.maxbytes = maxbytes
3427 self.aton = aton
3428 self.ntoa = ntoa
3429 Field.__init__(self, name, default, "%is" % self.maxbytes)
3430 if length_from is None:
3431 length_from = lambda x: 0
3432 self.length_from = length_from
3434 def _numbytes(self, pfxlen):
3435 # type: (int) -> int
3436 wbits = self.wordbytes * 8
3437 return ((pfxlen + (wbits - 1)) // wbits) * self.wordbytes
3439 def h2i(self, pkt, x):
3440 # type: (Optional[Packet], str) -> Tuple[str, int]
3441 # "fc00:1::1/64" -> ("fc00:1::1", 64)
3442 [pfx, pfxlen] = x.split('/')
3443 self.aton(pfx) # check for validity
3444 return (pfx, int(pfxlen))
3446 def i2h(self, pkt, x):
3447 # type: (Optional[Packet], Tuple[str, int]) -> str
3448 # ("fc00:1::1", 64) -> "fc00:1::1/64"
3449 (pfx, pfxlen) = x
3450 return "%s/%i" % (pfx, pfxlen)
3452 def i2m(self,
3453 pkt, # type: Optional[Packet]
3454 x # type: Optional[Tuple[str, int]]
3455 ):
3456 # type: (...) -> Tuple[bytes, int]
3457 # ("fc00:1::1", 64) -> (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) # noqa: E501
3458 if x is None:
3459 pfx, pfxlen = "", 0
3460 else:
3461 (pfx, pfxlen) = x
3462 s = self.aton(pfx)
3463 return (s[:self._numbytes(pfxlen)], pfxlen)
3465 def m2i(self, pkt, x):
3466 # type: (Optional[Packet], Tuple[bytes, int]) -> Tuple[str, int]
3467 # (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) -> ("fc00:1::1", 64) # noqa: E501
3468 (s, pfxlen) = x
3470 if len(s) < self.maxbytes:
3471 s = s + (b"\0" * (self.maxbytes - len(s)))
3472 return (self.ntoa(s), pfxlen)
3474 def any2i(self, pkt, x):
3475 # type: (Optional[Packet], Optional[Any]) -> Tuple[str, int]
3476 if x is None:
3477 return (self.ntoa(b"\0" * self.maxbytes), 1)
3479 return self.h2i(pkt, x)
3481 def i2len(self, pkt, x):
3482 # type: (Packet, Tuple[str, int]) -> int
3483 (_, pfxlen) = x
3484 return pfxlen
3486 def addfield(self, pkt, s, val):
3487 # type: (Packet, bytes, Optional[Tuple[str, int]]) -> bytes
3488 (rawpfx, pfxlen) = self.i2m(pkt, val)
3489 fmt = "!%is" % self._numbytes(pfxlen)
3490 return s + struct.pack(fmt, rawpfx)
3492 def getfield(self, pkt, s):
3493 # type: (Packet, bytes) -> Tuple[bytes, Tuple[str, int]]
3494 pfxlen = self.length_from(pkt)
3495 numbytes = self._numbytes(pfxlen)
3496 fmt = "!%is" % numbytes
3497 return s[numbytes:], self.m2i(pkt, (struct.unpack(fmt, s[:numbytes])[0], pfxlen)) # noqa: E501
3500class IPPrefixField(_IPPrefixFieldBase):
3501 def __init__(
3502 self,
3503 name, # type: str
3504 default, # type: Tuple[str, int]
3505 wordbytes=1, # type: int
3506 length_from=None # type: Optional[Callable[[Packet], int]]
3507 ):
3508 _IPPrefixFieldBase.__init__(
3509 self,
3510 name,
3511 default,
3512 wordbytes,
3513 4,
3514 inet_aton,
3515 inet_ntoa,
3516 length_from
3517 )
3520class IP6PrefixField(_IPPrefixFieldBase):
3521 def __init__(
3522 self,
3523 name, # type: str
3524 default, # type: Tuple[str, int]
3525 wordbytes=1, # type: int
3526 length_from=None # type: Optional[Callable[[Packet], int]]
3527 ):
3528 # type: (...) -> None
3529 _IPPrefixFieldBase.__init__(
3530 self,
3531 name,
3532 default,
3533 wordbytes,
3534 16,
3535 lambda a: inet_pton(socket.AF_INET6, a),
3536 lambda n: inet_ntop(socket.AF_INET6, n),
3537 length_from
3538 )
3541class UTCTimeField(Field[float, int]):
3542 __slots__ = ["epoch", "delta", "strf",
3543 "use_msec", "use_micro", "use_nano", "custom_scaling"]
3545 def __init__(self,
3546 name, # type: str
3547 default, # type: int
3548 use_msec=False, # type: bool
3549 use_micro=False, # type: bool
3550 use_nano=False, # type: bool
3551 epoch=None, # type: Optional[Tuple[int, int, int, int, int, int, int, int, int]] # noqa: E501
3552 strf="%a, %d %b %Y %H:%M:%S %z", # type: str
3553 custom_scaling=None, # type: Optional[int]
3554 fmt="I" # type: str
3555 ):
3556 # type: (...) -> None
3557 Field.__init__(self, name, default, fmt=fmt)
3558 mk_epoch = EPOCH if epoch is None else calendar.timegm(epoch)
3559 self.epoch = mk_epoch
3560 self.delta = mk_epoch - EPOCH
3561 self.strf = strf
3562 self.use_msec = use_msec
3563 self.use_micro = use_micro
3564 self.use_nano = use_nano
3565 self.custom_scaling = custom_scaling
3567 def i2repr(self, pkt, x):
3568 # type: (Optional[Packet], float) -> str
3569 if x is None:
3570 x = time.time() - self.delta
3571 elif self.use_msec:
3572 x = x / 1e3
3573 elif self.use_micro:
3574 x = x / 1e6
3575 elif self.use_nano:
3576 x = x / 1e9
3577 elif self.custom_scaling:
3578 x = x / self.custom_scaling
3579 x += self.delta
3580 # To make negative timestamps work on all plateforms (e.g. Windows),
3581 # we need a trick.
3582 t = (
3583 datetime.datetime(1970, 1, 1) +
3584 datetime.timedelta(seconds=x)
3585 ).strftime(self.strf)
3586 return "%s (%d)" % (t, int(x))
3588 def i2m(self, pkt, x):
3589 # type: (Optional[Packet], Optional[float]) -> int
3590 if x is None:
3591 x = time.time() - self.delta
3592 if self.use_msec:
3593 x = x * 1e3
3594 elif self.use_micro:
3595 x = x * 1e6
3596 elif self.use_nano:
3597 x = x * 1e9
3598 elif self.custom_scaling:
3599 x = x * self.custom_scaling
3600 return int(x)
3601 return int(x)
3604class SecondsIntField(Field[float, int]):
3605 __slots__ = ["use_msec", "use_micro", "use_nano"]
3607 def __init__(self, name, default,
3608 use_msec=False,
3609 use_micro=False,
3610 use_nano=False):
3611 # type: (str, int, bool, bool, bool) -> None
3612 Field.__init__(self, name, default, "I")
3613 self.use_msec = use_msec
3614 self.use_micro = use_micro
3615 self.use_nano = use_nano
3617 def i2repr(self, pkt, x):
3618 # type: (Optional[Packet], Optional[float]) -> str
3619 if x is None:
3620 y = 0 # type: Union[int, float]
3621 elif self.use_msec:
3622 y = x / 1e3
3623 elif self.use_micro:
3624 y = x / 1e6
3625 elif self.use_nano:
3626 y = x / 1e9
3627 else:
3628 y = x
3629 return "%s sec" % y
3632class _ScalingField(object):
3633 def __init__(self,
3634 name, # type: str
3635 default, # type: float
3636 scaling=1, # type: Union[int, float]
3637 unit="", # type: str
3638 offset=0, # type: Union[int, float]
3639 ndigits=3, # type: int
3640 fmt="B", # type: str
3641 ):
3642 # type: (...) -> None
3643 self.scaling = scaling
3644 self.unit = unit
3645 self.offset = offset
3646 self.ndigits = ndigits
3647 Field.__init__(self, name, default, fmt) # type: ignore
3649 def i2m(self,
3650 pkt, # type: Optional[Packet]
3651 x # type: Optional[Union[int, float]]
3652 ):
3653 # type: (...) -> Union[int, float]
3654 if x is None:
3655 x = 0
3656 x = (x - self.offset) / self.scaling
3657 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore
3658 x = int(round(x))
3659 return x
3661 def m2i(self, pkt, x):
3662 # type: (Optional[Packet], Union[int, float]) -> Union[int, float]
3663 x = x * self.scaling + self.offset
3664 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore
3665 x = round(x, self.ndigits)
3666 return x
3668 def any2i(self, pkt, x):
3669 # type: (Optional[Packet], Any) -> Union[int, float]
3670 if isinstance(x, (str, bytes)):
3671 x = struct.unpack(self.fmt, bytes_encode(x))[0] # type: ignore
3672 x = self.m2i(pkt, x)
3673 if not isinstance(x, (int, float)):
3674 raise ValueError("Unknown type")
3675 return x
3677 def i2repr(self, pkt, x):
3678 # type: (Optional[Packet], Union[int, float]) -> str
3679 return "%s %s" % (
3680 self.i2h(pkt, x), # type: ignore
3681 self.unit
3682 )
3684 def randval(self):
3685 # type: () -> RandFloat
3686 value = Field.randval(self) # type: ignore
3687 if value is not None:
3688 min_val = round(value.min * self.scaling + self.offset,
3689 self.ndigits)
3690 max_val = round(value.max * self.scaling + self.offset,
3691 self.ndigits)
3693 return RandFloat(min(min_val, max_val), max(min_val, max_val))
3696class ScalingField(_ScalingField,
3697 Field[Union[int, float], Union[int, float]]):
3698 """ Handle physical values which are scaled and/or offset for communication
3700 Example:
3701 >>> from scapy.packet import Packet
3702 >>> class ScalingFieldTest(Packet):
3703 fields_desc = [ScalingField('data', 0, scaling=0.1, offset=-1, unit='mV')] # noqa: E501
3704 >>> ScalingFieldTest(data=10).show2()
3705 ###[ ScalingFieldTest ]###
3706 data= 10.0 mV
3707 >>> hexdump(ScalingFieldTest(data=10))
3708 0000 6E n
3709 >>> hexdump(ScalingFieldTest(data=b"\x6D"))
3710 0000 6D m
3711 >>> ScalingFieldTest(data=b"\x6D").show2()
3712 ###[ ScalingFieldTest ]###
3713 data= 9.9 mV
3715 bytes(ScalingFieldTest(...)) will produce 0x6E in this example.
3716 0x6E is 110 (decimal). This is calculated through the scaling factor
3717 and the offset. "data" was set to 10, which means, we want to transfer
3718 the physical value 10 mV. To calculate the value, which has to be
3719 sent on the bus, the offset has to subtracted and the scaling has to be
3720 applied by division through the scaling factor.
3721 bytes = (data - offset) / scaling
3722 bytes = ( 10 - (-1) ) / 0.1
3723 bytes = 110 = 0x6E
3725 If you want to force a certain internal value, you can assign a byte-
3726 string to the field (data=b"\x6D"). If a string of a bytes object is
3727 given to the field, no internal value conversion will be applied
3729 :param name: field's name
3730 :param default: default value for the field
3731 :param scaling: scaling factor for the internal value conversion
3732 :param unit: string for the unit representation of the internal value
3733 :param offset: value to offset the internal value during conversion
3734 :param ndigits: number of fractional digits for the internal conversion
3735 :param fmt: struct.pack format used to parse and serialize the internal value from and to machine representation # noqa: E501
3736 """
3739class BitScalingField(_ScalingField, BitField): # type: ignore
3740 """
3741 A ScalingField that is a BitField
3742 """
3744 def __init__(self, name, default, size, *args, **kwargs):
3745 # type: (str, int, int, *Any, **Any) -> None
3746 _ScalingField.__init__(self, name, default, *args, **kwargs)
3747 BitField.__init__(self, name, default, size) # type: ignore
3750class OUIField(X3BytesField):
3751 """
3752 A field designed to carry a OUI (3 bytes)
3753 """
3755 def i2repr(self, pkt, val):
3756 # type: (Optional[Packet], int) -> str
3757 by_val = struct.pack("!I", val or 0)[1:]
3758 oui = str2mac(by_val + b"\0" * 3)[:8]
3759 if conf.manufdb:
3760 fancy = conf.manufdb._get_manuf(oui)
3761 if fancy != oui:
3762 return "%s (%s)" % (fancy, oui)
3763 return oui
3766class UUIDField(Field[UUID, bytes]):
3767 """Field for UUID storage, wrapping Python's uuid.UUID type.
3769 The internal storage format of this field is ``uuid.UUID`` from the Python
3770 standard library.
3772 There are three formats (``uuid_fmt``) for this field type:
3774 * ``FORMAT_BE`` (default): the UUID is six fields in big-endian byte order,
3775 per RFC 4122.
3777 This format is used by DHCPv6 (RFC 6355) and most network protocols.
3779 * ``FORMAT_LE``: the UUID is six fields, with ``time_low``, ``time_mid``
3780 and ``time_high_version`` in little-endian byte order. This *doesn't*
3781 change the arrangement of the fields from RFC 4122.
3783 This format is used by Microsoft's COM/OLE libraries.
3785 * ``FORMAT_REV``: the UUID is a single 128-bit integer in little-endian
3786 byte order. This *changes the arrangement* of the fields.
3788 This format is used by Bluetooth Low Energy.
3790 Note: You should use the constants here.
3792 The "human encoding" of this field supports a number of different input
3793 formats, and wraps Python's ``uuid.UUID`` library appropriately:
3795 * Given a bytearray, bytes or str of 16 bytes, this class decodes UUIDs in
3796 wire format.
3798 * Given a bytearray, bytes or str of other lengths, this delegates to
3799 ``uuid.UUID`` the Python standard library. This supports a number of
3800 different encoding options -- see the Python standard library
3801 documentation for more details.
3803 * Given an int or long, presumed to be a 128-bit integer to pass to
3804 ``uuid.UUID``.
3806 * Given a tuple:
3808 * Tuples of 11 integers are treated as having the last 6 integers forming
3809 the ``node`` field, and are merged before being passed as a tuple of 6
3810 integers to ``uuid.UUID``.
3812 * Otherwise, the tuple is passed as the ``fields`` parameter to
3813 ``uuid.UUID`` directly without modification.
3815 ``uuid.UUID`` expects a tuple of 6 integers.
3817 Other types (such as ``uuid.UUID``) are passed through.
3818 """
3820 __slots__ = ["uuid_fmt"]
3822 FORMAT_BE = 0
3823 FORMAT_LE = 1
3824 FORMAT_REV = 2
3826 # Change this when we get new formats
3827 FORMATS = (FORMAT_BE, FORMAT_LE, FORMAT_REV)
3829 def __init__(self, name, default, uuid_fmt=FORMAT_BE):
3830 # type: (str, Optional[int], int) -> None
3831 self.uuid_fmt = uuid_fmt
3832 self._check_uuid_fmt()
3833 Field.__init__(self, name, default, "16s")
3835 def _check_uuid_fmt(self):
3836 # type: () -> None
3837 """Checks .uuid_fmt, and raises an exception if it is not valid."""
3838 if self.uuid_fmt not in UUIDField.FORMATS:
3839 raise FieldValueRangeException(
3840 "Unsupported uuid_fmt ({})".format(self.uuid_fmt))
3842 def i2m(self, pkt, x):
3843 # type: (Optional[Packet], Optional[UUID]) -> bytes
3844 self._check_uuid_fmt()
3845 if x is None:
3846 return b'\0' * 16
3847 if self.uuid_fmt == UUIDField.FORMAT_BE:
3848 return x.bytes
3849 elif self.uuid_fmt == UUIDField.FORMAT_LE:
3850 return x.bytes_le
3851 elif self.uuid_fmt == UUIDField.FORMAT_REV:
3852 return x.bytes[::-1]
3853 else:
3854 raise FieldAttributeException("Unknown fmt")
3856 def m2i(self,
3857 pkt, # type: Optional[Packet]
3858 x, # type: bytes
3859 ):
3860 # type: (...) -> UUID
3861 self._check_uuid_fmt()
3862 if self.uuid_fmt == UUIDField.FORMAT_BE:
3863 return UUID(bytes=x)
3864 elif self.uuid_fmt == UUIDField.FORMAT_LE:
3865 return UUID(bytes_le=x)
3866 elif self.uuid_fmt == UUIDField.FORMAT_REV:
3867 return UUID(bytes=x[::-1])
3868 else:
3869 raise FieldAttributeException("Unknown fmt")
3871 def any2i(self,
3872 pkt, # type: Optional[Packet]
3873 x # type: Any # noqa: E501
3874 ):
3875 # type: (...) -> Optional[UUID]
3876 # Python's uuid doesn't handle bytearray, so convert to an immutable
3877 # type first.
3878 if isinstance(x, bytearray):
3879 x = bytes_encode(x)
3881 if isinstance(x, int):
3882 u = UUID(int=x)
3883 elif isinstance(x, tuple):
3884 if len(x) == 11:
3885 # For compatibility with dce_rpc: this packs into a tuple where
3886 # elements 7..10 are the 48-bit node ID.
3887 node = 0
3888 for i in x[5:]:
3889 node = (node << 8) | i
3891 x = (x[0], x[1], x[2], x[3], x[4], node)
3893 u = UUID(fields=x)
3894 elif isinstance(x, (str, bytes)):
3895 if len(x) == 16:
3896 # Raw bytes
3897 u = self.m2i(pkt, bytes_encode(x))
3898 else:
3899 u = UUID(plain_str(x))
3900 elif isinstance(x, (UUID, RandUUID)):
3901 u = cast(UUID, x)
3902 else:
3903 return None
3904 return u
3906 @staticmethod
3907 def randval():
3908 # type: () -> RandUUID
3909 return RandUUID()
3912class UUIDEnumField(UUIDField, _EnumField[UUID]):
3913 __slots__ = EnumField.__slots__
3915 def __init__(self, name, default, enum, uuid_fmt=0):
3916 # type: (str, Optional[int], Any, int) -> None
3917 _EnumField.__init__(self, name, default, enum, "16s") # type: ignore
3918 UUIDField.__init__(self, name, default, uuid_fmt=uuid_fmt)
3920 def any2i(self, pkt, x):
3921 # type: (Optional[Packet], Any) -> UUID
3922 return _EnumField.any2i(self, pkt, x) # type: ignore
3924 def i2repr(self,
3925 pkt, # type: Optional[Packet]
3926 x, # type: UUID
3927 ):
3928 # type: (...) -> Any
3929 return _EnumField.i2repr(self, pkt, x)
3932class BitExtendedField(Field[Optional[int], int]):
3933 """
3934 Low E Bit Extended Field
3936 This type of field has a variable number of bytes. Each byte is defined
3937 as follows:
3938 - 7 bits of data
3939 - 1 bit an an extension bit:
3941 + 0 means it is last byte of the field ("stopping bit")
3942 + 1 means there is another byte after this one ("forwarding bit")
3944 To get the actual data, it is necessary to hop the binary data byte per
3945 byte and to check the extension bit until 0
3946 """
3948 __slots__ = ["extension_bit"]
3950 def __init__(self, name, default, extension_bit):
3951 # type: (str, Optional[Any], int) -> None
3952 Field.__init__(self, name, default, "B")
3953 assert extension_bit in [7, 0]
3954 self.extension_bit = extension_bit
3956 def addfield(self, pkt, s, val):
3957 # type: (Optional[Packet], bytes, Optional[int]) -> bytes
3958 val = self.i2m(pkt, val)
3959 if not val:
3960 return s + b"\0"
3961 rv = b""
3962 mask = 1 << self.extension_bit
3963 shift = (self.extension_bit + 1) % 8
3964 while val:
3965 bv = (val & 0x7F) << shift
3966 val = val >> 7
3967 if val:
3968 bv |= mask
3969 rv += struct.pack("!B", bv)
3970 return s + rv
3972 def getfield(self, pkt, s):
3973 # type: (Optional[Any], bytes) -> Tuple[bytes, Optional[int]]
3974 val = 0
3975 smask = 1 << self.extension_bit
3976 mask = 0xFF & ~ (1 << self.extension_bit)
3977 shift = (self.extension_bit + 1) % 8
3978 i = 0
3979 while s:
3980 val |= ((s[0] & mask) >> shift) << (7 * i)
3981 if (s[0] & smask) == 0: # extension bit is 0
3982 # end
3983 s = s[1:]
3984 break
3985 s = s[1:]
3986 i += 1
3987 return s, self.m2i(pkt, val)
3990class LSBExtendedField(BitExtendedField):
3991 # This is a BitExtendedField with the extension bit on LSB
3992 def __init__(self, name, default):
3993 # type: (str, Optional[Any]) -> None
3994 BitExtendedField.__init__(self, name, default, extension_bit=0)
3997class MSBExtendedField(BitExtendedField):
3998 # This is a BitExtendedField with the extension bit on MSB
3999 def __init__(self, name, default):
4000 # type: (str, Optional[Any]) -> None
4001 BitExtendedField.__init__(self, name, default, extension_bit=7)