Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/base_classes.py: 59%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Generators and packet meta classes.
8"""
10################
11# Generators #
12################
15from functools import reduce
16import abc
17import operator
18import os
19import random
20import re
21import socket
22import struct
23import subprocess
24import types
25import warnings
27import scapy
28from scapy.error import Scapy_Exception
29from scapy.consts import WINDOWS
31from typing import (
32 Any,
33 Dict,
34 Generic,
35 Iterator,
36 List,
37 Optional,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 cast,
43 TYPE_CHECKING,
44)
46if TYPE_CHECKING:
47 try:
48 import pyx
49 except ImportError:
50 pass
51 from scapy.packet import Packet
53_T = TypeVar("_T")
56class Gen(Generic[_T]):
57 __slots__ = [] # type: List[str]
59 def __iter__(self):
60 # type: () -> Iterator[_T]
61 return iter([])
63 def __iterlen__(self):
64 # type: () -> int
65 return sum(1 for _ in iter(self))
68def _get_values(value):
69 # type: (Any) -> Any
70 """Generate a range object from (start, stop[, step]) tuples, or
71 return value.
73 """
74 if (isinstance(value, tuple) and (2 <= len(value) <= 3) and
75 all(hasattr(i, "__int__") for i in value)):
76 # We use values[1] + 1 as stop value for (x)range to maintain
77 # the behavior of using tuples as field `values`
78 return range(*((int(value[0]), int(value[1]) + 1) +
79 tuple(int(v) for v in value[2:])))
80 return value
83class SetGen(Gen[_T]):
84 def __init__(self, values, _iterpacket=1):
85 # type: (Any, int) -> None
86 self._iterpacket = _iterpacket
87 if isinstance(values, (list, BasePacketList)):
88 self.values = [_get_values(val) for val in values]
89 else:
90 self.values = [_get_values(values)]
92 def __iter__(self):
93 # type: () -> Iterator[Any]
94 for i in self.values:
95 if (isinstance(i, Gen) and
96 (self._iterpacket or not isinstance(i, BasePacket))) or (
97 isinstance(i, (range, types.GeneratorType))):
98 for j in i:
99 yield j
100 else:
101 yield i
103 def __len__(self):
104 # type: () -> int
105 return self.__iterlen__()
107 def __repr__(self):
108 # type: () -> str
109 return "<SetGen %r>" % self.values
112class Net(Gen[str]):
113 """Network object from an IP address or hostname and mask"""
114 name = "Net" # type: str
115 family = socket.AF_INET # type: int
116 max_mask = 32 # type: int
118 @classmethod
119 def name2addr(cls, name):
120 # type: (str) -> str
121 try:
122 return next(
123 addr_port[0]
124 for family, _, _, _, addr_port in
125 socket.getaddrinfo(name, None, cls.family)
126 if family == cls.family
127 )
128 except socket.error:
129 if re.search("(^|\\.)[0-9]+-[0-9]+($|\\.)", name) is not None:
130 raise Scapy_Exception("Ranges are no longer accepted in %s()" %
131 cls.__name__)
132 raise
134 @classmethod
135 def ip2int(cls, addr):
136 # type: (str) -> int
137 return cast(int, struct.unpack(
138 "!I", socket.inet_aton(cls.name2addr(addr))
139 )[0])
141 @staticmethod
142 def int2ip(val):
143 # type: (int) -> str
144 return socket.inet_ntoa(struct.pack('!I', val))
146 def __init__(self, net, stop=None):
147 # type: (str, Union[None, str]) -> None
148 if "*" in net:
149 raise Scapy_Exception("Wildcards are no longer accepted in %s()" %
150 self.__class__.__name__)
151 if stop is None:
152 try:
153 net, mask = net.split("/", 1)
154 except ValueError:
155 self.mask = self.max_mask # type: Union[None, int]
156 else:
157 self.mask = int(mask)
158 self.net = net # type: Union[None, str]
159 inv_mask = self.max_mask - self.mask
160 self.start = self.ip2int(net) >> inv_mask << inv_mask
161 self.count = 1 << inv_mask
162 self.stop = self.start + self.count - 1
163 else:
164 self.start = self.ip2int(net)
165 self.stop = self.ip2int(stop)
166 self.count = self.stop - self.start + 1
167 self.net = self.mask = None
169 def __str__(self):
170 # type: () -> str
171 return next(iter(self), "")
173 def __iter__(self):
174 # type: () -> Iterator[str]
175 # Python 2 won't handle huge (> sys.maxint) values in range()
176 for i in range(self.count):
177 yield self.int2ip(self.start + i)
179 def __len__(self):
180 # type: () -> int
181 return self.count
183 def __iterlen__(self):
184 # type: () -> int
185 # for compatibility
186 return len(self)
188 def choice(self):
189 # type: () -> str
190 return self.int2ip(random.randint(self.start, self.stop))
192 def __repr__(self):
193 # type: () -> str
194 if self.mask is not None:
195 return '%s("%s/%d")' % (
196 self.__class__.__name__,
197 self.net,
198 self.mask,
199 )
200 return '%s("%s", "%s")' % (
201 self.__class__.__name__,
202 self.int2ip(self.start),
203 self.int2ip(self.stop),
204 )
206 def __eq__(self, other):
207 # type: (Any) -> bool
208 if isinstance(other, str):
209 return self == self.__class__(other)
210 if not isinstance(other, Net):
211 return False
212 if self.family != other.family:
213 return False
214 return (self.start == other.start) and (self.stop == other.stop)
216 def __ne__(self, other):
217 # type: (Any) -> bool
218 # Python 2.7 compat
219 return not self == other
221 def __hash__(self):
222 # type: () -> int
223 return hash(("scapy.Net", self.family, self.start, self.stop))
225 def __contains__(self, other):
226 # type: (Any) -> bool
227 if isinstance(other, int):
228 return self.start <= other <= self.stop
229 if isinstance(other, str):
230 return self.__class__(other) in self
231 if type(other) is not self.__class__:
232 return False
233 return self.start <= other.start <= other.stop <= self.stop
236class OID(Gen[str]):
237 name = "OID"
239 def __init__(self, oid):
240 # type: (str) -> None
241 self.oid = oid
242 self.cmpt = []
243 fmt = []
244 for i in oid.split("."):
245 if "-" in i:
246 fmt.append("%i")
247 self.cmpt.append(tuple(map(int, i.split("-"))))
248 else:
249 fmt.append(i)
250 self.fmt = ".".join(fmt)
252 def __repr__(self):
253 # type: () -> str
254 return "OID(%r)" % self.oid
256 def __iter__(self):
257 # type: () -> Iterator[str]
258 ii = [k[0] for k in self.cmpt]
259 while True:
260 yield self.fmt % tuple(ii)
261 i = 0
262 while True:
263 if i >= len(ii):
264 return
265 if ii[i] < self.cmpt[i][1]:
266 ii[i] += 1
267 break
268 else:
269 ii[i] = self.cmpt[i][0]
270 i += 1
272 def __iterlen__(self):
273 # type: () -> int
274 return reduce(operator.mul, (max(y - x, 0) + 1 for (x, y) in self.cmpt), 1) # noqa: E501
277######################################
278# Packet abstract and base classes #
279######################################
281class Packet_metaclass(type):
282 def __new__(cls: Type[_T],
283 name, # type: str
284 bases, # type: Tuple[type, ...]
285 dct # type: Dict[str, Any]
286 ):
287 # type: (...) -> Type['Packet']
288 if "fields_desc" in dct: # perform resolution of references to other packets # noqa: E501
289 current_fld = dct["fields_desc"] # type: List[Union[scapy.fields.Field[Any, Any], Packet_metaclass]] # noqa: E501
290 resolved_fld = [] # type: List[scapy.fields.Field[Any, Any]]
291 for fld_or_pkt in current_fld:
292 if isinstance(fld_or_pkt, Packet_metaclass):
293 # reference to another fields_desc
294 for pkt_fld in fld_or_pkt.fields_desc:
295 resolved_fld.append(pkt_fld)
296 else:
297 resolved_fld.append(fld_or_pkt)
298 else: # look for a fields_desc in parent classes
299 resolved_fld = []
300 for b in bases:
301 if hasattr(b, "fields_desc"):
302 resolved_fld = b.fields_desc
303 break
305 if resolved_fld: # perform default value replacements
306 final_fld = [] # type: List[scapy.fields.Field[Any, Any]]
307 names = []
308 for f in resolved_fld:
309 if f.name in names:
310 war_msg = (
311 "Packet '%s' has a duplicated '%s' field ! "
312 "If you are using several ConditionalFields, have "
313 "a look at MultipleTypeField instead ! This will "
314 "become a SyntaxError in a future version of "
315 "Scapy !" % (
316 name, f.name
317 )
318 )
319 warnings.warn(war_msg, SyntaxWarning)
320 names.append(f.name)
321 if f.name in dct:
322 f = f.copy()
323 f.default = dct[f.name]
324 del dct[f.name]
325 final_fld.append(f)
327 dct["fields_desc"] = final_fld
329 dct.setdefault("__slots__", [])
330 for attr in ["name", "overload_fields"]:
331 try:
332 dct["_%s" % attr] = dct.pop(attr)
333 except KeyError:
334 pass
335 # Build and inject signature
336 try:
337 # Py3 only
338 import inspect
339 dct["__signature__"] = inspect.Signature([
340 inspect.Parameter("_pkt", inspect.Parameter.POSITIONAL_ONLY),
341 ] + [
342 inspect.Parameter(f.name,
343 inspect.Parameter.KEYWORD_ONLY,
344 default=f.default)
345 for f in dct["fields_desc"]
346 ])
347 except (ImportError, AttributeError, KeyError):
348 pass
349 newcls = cast(Type['Packet'], type.__new__(cls, name, bases, dct))
350 # Note: below can't be typed because we use attributes
351 # created dynamically..
352 newcls.__all_slots__ = set( # type: ignore
353 attr
354 for cls in newcls.__mro__ if hasattr(cls, "__slots__")
355 for attr in cls.__slots__
356 )
358 newcls.aliastypes = ( # type: ignore
359 [newcls] + getattr(newcls, "aliastypes", [])
360 )
362 if hasattr(newcls, "register_variant"):
363 newcls.register_variant()
364 for _f in newcls.fields_desc:
365 if hasattr(_f, "register_owner"):
366 _f.register_owner(newcls)
367 if newcls.__name__[0] != "_":
368 from scapy import config
369 config.conf.layers.register(newcls)
370 return newcls
372 def __getattr__(self, attr):
373 # type: (str) -> Any
374 for k in self.fields_desc:
375 if k.name == attr:
376 return k
377 raise AttributeError(attr)
379 def __call__(cls,
380 *args, # type: Any
381 **kargs # type: Any
382 ):
383 # type: (...) -> 'Packet'
384 if "dispatch_hook" in cls.__dict__:
385 try:
386 cls = cls.dispatch_hook(*args, **kargs)
387 except Exception:
388 from scapy import config
389 if config.conf.debug_dissector:
390 raise
391 cls = config.conf.raw_layer
392 i = cls.__new__(
393 cls, # type: ignore
394 cls.__name__,
395 cls.__bases__,
396 cls.__dict__ # type: ignore
397 )
398 i.__init__(*args, **kargs)
399 return i # type: ignore
402# Note: see compat.py for an explanation
404class Field_metaclass(type):
405 def __new__(cls: Type[_T],
406 name, # type: str
407 bases, # type: Tuple[type, ...]
408 dct # type: Dict[str, Any]
409 ):
410 # type: (...) -> Type[_T]
411 dct.setdefault("__slots__", [])
412 newcls = type.__new__(cls, name, bases, dct)
413 return newcls # type: ignore
416PacketList_metaclass = Field_metaclass
419class BasePacket(Gen['Packet']):
420 __slots__ = [] # type: List[str]
423#############################
424# Packet list base class #
425#############################
427class BasePacketList(Gen[_T]):
428 __slots__ = [] # type: List[str]
431class _CanvasDumpExtended(object):
432 @abc.abstractmethod
433 def canvas_dump(self, layer_shift=0, rebuild=1):
434 # type: (int, int) -> pyx.canvas.canvas
435 pass
437 def psdump(self, filename=None, **kargs):
438 # type: (Optional[str], **Any) -> None
439 """
440 psdump(filename=None, layer_shift=0, rebuild=1)
442 Creates an EPS file describing a packet. If filename is not provided a
443 temporary file is created and gs is called.
445 :param filename: the file's filename
446 """
447 from scapy.config import conf
448 from scapy.utils import get_temp_file, ContextManagerSubprocess
449 canvas = self.canvas_dump(**kargs)
450 if filename is None:
451 fname = get_temp_file(autoext=kargs.get("suffix", ".eps"))
452 canvas.writeEPSfile(fname)
453 if WINDOWS and not conf.prog.psreader:
454 os.startfile(fname)
455 else:
456 with ContextManagerSubprocess(conf.prog.psreader):
457 subprocess.Popen([conf.prog.psreader, fname])
458 else:
459 canvas.writeEPSfile(filename)
460 print()
462 def pdfdump(self, filename=None, **kargs):
463 # type: (Optional[str], **Any) -> None
464 """
465 pdfdump(filename=None, layer_shift=0, rebuild=1)
467 Creates a PDF file describing a packet. If filename is not provided a
468 temporary file is created and xpdf is called.
470 :param filename: the file's filename
471 """
472 from scapy.config import conf
473 from scapy.utils import get_temp_file, ContextManagerSubprocess
474 canvas = self.canvas_dump(**kargs)
475 if filename is None:
476 fname = get_temp_file(autoext=kargs.get("suffix", ".pdf"))
477 canvas.writePDFfile(fname)
478 if WINDOWS and not conf.prog.pdfreader:
479 os.startfile(fname)
480 else:
481 with ContextManagerSubprocess(conf.prog.pdfreader):
482 subprocess.Popen([conf.prog.pdfreader, fname])
483 else:
484 canvas.writePDFfile(filename)
485 print()
487 def svgdump(self, filename=None, **kargs):
488 # type: (Optional[str], **Any) -> None
489 """
490 svgdump(filename=None, layer_shift=0, rebuild=1)
492 Creates an SVG file describing a packet. If filename is not provided a
493 temporary file is created and gs is called.
495 :param filename: the file's filename
496 """
497 from scapy.config import conf
498 from scapy.utils import get_temp_file, ContextManagerSubprocess
499 canvas = self.canvas_dump(**kargs)
500 if filename is None:
501 fname = get_temp_file(autoext=kargs.get("suffix", ".svg"))
502 canvas.writeSVGfile(fname)
503 if WINDOWS and not conf.prog.svgreader:
504 os.startfile(fname)
505 else:
506 with ContextManagerSubprocess(conf.prog.svgreader):
507 subprocess.Popen([conf.prog.svgreader, fname])
508 else:
509 canvas.writeSVGfile(filename)
510 print()