1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Generators and packet meta classes.
8"""
9
10################
11# Generators #
12################
13
14
15from functools import reduce
16import abc
17import operator
18import os
19import random
20import re
21import socket
22import struct
23import subprocess
24import types
25import warnings
26
27import scapy
28from scapy.error import Scapy_Exception
29from scapy.consts import WINDOWS
30
31from typing import (
32 Any,
33 Dict,
34 Generic,
35 Iterator,
36 List,
37 Optional,
38 Tuple,
39 Type,
40 TypeVar,
41 Union,
42 cast,
43 TYPE_CHECKING,
44)
45
46if TYPE_CHECKING:
47 try:
48 import pyx
49 except ImportError:
50 pass
51 from scapy.packet import Packet
52
53_T = TypeVar("_T")
54
55
56class Gen(Generic[_T]):
57 __slots__ = [] # type: List[str]
58
59 def __iter__(self):
60 # type: () -> Iterator[_T]
61 return iter([])
62
63 def __iterlen__(self):
64 # type: () -> int
65 return sum(1 for _ in iter(self))
66
67
68def _get_values(value):
69 # type: (Any) -> Any
70 """Generate a range object from (start, stop[, step]) tuples, or
71 return value.
72
73 """
74 if (isinstance(value, tuple) and (2 <= len(value) <= 3) and
75 all(hasattr(i, "__int__") for i in value)):
76 # We use values[1] + 1 as stop value for (x)range to maintain
77 # the behavior of using tuples as field `values`
78 return range(*((int(value[0]), int(value[1]) + 1) +
79 tuple(int(v) for v in value[2:])))
80 return value
81
82
83class SetGen(Gen[_T]):
84 def __init__(self, values, _iterpacket=1):
85 # type: (Any, int) -> None
86 self._iterpacket = _iterpacket
87 if isinstance(values, (list, BasePacketList)):
88 self.values = [_get_values(val) for val in values]
89 else:
90 self.values = [_get_values(values)]
91
92 def __iter__(self):
93 # type: () -> Iterator[Any]
94 for i in self.values:
95 if (isinstance(i, Gen) and
96 (self._iterpacket or not isinstance(i, BasePacket))) or (
97 isinstance(i, (range, types.GeneratorType))):
98 for j in i:
99 yield j
100 else:
101 yield i
102
103 def __len__(self):
104 # type: () -> int
105 return self.__iterlen__()
106
107 def __repr__(self):
108 # type: () -> str
109 return "<SetGen %r>" % self.values
110
111
112class Net(Gen[str]):
113 """Network object from an IP address or hostname and mask"""
114 name = "Net" # type: str
115 family = socket.AF_INET # type: int
116 max_mask = 32 # type: int
117
118 @classmethod
119 def name2addr(cls, name):
120 # type: (str) -> str
121 try:
122 return next(
123 addr_port[0]
124 for family, _, _, _, addr_port in
125 socket.getaddrinfo(name, None, cls.family)
126 if family == cls.family
127 )
128 except socket.error:
129 if re.search("(^|\\.)[0-9]+-[0-9]+($|\\.)", name) is not None:
130 raise Scapy_Exception("Ranges are no longer accepted in %s()" %
131 cls.__name__)
132 raise
133
134 @classmethod
135 def ip2int(cls, addr):
136 # type: (str) -> int
137 return cast(int, struct.unpack(
138 "!I", socket.inet_aton(cls.name2addr(addr))
139 )[0])
140
141 @staticmethod
142 def int2ip(val):
143 # type: (int) -> str
144 return socket.inet_ntoa(struct.pack('!I', val))
145
146 def __init__(self, net, stop=None):
147 # type: (str, Union[None, str]) -> None
148 if "*" in net:
149 raise Scapy_Exception("Wildcards are no longer accepted in %s()" %
150 self.__class__.__name__)
151 if stop is None:
152 try:
153 net, mask = net.split("/", 1)
154 except ValueError:
155 self.mask = self.max_mask # type: Union[None, int]
156 else:
157 self.mask = int(mask)
158 self.net = net # type: Union[None, str]
159 inv_mask = self.max_mask - self.mask
160 self.start = self.ip2int(net) >> inv_mask << inv_mask
161 self.count = 1 << inv_mask
162 self.stop = self.start + self.count - 1
163 else:
164 self.start = self.ip2int(net)
165 self.stop = self.ip2int(stop)
166 self.count = self.stop - self.start + 1
167 self.net = self.mask = None
168
169 def __str__(self):
170 # type: () -> str
171 return next(iter(self), "")
172
173 def __iter__(self):
174 # type: () -> Iterator[str]
175 # Python 2 won't handle huge (> sys.maxint) values in range()
176 for i in range(self.count):
177 yield self.int2ip(self.start + i)
178
179 def __len__(self):
180 # type: () -> int
181 return self.count
182
183 def __iterlen__(self):
184 # type: () -> int
185 # for compatibility
186 return len(self)
187
188 def choice(self):
189 # type: () -> str
190 return self.int2ip(random.randint(self.start, self.stop))
191
192 def __repr__(self):
193 # type: () -> str
194 if self.mask is not None:
195 return '%s("%s/%d")' % (
196 self.__class__.__name__,
197 self.net,
198 self.mask,
199 )
200 return '%s("%s", "%s")' % (
201 self.__class__.__name__,
202 self.int2ip(self.start),
203 self.int2ip(self.stop),
204 )
205
206 def __eq__(self, other):
207 # type: (Any) -> bool
208 if isinstance(other, str):
209 return self == self.__class__(other)
210 if not isinstance(other, Net):
211 return False
212 if self.family != other.family:
213 return False
214 return (self.start == other.start) and (self.stop == other.stop)
215
216 def __ne__(self, other):
217 # type: (Any) -> bool
218 # Python 2.7 compat
219 return not self == other
220
221 def __hash__(self):
222 # type: () -> int
223 return hash(("scapy.Net", self.family, self.start, self.stop))
224
225 def __contains__(self, other):
226 # type: (Any) -> bool
227 if isinstance(other, int):
228 return self.start <= other <= self.stop
229 if isinstance(other, str):
230 return self.__class__(other) in self
231 if type(other) is not self.__class__:
232 return False
233 return self.start <= other.start <= other.stop <= self.stop
234
235
236class OID(Gen[str]):
237 name = "OID"
238
239 def __init__(self, oid):
240 # type: (str) -> None
241 self.oid = oid
242 self.cmpt = []
243 fmt = []
244 for i in oid.split("."):
245 if "-" in i:
246 fmt.append("%i")
247 self.cmpt.append(tuple(map(int, i.split("-"))))
248 else:
249 fmt.append(i)
250 self.fmt = ".".join(fmt)
251
252 def __repr__(self):
253 # type: () -> str
254 return "OID(%r)" % self.oid
255
256 def __iter__(self):
257 # type: () -> Iterator[str]
258 ii = [k[0] for k in self.cmpt]
259 while True:
260 yield self.fmt % tuple(ii)
261 i = 0
262 while True:
263 if i >= len(ii):
264 return
265 if ii[i] < self.cmpt[i][1]:
266 ii[i] += 1
267 break
268 else:
269 ii[i] = self.cmpt[i][0]
270 i += 1
271
272 def __iterlen__(self):
273 # type: () -> int
274 return reduce(operator.mul, (max(y - x, 0) + 1 for (x, y) in self.cmpt), 1) # noqa: E501
275
276
277######################################
278# Packet abstract and base classes #
279######################################
280
281class Packet_metaclass(type):
282 def __new__(cls: Type[_T],
283 name, # type: str
284 bases, # type: Tuple[type, ...]
285 dct # type: Dict[str, Any]
286 ):
287 # type: (...) -> Type['Packet']
288 if "fields_desc" in dct: # perform resolution of references to other packets # noqa: E501
289 current_fld = dct["fields_desc"] # type: List[Union[scapy.fields.Field[Any, Any], Packet_metaclass]] # noqa: E501
290 resolved_fld = [] # type: List[scapy.fields.Field[Any, Any]]
291 for fld_or_pkt in current_fld:
292 if isinstance(fld_or_pkt, Packet_metaclass):
293 # reference to another fields_desc
294 for pkt_fld in fld_or_pkt.fields_desc:
295 resolved_fld.append(pkt_fld)
296 else:
297 resolved_fld.append(fld_or_pkt)
298 else: # look for a fields_desc in parent classes
299 resolved_fld = []
300 for b in bases:
301 if hasattr(b, "fields_desc"):
302 resolved_fld = b.fields_desc
303 break
304
305 if resolved_fld: # perform default value replacements
306 final_fld = [] # type: List[scapy.fields.Field[Any, Any]]
307 names = []
308 for f in resolved_fld:
309 if f.name in names:
310 war_msg = (
311 "Packet '%s' has a duplicated '%s' field ! "
312 "If you are using several ConditionalFields, have "
313 "a look at MultipleTypeField instead ! This will "
314 "become a SyntaxError in a future version of "
315 "Scapy !" % (
316 name, f.name
317 )
318 )
319 warnings.warn(war_msg, SyntaxWarning)
320 names.append(f.name)
321 if f.name in dct:
322 f = f.copy()
323 f.default = dct[f.name]
324 del dct[f.name]
325 final_fld.append(f)
326
327 dct["fields_desc"] = final_fld
328
329 dct.setdefault("__slots__", [])
330 for attr in ["name", "overload_fields"]:
331 try:
332 dct["_%s" % attr] = dct.pop(attr)
333 except KeyError:
334 pass
335 # Build and inject signature
336 try:
337 # Py3 only
338 import inspect
339 dct["__signature__"] = inspect.Signature([
340 inspect.Parameter("_pkt", inspect.Parameter.POSITIONAL_ONLY),
341 ] + [
342 inspect.Parameter(f.name,
343 inspect.Parameter.KEYWORD_ONLY,
344 default=f.default)
345 for f in dct["fields_desc"]
346 ])
347 except (ImportError, AttributeError, KeyError):
348 pass
349 newcls = cast(Type['Packet'], type.__new__(cls, name, bases, dct))
350 # Note: below can't be typed because we use attributes
351 # created dynamically..
352 newcls.__all_slots__ = set( # type: ignore
353 attr
354 for cls in newcls.__mro__ if hasattr(cls, "__slots__")
355 for attr in cls.__slots__
356 )
357
358 newcls.aliastypes = ( # type: ignore
359 [newcls] + getattr(newcls, "aliastypes", [])
360 )
361
362 if hasattr(newcls, "register_variant"):
363 newcls.register_variant()
364 for _f in newcls.fields_desc:
365 if hasattr(_f, "register_owner"):
366 _f.register_owner(newcls)
367 if newcls.__name__[0] != "_":
368 from scapy import config
369 config.conf.layers.register(newcls)
370 return newcls
371
372 def __getattr__(self, attr):
373 # type: (str) -> Any
374 for k in self.fields_desc:
375 if k.name == attr:
376 return k
377 raise AttributeError(attr)
378
379 def __call__(cls,
380 *args, # type: Any
381 **kargs # type: Any
382 ):
383 # type: (...) -> 'Packet'
384 if "dispatch_hook" in cls.__dict__:
385 try:
386 cls = cls.dispatch_hook(*args, **kargs)
387 except Exception:
388 from scapy import config
389 if config.conf.debug_dissector:
390 raise
391 cls = config.conf.raw_layer
392 i = cls.__new__(
393 cls, # type: ignore
394 cls.__name__,
395 cls.__bases__,
396 cls.__dict__ # type: ignore
397 )
398 i.__init__(*args, **kargs)
399 return i # type: ignore
400
401
402# Note: see compat.py for an explanation
403
404class Field_metaclass(type):
405 def __new__(cls: Type[_T],
406 name, # type: str
407 bases, # type: Tuple[type, ...]
408 dct # type: Dict[str, Any]
409 ):
410 # type: (...) -> Type[_T]
411 dct.setdefault("__slots__", [])
412 newcls = type.__new__(cls, name, bases, dct)
413 return newcls # type: ignore
414
415
416PacketList_metaclass = Field_metaclass
417
418
419class BasePacket(Gen['Packet']):
420 __slots__ = [] # type: List[str]
421
422
423#############################
424# Packet list base class #
425#############################
426
427class BasePacketList(Gen[_T]):
428 __slots__ = [] # type: List[str]
429
430
431class _CanvasDumpExtended(object):
432 @abc.abstractmethod
433 def canvas_dump(self, layer_shift=0, rebuild=1):
434 # type: (int, int) -> pyx.canvas.canvas
435 pass
436
437 def psdump(self, filename=None, **kargs):
438 # type: (Optional[str], **Any) -> None
439 """
440 psdump(filename=None, layer_shift=0, rebuild=1)
441
442 Creates an EPS file describing a packet. If filename is not provided a
443 temporary file is created and gs is called.
444
445 :param filename: the file's filename
446 """
447 from scapy.config import conf
448 from scapy.utils import get_temp_file, ContextManagerSubprocess
449 canvas = self.canvas_dump(**kargs)
450 if filename is None:
451 fname = get_temp_file(autoext=kargs.get("suffix", ".eps"))
452 canvas.writeEPSfile(fname)
453 if WINDOWS and not conf.prog.psreader:
454 os.startfile(fname)
455 else:
456 with ContextManagerSubprocess(conf.prog.psreader):
457 subprocess.Popen([conf.prog.psreader, fname])
458 else:
459 canvas.writeEPSfile(filename)
460 print()
461
462 def pdfdump(self, filename=None, **kargs):
463 # type: (Optional[str], **Any) -> None
464 """
465 pdfdump(filename=None, layer_shift=0, rebuild=1)
466
467 Creates a PDF file describing a packet. If filename is not provided a
468 temporary file is created and xpdf is called.
469
470 :param filename: the file's filename
471 """
472 from scapy.config import conf
473 from scapy.utils import get_temp_file, ContextManagerSubprocess
474 canvas = self.canvas_dump(**kargs)
475 if filename is None:
476 fname = get_temp_file(autoext=kargs.get("suffix", ".pdf"))
477 canvas.writePDFfile(fname)
478 if WINDOWS and not conf.prog.pdfreader:
479 os.startfile(fname)
480 else:
481 with ContextManagerSubprocess(conf.prog.pdfreader):
482 subprocess.Popen([conf.prog.pdfreader, fname])
483 else:
484 canvas.writePDFfile(filename)
485 print()
486
487 def svgdump(self, filename=None, **kargs):
488 # type: (Optional[str], **Any) -> None
489 """
490 svgdump(filename=None, layer_shift=0, rebuild=1)
491
492 Creates an SVG file describing a packet. If filename is not provided a
493 temporary file is created and gs is called.
494
495 :param filename: the file's filename
496 """
497 from scapy.config import conf
498 from scapy.utils import get_temp_file, ContextManagerSubprocess
499 canvas = self.canvas_dump(**kargs)
500 if filename is None:
501 fname = get_temp_file(autoext=kargs.get("suffix", ".svg"))
502 canvas.writeSVGfile(fname)
503 if WINDOWS and not conf.prog.svgreader:
504 os.startfile(fname)
505 else:
506 with ContextManagerSubprocess(conf.prog.svgreader):
507 subprocess.Popen([conf.prog.svgreader, fname])
508 else:
509 canvas.writeSVGfile(filename)
510 print()