1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Generators and packet meta classes.
8"""
9
10################
11# Generators #
12################
13
14
15from functools import reduce
16import abc
17import operator
18import os
19import random
20import re
21import socket
22import struct
23import subprocess
24import types
25import warnings
26
27import scapy
28from scapy.error import Scapy_Exception
29from scapy.consts import WINDOWS
30
31from typing import (
32 Any,
33 Dict,
34 Generic,
35 Iterator,
36 List,
37 Optional,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 cast,
43 TYPE_CHECKING,
44)
45
46if TYPE_CHECKING:
47 try:
48 import pyx
49 except ImportError:
50 pass
51 from scapy.packet import Packet
52
53_T = TypeVar("_T")
54
55
56class Gen(Generic[_T]):
57 __slots__ = [] # type: List[str]
58
59 def __iter__(self):
60 # type: () -> Iterator[_T]
61 return iter([])
62
63 def __iterlen__(self):
64 # type: () -> int
65 return sum(1 for _ in iter(self))
66
67
68def _get_values(value):
69 # type: (Any) -> Any
70 """Generate a range object from (start, stop[, step]) tuples, or
71 return value.
72
73 """
74 if (isinstance(value, tuple) and (2 <= len(value) <= 3) and
75 all(hasattr(i, "__int__") for i in value)):
76 # We use values[1] + 1 as stop value for (x)range to maintain
77 # the behavior of using tuples as field `values`
78 return range(*((int(value[0]), int(value[1]) + 1) +
79 tuple(int(v) for v in value[2:])))
80 return value
81
82
83class SetGen(Gen[_T]):
84 def __init__(self, values, _iterpacket=1):
85 # type: (Any, int) -> None
86 self._iterpacket = _iterpacket
87 if isinstance(values, (list, BasePacketList)):
88 self.values = [_get_values(val) for val in values]
89 else:
90 self.values = [_get_values(values)]
91
92 def __iter__(self):
93 # type: () -> Iterator[Any]
94 for i in self.values:
95 if (isinstance(i, Gen) and
96 (self._iterpacket or not isinstance(i, BasePacket))) or (
97 isinstance(i, (range, types.GeneratorType))):
98 for j in i:
99 yield j
100 else:
101 yield i
102
103 def __len__(self):
104 # type: () -> int
105 return self.__iterlen__()
106
107 def __repr__(self):
108 # type: () -> str
109 return "<SetGen %r>" % self.values
110
111
112class _ScopedIP(str):
113 """
114 A str that also holds extra attributes.
115 """
116 __slots__ = ["scope"]
117
118 def __init__(self, _: str) -> None:
119 self.scope = None
120
121 def __repr__(self) -> str:
122 val = super(_ScopedIP, self).__repr__()
123 if self.scope is not None:
124 return "ScopedIP(%s, scope=%s)" % (val, repr(self.scope))
125 return val
126
127
128def ScopedIP(net: str, scope: Optional[Any] = None) -> _ScopedIP:
129 """
130 An str that also holds extra attributes.
131
132 Examples::
133
134 >>> ScopedIP("224.0.0.1%eth0") # interface 'eth0'
135 >>> ScopedIP("224.0.0.1%1") # interface index 1
136 >>> ScopedIP("224.0.0.1", scope=conf.iface)
137 """
138 if "%" in net:
139 try:
140 net, scope = net.split("%", 1)
141 except ValueError:
142 raise Scapy_Exception("Scope identifier can only be present once !")
143 if scope is not None:
144 from scapy.interfaces import resolve_iface, network_name, dev_from_index
145 try:
146 iface = dev_from_index(int(scope))
147 except (ValueError, TypeError):
148 iface = resolve_iface(scope)
149 if not iface.is_valid():
150 raise Scapy_Exception(
151 "RFC6874 scope identifier '%s' could not be resolved to a "
152 "valid interface !" % scope
153 )
154 scope = network_name(iface)
155 x = _ScopedIP(net)
156 x.scope = scope
157 return x
158
159
160class Net(Gen[str]):
161 """
162 Network object from an IP address or hostname and mask
163
164 Examples:
165
166 - With mask::
167
168 >>> list(Net("192.168.0.1/24"))
169 ['192.168.0.0', '192.168.0.1', ..., '192.168.0.255']
170
171 - With 'end'::
172
173 >>> list(Net("192.168.0.100", "192.168.0.200"))
174 ['192.168.0.100', '192.168.0.101', ..., '192.168.0.200']
175
176 - With 'scope' (for multicast)::
177
178 >>> Net("224.0.0.1%lo")
179 >>> Net("224.0.0.1", scope=conf.iface)
180 """
181 name = "Net" # type: str
182 family = socket.AF_INET # type: int
183 max_mask = 32 # type: int
184
185 @classmethod
186 def name2addr(cls, name):
187 # type: (str) -> str
188 try:
189 return next(
190 addr_port[0]
191 for family, _, _, _, addr_port in
192 socket.getaddrinfo(name, None, cls.family)
193 if family == cls.family
194 )
195 except socket.error:
196 if re.search("(^|\\.)[0-9]+-[0-9]+($|\\.)", name) is not None:
197 raise Scapy_Exception("Ranges are no longer accepted in %s()" %
198 cls.__name__)
199 raise
200
201 @classmethod
202 def ip2int(cls, addr):
203 # type: (str) -> int
204 return cast(int, struct.unpack(
205 "!I", socket.inet_aton(cls.name2addr(addr))
206 )[0])
207
208 @staticmethod
209 def int2ip(val):
210 # type: (int) -> str
211 return socket.inet_ntoa(struct.pack('!I', val))
212
213 def __init__(self, net, stop=None, scope=None):
214 # type: (str, Optional[str], Optional[str]) -> None
215 if "*" in net:
216 raise Scapy_Exception("Wildcards are no longer accepted in %s()" %
217 self.__class__.__name__)
218 self.scope = None
219 if "%" in net:
220 net = ScopedIP(net)
221 if isinstance(net, _ScopedIP):
222 self.scope = net.scope
223 if stop is None:
224 try:
225 net, mask = net.split("/", 1)
226 except ValueError:
227 self.mask = self.max_mask # type: Union[None, int]
228 else:
229 self.mask = int(mask)
230 self.net = net # type: Union[None, str]
231 inv_mask = self.max_mask - self.mask
232 self.start = self.ip2int(net) >> inv_mask << inv_mask
233 self.count = 1 << inv_mask
234 self.stop = self.start + self.count - 1
235 else:
236 self.start = self.ip2int(net)
237 self.stop = self.ip2int(stop)
238 self.count = self.stop - self.start + 1
239 self.net = self.mask = None
240
241 def __str__(self):
242 # type: () -> str
243 return next(iter(self), "")
244
245 def __iter__(self):
246 # type: () -> Iterator[str]
247 # Python 2 won't handle huge (> sys.maxint) values in range()
248 for i in range(self.count):
249 yield ScopedIP(
250 self.int2ip(self.start + i),
251 scope=self.scope,
252 )
253
254 def __len__(self):
255 # type: () -> int
256 return self.count
257
258 def __iterlen__(self):
259 # type: () -> int
260 # for compatibility
261 return len(self)
262
263 def choice(self):
264 # type: () -> str
265 return ScopedIP(
266 self.int2ip(random.randint(self.start, self.stop)),
267 scope=self.scope,
268 )
269
270 def __repr__(self):
271 # type: () -> str
272 scope_id_repr = ""
273 if self.scope:
274 scope_id_repr = ", scope=%s" % repr(self.scope)
275 if self.mask is not None:
276 return '%s("%s/%d"%s)' % (
277 self.__class__.__name__,
278 self.net,
279 self.mask,
280 scope_id_repr,
281 )
282 return '%s("%s", "%s"%s)' % (
283 self.__class__.__name__,
284 self.int2ip(self.start),
285 self.int2ip(self.stop),
286 scope_id_repr,
287 )
288
289 def __eq__(self, other):
290 # type: (Any) -> bool
291 if isinstance(other, str):
292 return self == self.__class__(other)
293 if not isinstance(other, Net):
294 return False
295 if self.family != other.family:
296 return False
297 return (self.start == other.start) and (self.stop == other.stop)
298
299 def __ne__(self, other):
300 # type: (Any) -> bool
301 # Python 2.7 compat
302 return not self == other
303
304 def __hash__(self):
305 # type: () -> int
306 return hash(("scapy.Net", self.family, self.start, self.stop, self.scope))
307
308 def __contains__(self, other):
309 # type: (Any) -> bool
310 if isinstance(other, int):
311 return self.start <= other <= self.stop
312 if isinstance(other, str):
313 return self.__class__(other) in self
314 if type(other) is not self.__class__:
315 return False
316 return self.start <= other.start <= other.stop <= self.stop
317
318
319class OID(Gen[str]):
320 name = "OID"
321
322 def __init__(self, oid):
323 # type: (str) -> None
324 self.oid = oid
325 self.cmpt = []
326 fmt = []
327 for i in oid.split("."):
328 if "-" in i:
329 fmt.append("%i")
330 self.cmpt.append(tuple(map(int, i.split("-"))))
331 else:
332 fmt.append(i)
333 self.fmt = ".".join(fmt)
334
335 def __repr__(self):
336 # type: () -> str
337 return "OID(%r)" % self.oid
338
339 def __iter__(self):
340 # type: () -> Iterator[str]
341 ii = [k[0] for k in self.cmpt]
342 while True:
343 yield self.fmt % tuple(ii)
344 i = 0
345 while True:
346 if i >= len(ii):
347 return
348 if ii[i] < self.cmpt[i][1]:
349 ii[i] += 1
350 break
351 else:
352 ii[i] = self.cmpt[i][0]
353 i += 1
354
355 def __iterlen__(self):
356 # type: () -> int
357 return reduce(operator.mul, (max(y - x, 0) + 1 for (x, y) in self.cmpt), 1) # noqa: E501
358
359
360######################################
361# Packet abstract and base classes #
362######################################
363
364class Packet_metaclass(type):
365 def __new__(cls: Type[_T],
366 name, # type: str
367 bases, # type: Tuple[type, ...]
368 dct # type: Dict[str, Any]
369 ):
370 # type: (...) -> Type['Packet']
371 if "fields_desc" in dct: # perform resolution of references to other packets # noqa: E501
372 current_fld = dct["fields_desc"] # type: List[Union[scapy.fields.Field[Any, Any], Packet_metaclass]] # noqa: E501
373 resolved_fld = [] # type: List[scapy.fields.Field[Any, Any]]
374 for fld_or_pkt in current_fld:
375 if isinstance(fld_or_pkt, Packet_metaclass):
376 # reference to another fields_desc
377 for pkt_fld in fld_or_pkt.fields_desc:
378 resolved_fld.append(pkt_fld)
379 else:
380 resolved_fld.append(fld_or_pkt)
381 else: # look for a fields_desc in parent classes
382 resolved_fld = []
383 for b in bases:
384 if hasattr(b, "fields_desc"):
385 resolved_fld = b.fields_desc
386 break
387
388 if resolved_fld: # perform default value replacements
389 final_fld = [] # type: List[scapy.fields.Field[Any, Any]]
390 names = []
391 for f in resolved_fld:
392 if f.name in names:
393 war_msg = (
394 "Packet '%s' has a duplicated '%s' field ! "
395 "If you are using several ConditionalFields, have "
396 "a look at MultipleTypeField instead ! This will "
397 "become a SyntaxError in a future version of "
398 "Scapy !" % (
399 name, f.name
400 )
401 )
402 warnings.warn(war_msg, SyntaxWarning)
403 names.append(f.name)
404 if f.name in dct:
405 f = f.copy()
406 f.default = dct[f.name]
407 del dct[f.name]
408 final_fld.append(f)
409
410 dct["fields_desc"] = final_fld
411
412 dct.setdefault("__slots__", [])
413 for attr in ["name", "overload_fields"]:
414 try:
415 dct["_%s" % attr] = dct.pop(attr)
416 except KeyError:
417 pass
418 # Build and inject signature
419 try:
420 # Py3 only
421 import inspect
422 dct["__signature__"] = inspect.Signature([
423 inspect.Parameter("_pkt", inspect.Parameter.POSITIONAL_ONLY),
424 ] + [
425 inspect.Parameter(f.name,
426 inspect.Parameter.KEYWORD_ONLY,
427 default=f.default)
428 for f in dct["fields_desc"]
429 ])
430 except (ImportError, AttributeError, KeyError):
431 pass
432 newcls = cast(Type['Packet'], type.__new__(cls, name, bases, dct))
433 # Note: below can't be typed because we use attributes
434 # created dynamically..
435 newcls.__all_slots__ = set( # type: ignore
436 attr
437 for cls in newcls.__mro__ if hasattr(cls, "__slots__")
438 for attr in cls.__slots__
439 )
440
441 newcls.aliastypes = ( # type: ignore
442 [newcls] + getattr(newcls, "aliastypes", [])
443 )
444
445 if hasattr(newcls, "register_variant"):
446 newcls.register_variant()
447 for _f in newcls.fields_desc:
448 if hasattr(_f, "register_owner"):
449 _f.register_owner(newcls)
450 if newcls.__name__[0] != "_":
451 from scapy import config
452 config.conf.layers.register(newcls)
453 return newcls
454
455 def __getattr__(self, attr):
456 # type: (str) -> Any
457 for k in self.fields_desc:
458 if k.name == attr:
459 return k
460 raise AttributeError(attr)
461
462 def __call__(cls,
463 *args, # type: Any
464 **kargs # type: Any
465 ):
466 # type: (...) -> 'Packet'
467 if "dispatch_hook" in cls.__dict__:
468 try:
469 cls = cls.dispatch_hook(*args, **kargs)
470 except Exception:
471 from scapy import config
472 if config.conf.debug_dissector:
473 raise
474 cls = config.conf.raw_layer
475 i = cls.__new__(
476 cls, # type: ignore
477 cls.__name__,
478 cls.__bases__,
479 cls.__dict__ # type: ignore
480 )
481 i.__init__(*args, **kargs)
482 return i # type: ignore
483
484
485# Note: see compat.py for an explanation
486
487class Field_metaclass(type):
488 def __new__(cls: Type[_T],
489 name, # type: str
490 bases, # type: Tuple[type, ...]
491 dct # type: Dict[str, Any]
492 ):
493 # type: (...) -> Type[_T]
494 dct.setdefault("__slots__", [])
495 newcls = type.__new__(cls, name, bases, dct)
496 return newcls # type: ignore
497
498
499PacketList_metaclass = Field_metaclass
500
501
502class BasePacket(Gen['Packet']):
503 __slots__ = [] # type: List[str]
504
505
506#############################
507# Packet list base class #
508#############################
509
510class BasePacketList(Gen[_T]):
511 __slots__ = [] # type: List[str]
512
513
514class _CanvasDumpExtended(object):
515 @abc.abstractmethod
516 def canvas_dump(self, layer_shift=0, rebuild=1):
517 # type: (int, int) -> pyx.canvas.canvas
518 pass
519
520 def psdump(self, filename=None, **kargs):
521 # type: (Optional[str], **Any) -> None
522 """
523 psdump(filename=None, layer_shift=0, rebuild=1)
524
525 Creates an EPS file describing a packet. If filename is not provided a
526 temporary file is created and gs is called.
527
528 :param filename: the file's filename
529 """
530 from scapy.config import conf
531 from scapy.utils import get_temp_file, ContextManagerSubprocess
532 canvas = self.canvas_dump(**kargs)
533 if filename is None:
534 fname = get_temp_file(autoext=kargs.get("suffix", ".eps"))
535 canvas.writeEPSfile(fname)
536 if WINDOWS and not conf.prog.psreader:
537 os.startfile(fname)
538 else:
539 with ContextManagerSubprocess(conf.prog.psreader):
540 subprocess.Popen([conf.prog.psreader, fname])
541 else:
542 canvas.writeEPSfile(filename)
543 print()
544
545 def pdfdump(self, filename=None, **kargs):
546 # type: (Optional[str], **Any) -> None
547 """
548 pdfdump(filename=None, layer_shift=0, rebuild=1)
549
550 Creates a PDF file describing a packet. If filename is not provided a
551 temporary file is created and xpdf is called.
552
553 :param filename: the file's filename
554 """
555 from scapy.config import conf
556 from scapy.utils import get_temp_file, ContextManagerSubprocess
557 canvas = self.canvas_dump(**kargs)
558 if filename is None:
559 fname = get_temp_file(autoext=kargs.get("suffix", ".pdf"))
560 canvas.writePDFfile(fname)
561 if WINDOWS and not conf.prog.pdfreader:
562 os.startfile(fname)
563 else:
564 with ContextManagerSubprocess(conf.prog.pdfreader):
565 subprocess.Popen([conf.prog.pdfreader, fname])
566 else:
567 canvas.writePDFfile(filename)
568 print()
569
570 def svgdump(self, filename=None, **kargs):
571 # type: (Optional[str], **Any) -> None
572 """
573 svgdump(filename=None, layer_shift=0, rebuild=1)
574
575 Creates an SVG file describing a packet. If filename is not provided a
576 temporary file is created and gs is called.
577
578 :param filename: the file's filename
579 """
580 from scapy.config import conf
581 from scapy.utils import get_temp_file, ContextManagerSubprocess
582 canvas = self.canvas_dump(**kargs)
583 if filename is None:
584 fname = get_temp_file(autoext=kargs.get("suffix", ".svg"))
585 canvas.writeSVGfile(fname)
586 if WINDOWS and not conf.prog.svgreader:
587 os.startfile(fname)
588 else:
589 with ContextManagerSubprocess(conf.prog.svgreader):
590 subprocess.Popen([conf.prog.svgreader, fname])
591 else:
592 canvas.writeSVGfile(filename)
593 print()