Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/base_classes.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

307 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Generators and packet meta classes. 

8""" 

9 

10################ 

11# Generators # 

12################ 

13 

14 

15from functools import reduce 

16import abc 

17import operator 

18import os 

19import random 

20import re 

21import socket 

22import struct 

23import subprocess 

24import types 

25import warnings 

26 

27import scapy 

28from scapy.error import Scapy_Exception 

29from scapy.consts import WINDOWS 

30 

31from typing import ( 

32 Any, 

33 Dict, 

34 Generic, 

35 Iterator, 

36 List, 

37 Optional, 

38 Tuple, 

39 Type, 

40 TypeVar, 

41 Union, 

42 cast, 

43 TYPE_CHECKING, 

44) 

45 

46if TYPE_CHECKING: 

47 try: 

48 import pyx 

49 except ImportError: 

50 pass 

51 from scapy.packet import Packet 

52 

53_T = TypeVar("_T") 

54 

55 

56class Gen(Generic[_T]): 

57 __slots__ = [] # type: List[str] 

58 

59 def __iter__(self): 

60 # type: () -> Iterator[_T] 

61 return iter([]) 

62 

63 def __iterlen__(self): 

64 # type: () -> int 

65 return sum(1 for _ in iter(self)) 

66 

67 

68def _get_values(value): 

69 # type: (Any) -> Any 

70 """Generate a range object from (start, stop[, step]) tuples, or 

71 return value. 

72 

73 """ 

74 if (isinstance(value, tuple) and (2 <= len(value) <= 3) and 

75 all(hasattr(i, "__int__") for i in value)): 

76 # We use values[1] + 1 as stop value for (x)range to maintain 

77 # the behavior of using tuples as field `values` 

78 return range(*((int(value[0]), int(value[1]) + 1) + 

79 tuple(int(v) for v in value[2:]))) 

80 return value 

81 

82 

83class SetGen(Gen[_T]): 

84 def __init__(self, values, _iterpacket=1): 

85 # type: (Any, int) -> None 

86 self._iterpacket = _iterpacket 

87 if isinstance(values, (list, BasePacketList)): 

88 self.values = [_get_values(val) for val in values] 

89 else: 

90 self.values = [_get_values(values)] 

91 

92 def __iter__(self): 

93 # type: () -> Iterator[Any] 

94 for i in self.values: 

95 if (isinstance(i, Gen) and 

96 (self._iterpacket or not isinstance(i, BasePacket))) or ( 

97 isinstance(i, (range, types.GeneratorType))): 

98 for j in i: 

99 yield j 

100 else: 

101 yield i 

102 

103 def __len__(self): 

104 # type: () -> int 

105 return self.__iterlen__() 

106 

107 def __repr__(self): 

108 # type: () -> str 

109 return "<SetGen %r>" % self.values 

110 

111 

112class _ScopedIP(str): 

113 """ 

114 A str that also holds extra attributes. 

115 """ 

116 __slots__ = ["scope"] 

117 

118 def __init__(self, _: str) -> None: 

119 self.scope = None 

120 

121 def __repr__(self) -> str: 

122 val = super(_ScopedIP, self).__repr__() 

123 if self.scope is not None: 

124 return "ScopedIP(%s, scope=%s)" % (val, repr(self.scope)) 

125 return val 

126 

127 

128def ScopedIP(net: str, scope: Optional[Any] = None) -> _ScopedIP: 

129 """ 

130 An str that also holds extra attributes. 

131 

132 Examples:: 

133 

134 >>> ScopedIP("224.0.0.1%eth0") # interface 'eth0' 

135 >>> ScopedIP("224.0.0.1%1") # interface index 1 

136 >>> ScopedIP("224.0.0.1", scope=conf.iface) 

137 """ 

138 if "%" in net: 

139 try: 

140 net, scope = net.split("%", 1) 

141 except ValueError: 

142 raise Scapy_Exception("Scope identifier can only be present once !") 

143 if scope is not None: 

144 from scapy.interfaces import resolve_iface, network_name, dev_from_index 

145 try: 

146 iface = dev_from_index(int(scope)) 

147 except (ValueError, TypeError): 

148 iface = resolve_iface(scope) 

149 if not iface.is_valid(): 

150 raise Scapy_Exception( 

151 "RFC6874 scope identifier '%s' could not be resolved to a " 

152 "valid interface !" % scope 

153 ) 

154 scope = network_name(iface) 

155 x = _ScopedIP(net) 

156 x.scope = scope 

157 return x 

158 

159 

160class Net(Gen[str]): 

161 """ 

162 Network object from an IP address or hostname and mask 

163 

164 Examples: 

165 

166 - With mask:: 

167 

168 >>> list(Net("192.168.0.1/24")) 

169 ['192.168.0.0', '192.168.0.1', ..., '192.168.0.255'] 

170 

171 - With 'end':: 

172 

173 >>> list(Net("192.168.0.100", "192.168.0.200")) 

174 ['192.168.0.100', '192.168.0.101', ..., '192.168.0.200'] 

175 

176 - With 'scope' (for multicast):: 

177 

178 >>> Net("224.0.0.1%lo") 

179 >>> Net("224.0.0.1", scope=conf.iface) 

180 """ 

181 name = "Net" # type: str 

182 family = socket.AF_INET # type: int 

183 max_mask = 32 # type: int 

184 

185 @classmethod 

186 def name2addr(cls, name): 

187 # type: (str) -> str 

188 try: 

189 return next( 

190 addr_port[0] 

191 for family, _, _, _, addr_port in 

192 socket.getaddrinfo(name, None, cls.family) 

193 if family == cls.family 

194 ) 

195 except socket.error: 

196 if re.search("(^|\\.)[0-9]+-[0-9]+($|\\.)", name) is not None: 

197 raise Scapy_Exception("Ranges are no longer accepted in %s()" % 

198 cls.__name__) 

199 raise 

200 

201 @classmethod 

202 def ip2int(cls, addr): 

203 # type: (str) -> int 

204 return cast(int, struct.unpack( 

205 "!I", socket.inet_aton(cls.name2addr(addr)) 

206 )[0]) 

207 

208 @staticmethod 

209 def int2ip(val): 

210 # type: (int) -> str 

211 return socket.inet_ntoa(struct.pack('!I', val)) 

212 

213 def __init__(self, net, stop=None, scope=None): 

214 # type: (str, Optional[str], Optional[str]) -> None 

215 if "*" in net: 

216 raise Scapy_Exception("Wildcards are no longer accepted in %s()" % 

217 self.__class__.__name__) 

218 self.scope = None 

219 if "%" in net: 

220 net = ScopedIP(net) 

221 if isinstance(net, _ScopedIP): 

222 self.scope = net.scope 

223 if stop is None: 

224 try: 

225 net, mask = net.split("/", 1) 

226 except ValueError: 

227 self.mask = self.max_mask # type: Union[None, int] 

228 else: 

229 self.mask = int(mask) 

230 self.net = net # type: Union[None, str] 

231 inv_mask = self.max_mask - self.mask 

232 self.start = self.ip2int(net) >> inv_mask << inv_mask 

233 self.count = 1 << inv_mask 

234 self.stop = self.start + self.count - 1 

235 else: 

236 self.start = self.ip2int(net) 

237 self.stop = self.ip2int(stop) 

238 self.count = self.stop - self.start + 1 

239 self.net = self.mask = None 

240 

241 def __str__(self): 

242 # type: () -> str 

243 return next(iter(self), "") 

244 

245 def __iter__(self): 

246 # type: () -> Iterator[str] 

247 # Python 2 won't handle huge (> sys.maxint) values in range() 

248 for i in range(self.count): 

249 yield ScopedIP( 

250 self.int2ip(self.start + i), 

251 scope=self.scope, 

252 ) 

253 

254 def __len__(self): 

255 # type: () -> int 

256 return self.count 

257 

258 def __iterlen__(self): 

259 # type: () -> int 

260 # for compatibility 

261 return len(self) 

262 

263 def choice(self): 

264 # type: () -> str 

265 return ScopedIP( 

266 self.int2ip(random.randint(self.start, self.stop)), 

267 scope=self.scope, 

268 ) 

269 

270 def __repr__(self): 

271 # type: () -> str 

272 scope_id_repr = "" 

273 if self.scope: 

274 scope_id_repr = ", scope=%s" % repr(self.scope) 

275 if self.mask is not None: 

276 return '%s("%s/%d"%s)' % ( 

277 self.__class__.__name__, 

278 self.net, 

279 self.mask, 

280 scope_id_repr, 

281 ) 

282 return '%s("%s", "%s"%s)' % ( 

283 self.__class__.__name__, 

284 self.int2ip(self.start), 

285 self.int2ip(self.stop), 

286 scope_id_repr, 

287 ) 

288 

289 def __eq__(self, other): 

290 # type: (Any) -> bool 

291 if isinstance(other, str): 

292 return self == self.__class__(other) 

293 if not isinstance(other, Net): 

294 return False 

295 if self.family != other.family: 

296 return False 

297 return (self.start == other.start) and (self.stop == other.stop) 

298 

299 def __ne__(self, other): 

300 # type: (Any) -> bool 

301 # Python 2.7 compat 

302 return not self == other 

303 

304 def __hash__(self): 

305 # type: () -> int 

306 return hash(("scapy.Net", self.family, self.start, self.stop, self.scope)) 

307 

308 def __contains__(self, other): 

309 # type: (Any) -> bool 

310 if isinstance(other, int): 

311 return self.start <= other <= self.stop 

312 if isinstance(other, str): 

313 return self.__class__(other) in self 

314 if type(other) is not self.__class__: 

315 return False 

316 return self.start <= other.start <= other.stop <= self.stop 

317 

318 

319class OID(Gen[str]): 

320 name = "OID" 

321 

322 def __init__(self, oid): 

323 # type: (str) -> None 

324 self.oid = oid 

325 self.cmpt = [] 

326 fmt = [] 

327 for i in oid.split("."): 

328 if "-" in i: 

329 fmt.append("%i") 

330 self.cmpt.append(tuple(map(int, i.split("-")))) 

331 else: 

332 fmt.append(i) 

333 self.fmt = ".".join(fmt) 

334 

335 def __repr__(self): 

336 # type: () -> str 

337 return "OID(%r)" % self.oid 

338 

339 def __iter__(self): 

340 # type: () -> Iterator[str] 

341 ii = [k[0] for k in self.cmpt] 

342 while True: 

343 yield self.fmt % tuple(ii) 

344 i = 0 

345 while True: 

346 if i >= len(ii): 

347 return 

348 if ii[i] < self.cmpt[i][1]: 

349 ii[i] += 1 

350 break 

351 else: 

352 ii[i] = self.cmpt[i][0] 

353 i += 1 

354 

355 def __iterlen__(self): 

356 # type: () -> int 

357 return reduce(operator.mul, (max(y - x, 0) + 1 for (x, y) in self.cmpt), 1) # noqa: E501 

358 

359 

360###################################### 

361# Packet abstract and base classes # 

362###################################### 

363 

364class Packet_metaclass(type): 

365 def __new__(cls: Type[_T], 

366 name, # type: str 

367 bases, # type: Tuple[type, ...] 

368 dct # type: Dict[str, Any] 

369 ): 

370 # type: (...) -> Type['Packet'] 

371 if "fields_desc" in dct: # perform resolution of references to other packets # noqa: E501 

372 current_fld = dct["fields_desc"] # type: List[Union[scapy.fields.Field[Any, Any], Packet_metaclass]] # noqa: E501 

373 resolved_fld = [] # type: List[scapy.fields.Field[Any, Any]] 

374 for fld_or_pkt in current_fld: 

375 if isinstance(fld_or_pkt, Packet_metaclass): 

376 # reference to another fields_desc 

377 for pkt_fld in fld_or_pkt.fields_desc: 

378 resolved_fld.append(pkt_fld) 

379 else: 

380 resolved_fld.append(fld_or_pkt) 

381 else: # look for a fields_desc in parent classes 

382 resolved_fld = [] 

383 for b in bases: 

384 if hasattr(b, "fields_desc"): 

385 resolved_fld = b.fields_desc 

386 break 

387 

388 if resolved_fld: # perform default value replacements 

389 final_fld = [] # type: List[scapy.fields.Field[Any, Any]] 

390 names = [] 

391 for f in resolved_fld: 

392 if f.name in names: 

393 war_msg = ( 

394 "Packet '%s' has a duplicated '%s' field ! " 

395 "If you are using several ConditionalFields, have " 

396 "a look at MultipleTypeField instead ! This will " 

397 "become a SyntaxError in a future version of " 

398 "Scapy !" % ( 

399 name, f.name 

400 ) 

401 ) 

402 warnings.warn(war_msg, SyntaxWarning) 

403 names.append(f.name) 

404 if f.name in dct: 

405 f = f.copy() 

406 f.default = dct[f.name] 

407 del dct[f.name] 

408 final_fld.append(f) 

409 

410 dct["fields_desc"] = final_fld 

411 

412 dct.setdefault("__slots__", []) 

413 for attr in ["name", "overload_fields"]: 

414 try: 

415 dct["_%s" % attr] = dct.pop(attr) 

416 except KeyError: 

417 pass 

418 # Build and inject signature 

419 try: 

420 # Py3 only 

421 import inspect 

422 dct["__signature__"] = inspect.Signature([ 

423 inspect.Parameter("_pkt", inspect.Parameter.POSITIONAL_ONLY), 

424 ] + [ 

425 inspect.Parameter(f.name, 

426 inspect.Parameter.KEYWORD_ONLY, 

427 default=f.default) 

428 for f in dct["fields_desc"] 

429 ]) 

430 except (ImportError, AttributeError, KeyError): 

431 pass 

432 newcls = cast(Type['Packet'], type.__new__(cls, name, bases, dct)) 

433 # Note: below can't be typed because we use attributes 

434 # created dynamically.. 

435 newcls.__all_slots__ = set( # type: ignore 

436 attr 

437 for cls in newcls.__mro__ if hasattr(cls, "__slots__") 

438 for attr in cls.__slots__ 

439 ) 

440 

441 newcls.aliastypes = ( # type: ignore 

442 [newcls] + getattr(newcls, "aliastypes", []) 

443 ) 

444 

445 if hasattr(newcls, "register_variant"): 

446 newcls.register_variant() 

447 for _f in newcls.fields_desc: 

448 if hasattr(_f, "register_owner"): 

449 _f.register_owner(newcls) 

450 if newcls.__name__[0] != "_": 

451 from scapy import config 

452 config.conf.layers.register(newcls) 

453 return newcls 

454 

455 def __getattr__(self, attr): 

456 # type: (str) -> Any 

457 for k in self.fields_desc: 

458 if k.name == attr: 

459 return k 

460 raise AttributeError(attr) 

461 

462 def __call__(cls, 

463 *args, # type: Any 

464 **kargs # type: Any 

465 ): 

466 # type: (...) -> 'Packet' 

467 if "dispatch_hook" in cls.__dict__: 

468 try: 

469 cls = cls.dispatch_hook(*args, **kargs) 

470 except Exception: 

471 from scapy import config 

472 if config.conf.debug_dissector: 

473 raise 

474 cls = config.conf.raw_layer 

475 i = cls.__new__( 

476 cls, # type: ignore 

477 cls.__name__, 

478 cls.__bases__, 

479 cls.__dict__ # type: ignore 

480 ) 

481 i.__init__(*args, **kargs) 

482 return i # type: ignore 

483 

484 

485# Note: see compat.py for an explanation 

486 

487class Field_metaclass(type): 

488 def __new__(cls: Type[_T], 

489 name, # type: str 

490 bases, # type: Tuple[type, ...] 

491 dct # type: Dict[str, Any] 

492 ): 

493 # type: (...) -> Type[_T] 

494 dct.setdefault("__slots__", []) 

495 newcls = type.__new__(cls, name, bases, dct) 

496 return newcls # type: ignore 

497 

498 

499PacketList_metaclass = Field_metaclass 

500 

501 

502class BasePacket(Gen['Packet']): 

503 __slots__ = [] # type: List[str] 

504 

505 

506############################# 

507# Packet list base class # 

508############################# 

509 

510class BasePacketList(Gen[_T]): 

511 __slots__ = [] # type: List[str] 

512 

513 

514class _CanvasDumpExtended(object): 

515 @abc.abstractmethod 

516 def canvas_dump(self, layer_shift=0, rebuild=1): 

517 # type: (int, int) -> pyx.canvas.canvas 

518 pass 

519 

520 def psdump(self, filename=None, **kargs): 

521 # type: (Optional[str], **Any) -> None 

522 """ 

523 psdump(filename=None, layer_shift=0, rebuild=1) 

524 

525 Creates an EPS file describing a packet. If filename is not provided a 

526 temporary file is created and gs is called. 

527 

528 :param filename: the file's filename 

529 """ 

530 from scapy.config import conf 

531 from scapy.utils import get_temp_file, ContextManagerSubprocess 

532 canvas = self.canvas_dump(**kargs) 

533 if filename is None: 

534 fname = get_temp_file(autoext=kargs.get("suffix", ".eps")) 

535 canvas.writeEPSfile(fname) 

536 if WINDOWS and not conf.prog.psreader: 

537 os.startfile(fname) 

538 else: 

539 with ContextManagerSubprocess(conf.prog.psreader): 

540 subprocess.Popen([conf.prog.psreader, fname]) 

541 else: 

542 canvas.writeEPSfile(filename) 

543 print() 

544 

545 def pdfdump(self, filename=None, **kargs): 

546 # type: (Optional[str], **Any) -> None 

547 """ 

548 pdfdump(filename=None, layer_shift=0, rebuild=1) 

549 

550 Creates a PDF file describing a packet. If filename is not provided a 

551 temporary file is created and xpdf is called. 

552 

553 :param filename: the file's filename 

554 """ 

555 from scapy.config import conf 

556 from scapy.utils import get_temp_file, ContextManagerSubprocess 

557 canvas = self.canvas_dump(**kargs) 

558 if filename is None: 

559 fname = get_temp_file(autoext=kargs.get("suffix", ".pdf")) 

560 canvas.writePDFfile(fname) 

561 if WINDOWS and not conf.prog.pdfreader: 

562 os.startfile(fname) 

563 else: 

564 with ContextManagerSubprocess(conf.prog.pdfreader): 

565 subprocess.Popen([conf.prog.pdfreader, fname]) 

566 else: 

567 canvas.writePDFfile(filename) 

568 print() 

569 

570 def svgdump(self, filename=None, **kargs): 

571 # type: (Optional[str], **Any) -> None 

572 """ 

573 svgdump(filename=None, layer_shift=0, rebuild=1) 

574 

575 Creates an SVG file describing a packet. If filename is not provided a 

576 temporary file is created and gs is called. 

577 

578 :param filename: the file's filename 

579 """ 

580 from scapy.config import conf 

581 from scapy.utils import get_temp_file, ContextManagerSubprocess 

582 canvas = self.canvas_dump(**kargs) 

583 if filename is None: 

584 fname = get_temp_file(autoext=kargs.get("suffix", ".svg")) 

585 canvas.writeSVGfile(fname) 

586 if WINDOWS and not conf.prog.svgreader: 

587 os.startfile(fname) 

588 else: 

589 with ContextManagerSubprocess(conf.prog.svgreader): 

590 subprocess.Popen([conf.prog.svgreader, fname]) 

591 else: 

592 canvas.writeSVGfile(filename) 

593 print()