Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/base_classes.py: 59%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

269 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Generators and packet meta classes. 

8""" 

9 

10################ 

11# Generators # 

12################ 

13 

14 

15from functools import reduce 

16import abc 

17import operator 

18import os 

19import random 

20import re 

21import socket 

22import struct 

23import subprocess 

24import types 

25import warnings 

26 

27import scapy 

28from scapy.error import Scapy_Exception 

29from scapy.consts import WINDOWS 

30 

31from typing import ( 

32 Any, 

33 Dict, 

34 Generic, 

35 Iterator, 

36 List, 

37 Optional, 

38 Tuple, 

39 Type, 

40 TypeVar, 

41 Union, 

42 cast, 

43 TYPE_CHECKING, 

44) 

45 

46if TYPE_CHECKING: 

47 try: 

48 import pyx 

49 except ImportError: 

50 pass 

51 from scapy.packet import Packet 

52 

53_T = TypeVar("_T") 

54 

55 

56class Gen(Generic[_T]): 

57 __slots__ = [] # type: List[str] 

58 

59 def __iter__(self): 

60 # type: () -> Iterator[_T] 

61 return iter([]) 

62 

63 def __iterlen__(self): 

64 # type: () -> int 

65 return sum(1 for _ in iter(self)) 

66 

67 

68def _get_values(value): 

69 # type: (Any) -> Any 

70 """Generate a range object from (start, stop[, step]) tuples, or 

71 return value. 

72 

73 """ 

74 if (isinstance(value, tuple) and (2 <= len(value) <= 3) and 

75 all(hasattr(i, "__int__") for i in value)): 

76 # We use values[1] + 1 as stop value for (x)range to maintain 

77 # the behavior of using tuples as field `values` 

78 return range(*((int(value[0]), int(value[1]) + 1) + 

79 tuple(int(v) for v in value[2:]))) 

80 return value 

81 

82 

83class SetGen(Gen[_T]): 

84 def __init__(self, values, _iterpacket=1): 

85 # type: (Any, int) -> None 

86 self._iterpacket = _iterpacket 

87 if isinstance(values, (list, BasePacketList)): 

88 self.values = [_get_values(val) for val in values] 

89 else: 

90 self.values = [_get_values(values)] 

91 

92 def __iter__(self): 

93 # type: () -> Iterator[Any] 

94 for i in self.values: 

95 if (isinstance(i, Gen) and 

96 (self._iterpacket or not isinstance(i, BasePacket))) or ( 

97 isinstance(i, (range, types.GeneratorType))): 

98 for j in i: 

99 yield j 

100 else: 

101 yield i 

102 

103 def __len__(self): 

104 # type: () -> int 

105 return self.__iterlen__() 

106 

107 def __repr__(self): 

108 # type: () -> str 

109 return "<SetGen %r>" % self.values 

110 

111 

112class Net(Gen[str]): 

113 """Network object from an IP address or hostname and mask""" 

114 name = "Net" # type: str 

115 family = socket.AF_INET # type: int 

116 max_mask = 32 # type: int 

117 

118 @classmethod 

119 def name2addr(cls, name): 

120 # type: (str) -> str 

121 try: 

122 return next( 

123 addr_port[0] 

124 for family, _, _, _, addr_port in 

125 socket.getaddrinfo(name, None, cls.family) 

126 if family == cls.family 

127 ) 

128 except socket.error: 

129 if re.search("(^|\\.)[0-9]+-[0-9]+($|\\.)", name) is not None: 

130 raise Scapy_Exception("Ranges are no longer accepted in %s()" % 

131 cls.__name__) 

132 raise 

133 

134 @classmethod 

135 def ip2int(cls, addr): 

136 # type: (str) -> int 

137 return cast(int, struct.unpack( 

138 "!I", socket.inet_aton(cls.name2addr(addr)) 

139 )[0]) 

140 

141 @staticmethod 

142 def int2ip(val): 

143 # type: (int) -> str 

144 return socket.inet_ntoa(struct.pack('!I', val)) 

145 

146 def __init__(self, net, stop=None): 

147 # type: (str, Union[None, str]) -> None 

148 if "*" in net: 

149 raise Scapy_Exception("Wildcards are no longer accepted in %s()" % 

150 self.__class__.__name__) 

151 if stop is None: 

152 try: 

153 net, mask = net.split("/", 1) 

154 except ValueError: 

155 self.mask = self.max_mask # type: Union[None, int] 

156 else: 

157 self.mask = int(mask) 

158 self.net = net # type: Union[None, str] 

159 inv_mask = self.max_mask - self.mask 

160 self.start = self.ip2int(net) >> inv_mask << inv_mask 

161 self.count = 1 << inv_mask 

162 self.stop = self.start + self.count - 1 

163 else: 

164 self.start = self.ip2int(net) 

165 self.stop = self.ip2int(stop) 

166 self.count = self.stop - self.start + 1 

167 self.net = self.mask = None 

168 

169 def __str__(self): 

170 # type: () -> str 

171 return next(iter(self), "") 

172 

173 def __iter__(self): 

174 # type: () -> Iterator[str] 

175 # Python 2 won't handle huge (> sys.maxint) values in range() 

176 for i in range(self.count): 

177 yield self.int2ip(self.start + i) 

178 

179 def __len__(self): 

180 # type: () -> int 

181 return self.count 

182 

183 def __iterlen__(self): 

184 # type: () -> int 

185 # for compatibility 

186 return len(self) 

187 

188 def choice(self): 

189 # type: () -> str 

190 return self.int2ip(random.randint(self.start, self.stop)) 

191 

192 def __repr__(self): 

193 # type: () -> str 

194 if self.mask is not None: 

195 return '%s("%s/%d")' % ( 

196 self.__class__.__name__, 

197 self.net, 

198 self.mask, 

199 ) 

200 return '%s("%s", "%s")' % ( 

201 self.__class__.__name__, 

202 self.int2ip(self.start), 

203 self.int2ip(self.stop), 

204 ) 

205 

206 def __eq__(self, other): 

207 # type: (Any) -> bool 

208 if isinstance(other, str): 

209 return self == self.__class__(other) 

210 if not isinstance(other, Net): 

211 return False 

212 if self.family != other.family: 

213 return False 

214 return (self.start == other.start) and (self.stop == other.stop) 

215 

216 def __ne__(self, other): 

217 # type: (Any) -> bool 

218 # Python 2.7 compat 

219 return not self == other 

220 

221 def __hash__(self): 

222 # type: () -> int 

223 return hash(("scapy.Net", self.family, self.start, self.stop)) 

224 

225 def __contains__(self, other): 

226 # type: (Any) -> bool 

227 if isinstance(other, int): 

228 return self.start <= other <= self.stop 

229 if isinstance(other, str): 

230 return self.__class__(other) in self 

231 if type(other) is not self.__class__: 

232 return False 

233 return self.start <= other.start <= other.stop <= self.stop 

234 

235 

236class OID(Gen[str]): 

237 name = "OID" 

238 

239 def __init__(self, oid): 

240 # type: (str) -> None 

241 self.oid = oid 

242 self.cmpt = [] 

243 fmt = [] 

244 for i in oid.split("."): 

245 if "-" in i: 

246 fmt.append("%i") 

247 self.cmpt.append(tuple(map(int, i.split("-")))) 

248 else: 

249 fmt.append(i) 

250 self.fmt = ".".join(fmt) 

251 

252 def __repr__(self): 

253 # type: () -> str 

254 return "OID(%r)" % self.oid 

255 

256 def __iter__(self): 

257 # type: () -> Iterator[str] 

258 ii = [k[0] for k in self.cmpt] 

259 while True: 

260 yield self.fmt % tuple(ii) 

261 i = 0 

262 while True: 

263 if i >= len(ii): 

264 return 

265 if ii[i] < self.cmpt[i][1]: 

266 ii[i] += 1 

267 break 

268 else: 

269 ii[i] = self.cmpt[i][0] 

270 i += 1 

271 

272 def __iterlen__(self): 

273 # type: () -> int 

274 return reduce(operator.mul, (max(y - x, 0) + 1 for (x, y) in self.cmpt), 1) # noqa: E501 

275 

276 

277###################################### 

278# Packet abstract and base classes # 

279###################################### 

280 

281class Packet_metaclass(type): 

282 def __new__(cls: Type[_T], 

283 name, # type: str 

284 bases, # type: Tuple[type, ...] 

285 dct # type: Dict[str, Any] 

286 ): 

287 # type: (...) -> Type['Packet'] 

288 if "fields_desc" in dct: # perform resolution of references to other packets # noqa: E501 

289 current_fld = dct["fields_desc"] # type: List[Union[scapy.fields.Field[Any, Any], Packet_metaclass]] # noqa: E501 

290 resolved_fld = [] # type: List[scapy.fields.Field[Any, Any]] 

291 for fld_or_pkt in current_fld: 

292 if isinstance(fld_or_pkt, Packet_metaclass): 

293 # reference to another fields_desc 

294 for pkt_fld in fld_or_pkt.fields_desc: 

295 resolved_fld.append(pkt_fld) 

296 else: 

297 resolved_fld.append(fld_or_pkt) 

298 else: # look for a fields_desc in parent classes 

299 resolved_fld = [] 

300 for b in bases: 

301 if hasattr(b, "fields_desc"): 

302 resolved_fld = b.fields_desc 

303 break 

304 

305 if resolved_fld: # perform default value replacements 

306 final_fld = [] # type: List[scapy.fields.Field[Any, Any]] 

307 names = [] 

308 for f in resolved_fld: 

309 if f.name in names: 

310 war_msg = ( 

311 "Packet '%s' has a duplicated '%s' field ! " 

312 "If you are using several ConditionalFields, have " 

313 "a look at MultipleTypeField instead ! This will " 

314 "become a SyntaxError in a future version of " 

315 "Scapy !" % ( 

316 name, f.name 

317 ) 

318 ) 

319 warnings.warn(war_msg, SyntaxWarning) 

320 names.append(f.name) 

321 if f.name in dct: 

322 f = f.copy() 

323 f.default = dct[f.name] 

324 del dct[f.name] 

325 final_fld.append(f) 

326 

327 dct["fields_desc"] = final_fld 

328 

329 dct.setdefault("__slots__", []) 

330 for attr in ["name", "overload_fields"]: 

331 try: 

332 dct["_%s" % attr] = dct.pop(attr) 

333 except KeyError: 

334 pass 

335 # Build and inject signature 

336 try: 

337 # Py3 only 

338 import inspect 

339 dct["__signature__"] = inspect.Signature([ 

340 inspect.Parameter("_pkt", inspect.Parameter.POSITIONAL_ONLY), 

341 ] + [ 

342 inspect.Parameter(f.name, 

343 inspect.Parameter.KEYWORD_ONLY, 

344 default=f.default) 

345 for f in dct["fields_desc"] 

346 ]) 

347 except (ImportError, AttributeError, KeyError): 

348 pass 

349 newcls = cast(Type['Packet'], type.__new__(cls, name, bases, dct)) 

350 # Note: below can't be typed because we use attributes 

351 # created dynamically.. 

352 newcls.__all_slots__ = set( # type: ignore 

353 attr 

354 for cls in newcls.__mro__ if hasattr(cls, "__slots__") 

355 for attr in cls.__slots__ 

356 ) 

357 

358 newcls.aliastypes = ( # type: ignore 

359 [newcls] + getattr(newcls, "aliastypes", []) 

360 ) 

361 

362 if hasattr(newcls, "register_variant"): 

363 newcls.register_variant() 

364 for _f in newcls.fields_desc: 

365 if hasattr(_f, "register_owner"): 

366 _f.register_owner(newcls) 

367 if newcls.__name__[0] != "_": 

368 from scapy import config 

369 config.conf.layers.register(newcls) 

370 return newcls 

371 

372 def __getattr__(self, attr): 

373 # type: (str) -> Any 

374 for k in self.fields_desc: 

375 if k.name == attr: 

376 return k 

377 raise AttributeError(attr) 

378 

379 def __call__(cls, 

380 *args, # type: Any 

381 **kargs # type: Any 

382 ): 

383 # type: (...) -> 'Packet' 

384 if "dispatch_hook" in cls.__dict__: 

385 try: 

386 cls = cls.dispatch_hook(*args, **kargs) 

387 except Exception: 

388 from scapy import config 

389 if config.conf.debug_dissector: 

390 raise 

391 cls = config.conf.raw_layer 

392 i = cls.__new__( 

393 cls, # type: ignore 

394 cls.__name__, 

395 cls.__bases__, 

396 cls.__dict__ # type: ignore 

397 ) 

398 i.__init__(*args, **kargs) 

399 return i # type: ignore 

400 

401 

402# Note: see compat.py for an explanation 

403 

404class Field_metaclass(type): 

405 def __new__(cls: Type[_T], 

406 name, # type: str 

407 bases, # type: Tuple[type, ...] 

408 dct # type: Dict[str, Any] 

409 ): 

410 # type: (...) -> Type[_T] 

411 dct.setdefault("__slots__", []) 

412 newcls = type.__new__(cls, name, bases, dct) 

413 return newcls # type: ignore 

414 

415 

416PacketList_metaclass = Field_metaclass 

417 

418 

419class BasePacket(Gen['Packet']): 

420 __slots__ = [] # type: List[str] 

421 

422 

423############################# 

424# Packet list base class # 

425############################# 

426 

427class BasePacketList(Gen[_T]): 

428 __slots__ = [] # type: List[str] 

429 

430 

431class _CanvasDumpExtended(object): 

432 @abc.abstractmethod 

433 def canvas_dump(self, layer_shift=0, rebuild=1): 

434 # type: (int, int) -> pyx.canvas.canvas 

435 pass 

436 

437 def psdump(self, filename=None, **kargs): 

438 # type: (Optional[str], **Any) -> None 

439 """ 

440 psdump(filename=None, layer_shift=0, rebuild=1) 

441 

442 Creates an EPS file describing a packet. If filename is not provided a 

443 temporary file is created and gs is called. 

444 

445 :param filename: the file's filename 

446 """ 

447 from scapy.config import conf 

448 from scapy.utils import get_temp_file, ContextManagerSubprocess 

449 canvas = self.canvas_dump(**kargs) 

450 if filename is None: 

451 fname = get_temp_file(autoext=kargs.get("suffix", ".eps")) 

452 canvas.writeEPSfile(fname) 

453 if WINDOWS and not conf.prog.psreader: 

454 os.startfile(fname) 

455 else: 

456 with ContextManagerSubprocess(conf.prog.psreader): 

457 subprocess.Popen([conf.prog.psreader, fname]) 

458 else: 

459 canvas.writeEPSfile(filename) 

460 print() 

461 

462 def pdfdump(self, filename=None, **kargs): 

463 # type: (Optional[str], **Any) -> None 

464 """ 

465 pdfdump(filename=None, layer_shift=0, rebuild=1) 

466 

467 Creates a PDF file describing a packet. If filename is not provided a 

468 temporary file is created and xpdf is called. 

469 

470 :param filename: the file's filename 

471 """ 

472 from scapy.config import conf 

473 from scapy.utils import get_temp_file, ContextManagerSubprocess 

474 canvas = self.canvas_dump(**kargs) 

475 if filename is None: 

476 fname = get_temp_file(autoext=kargs.get("suffix", ".pdf")) 

477 canvas.writePDFfile(fname) 

478 if WINDOWS and not conf.prog.pdfreader: 

479 os.startfile(fname) 

480 else: 

481 with ContextManagerSubprocess(conf.prog.pdfreader): 

482 subprocess.Popen([conf.prog.pdfreader, fname]) 

483 else: 

484 canvas.writePDFfile(filename) 

485 print() 

486 

487 def svgdump(self, filename=None, **kargs): 

488 # type: (Optional[str], **Any) -> None 

489 """ 

490 svgdump(filename=None, layer_shift=0, rebuild=1) 

491 

492 Creates an SVG file describing a packet. If filename is not provided a 

493 temporary file is created and gs is called. 

494 

495 :param filename: the file's filename 

496 """ 

497 from scapy.config import conf 

498 from scapy.utils import get_temp_file, ContextManagerSubprocess 

499 canvas = self.canvas_dump(**kargs) 

500 if filename is None: 

501 fname = get_temp_file(autoext=kargs.get("suffix", ".svg")) 

502 canvas.writeSVGfile(fname) 

503 if WINDOWS and not conf.prog.svgreader: 

504 os.startfile(fname) 

505 else: 

506 with ContextManagerSubprocess(conf.prog.svgreader): 

507 subprocess.Popen([conf.prog.svgreader, fname]) 

508 else: 

509 canvas.writeSVGfile(filename) 

510 print()