Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/base_classes.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Generators and packet meta classes.
8"""
10################
11# Generators #
12################
15from functools import reduce
16import abc
17import operator
18import os
19import random
20import re
21import socket
22import struct
23import subprocess
24import types
25import warnings
27import scapy
28from scapy.error import Scapy_Exception
29from scapy.consts import WINDOWS
31from typing import (
32 Any,
33 Dict,
34 Generic,
35 Iterator,
36 List,
37 Optional,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 cast,
43 TYPE_CHECKING,
44)
46if TYPE_CHECKING:
47 try:
48 import pyx
49 except ImportError:
50 pass
51 from scapy.packet import Packet
53_T = TypeVar("_T")
56class Gen(Generic[_T]):
57 __slots__ = [] # type: List[str]
59 def __iter__(self):
60 # type: () -> Iterator[_T]
61 return iter([])
63 def __iterlen__(self):
64 # type: () -> int
65 return sum(1 for _ in iter(self))
68def _get_values(value):
69 # type: (Any) -> Any
70 """Generate a range object from (start, stop[, step]) tuples, or
71 return value.
73 """
74 if (isinstance(value, tuple) and (2 <= len(value) <= 3) and
75 all(hasattr(i, "__int__") for i in value)):
76 # We use values[1] + 1 as stop value for (x)range to maintain
77 # the behavior of using tuples as field `values`
78 return range(*((int(value[0]), int(value[1]) + 1) +
79 tuple(int(v) for v in value[2:])))
80 return value
83class SetGen(Gen[_T]):
84 def __init__(self, values, _iterpacket=1):
85 # type: (Any, int) -> None
86 self._iterpacket = _iterpacket
87 if isinstance(values, (list, BasePacketList)):
88 self.values = [_get_values(val) for val in values]
89 else:
90 self.values = [_get_values(values)]
92 def __iter__(self):
93 # type: () -> Iterator[Any]
94 for i in self.values:
95 if (isinstance(i, Gen) and
96 (self._iterpacket or not isinstance(i, BasePacket))) or (
97 isinstance(i, (range, types.GeneratorType))):
98 for j in i:
99 yield j
100 else:
101 yield i
103 def __len__(self):
104 # type: () -> int
105 return self.__iterlen__()
107 def __repr__(self):
108 # type: () -> str
109 return "<SetGen %r>" % self.values
112class _ScopedIP(str):
113 """
114 A str that also holds extra attributes.
115 """
116 __slots__ = ["scope"]
118 def __init__(self, _: str) -> None:
119 self.scope = None
121 def __repr__(self) -> str:
122 val = super(_ScopedIP, self).__repr__()
123 if self.scope is not None:
124 return "ScopedIP(%s, scope=%s)" % (val, repr(self.scope))
125 return val
128def ScopedIP(net: str, scope: Optional[Any] = None) -> _ScopedIP:
129 """
130 An str that also holds extra attributes.
132 Examples::
134 >>> ScopedIP("224.0.0.1%eth0") # interface 'eth0'
135 >>> ScopedIP("224.0.0.1%1") # interface index 1
136 >>> ScopedIP("224.0.0.1", scope=conf.iface)
137 """
138 if "%" in net:
139 try:
140 net, scope = net.split("%", 1)
141 except ValueError:
142 raise Scapy_Exception("Scope identifier can only be present once !")
143 if scope is not None:
144 from scapy.interfaces import resolve_iface, network_name, dev_from_index
145 try:
146 iface = dev_from_index(int(scope))
147 except (ValueError, TypeError):
148 iface = resolve_iface(scope)
149 if not iface.is_valid():
150 raise Scapy_Exception(
151 "RFC6874 scope identifier '%s' could not be resolved to a "
152 "valid interface !" % scope
153 )
154 scope = network_name(iface)
155 x = _ScopedIP(net)
156 x.scope = scope
157 return x
160class Net(Gen[str]):
161 """
162 Network object from an IP address or hostname and mask
164 Examples:
166 - With mask::
168 >>> list(Net("192.168.0.1/24"))
169 ['192.168.0.0', '192.168.0.1', ..., '192.168.0.255']
171 - With 'end'::
173 >>> list(Net("192.168.0.100", "192.168.0.200"))
174 ['192.168.0.100', '192.168.0.101', ..., '192.168.0.200']
176 - With 'scope' (for multicast)::
178 >>> Net("224.0.0.1%lo")
179 >>> Net("224.0.0.1", scope=conf.iface)
180 """
181 name = "Net" # type: str
182 family = socket.AF_INET # type: int
183 max_mask = 32 # type: int
185 @classmethod
186 def name2addr(cls, name):
187 # type: (str) -> str
188 try:
189 return next(
190 addr_port[0]
191 for family, _, _, _, addr_port in
192 socket.getaddrinfo(name, None, cls.family)
193 if family == cls.family
194 )
195 except socket.error:
196 if re.search("(^|\\.)[0-9]+-[0-9]+($|\\.)", name) is not None:
197 raise Scapy_Exception("Ranges are no longer accepted in %s()" %
198 cls.__name__)
199 raise
201 @classmethod
202 def ip2int(cls, addr):
203 # type: (str) -> int
204 return cast(int, struct.unpack(
205 "!I", socket.inet_aton(cls.name2addr(addr))
206 )[0])
208 @staticmethod
209 def int2ip(val):
210 # type: (int) -> str
211 return socket.inet_ntoa(struct.pack('!I', val))
213 def __init__(self, net, stop=None, scope=None):
214 # type: (str, Optional[str], Optional[str]) -> None
215 if "*" in net:
216 raise Scapy_Exception("Wildcards are no longer accepted in %s()" %
217 self.__class__.__name__)
218 self.scope = None
219 if "%" in net:
220 net = ScopedIP(net)
221 if isinstance(net, _ScopedIP):
222 self.scope = net.scope
223 if stop is None:
224 try:
225 net, mask = net.split("/", 1)
226 except ValueError:
227 self.mask = self.max_mask # type: Union[None, int]
228 else:
229 self.mask = int(mask)
230 self.net = net # type: Union[None, str]
231 inv_mask = self.max_mask - self.mask
232 self.start = self.ip2int(net) >> inv_mask << inv_mask
233 self.count = 1 << inv_mask
234 self.stop = self.start + self.count - 1
235 else:
236 self.start = self.ip2int(net)
237 self.stop = self.ip2int(stop)
238 self.count = self.stop - self.start + 1
239 self.net = self.mask = None
241 def __str__(self):
242 # type: () -> str
243 return next(iter(self), "")
245 def __iter__(self):
246 # type: () -> Iterator[str]
247 # Python 2 won't handle huge (> sys.maxint) values in range()
248 for i in range(self.count):
249 yield ScopedIP(
250 self.int2ip(self.start + i),
251 scope=self.scope,
252 )
254 def __len__(self):
255 # type: () -> int
256 return self.count
258 def __iterlen__(self):
259 # type: () -> int
260 # for compatibility
261 return len(self)
263 def choice(self):
264 # type: () -> str
265 return ScopedIP(
266 self.int2ip(random.randint(self.start, self.stop)),
267 scope=self.scope,
268 )
270 def __repr__(self):
271 # type: () -> str
272 scope_id_repr = ""
273 if self.scope:
274 scope_id_repr = ", scope=%s" % repr(self.scope)
275 if self.mask is not None:
276 return '%s("%s/%d"%s)' % (
277 self.__class__.__name__,
278 self.net,
279 self.mask,
280 scope_id_repr,
281 )
282 return '%s("%s", "%s"%s)' % (
283 self.__class__.__name__,
284 self.int2ip(self.start),
285 self.int2ip(self.stop),
286 scope_id_repr,
287 )
289 def __eq__(self, other):
290 # type: (Any) -> bool
291 if isinstance(other, str):
292 return self == self.__class__(other)
293 if not isinstance(other, Net):
294 return False
295 if self.family != other.family:
296 return False
297 return (self.start == other.start) and (self.stop == other.stop)
299 def __ne__(self, other):
300 # type: (Any) -> bool
301 # Python 2.7 compat
302 return not self == other
304 def __hash__(self):
305 # type: () -> int
306 return hash(("scapy.Net", self.family, self.start, self.stop, self.scope))
308 def __contains__(self, other):
309 # type: (Any) -> bool
310 if isinstance(other, int):
311 return self.start <= other <= self.stop
312 if isinstance(other, str):
313 return self.__class__(other) in self
314 if type(other) is not self.__class__:
315 return False
316 return self.start <= other.start <= other.stop <= self.stop
319class OID(Gen[str]):
320 name = "OID"
322 def __init__(self, oid):
323 # type: (str) -> None
324 self.oid = oid
325 self.cmpt = []
326 fmt = []
327 for i in oid.split("."):
328 if "-" in i:
329 fmt.append("%i")
330 self.cmpt.append(tuple(map(int, i.split("-"))))
331 else:
332 fmt.append(i)
333 self.fmt = ".".join(fmt)
335 def __repr__(self):
336 # type: () -> str
337 return "OID(%r)" % self.oid
339 def __iter__(self):
340 # type: () -> Iterator[str]
341 ii = [k[0] for k in self.cmpt]
342 while True:
343 yield self.fmt % tuple(ii)
344 i = 0
345 while True:
346 if i >= len(ii):
347 return
348 if ii[i] < self.cmpt[i][1]:
349 ii[i] += 1
350 break
351 else:
352 ii[i] = self.cmpt[i][0]
353 i += 1
355 def __iterlen__(self):
356 # type: () -> int
357 return reduce(operator.mul, (max(y - x, 0) + 1 for (x, y) in self.cmpt), 1) # noqa: E501
360######################################
361# Packet abstract and base classes #
362######################################
364class Packet_metaclass(type):
365 def __new__(cls: Type[_T],
366 name, # type: str
367 bases, # type: Tuple[type, ...]
368 dct # type: Dict[str, Any]
369 ):
370 # type: (...) -> Type['Packet']
371 if "fields_desc" in dct: # perform resolution of references to other packets # noqa: E501
372 current_fld = dct["fields_desc"] # type: List[Union[scapy.fields.Field[Any, Any], Packet_metaclass]] # noqa: E501
373 resolved_fld = [] # type: List[scapy.fields.Field[Any, Any]]
374 for fld_or_pkt in current_fld:
375 if isinstance(fld_or_pkt, Packet_metaclass):
376 # reference to another fields_desc
377 for pkt_fld in fld_or_pkt.fields_desc:
378 resolved_fld.append(pkt_fld)
379 else:
380 resolved_fld.append(fld_or_pkt)
381 else: # look for a fields_desc in parent classes
382 resolved_fld = []
383 for b in bases:
384 if hasattr(b, "fields_desc"):
385 resolved_fld = b.fields_desc
386 break
388 if resolved_fld: # perform default value replacements
389 final_fld = [] # type: List[scapy.fields.Field[Any, Any]]
390 names = []
391 for f in resolved_fld:
392 if f.name in names:
393 war_msg = (
394 "Packet '%s' has a duplicated '%s' field ! "
395 "If you are using several ConditionalFields, have "
396 "a look at MultipleTypeField instead ! This will "
397 "become a SyntaxError in a future version of "
398 "Scapy !" % (
399 name, f.name
400 )
401 )
402 warnings.warn(war_msg, SyntaxWarning)
403 names.append(f.name)
404 if f.name in dct:
405 f = f.copy()
406 f.default = dct[f.name]
407 del dct[f.name]
408 final_fld.append(f)
410 dct["fields_desc"] = final_fld
412 dct.setdefault("__slots__", [])
413 for attr in ["name", "overload_fields"]:
414 try:
415 dct["_%s" % attr] = dct.pop(attr)
416 except KeyError:
417 pass
418 # Build and inject signature
419 try:
420 # Py3 only
421 import inspect
422 dct["__signature__"] = inspect.Signature([
423 inspect.Parameter("_pkt", inspect.Parameter.POSITIONAL_ONLY),
424 ] + [
425 inspect.Parameter(f.name,
426 inspect.Parameter.KEYWORD_ONLY,
427 default=f.default)
428 for f in dct["fields_desc"]
429 ])
430 except (ImportError, AttributeError, KeyError):
431 pass
432 newcls = cast(Type['Packet'], type.__new__(cls, name, bases, dct))
433 # Note: below can't be typed because we use attributes
434 # created dynamically..
435 newcls.__all_slots__ = set( # type: ignore
436 attr
437 for cls in newcls.__mro__ if hasattr(cls, "__slots__")
438 for attr in cls.__slots__
439 )
441 newcls.aliastypes = ( # type: ignore
442 [newcls] + getattr(newcls, "aliastypes", [])
443 )
445 if hasattr(newcls, "register_variant"):
446 newcls.register_variant()
447 for _f in newcls.fields_desc:
448 if hasattr(_f, "register_owner"):
449 _f.register_owner(newcls)
450 if newcls.__name__[0] != "_":
451 from scapy import config
452 config.conf.layers.register(newcls)
453 return newcls
455 def __getattr__(self, attr):
456 # type: (str) -> Any
457 for k in self.fields_desc:
458 if k.name == attr:
459 return k
460 raise AttributeError(attr)
462 def __call__(cls,
463 *args, # type: Any
464 **kargs # type: Any
465 ):
466 # type: (...) -> 'Packet'
467 if "dispatch_hook" in cls.__dict__:
468 try:
469 cls = cls.dispatch_hook(*args, **kargs)
470 except Exception:
471 from scapy import config
472 if config.conf.debug_dissector:
473 raise
474 cls = config.conf.raw_layer
475 i = cls.__new__(
476 cls, # type: ignore
477 cls.__name__,
478 cls.__bases__,
479 cls.__dict__ # type: ignore
480 )
481 i.__init__(*args, **kargs)
482 return i # type: ignore
485# Note: see compat.py for an explanation
487class Field_metaclass(type):
488 def __new__(cls: Type[_T],
489 name, # type: str
490 bases, # type: Tuple[type, ...]
491 dct # type: Dict[str, Any]
492 ):
493 # type: (...) -> Type[_T]
494 dct.setdefault("__slots__", [])
495 newcls = type.__new__(cls, name, bases, dct)
496 return newcls # type: ignore
499PacketList_metaclass = Field_metaclass
502class BasePacket(Gen['Packet']):
503 __slots__ = [] # type: List[str]
506#############################
507# Packet list base class #
508#############################
510class BasePacketList(Gen[_T]):
511 __slots__ = [] # type: List[str]
514class _CanvasDumpExtended(object):
515 @abc.abstractmethod
516 def canvas_dump(self, layer_shift=0, rebuild=1):
517 # type: (int, int) -> pyx.canvas.canvas
518 pass
520 def psdump(self, filename=None, **kargs):
521 # type: (Optional[str], **Any) -> None
522 """
523 psdump(filename=None, layer_shift=0, rebuild=1)
525 Creates an EPS file describing a packet. If filename is not provided a
526 temporary file is created and gs is called.
528 :param filename: the file's filename
529 """
530 from scapy.config import conf
531 from scapy.utils import get_temp_file, ContextManagerSubprocess
532 canvas = self.canvas_dump(**kargs)
533 if filename is None:
534 fname = get_temp_file(autoext=kargs.get("suffix", ".eps"))
535 canvas.writeEPSfile(fname)
536 if WINDOWS and not conf.prog.psreader:
537 os.startfile(fname)
538 else:
539 with ContextManagerSubprocess(conf.prog.psreader):
540 subprocess.Popen([conf.prog.psreader, fname])
541 else:
542 canvas.writeEPSfile(filename)
543 print()
545 def pdfdump(self, filename=None, **kargs):
546 # type: (Optional[str], **Any) -> None
547 """
548 pdfdump(filename=None, layer_shift=0, rebuild=1)
550 Creates a PDF file describing a packet. If filename is not provided a
551 temporary file is created and xpdf is called.
553 :param filename: the file's filename
554 """
555 from scapy.config import conf
556 from scapy.utils import get_temp_file, ContextManagerSubprocess
557 canvas = self.canvas_dump(**kargs)
558 if filename is None:
559 fname = get_temp_file(autoext=kargs.get("suffix", ".pdf"))
560 canvas.writePDFfile(fname)
561 if WINDOWS and not conf.prog.pdfreader:
562 os.startfile(fname)
563 else:
564 with ContextManagerSubprocess(conf.prog.pdfreader):
565 subprocess.Popen([conf.prog.pdfreader, fname])
566 else:
567 canvas.writePDFfile(filename)
568 print()
570 def svgdump(self, filename=None, **kargs):
571 # type: (Optional[str], **Any) -> None
572 """
573 svgdump(filename=None, layer_shift=0, rebuild=1)
575 Creates an SVG file describing a packet. If filename is not provided a
576 temporary file is created and gs is called.
578 :param filename: the file's filename
579 """
580 from scapy.config import conf
581 from scapy.utils import get_temp_file, ContextManagerSubprocess
582 canvas = self.canvas_dump(**kargs)
583 if filename is None:
584 fname = get_temp_file(autoext=kargs.get("suffix", ".svg"))
585 canvas.writeSVGfile(fname)
586 if WINDOWS and not conf.prog.svgreader:
587 os.startfile(fname)
588 else:
589 with ContextManagerSubprocess(conf.prog.svgreader):
590 subprocess.Popen([conf.prog.svgreader, fname])
591 else:
592 canvas.writeSVGfile(filename)
593 print()