Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/packet.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1442 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Packet class 

8 

9Provides: 

10 - the default Packet classes 

11 - binding mechanisms 

12 - fuzz() method 

13 - exploration methods: explore() / ls() 

14""" 

15 

16from collections import defaultdict 

17 

18import json 

19import re 

20import time 

21import itertools 

22import copy 

23import types 

24import warnings 

25 

26from scapy.fields import ( 

27 AnyField, 

28 BitField, 

29 ConditionalField, 

30 Emph, 

31 EnumField, 

32 Field, 

33 FlagsField, 

34 FlagValue, 

35 MayEnd, 

36 MultiEnumField, 

37 MultipleTypeField, 

38 PadField, 

39 PacketListField, 

40 RawVal, 

41 StrField, 

42) 

43from scapy.config import conf, _version_checker 

44from scapy.compat import raw, orb, bytes_encode 

45from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \ 

46 _CanvasDumpExtended 

47from scapy.interfaces import _GlobInterfaceType 

48from scapy.volatile import RandField, VolatileValue 

49from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \ 

50 pretty_list, EDecimal 

51from scapy.error import Scapy_Exception, log_runtime, warning 

52from scapy.libs.test_pyx import PYX 

53 

54# Typing imports 

55from typing import ( 

56 Any, 

57 Callable, 

58 Dict, 

59 Iterator, 

60 List, 

61 NoReturn, 

62 Optional, 

63 Set, 

64 Tuple, 

65 Type, 

66 TypeVar, 

67 Union, 

68 Sequence, 

69 cast, 

70) 

71from scapy.compat import Self 

72 

73try: 

74 import pyx 

75except ImportError: 

76 pass 

77 

78 

79_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]]) 

80 

81 

82class Packet( 

83 BasePacket, 

84 _CanvasDumpExtended, 

85 metaclass=Packet_metaclass 

86): 

87 __slots__ = [ 

88 "time", "sent_time", "name", 

89 "default_fields", "fields", "fieldtype", 

90 "overload_fields", "overloaded_fields", 

91 "packetfields", 

92 "original", "explicit", "raw_packet_cache", 

93 "raw_packet_cache_fields", "_pkt", "post_transforms", 

94 "stop_dissection_after", 

95 # then payload, underlayer and parent 

96 "payload", "underlayer", "parent", 

97 "name", 

98 # used for sr() 

99 "_answered", 

100 # used when sniffing 

101 "direction", "sniffed_on", 

102 # handle snaplen Vs real length 

103 "wirelen", 

104 "comments", 

105 "process_information" 

106 ] 

107 name = None 

108 fields_desc = [] # type: List[AnyField] 

109 deprecated_fields = {} # type: Dict[str, Tuple[str, str]] 

110 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 

111 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]] 

112 show_indent = 1 

113 show_summary = True 

114 match_subclass = False 

115 class_dont_cache = {} # type: Dict[Type[Packet], bool] 

116 class_packetfields = {} # type: Dict[Type[Packet], Any] 

117 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 

118 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]] 

119 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501 

120 

121 @classmethod 

122 def from_hexcap(cls): 

123 # type: (Type[Packet]) -> Packet 

124 return cls(import_hexcap()) 

125 

126 @classmethod 

127 def upper_bonds(self): 

128 # type: () -> None 

129 for fval, upper in self.payload_guess: 

130 print( 

131 "%-20s %s" % ( 

132 upper.__name__, 

133 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 

134 ) 

135 ) 

136 

137 @classmethod 

138 def lower_bonds(self): 

139 # type: () -> None 

140 for lower, fval in self._overload_fields.items(): 

141 print( 

142 "%-20s %s" % ( 

143 lower.__name__, 

144 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 

145 ) 

146 ) 

147 

148 def __init__(self, 

149 _pkt=b"", # type: Union[bytes, bytearray] 

150 post_transform=None, # type: Any 

151 _internal=0, # type: int 

152 _underlayer=None, # type: Optional[Packet] 

153 _parent=None, # type: Optional[Packet] 

154 stop_dissection_after=None, # type: Optional[Type[Packet]] 

155 **fields # type: Any 

156 ): 

157 # type: (...) -> None 

158 self.time = time.time() # type: Union[EDecimal, float] 

159 self.sent_time = None # type: Union[EDecimal, float, None] 

160 self.name = (self.__class__.__name__ 

161 if self._name is None else 

162 self._name) 

163 self.default_fields = {} # type: Dict[str, Any] 

164 self.overload_fields = self._overload_fields 

165 self.overloaded_fields = {} # type: Dict[str, Any] 

166 self.fields = {} # type: Dict[str, Any] 

167 self.fieldtype = {} # type: Dict[str, AnyField] 

168 self.packetfields = [] # type: List[AnyField] 

169 self.payload = NoPayload() # type: Packet 

170 self.init_fields(bool(_pkt)) 

171 self.underlayer = _underlayer 

172 self.parent = _parent 

173 if isinstance(_pkt, bytearray): 

174 _pkt = bytes(_pkt) 

175 self.original = _pkt 

176 self.explicit = 0 

177 self.raw_packet_cache = None # type: Optional[bytes] 

178 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501 

179 self.wirelen = None # type: Optional[int] 

180 self.direction = None # type: Optional[int] 

181 self.sniffed_on = None # type: Optional[_GlobInterfaceType] 

182 self.comments = None # type: Optional[List[bytes]] 

183 self.process_information = None # type: Optional[Dict[str, Any]] 

184 self.stop_dissection_after = stop_dissection_after 

185 if _pkt: 

186 self.dissect(_pkt) 

187 if not _internal: 

188 self.dissection_done(self) 

189 # We use this strange initialization so that the fields 

190 # are initialized in their declaration order. 

191 # It is required to always support MultipleTypeField 

192 for field in self.fields_desc: 

193 fname = field.name 

194 try: 

195 value = fields.pop(fname) 

196 except KeyError: 

197 continue 

198 self.fields[fname] = value if isinstance(value, RawVal) else \ 

199 self.get_field(fname).any2i(self, value) 

200 # The remaining fields are unknown 

201 for fname in fields: 

202 if fname in self.deprecated_fields: 

203 # Resolve deprecated fields 

204 value = fields[fname] 

205 fname = self._resolve_alias(fname) 

206 self.fields[fname] = value if isinstance(value, RawVal) else \ 

207 self.get_field(fname).any2i(self, value) 

208 continue 

209 raise AttributeError(fname) 

210 if isinstance(post_transform, list): 

211 self.post_transforms = post_transform 

212 elif post_transform is None: 

213 self.post_transforms = [] 

214 else: 

215 self.post_transforms = [post_transform] 

216 

217 _PickleType = Tuple[ 

218 Union[EDecimal, float], 

219 Optional[Union[EDecimal, float, None]], 

220 Optional[int], 

221 Optional[_GlobInterfaceType], 

222 Optional[int], 

223 Optional[bytes], 

224 ] 

225 

226 @property 

227 def comment(self): 

228 # type: () -> Optional[bytes] 

229 """Get the comment of the packet""" 

230 if self.comments and len(self.comments): 

231 return self.comments[0] 

232 return None 

233 

234 @comment.setter 

235 def comment(self, value): 

236 # type: (Optional[bytes]) -> None 

237 """ 

238 Set the comment of the packet. 

239 If value is None, it will clear the comments. 

240 """ 

241 if value is not None: 

242 self.comments = [value] 

243 else: 

244 self.comments = None 

245 

246 def __reduce__(self): 

247 # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType] 

248 """Used by pickling methods""" 

249 return (self.__class__, (self.build(),), ( 

250 self.time, 

251 self.sent_time, 

252 self.direction, 

253 self.sniffed_on, 

254 self.wirelen, 

255 self.comment 

256 )) 

257 

258 def __setstate__(self, state): 

259 # type: (Packet._PickleType) -> Packet 

260 """Rebuild state using pickable methods""" 

261 self.time = state[0] 

262 self.sent_time = state[1] 

263 self.direction = state[2] 

264 self.sniffed_on = state[3] 

265 self.wirelen = state[4] 

266 self.comment = state[5] 

267 return self 

268 

269 def __deepcopy__(self, 

270 memo, # type: Any 

271 ): 

272 # type: (...) -> Packet 

273 """Used by copy.deepcopy""" 

274 return self.copy() 

275 

276 def init_fields(self, for_dissect_only=False): 

277 # type: (bool) -> None 

278 """ 

279 Initialize each fields of the fields_desc dict 

280 """ 

281 

282 if self.class_dont_cache.get(self.__class__, False): 

283 self.do_init_fields(self.fields_desc) 

284 else: 

285 self.do_init_cached_fields(for_dissect_only=for_dissect_only) 

286 

287 def do_init_fields(self, 

288 flist, # type: Sequence[AnyField] 

289 ): 

290 # type: (...) -> None 

291 """ 

292 Initialize each fields of the fields_desc dict 

293 """ 

294 default_fields = {} 

295 for f in flist: 

296 default_fields[f.name] = copy.deepcopy(f.default) 

297 self.fieldtype[f.name] = f 

298 if f.holds_packets: 

299 self.packetfields.append(f) 

300 # We set default_fields last to avoid race issues 

301 self.default_fields = default_fields 

302 

303 def do_init_cached_fields(self, for_dissect_only=False): 

304 # type: (bool) -> None 

305 """ 

306 Initialize each fields of the fields_desc dict, or use the cached 

307 fields information 

308 """ 

309 

310 cls_name = self.__class__ 

311 

312 # Build the fields information 

313 if Packet.class_default_fields.get(cls_name, None) is None: 

314 self.prepare_cached_fields(self.fields_desc) 

315 

316 # Use fields information from cache 

317 default_fields = Packet.class_default_fields.get(cls_name, None) 

318 if default_fields: 

319 self.default_fields = default_fields 

320 self.fieldtype = Packet.class_fieldtype[cls_name] 

321 self.packetfields = Packet.class_packetfields[cls_name] 

322 

323 # Optimization: no need for references when only dissecting. 

324 if for_dissect_only: 

325 return 

326 

327 # Deepcopy default references 

328 for fname in Packet.class_default_fields_ref[cls_name]: 

329 value = self.default_fields[fname] 

330 try: 

331 self.fields[fname] = value.copy() 

332 except AttributeError: 

333 # Python 2.7 - list only 

334 self.fields[fname] = value[:] 

335 

336 def prepare_cached_fields(self, flist): 

337 # type: (Sequence[AnyField]) -> None 

338 """ 

339 Prepare the cached fields of the fields_desc dict 

340 """ 

341 

342 cls_name = self.__class__ 

343 

344 # Fields cache initialization 

345 if not flist: 

346 return 

347 

348 class_default_fields = dict() 

349 class_default_fields_ref = list() 

350 class_fieldtype = dict() 

351 class_packetfields = list() 

352 

353 # Fields initialization 

354 for f in flist: 

355 if isinstance(f, MultipleTypeField): 

356 # Abort 

357 self.class_dont_cache[cls_name] = True 

358 self.do_init_fields(self.fields_desc) 

359 return 

360 

361 class_default_fields[f.name] = copy.deepcopy(f.default) 

362 class_fieldtype[f.name] = f 

363 if f.holds_packets: 

364 class_packetfields.append(f) 

365 

366 # Remember references 

367 if isinstance(f.default, (list, dict, set, RandField, Packet)): 

368 class_default_fields_ref.append(f.name) 

369 

370 # Apply 

371 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref 

372 Packet.class_fieldtype[cls_name] = class_fieldtype 

373 Packet.class_packetfields[cls_name] = class_packetfields 

374 # Last to avoid racing issues 

375 Packet.class_default_fields[cls_name] = class_default_fields 

376 

377 def dissection_done(self, pkt): 

378 # type: (Packet) -> None 

379 """DEV: will be called after a dissection is completed""" 

380 self.post_dissection(pkt) 

381 self.payload.dissection_done(pkt) 

382 

383 def post_dissection(self, pkt): 

384 # type: (Packet) -> None 

385 """DEV: is called after the dissection of the whole packet""" 

386 pass 

387 

388 def get_field(self, fld): 

389 # type: (str) -> AnyField 

390 """DEV: returns the field instance from the name of the field""" 

391 return self.fieldtype[fld] 

392 

393 def add_payload(self, payload): 

394 # type: (Union[Packet, bytes]) -> None 

395 if payload is None: 

396 return 

397 elif not isinstance(self.payload, NoPayload): 

398 self.payload.add_payload(payload) 

399 else: 

400 if isinstance(payload, Packet): 

401 self.payload = payload 

402 payload.add_underlayer(self) 

403 for t in self.aliastypes: 

404 if t in payload.overload_fields: 

405 self.overloaded_fields = payload.overload_fields[t] 

406 break 

407 elif isinstance(payload, (bytes, str, bytearray, memoryview)): 

408 self.payload = conf.raw_layer(load=bytes_encode(payload)) 

409 else: 

410 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501 

411 

412 def remove_payload(self): 

413 # type: () -> None 

414 self.payload.remove_underlayer(self) 

415 self.payload = NoPayload() 

416 self.overloaded_fields = {} 

417 

418 def add_underlayer(self, underlayer): 

419 # type: (Packet) -> None 

420 self.underlayer = underlayer 

421 

422 def remove_underlayer(self, other): 

423 # type: (Packet) -> None 

424 self.underlayer = None 

425 

426 def add_parent(self, parent): 

427 # type: (Packet) -> None 

428 """Set packet parent. 

429 When packet is an element in PacketListField, parent field would 

430 point to the list owner packet.""" 

431 self.parent = parent 

432 

433 def remove_parent(self, other): 

434 # type: (Packet) -> None 

435 """Remove packet parent. 

436 When packet is an element in PacketListField, parent field would 

437 point to the list owner packet.""" 

438 self.parent = None 

439 

440 def copy(self) -> Self: 

441 """Returns a deep copy of the instance.""" 

442 clone = self.__class__() 

443 clone.fields = self.copy_fields_dict(self.fields) 

444 clone.default_fields = self.copy_fields_dict(self.default_fields) 

445 clone.overloaded_fields = self.overloaded_fields.copy() 

446 clone.underlayer = self.underlayer 

447 clone.parent = self.parent 

448 clone.explicit = self.explicit 

449 clone.raw_packet_cache = self.raw_packet_cache 

450 clone.raw_packet_cache_fields = self.copy_fields_dict( 

451 self.raw_packet_cache_fields 

452 ) 

453 clone.wirelen = self.wirelen 

454 clone.post_transforms = self.post_transforms[:] 

455 clone.payload = self.payload.copy() 

456 clone.payload.add_underlayer(clone) 

457 clone.time = self.time 

458 clone.comments = self.comments 

459 clone.direction = self.direction 

460 clone.sniffed_on = self.sniffed_on 

461 return clone 

462 

463 def _resolve_alias(self, attr): 

464 # type: (str) -> str 

465 new_attr, version = self.deprecated_fields[attr] 

466 warnings.warn( 

467 "%s has been deprecated in favor of %s since %s !" % ( 

468 attr, new_attr, version 

469 ), DeprecationWarning 

470 ) 

471 return new_attr 

472 

473 def getfieldval(self, attr): 

474 # type: (str) -> Any 

475 if self.deprecated_fields and attr in self.deprecated_fields: 

476 attr = self._resolve_alias(attr) 

477 if attr in self.fields: 

478 return self.fields[attr] 

479 if attr in self.overloaded_fields: 

480 return self.overloaded_fields[attr] 

481 if attr in self.default_fields: 

482 return self.default_fields[attr] 

483 return self.payload.getfieldval(attr) 

484 

485 def getfield_and_val(self, attr): 

486 # type: (str) -> Tuple[AnyField, Any] 

487 if self.deprecated_fields and attr in self.deprecated_fields: 

488 attr = self._resolve_alias(attr) 

489 if attr in self.fields: 

490 return self.get_field(attr), self.fields[attr] 

491 if attr in self.overloaded_fields: 

492 return self.get_field(attr), self.overloaded_fields[attr] 

493 if attr in self.default_fields: 

494 return self.get_field(attr), self.default_fields[attr] 

495 raise ValueError 

496 

497 def __getattr__(self, attr): 

498 # type: (str) -> Any 

499 try: 

500 fld, v = self.getfield_and_val(attr) 

501 except ValueError: 

502 return self.payload.__getattr__(attr) 

503 if fld is not None: 

504 return v if isinstance(v, RawVal) else fld.i2h(self, v) 

505 return v 

506 

507 def setfieldval(self, attr, val): 

508 # type: (str, Any) -> None 

509 if self.deprecated_fields and attr in self.deprecated_fields: 

510 attr = self._resolve_alias(attr) 

511 if attr in self.default_fields: 

512 fld = self.get_field(attr) 

513 if fld is None: 

514 any2i = lambda x, y: y # type: Callable[..., Any] 

515 else: 

516 any2i = fld.any2i 

517 self.fields[attr] = val if isinstance(val, RawVal) else \ 

518 any2i(self, val) 

519 self.explicit = 0 

520 self.raw_packet_cache = None 

521 self.raw_packet_cache_fields = None 

522 self.wirelen = None 

523 elif attr == "payload": 

524 self.remove_payload() 

525 self.add_payload(val) 

526 else: 

527 self.payload.setfieldval(attr, val) 

528 

529 def __setattr__(self, attr, val): 

530 # type: (str, Any) -> None 

531 if attr in self.__all_slots__: 

532 return object.__setattr__(self, attr, val) 

533 try: 

534 return self.setfieldval(attr, val) 

535 except AttributeError: 

536 pass 

537 return object.__setattr__(self, attr, val) 

538 

539 def delfieldval(self, attr): 

540 # type: (str) -> None 

541 if attr in self.fields: 

542 del self.fields[attr] 

543 self.explicit = 0 # in case a default value must be explicit 

544 self.raw_packet_cache = None 

545 self.raw_packet_cache_fields = None 

546 self.wirelen = None 

547 elif attr in self.default_fields: 

548 pass 

549 elif attr == "payload": 

550 self.remove_payload() 

551 else: 

552 self.payload.delfieldval(attr) 

553 

554 def __delattr__(self, attr): 

555 # type: (str) -> None 

556 if attr == "payload": 

557 return self.remove_payload() 

558 if attr in self.__all_slots__: 

559 return object.__delattr__(self, attr) 

560 try: 

561 return self.delfieldval(attr) 

562 except AttributeError: 

563 pass 

564 return object.__delattr__(self, attr) 

565 

566 def _superdir(self): 

567 # type: () -> Set[str] 

568 """ 

569 Return a list of slots and methods, including those from subclasses. 

570 """ 

571 attrs = set() # type: Set[str] 

572 cls = self.__class__ 

573 if hasattr(cls, '__all_slots__'): 

574 attrs.update(cls.__all_slots__) 

575 for bcls in cls.__mro__: 

576 if hasattr(bcls, '__dict__'): 

577 attrs.update(bcls.__dict__) 

578 return attrs 

579 

580 def __dir__(self): 

581 # type: () -> List[str] 

582 """ 

583 Add fields to tab completion list. 

584 """ 

585 return sorted(itertools.chain(self._superdir(), self.default_fields)) 

586 

587 def __repr__(self): 

588 # type: () -> str 

589 s = "" 

590 ct = conf.color_theme 

591 for f in self.fields_desc: 

592 if isinstance(f, ConditionalField) and not f._evalcond(self): 

593 continue 

594 if f.name in self.fields: 

595 fval = self.fields[f.name] 

596 if isinstance(fval, (list, dict, set)) and len(fval) == 0: 

597 continue 

598 val = f.i2repr(self, fval) 

599 elif f.name in self.overloaded_fields: 

600 fover = self.overloaded_fields[f.name] 

601 if isinstance(fover, (list, dict, set)) and len(fover) == 0: 

602 continue 

603 val = f.i2repr(self, fover) 

604 else: 

605 continue 

606 if isinstance(f, Emph) or f in conf.emph: 

607 ncol = ct.emph_field_name 

608 vcol = ct.emph_field_value 

609 else: 

610 ncol = ct.field_name 

611 vcol = ct.field_value 

612 

613 s += " %s%s%s" % (ncol(f.name), 

614 ct.punct("="), 

615 vcol(val)) 

616 return "%s%s %s %s%s%s" % (ct.punct("<"), 

617 ct.layer_name(self.__class__.__name__), 

618 s, 

619 ct.punct("|"), 

620 repr(self.payload), 

621 ct.punct(">")) 

622 

623 def __str__(self): 

624 # type: () -> str 

625 return self.summary() 

626 

627 def __bytes__(self): 

628 # type: () -> bytes 

629 return self.build() 

630 

631 def __div__(self, other): 

632 # type: (Any) -> Self 

633 if isinstance(other, Packet): 

634 cloneA = self.copy() 

635 cloneB = other.copy() 

636 cloneA.add_payload(cloneB) 

637 return cloneA 

638 elif isinstance(other, (bytes, str, bytearray, memoryview)): 

639 return self / conf.raw_layer(load=bytes_encode(other)) 

640 else: 

641 return other.__rdiv__(self) # type: ignore 

642 __truediv__ = __div__ 

643 

644 def __rdiv__(self, other): 

645 # type: (Any) -> Packet 

646 if isinstance(other, (bytes, str, bytearray, memoryview)): 

647 return conf.raw_layer(load=bytes_encode(other)) / self 

648 else: 

649 raise TypeError 

650 __rtruediv__ = __rdiv__ 

651 

652 def __mul__(self, other): 

653 # type: (Any) -> List[Packet] 

654 if isinstance(other, int): 

655 return [self] * other 

656 else: 

657 raise TypeError 

658 

659 def __rmul__(self, other): 

660 # type: (Any) -> List[Packet] 

661 return self.__mul__(other) 

662 

663 def __nonzero__(self): 

664 # type: () -> bool 

665 return True 

666 __bool__ = __nonzero__ 

667 

668 def __len__(self): 

669 # type: () -> int 

670 return len(self.__bytes__()) 

671 

672 def copy_field_value(self, fieldname, value): 

673 # type: (str, Any) -> Any 

674 return self.get_field(fieldname).do_copy(value) 

675 

676 def copy_fields_dict(self, fields): 

677 # type: (_T) -> _T 

678 if fields is None: 

679 return None 

680 return {fname: self.copy_field_value(fname, fval) 

681 for fname, fval in fields.items()} 

682 

683 def _raw_packet_cache_field_value(self, fld, val, copy=False): 

684 # type: (AnyField, Any, bool) -> Optional[Any] 

685 """Get a value representative of a mutable field to detect changes""" 

686 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any] 

687 if fld.holds_packets: 

688 # avoid copying whole packets (perf: #GH3894) 

689 if fld.islist: 

690 return [ 

691 (_cpy(x.fields), x.payload.raw_packet_cache) for x in val 

692 ] 

693 else: 

694 return (_cpy(val.fields), val.payload.raw_packet_cache) 

695 elif fld.islist or fld.ismutable: 

696 return _cpy(val) 

697 return None 

698 

699 def clear_cache(self): 

700 # type: () -> None 

701 """Clear the raw packet cache for the field and all its subfields""" 

702 self.raw_packet_cache = None 

703 for fname, fval in self.fields.items(): 

704 fld = self.get_field(fname) 

705 if fld.holds_packets: 

706 if isinstance(fval, Packet): 

707 fval.clear_cache() 

708 elif isinstance(fval, list): 

709 for fsubval in fval: 

710 fsubval.clear_cache() 

711 self.payload.clear_cache() 

712 

713 def self_build(self): 

714 # type: () -> bytes 

715 """ 

716 Create the default layer regarding fields_desc dict 

717 """ 

718 if self.raw_packet_cache is not None and \ 

719 self.raw_packet_cache_fields is not None: 

720 for fname, fval in self.raw_packet_cache_fields.items(): 

721 fld, val = self.getfield_and_val(fname) 

722 if self._raw_packet_cache_field_value(fld, val) != fval: 

723 self.raw_packet_cache = None 

724 self.raw_packet_cache_fields = None 

725 self.wirelen = None 

726 break 

727 if self.raw_packet_cache is not None: 

728 return self.raw_packet_cache 

729 p = b"" 

730 for f in self.fields_desc: 

731 val = self.getfieldval(f.name) 

732 if isinstance(val, RawVal): 

733 p += bytes(val) 

734 else: 

735 try: 

736 p = f.addfield(self, p, val) 

737 except Exception as ex: 

738 try: 

739 ex.args = ( 

740 "While building field '%s': " % f.name + 

741 ex.args[0], 

742 ) + ex.args[1:] 

743 except (AttributeError, IndexError): 

744 pass 

745 raise ex 

746 return p 

747 

748 def do_build_payload(self): 

749 # type: () -> bytes 

750 """ 

751 Create the default version of the payload layer 

752 

753 :return: a string of payload layer 

754 """ 

755 return self.payload.do_build() 

756 

757 def do_build(self): 

758 # type: () -> bytes 

759 """ 

760 Create the default version of the layer 

761 

762 :return: a string of the packet with the payload 

763 """ 

764 if not self.explicit: 

765 self = next(iter(self)) 

766 pkt = self.self_build() 

767 for t in self.post_transforms: 

768 pkt = t(pkt) 

769 pay = self.do_build_payload() 

770 if self.raw_packet_cache is None: 

771 return self.post_build(pkt, pay) 

772 else: 

773 return pkt + pay 

774 

775 def build_padding(self): 

776 # type: () -> bytes 

777 return self.payload.build_padding() 

778 

779 def build(self): 

780 # type: () -> bytes 

781 """ 

782 Create the current layer 

783 

784 :return: string of the packet with the payload 

785 """ 

786 p = self.do_build() 

787 p += self.build_padding() 

788 p = self.build_done(p) 

789 return p 

790 

791 def post_build(self, pkt, pay): 

792 # type: (bytes, bytes) -> bytes 

793 """ 

794 DEV: called right after the current layer is build. 

795 

796 :param str pkt: the current packet (build by self_build function) 

797 :param str pay: the packet payload (build by do_build_payload function) 

798 :return: a string of the packet with the payload 

799 """ 

800 return pkt + pay 

801 

802 def build_done(self, p): 

803 # type: (bytes) -> bytes 

804 return self.payload.build_done(p) 

805 

806 def do_build_ps(self): 

807 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501 

808 p = b"" 

809 pl = [] 

810 q = b"" 

811 for f in self.fields_desc: 

812 if isinstance(f, ConditionalField) and not f._evalcond(self): 

813 continue 

814 p = f.addfield(self, p, self.getfieldval(f.name)) 

815 if isinstance(p, bytes): 

816 r = p[len(q):] 

817 q = p 

818 else: 

819 r = b"" 

820 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r)) 

821 

822 pkt, lst = self.payload.build_ps(internal=1) 

823 p += pkt 

824 lst.append((self, pl)) 

825 

826 return p, lst 

827 

828 def build_ps(self, internal=0): 

829 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501 

830 p, lst = self.do_build_ps() 

831# if not internal: 

832# pkt = self 

833# while pkt.haslayer(conf.padding_layer): 

834# pkt = pkt.getlayer(conf.padding_layer) 

835# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) ) 

836# p += pkt.load 

837# pkt = pkt.payload 

838 return p, lst 

839 

840 def canvas_dump(self, layer_shift=0, rebuild=1): 

841 # type: (int, int) -> pyx.canvas.canvas 

842 if PYX == 0: 

843 raise ImportError("PyX and its dependencies must be installed") 

844 canvas = pyx.canvas.canvas() 

845 if rebuild: 

846 _, t = self.__class__(raw(self)).build_ps() 

847 else: 

848 _, t = self.build_ps() 

849 YTXTI = len(t) 

850 for _, l in t: 

851 YTXTI += len(l) 

852 YTXT = float(YTXTI) 

853 YDUMP = YTXT 

854 

855 XSTART = 1 

856 XDSTART = 10 

857 y = 0.0 

858 yd = 0.0 

859 XMUL = 0.55 

860 YMUL = 0.4 

861 

862 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb) 

863 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb) 

864# backcolor=makecol(0.376, 0.729, 0.525, 1.0) 

865 

866 def hexstr(x): 

867 # type: (bytes) -> str 

868 return " ".join("%02x" % orb(c) for c in x) 

869 

870 def make_dump_txt(x, y, txt): 

871 # type: (int, float, bytes) -> pyx.text.text 

872 return pyx.text.text( 

873 XDSTART + x * XMUL, 

874 (YDUMP - y) * YMUL, 

875 r"\tt{%s}" % hexstr(txt), 

876 [pyx.text.size.Large] 

877 ) 

878 

879 def make_box(o): 

880 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox 

881 return pyx.box.rect( 

882 o.left(), o.bottom(), o.width(), o.height(), 

883 relcenter=(0.5, 0.5) 

884 ) 

885 

886 def make_frame(lst): 

887 # type: (List[Any]) -> pyx.path.path 

888 if len(lst) == 1: 

889 b = lst[0].bbox() 

890 b.enlarge(pyx.unit.u_pt) 

891 return b.path() 

892 else: 

893 fb = lst[0].bbox() 

894 fb.enlarge(pyx.unit.u_pt) 

895 lb = lst[-1].bbox() 

896 lb.enlarge(pyx.unit.u_pt) 

897 if len(lst) == 2 and fb.left() > lb.right(): 

898 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()), 

899 pyx.path.lineto(fb.left(), fb.top()), 

900 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501 

901 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501 

902 pyx.path.moveto(lb.left(), lb.top()), 

903 pyx.path.lineto(lb.right(), lb.top()), 

904 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 

905 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501 

906 else: 

907 # XXX 

908 gb = lst[1].bbox() 

909 if gb != lb: 

910 gb.enlarge(pyx.unit.u_pt) 

911 kb = lst[-2].bbox() 

912 if kb != gb and kb != lb: 

913 kb.enlarge(pyx.unit.u_pt) 

914 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()), 

915 pyx.path.lineto(fb.right(), fb.top()), 

916 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501 

917 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501 

918 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 

919 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501 

920 pyx.path.lineto(lb.left(), gb.top()), 

921 pyx.path.lineto(fb.left(), gb.top()), 

922 pyx.path.closepath(),) 

923 

924 def make_dump(s, # type: bytes 

925 shift=0, # type: int 

926 y=0., # type: float 

927 col=None, # type: pyx.color.color 

928 bkcol=None, # type: pyx.color.color 

929 large=16 # type: int 

930 ): 

931 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501 

932 c = pyx.canvas.canvas() 

933 tlist = [] 

934 while s: 

935 dmp, s = s[:large - shift], s[large - shift:] 

936 txt = make_dump_txt(shift, y, dmp) 

937 tlist.append(txt) 

938 shift += len(dmp) 

939 if shift >= 16: 

940 shift = 0 

941 y += 1 

942 if col is None: 

943 col = pyx.color.rgb.red 

944 if bkcol is None: 

945 bkcol = pyx.color.rgb.white 

946 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501 

947 for txt in tlist: 

948 c.insert(txt) 

949 return c, tlist[-1].bbox(), shift, y 

950 

951 last_shift, last_y = 0, 0.0 

952 while t: 

953 bkcol = next(backcolor) 

954 proto, fields = t.pop() 

955 y += 0.5 

956 pt = pyx.text.text( 

957 XSTART, 

958 (YTXT - y) * YMUL, 

959 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape( 

960 str(proto.name) 

961 ), 

962 [pyx.text.size.Large] 

963 ) 

964 y += 1 

965 ptbb = pt.bbox() 

966 ptbb.enlarge(pyx.unit.u_pt * 2) 

967 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501 

968 canvas.insert(pt) 

969 for field, fval, fdump in fields: 

970 col = next(forecolor) 

971 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501 

972 if isinstance(field, BitField): 

973 fsize = '%sb' % field.size 

974 else: 

975 fsize = '%sB' % len(fdump) 

976 if (hasattr(field, 'field') and 

977 'LE' in field.field.__class__.__name__[:3] or 

978 'LE' in field.__class__.__name__[:3]): 

979 fsize = r'$\scriptstyle\langle$' + fsize 

980 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501 

981 if isinstance(fval, str): 

982 if len(fval) > 18: 

983 fval = fval[:18] + "[...]" 

984 else: 

985 fval = "" 

986 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501 

987 y += 1.0 

988 if fdump: 

989 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501 

990 

991 dtb = target 

992 vtb = vt.bbox() 

993 bxvt = make_box(vtb) 

994 bxdt = make_box(dtb) 

995 dtb.enlarge(pyx.unit.u_pt) 

996 try: 

997 if yd < 0: 

998 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501 

999 else: 

1000 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501 

1001 except Exception: 

1002 pass 

1003 else: 

1004 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501 

1005 

1006 canvas.insert(dt) 

1007 

1008 canvas.insert(ft) 

1009 canvas.insert(st) 

1010 canvas.insert(vt) 

1011 last_y += layer_shift 

1012 

1013 return canvas 

1014 

1015 def extract_padding(self, s): 

1016 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 

1017 """ 

1018 DEV: to be overloaded to extract current layer's padding. 

1019 

1020 :param str s: the current layer 

1021 :return: a couple of strings (actual layer, padding) 

1022 """ 

1023 return s, None 

1024 

1025 def post_dissect(self, s): 

1026 # type: (bytes) -> bytes 

1027 """DEV: is called right after the current layer has been dissected""" 

1028 return s 

1029 

1030 def pre_dissect(self, s): 

1031 # type: (bytes) -> bytes 

1032 """DEV: is called right before the current layer is dissected""" 

1033 return s 

1034 

1035 def do_dissect(self, s): 

1036 # type: (bytes) -> bytes 

1037 _raw = s 

1038 self.raw_packet_cache_fields = {} 

1039 for f in self.fields_desc: 

1040 s, fval = f.getfield(self, s) 

1041 # Skip unused ConditionalField 

1042 if isinstance(f, ConditionalField) and fval is None: 

1043 continue 

1044 # We need to track fields with mutable values to discard 

1045 # .raw_packet_cache when needed. 

1046 if (f.islist or f.holds_packets or f.ismutable) and fval is not None: 

1047 self.raw_packet_cache_fields[f.name] = \ 

1048 self._raw_packet_cache_field_value(f, fval, copy=True) 

1049 self.fields[f.name] = fval 

1050 # Nothing left to dissect 

1051 if not s and (isinstance(f, MayEnd) or 

1052 (fval is not None and isinstance(f, ConditionalField) and 

1053 isinstance(f.fld, MayEnd))): 

1054 break 

1055 self.raw_packet_cache = _raw[:-len(s)] if s else _raw 

1056 self.explicit = 1 

1057 return s 

1058 

1059 def do_dissect_payload(self, s): 

1060 # type: (bytes) -> None 

1061 """ 

1062 Perform the dissection of the layer's payload 

1063 

1064 :param str s: the raw layer 

1065 """ 

1066 if s: 

1067 if ( 

1068 self.stop_dissection_after and 

1069 isinstance(self, self.stop_dissection_after) 

1070 ): 

1071 # stop dissection here 

1072 p = conf.raw_layer(s, _internal=1, _underlayer=self) 

1073 self.add_payload(p) 

1074 return 

1075 cls = self.guess_payload_class(s) 

1076 try: 

1077 p = cls( 

1078 s, 

1079 stop_dissection_after=self.stop_dissection_after, 

1080 _internal=1, 

1081 _underlayer=self, 

1082 ) 

1083 except KeyboardInterrupt: 

1084 raise 

1085 except Exception: 

1086 if conf.debug_dissector: 

1087 if issubtype(cls, Packet): 

1088 log_runtime.error("%s dissector failed", cls.__name__) 

1089 else: 

1090 log_runtime.error("%s.guess_payload_class() returned " 

1091 "[%s]", 

1092 self.__class__.__name__, repr(cls)) 

1093 if cls is not None: 

1094 raise 

1095 p = conf.raw_layer(s, _internal=1, _underlayer=self) 

1096 self.add_payload(p) 

1097 

1098 def dissect(self, s): 

1099 # type: (bytes) -> None 

1100 s = self.pre_dissect(s) 

1101 

1102 s = self.do_dissect(s) 

1103 

1104 s = self.post_dissect(s) 

1105 

1106 payl, pad = self.extract_padding(s) 

1107 self.do_dissect_payload(payl) 

1108 if pad and conf.padding: 

1109 self.add_payload(conf.padding_layer(pad)) 

1110 

1111 def guess_payload_class(self, payload): 

1112 # type: (bytes) -> Type[Packet] 

1113 """ 

1114 DEV: Guesses the next payload class from layer bonds. 

1115 Can be overloaded to use a different mechanism. 

1116 

1117 :param str payload: the layer's payload 

1118 :return: the payload class 

1119 """ 

1120 for t in self.aliastypes: 

1121 for fval, cls in t.payload_guess: 

1122 try: 

1123 if all(v == self.getfieldval(k) 

1124 for k, v in fval.items()): 

1125 return cls # type: ignore 

1126 except AttributeError: 

1127 pass 

1128 return self.default_payload_class(payload) 

1129 

1130 def default_payload_class(self, payload): 

1131 # type: (bytes) -> Type[Packet] 

1132 """ 

1133 DEV: Returns the default payload class if nothing has been found by the 

1134 guess_payload_class() method. 

1135 

1136 :param str payload: the layer's payload 

1137 :return: the default payload class define inside the configuration file 

1138 """ 

1139 return conf.raw_layer 

1140 

1141 def hide_defaults(self): 

1142 # type: () -> None 

1143 """Removes fields' values that are the same as default values.""" 

1144 # use list(): self.fields is modified in the loop 

1145 for k, v in list(self.fields.items()): 

1146 v = self.fields[k] 

1147 if k in self.default_fields: 

1148 if self.default_fields[k] == v: 

1149 del self.fields[k] 

1150 self.payload.hide_defaults() 

1151 

1152 def clone_with(self, payload=None, **kargs): 

1153 # type: (Optional[Any], **Any) -> Any 

1154 pkt = self.__class__() 

1155 pkt.explicit = 1 

1156 pkt.fields = kargs 

1157 pkt.default_fields = self.copy_fields_dict(self.default_fields) 

1158 pkt.overloaded_fields = self.overloaded_fields.copy() 

1159 pkt.time = self.time 

1160 pkt.underlayer = self.underlayer 

1161 pkt.parent = self.parent 

1162 pkt.post_transforms = self.post_transforms 

1163 pkt.raw_packet_cache = self.raw_packet_cache 

1164 pkt.raw_packet_cache_fields = self.copy_fields_dict( 

1165 self.raw_packet_cache_fields 

1166 ) 

1167 pkt.wirelen = self.wirelen 

1168 pkt.comments = self.comments 

1169 pkt.sniffed_on = self.sniffed_on 

1170 pkt.direction = self.direction 

1171 if payload is not None: 

1172 pkt.add_payload(payload) 

1173 return pkt 

1174 

1175 def __iter__(self): 

1176 # type: () -> Iterator[Packet] 

1177 """Iterates through all sub-packets generated by this Packet.""" 

1178 def loop(todo, done, self=self): 

1179 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet] 

1180 if todo: 

1181 eltname = todo.pop() 

1182 elt = self.getfieldval(eltname) 

1183 if not isinstance(elt, Gen): 

1184 if self.get_field(eltname).islist: 

1185 elt = SetGen([elt]) 

1186 else: 

1187 elt = SetGen(elt) 

1188 for e in elt: 

1189 done[eltname] = e 

1190 for x in loop(todo[:], done): 

1191 yield x 

1192 else: 

1193 if isinstance(self.payload, NoPayload): 

1194 payloads = SetGen([None]) # type: SetGen[Packet] 

1195 else: 

1196 payloads = self.payload 

1197 for payl in payloads: 

1198 # Let's make sure subpackets are consistent 

1199 done2 = done.copy() 

1200 for k in done2: 

1201 if isinstance(done2[k], VolatileValue): 

1202 done2[k] = done2[k]._fix() 

1203 pkt = self.clone_with(payload=payl, **done2) 

1204 yield pkt 

1205 

1206 if self.explicit or self.raw_packet_cache is not None: 

1207 todo = [] 

1208 done = self.fields 

1209 else: 

1210 todo = [k for (k, v) in itertools.chain(self.default_fields.items(), 

1211 self.overloaded_fields.items()) 

1212 if isinstance(v, VolatileValue)] + list(self.fields) 

1213 done = {} 

1214 return loop(todo, done) 

1215 

1216 def iterpayloads(self): 

1217 # type: () -> Iterator[Packet] 

1218 """Used to iter through the payloads of a Packet. 

1219 Useful for DNS or 802.11 for instance. 

1220 """ 

1221 yield self 

1222 current = self 

1223 while current.payload: 

1224 current = current.payload 

1225 yield current 

1226 

1227 def __gt__(self, other): 

1228 # type: (Packet) -> int 

1229 """True if other is an answer from self (self ==> other).""" 

1230 if isinstance(other, Packet): 

1231 return other < self 

1232 elif isinstance(other, bytes): 

1233 return 1 

1234 else: 

1235 raise TypeError((self, other)) 

1236 

1237 def __lt__(self, other): 

1238 # type: (Packet) -> int 

1239 """True if self is an answer from other (other ==> self).""" 

1240 if isinstance(other, Packet): 

1241 return self.answers(other) 

1242 elif isinstance(other, bytes): 

1243 return 1 

1244 else: 

1245 raise TypeError((self, other)) 

1246 

1247 def __eq__(self, other): 

1248 # type: (Any) -> bool 

1249 if not isinstance(other, self.__class__): 

1250 return False 

1251 for f in self.fields_desc: 

1252 if f not in other.fields_desc: 

1253 return False 

1254 if self.getfieldval(f.name) != other.getfieldval(f.name): 

1255 return False 

1256 return self.payload == other.payload 

1257 

1258 def __ne__(self, other): 

1259 # type: (Any) -> bool 

1260 return not self.__eq__(other) 

1261 

1262 # Note: setting __hash__ to None is the standard way 

1263 # of making an object un-hashable. mypy doesn't know that 

1264 __hash__ = None # type: ignore 

1265 

1266 def hashret(self): 

1267 # type: () -> bytes 

1268 """DEV: returns a string that has the same value for a request 

1269 and its answer.""" 

1270 return self.payload.hashret() 

1271 

1272 def answers(self, other): 

1273 # type: (Packet) -> int 

1274 """DEV: true if self is an answer from other""" 

1275 if other.__class__ == self.__class__: 

1276 return self.payload.answers(other.payload) 

1277 return 0 

1278 

1279 def layers(self): 

1280 # type: () -> List[Type[Packet]] 

1281 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501 

1282 layers = [] 

1283 lyr = self # type: Optional[Packet] 

1284 while lyr: 

1285 layers.append(lyr.__class__) 

1286 lyr = lyr.payload.getlayer(0, _subclass=True) 

1287 return layers 

1288 

1289 def haslayer(self, cls, _subclass=None): 

1290 # type: (Union[Type[Packet], str], Optional[bool]) -> int 

1291 """ 

1292 true if self has a layer that is an instance of cls. 

1293 Superseded by "cls in self" syntax. 

1294 """ 

1295 if _subclass is None: 

1296 _subclass = self.match_subclass or None 

1297 if _subclass: 

1298 match = issubtype 

1299 else: 

1300 match = lambda x, t: bool(x == t) 

1301 if cls is None or match(self.__class__, cls) \ 

1302 or cls in [self.__class__.__name__, self._name]: 

1303 return True 

1304 for f in self.packetfields: 

1305 fvalue_gen = self.getfieldval(f.name) 

1306 if fvalue_gen is None: 

1307 continue 

1308 if not f.islist: 

1309 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 

1310 for fvalue in fvalue_gen: 

1311 if isinstance(fvalue, Packet): 

1312 ret = fvalue.haslayer(cls, _subclass=_subclass) 

1313 if ret: 

1314 return ret 

1315 return self.payload.haslayer(cls, _subclass=_subclass) 

1316 

1317 def getlayer(self, 

1318 cls, # type: Union[int, Type[Packet], str] 

1319 nb=1, # type: int 

1320 _track=None, # type: Optional[List[int]] 

1321 _subclass=None, # type: Optional[bool] 

1322 **flt # type: Any 

1323 ): 

1324 # type: (...) -> Optional[Packet] 

1325 """Return the nb^th layer that is an instance of cls, matching flt 

1326values. 

1327 """ 

1328 if _subclass is None: 

1329 _subclass = self.match_subclass or None 

1330 if _subclass: 

1331 match = issubtype 

1332 else: 

1333 match = lambda x, t: bool(x == t) 

1334 # Note: 

1335 # cls can be int, packet, str 

1336 # string_class_name can be packet, str (packet or packet+field) 

1337 # class_name can be packet, str (packet only) 

1338 if isinstance(cls, int): 

1339 nb = cls + 1 

1340 string_class_name = "" # type: Union[Type[Packet], str] 

1341 else: 

1342 string_class_name = cls 

1343 class_name = "" # type: Union[Type[Packet], str] 

1344 fld = None # type: Optional[str] 

1345 if isinstance(string_class_name, str) and "." in string_class_name: 

1346 class_name, fld = string_class_name.split(".", 1) 

1347 else: 

1348 class_name, fld = string_class_name, None 

1349 if not class_name or match(self.__class__, class_name) \ 

1350 or class_name in [self.__class__.__name__, self._name]: 

1351 if all(self.getfieldval(fldname) == fldvalue 

1352 for fldname, fldvalue in flt.items()): 

1353 if nb == 1: 

1354 if fld is None: 

1355 return self 

1356 else: 

1357 return self.getfieldval(fld) # type: ignore 

1358 else: 

1359 nb -= 1 

1360 for f in self.packetfields: 

1361 fvalue_gen = self.getfieldval(f.name) 

1362 if fvalue_gen is None: 

1363 continue 

1364 if not f.islist: 

1365 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 

1366 for fvalue in fvalue_gen: 

1367 if isinstance(fvalue, Packet): 

1368 track = [] # type: List[int] 

1369 ret = fvalue.getlayer(class_name, nb=nb, _track=track, 

1370 _subclass=_subclass, **flt) 

1371 if ret is not None: 

1372 return ret 

1373 nb = track[0] 

1374 return self.payload.getlayer(class_name, nb=nb, _track=_track, 

1375 _subclass=_subclass, **flt) 

1376 

1377 def firstlayer(self): 

1378 # type: () -> Packet 

1379 q = self 

1380 while q.underlayer is not None: 

1381 q = q.underlayer 

1382 return q 

1383 

1384 def __getitem__(self, cls): 

1385 # type: (Union[Type[Packet], str]) -> Any 

1386 if isinstance(cls, slice): 

1387 lname = cls.start 

1388 if cls.stop: 

1389 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {})) 

1390 else: 

1391 ret = self.getlayer(cls.start, **(cls.step or {})) 

1392 else: 

1393 lname = cls 

1394 ret = self.getlayer(cls) 

1395 if ret is None: 

1396 if isinstance(lname, type): 

1397 name = lname.__name__ 

1398 elif not isinstance(lname, bytes): 

1399 name = repr(lname) 

1400 else: 

1401 name = cast(str, lname) 

1402 raise IndexError("Layer [%s] not found" % name) 

1403 return ret 

1404 

1405 def __delitem__(self, cls): 

1406 # type: (Type[Packet]) -> None 

1407 del self[cls].underlayer.payload 

1408 

1409 def __setitem__(self, cls, val): 

1410 # type: (Type[Packet], Packet) -> None 

1411 self[cls].underlayer.payload = val 

1412 

1413 def __contains__(self, cls): 

1414 # type: (Union[Type[Packet], str]) -> int 

1415 """ 

1416 "cls in self" returns true if self has a layer which is an 

1417 instance of cls. 

1418 """ 

1419 return self.haslayer(cls) 

1420 

1421 def route(self): 

1422 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]] 

1423 return self.payload.route() 

1424 

1425 def fragment(self, *args, **kargs): 

1426 # type: (*Any, **Any) -> List[Packet] 

1427 return self.payload.fragment(*args, **kargs) 

1428 

1429 def display(self, *args, **kargs): # Deprecated. Use show() 

1430 # type: (*Any, **Any) -> None 

1431 """Deprecated. Use show() method.""" 

1432 self.show(*args, **kargs) 

1433 

1434 def _show_or_dump(self, 

1435 dump=False, # type: bool 

1436 indent=3, # type: int 

1437 lvl="", # type: str 

1438 label_lvl="", # type: str 

1439 first_call=True # type: bool 

1440 ): 

1441 # type: (...) -> Optional[str] 

1442 """ 

1443 Internal method that shows or dumps a hierarchical view of a packet. 

1444 Called by show. 

1445 

1446 :param dump: determine if it prints or returns the string value 

1447 :param int indent: the size of indentation for each layer 

1448 :param str lvl: additional information about the layer lvl 

1449 :param str label_lvl: additional information about the layer fields 

1450 :param first_call: determine if the current function is the first 

1451 :return: return a hierarchical view if dump, else print it 

1452 """ 

1453 

1454 if dump: 

1455 from scapy.themes import ColorTheme, AnsiColorTheme 

1456 ct: ColorTheme = AnsiColorTheme() # No color for dump output 

1457 else: 

1458 ct = conf.color_theme 

1459 s = "%s%s %s %s\n" % (label_lvl, 

1460 ct.punct("###["), 

1461 ct.layer_name(self.name), 

1462 ct.punct("]###")) 

1463 fields = self.fields_desc.copy() 

1464 while fields: 

1465 f = fields.pop(0) 

1466 if isinstance(f, ConditionalField) and not f._evalcond(self): 

1467 continue 

1468 if hasattr(f, "fields"): # Field has subfields 

1469 s += "%s %s =\n" % ( 

1470 label_lvl + lvl, 

1471 ct.depreciate_field_name(f.name), 

1472 ) 

1473 lvl += " " * indent * self.show_indent 

1474 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)): 

1475 fields.insert(i, fld) 

1476 continue 

1477 if isinstance(f, Emph) or f in conf.emph: 

1478 ncol = ct.emph_field_name 

1479 vcol = ct.emph_field_value 

1480 else: 

1481 ncol = ct.field_name 

1482 vcol = ct.field_value 

1483 pad = max(0, 10 - len(f.name)) * " " 

1484 fvalue = self.getfieldval(f.name) 

1485 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501 

1486 s += "%s %s%s%s%s\n" % (label_lvl + lvl, 

1487 ct.punct("\\"), 

1488 ncol(f.name), 

1489 pad, 

1490 ct.punct("\\")) 

1491 fvalue_gen = SetGen( 

1492 fvalue, 

1493 _iterpacket=0 

1494 ) # type: SetGen[Packet] 

1495 for fvalue in fvalue_gen: 

1496 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501 

1497 else: 

1498 begn = "%s %s%s%s " % (label_lvl + lvl, 

1499 ncol(f.name), 

1500 pad, 

1501 ct.punct("="),) 

1502 reprval = f.i2repr(self, fvalue) 

1503 if isinstance(reprval, str): 

1504 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501 

1505 len(lvl) + 

1506 len(f.name) + 

1507 4)) 

1508 s += "%s%s\n" % (begn, vcol(reprval)) 

1509 if self.payload: 

1510 s += self.payload._show_or_dump( # type: ignore 

1511 dump=dump, 

1512 indent=indent, 

1513 lvl=lvl + (" " * indent * self.show_indent), 

1514 label_lvl=label_lvl, 

1515 first_call=False 

1516 ) 

1517 

1518 if first_call and not dump: 

1519 print(s) 

1520 return None 

1521 else: 

1522 return s 

1523 

1524 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 

1525 # type: (bool, int, str, str) -> Optional[Any] 

1526 """ 

1527 Prints or returns (when "dump" is true) a hierarchical view of the 

1528 packet. 

1529 

1530 :param dump: determine if it prints or returns the string value 

1531 :param int indent: the size of indentation for each layer 

1532 :param str lvl: additional information about the layer lvl 

1533 :param str label_lvl: additional information about the layer fields 

1534 :return: return a hierarchical view if dump, else print it 

1535 """ 

1536 return self._show_or_dump(dump, indent, lvl, label_lvl) 

1537 

1538 def show2(self, dump=False, indent=3, lvl="", label_lvl=""): 

1539 # type: (bool, int, str, str) -> Optional[Any] 

1540 """ 

1541 Prints or returns (when "dump" is true) a hierarchical view of an 

1542 assembled version of the packet, so that automatic fields are 

1543 calculated (checksums, etc.) 

1544 

1545 :param dump: determine if it prints or returns the string value 

1546 :param int indent: the size of indentation for each layer 

1547 :param str lvl: additional information about the layer lvl 

1548 :param str label_lvl: additional information about the layer fields 

1549 :return: return a hierarchical view if dump, else print it 

1550 """ 

1551 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl) 

1552 

1553 def sprintf(self, fmt, relax=1): 

1554 # type: (str, int) -> str 

1555 """ 

1556 sprintf(format, [relax=1]) -> str 

1557 

1558 Where format is a string that can include directives. A directive 

1559 begins and ends by % and has the following format: 

1560 ``%[fmt[r],][cls[:nb].]field%`` 

1561 

1562 :param fmt: is a classic printf directive, "r" can be appended for raw 

1563 substitution: 

1564 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer 

1565 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer). 

1566 Special case : "%.time%" is the creation time. 

1567 Ex:: 

1568 

1569 p.sprintf( 

1570 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% " 

1571 "%03xr,IP.proto% %r,TCP.flags%" 

1572 ) 

1573 

1574 Moreover, the format string can include conditional statements. A 

1575 conditional statement looks like : {layer:string} where layer is a 

1576 layer name, and string is the string to insert in place of the 

1577 condition if it is true, i.e. if layer is present. If layer is 

1578 preceded by a "!", the result is inverted. Conditions can be 

1579 imbricated. A valid statement can be:: 

1580 

1581 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet") 

1582 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}") 

1583 

1584 A side effect is that, to obtain "{" and "}" characters, you must use 

1585 "%(" and "%)". 

1586 """ 

1587 

1588 escape = {"%": "%", 

1589 "(": "{", 

1590 ")": "}"} 

1591 

1592 # Evaluate conditions 

1593 while "{" in fmt: 

1594 i = fmt.rindex("{") 

1595 j = fmt[i + 1:].index("}") 

1596 cond = fmt[i + 1:i + j + 1] 

1597 k = cond.find(":") 

1598 if k < 0: 

1599 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501 

1600 cond, format_ = cond[:k], cond[k + 1:] 

1601 res = False 

1602 if cond[0] == "!": 

1603 res = True 

1604 cond = cond[1:] 

1605 if self.haslayer(cond): 

1606 res = not res 

1607 if not res: 

1608 format_ = "" 

1609 fmt = fmt[:i] + format_ + fmt[i + j + 2:] 

1610 

1611 # Evaluate directives 

1612 s = "" 

1613 while "%" in fmt: 

1614 i = fmt.index("%") 

1615 s += fmt[:i] 

1616 fmt = fmt[i + 1:] 

1617 if fmt and fmt[0] in escape: 

1618 s += escape[fmt[0]] 

1619 fmt = fmt[1:] 

1620 continue 

1621 try: 

1622 i = fmt.index("%") 

1623 sfclsfld = fmt[:i] 

1624 fclsfld = sfclsfld.split(",") 

1625 if len(fclsfld) == 1: 

1626 f = "s" 

1627 clsfld = fclsfld[0] 

1628 elif len(fclsfld) == 2: 

1629 f, clsfld = fclsfld 

1630 else: 

1631 raise Scapy_Exception 

1632 if "." in clsfld: 

1633 cls, fld = clsfld.split(".") 

1634 else: 

1635 cls = self.__class__.__name__ 

1636 fld = clsfld 

1637 num = 1 

1638 if ":" in cls: 

1639 cls, snum = cls.split(":") 

1640 num = int(snum) 

1641 fmt = fmt[i + 1:] 

1642 except Exception: 

1643 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501 

1644 else: 

1645 if fld == "time": 

1646 val = time.strftime( 

1647 "%H:%M:%S.%%06i", 

1648 time.localtime(float(self.time)) 

1649 ) % int((self.time - int(self.time)) * 1000000) 

1650 elif cls == self.__class__.__name__ and hasattr(self, fld): 

1651 if num > 1: 

1652 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501 

1653 f = "s" 

1654 else: 

1655 try: 

1656 val = self.getfieldval(fld) 

1657 except AttributeError: 

1658 val = getattr(self, fld) 

1659 if f[-1] == "r": # Raw field value 

1660 f = f[:-1] 

1661 if not f: 

1662 f = "s" 

1663 else: 

1664 if fld in self.fieldtype: 

1665 val = self.fieldtype[fld].i2repr(self, val) 

1666 else: 

1667 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax) 

1668 f = "s" 

1669 s += ("%" + f) % val 

1670 

1671 s += fmt 

1672 return s 

1673 

1674 def mysummary(self): 

1675 # type: () -> str 

1676 """DEV: can be overloaded to return a string that summarizes the layer. 

1677 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501 

1678 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501 

1679 mysummary() must be called if they are present.""" 

1680 return "" 

1681 

1682 def _do_summary(self): 

1683 # type: () -> Tuple[int, str, List[Any]] 

1684 found, s, needed = self.payload._do_summary() 

1685 ret = "" 

1686 if not found or self.__class__ in needed: 

1687 ret = self.mysummary() 

1688 if isinstance(ret, tuple): 

1689 ret, n = ret 

1690 needed += n 

1691 if ret or needed: 

1692 found = 1 

1693 if not ret: 

1694 ret = self.__class__.__name__ if self.show_summary else "" 

1695 if self.__class__ in conf.emph: 

1696 impf = [] 

1697 for f in self.fields_desc: 

1698 if f in conf.emph: 

1699 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501 

1700 ret = "%s [%s]" % (ret, " ".join(impf)) 

1701 if ret and s: 

1702 ret = "%s / %s" % (ret, s) 

1703 else: 

1704 ret = "%s%s" % (ret, s) 

1705 return found, ret, needed 

1706 

1707 def summary(self, intern=0): 

1708 # type: (int) -> str 

1709 """Prints a one line summary of a packet.""" 

1710 return self._do_summary()[1] 

1711 

1712 def lastlayer(self, layer=None): 

1713 # type: (Optional[Packet]) -> Packet 

1714 """Returns the uppest layer of the packet""" 

1715 return self.payload.lastlayer(self) 

1716 

1717 def decode_payload_as(self, cls): 

1718 # type: (Type[Packet]) -> None 

1719 """Reassembles the payload and decode it using another packet class""" 

1720 s = raw(self.payload) 

1721 self.payload = cls(s, _internal=1, _underlayer=self) 

1722 pp = self 

1723 while pp.underlayer is not None: 

1724 pp = pp.underlayer 

1725 self.payload.dissection_done(pp) 

1726 

1727 def _command(self, json=False): 

1728 # type: (bool) -> List[Tuple[str, Any]] 

1729 """ 

1730 Internal method used to generate command() and json() 

1731 """ 

1732 f = [] 

1733 iterator: Iterator[Tuple[str, Any]] 

1734 if json: 

1735 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc) 

1736 else: 

1737 iterator = iter(self.fields.items()) 

1738 for fn, fv in iterator: 

1739 fld = self.get_field(fn) 

1740 if isinstance(fv, (list, dict, set)) and not fv and not fld.default: 

1741 continue 

1742 if isinstance(fv, Packet): 

1743 if json: 

1744 fv = {k: v for (k, v) in fv._command(json=True)} 

1745 else: 

1746 fv = fv.command() 

1747 elif fld.islist and fld.holds_packets and isinstance(fv, list): 

1748 if json: 

1749 fv = [ 

1750 {k: v for (k, v) in x} 

1751 for x in map(lambda y: Packet._command(y, json=True), fv) 

1752 ] 

1753 else: 

1754 fv = "[%s]" % ",".join(map(Packet.command, fv)) 

1755 elif fld.islist and isinstance(fv, list): 

1756 if json: 

1757 fv = [ 

1758 getattr(x, 'command', lambda: repr(x))() 

1759 for x in fv 

1760 ] 

1761 else: 

1762 fv = "[%s]" % ",".join( 

1763 getattr(x, 'command', lambda: repr(x))() 

1764 for x in fv 

1765 ) 

1766 elif isinstance(fv, FlagValue): 

1767 fv = int(fv) 

1768 elif callable(getattr(fv, 'command', None)): 

1769 fv = fv.command(json=json) 

1770 else: 

1771 if json: 

1772 if isinstance(fv, bytes): 

1773 fv = fv.decode("utf-8", errors="backslashreplace") 

1774 else: 

1775 fv = fld.i2h(self, fv) 

1776 else: 

1777 fv = repr(fld.i2h(self, fv)) 

1778 f.append((fn, fv)) 

1779 return f 

1780 

1781 def command(self): 

1782 # type: () -> str 

1783 """ 

1784 Returns a string representing the command you have to type to 

1785 obtain the same packet 

1786 """ 

1787 c = "%s(%s)" % ( 

1788 self.__class__.__name__, 

1789 ", ".join("%s=%s" % x for x in self._command()) 

1790 ) 

1791 pc = self.payload.command() 

1792 if pc: 

1793 c += "/" + pc 

1794 return c 

1795 

1796 def json(self): 

1797 # type: () -> str 

1798 """ 

1799 Returns a JSON representing the packet. 

1800 

1801 Please note that this cannot be used for bijective usage: data loss WILL occur, 

1802 so it will not make sense to try to rebuild the packet from the output. 

1803 This must only be used for a grepping/displaying purpose. 

1804 """ 

1805 dump = json.dumps({k: v for (k, v) in self._command(json=True)}) 

1806 pc = self.payload.json() 

1807 if pc: 

1808 dump = dump[:-1] + ", \"payload\": %s}" % pc 

1809 return dump 

1810 

1811 

1812class NoPayload(Packet): 

1813 def __new__(cls, *args, **kargs): 

1814 # type: (Type[Packet], *Any, **Any) -> NoPayload 

1815 singl = cls.__dict__.get("__singl__") 

1816 if singl is None: 

1817 cls.__singl__ = singl = Packet.__new__(cls) 

1818 Packet.__init__(singl) 

1819 return cast(NoPayload, singl) 

1820 

1821 def __init__(self, *args, **kargs): 

1822 # type: (*Any, **Any) -> None 

1823 pass 

1824 

1825 def dissection_done(self, pkt): 

1826 # type: (Packet) -> None 

1827 pass 

1828 

1829 def add_payload(self, payload): 

1830 # type: (Union[Packet, bytes]) -> NoReturn 

1831 raise Scapy_Exception("Can't add payload to NoPayload instance") 

1832 

1833 def remove_payload(self): 

1834 # type: () -> None 

1835 pass 

1836 

1837 def add_underlayer(self, underlayer): 

1838 # type: (Any) -> None 

1839 pass 

1840 

1841 def remove_underlayer(self, other): 

1842 # type: (Packet) -> None 

1843 pass 

1844 

1845 def add_parent(self, parent): 

1846 # type: (Any) -> None 

1847 pass 

1848 

1849 def remove_parent(self, other): 

1850 # type: (Packet) -> None 

1851 pass 

1852 

1853 def copy(self): 

1854 # type: () -> NoPayload 

1855 return self 

1856 

1857 def clear_cache(self): 

1858 # type: () -> None 

1859 pass 

1860 

1861 def __repr__(self): 

1862 # type: () -> str 

1863 return "" 

1864 

1865 def __str__(self): 

1866 # type: () -> str 

1867 return "" 

1868 

1869 def __bytes__(self): 

1870 # type: () -> bytes 

1871 return b"" 

1872 

1873 def __nonzero__(self): 

1874 # type: () -> bool 

1875 return False 

1876 __bool__ = __nonzero__ 

1877 

1878 def do_build(self): 

1879 # type: () -> bytes 

1880 return b"" 

1881 

1882 def build(self): 

1883 # type: () -> bytes 

1884 return b"" 

1885 

1886 def build_padding(self): 

1887 # type: () -> bytes 

1888 return b"" 

1889 

1890 def build_done(self, p): 

1891 # type: (bytes) -> bytes 

1892 return p 

1893 

1894 def build_ps(self, internal=0): 

1895 # type: (int) -> Tuple[bytes, List[Any]] 

1896 return b"", [] 

1897 

1898 def getfieldval(self, attr): 

1899 # type: (str) -> NoReturn 

1900 raise AttributeError(attr) 

1901 

1902 def getfield_and_val(self, attr): 

1903 # type: (str) -> NoReturn 

1904 raise AttributeError(attr) 

1905 

1906 def setfieldval(self, attr, val): 

1907 # type: (str, Any) -> NoReturn 

1908 raise AttributeError(attr) 

1909 

1910 def delfieldval(self, attr): 

1911 # type: (str) -> NoReturn 

1912 raise AttributeError(attr) 

1913 

1914 def hide_defaults(self): 

1915 # type: () -> None 

1916 pass 

1917 

1918 def __iter__(self): 

1919 # type: () -> Iterator[Packet] 

1920 return iter([]) 

1921 

1922 def __eq__(self, other): 

1923 # type: (Any) -> bool 

1924 if isinstance(other, NoPayload): 

1925 return True 

1926 return False 

1927 

1928 def hashret(self): 

1929 # type: () -> bytes 

1930 return b"" 

1931 

1932 def answers(self, other): 

1933 # type: (Packet) -> bool 

1934 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501 

1935 

1936 def haslayer(self, cls, _subclass=None): 

1937 # type: (Union[Type[Packet], str], Optional[bool]) -> int 

1938 return 0 

1939 

1940 def getlayer(self, 

1941 cls, # type: Union[int, Type[Packet], str] 

1942 nb=1, # type: int 

1943 _track=None, # type: Optional[List[int]] 

1944 _subclass=None, # type: Optional[bool] 

1945 **flt # type: Any 

1946 ): 

1947 # type: (...) -> Optional[Packet] 

1948 if _track is not None: 

1949 _track.append(nb) 

1950 return None 

1951 

1952 def fragment(self, *args, **kargs): 

1953 # type: (*Any, **Any) -> List[Packet] 

1954 raise Scapy_Exception("cannot fragment this packet") 

1955 

1956 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 

1957 # type: (bool, int, str, str) -> None 

1958 pass 

1959 

1960 def sprintf(self, fmt, relax=1): 

1961 # type: (str, int) -> str 

1962 if relax: 

1963 return "??" 

1964 else: 

1965 raise Scapy_Exception("Format not found [%s]" % fmt) 

1966 

1967 def _do_summary(self): 

1968 # type: () -> Tuple[int, str, List[Any]] 

1969 return 0, "", [] 

1970 

1971 def layers(self): 

1972 # type: () -> List[Type[Packet]] 

1973 return [] 

1974 

1975 def lastlayer(self, layer=None): 

1976 # type: (Optional[Packet]) -> Packet 

1977 return layer or self 

1978 

1979 def command(self): 

1980 # type: () -> str 

1981 return "" 

1982 

1983 def json(self): 

1984 # type: () -> str 

1985 return "" 

1986 

1987 def route(self): 

1988 # type: () -> Tuple[None, None, None] 

1989 return (None, None, None) 

1990 

1991 

1992#################### 

1993# packet classes # 

1994#################### 

1995 

1996 

1997class Raw(Packet): 

1998 name = "Raw" 

1999 fields_desc = [StrField("load", b"")] 

2000 

2001 def __init__(self, _pkt=b"", *args, **kwargs): 

2002 # type: (bytes, *Any, **Any) -> None 

2003 if _pkt and not isinstance(_pkt, bytes): 

2004 if isinstance(_pkt, tuple): 

2005 _pkt, bn = _pkt 

2006 _pkt = bytes_encode(_pkt), bn 

2007 else: 

2008 _pkt = bytes_encode(_pkt) 

2009 super(Raw, self).__init__(_pkt, *args, **kwargs) 

2010 

2011 def answers(self, other): 

2012 # type: (Packet) -> int 

2013 return 1 

2014 

2015 def mysummary(self): 

2016 # type: () -> str 

2017 cs = conf.raw_summary 

2018 if cs: 

2019 if callable(cs): 

2020 return "Raw %s" % cs(self.load) 

2021 else: 

2022 return "Raw %r" % self.load 

2023 return Packet.mysummary(self) 

2024 

2025 

2026class Padding(Raw): 

2027 name = "Padding" 

2028 

2029 def self_build(self): 

2030 # type: (Optional[Any]) -> bytes 

2031 return b"" 

2032 

2033 def build_padding(self): 

2034 # type: () -> bytes 

2035 return ( 

2036 bytes_encode(self.load) if self.raw_packet_cache is None 

2037 else self.raw_packet_cache 

2038 ) + self.payload.build_padding() 

2039 

2040 

2041conf.raw_layer = Raw 

2042conf.padding_layer = Padding 

2043if conf.default_l2 is None: 

2044 conf.default_l2 = Raw 

2045 

2046################# 

2047# Bind layers # 

2048################# 

2049 

2050 

2051def bind_bottom_up(lower, # type: Type[Packet] 

2052 upper, # type: Type[Packet] 

2053 __fval=None, # type: Optional[Any] 

2054 **fval # type: Any 

2055 ): 

2056 # type: (...) -> None 

2057 r"""Bind 2 layers for dissection. 

2058 The upper layer will be chosen for dissection on top of the lower layer, if 

2059 ALL the passed arguments are validated. If multiple calls are made with 

2060 the same layers, the last one will be used as default. 

2061 

2062 ex: 

2063 >>> bind_bottom_up(Ether, SNAP, type=0x1234) 

2064 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501 

2065 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501 

2066 """ 

2067 if __fval is not None: 

2068 fval.update(__fval) 

2069 lower.payload_guess = lower.payload_guess[:] 

2070 lower.payload_guess.append((fval, upper)) 

2071 

2072 

2073def bind_top_down(lower, # type: Type[Packet] 

2074 upper, # type: Type[Packet] 

2075 __fval=None, # type: Optional[Any] 

2076 **fval # type: Any 

2077 ): 

2078 # type: (...) -> None 

2079 """Bind 2 layers for building. 

2080 When the upper layer is added as a payload of the lower layer, all the 

2081 arguments will be applied to them. 

2082 

2083 ex: 

2084 >>> bind_top_down(Ether, SNAP, type=0x1234) 

2085 >>> Ether()/SNAP() 

2086 <Ether type=0x1234 |<SNAP |>> 

2087 """ 

2088 if __fval is not None: 

2089 fval.update(__fval) 

2090 upper._overload_fields = upper._overload_fields.copy() # type: ignore 

2091 upper._overload_fields[lower] = fval 

2092 

2093 

2094@conf.commands.register 

2095def bind_layers(lower, # type: Type[Packet] 

2096 upper, # type: Type[Packet] 

2097 __fval=None, # type: Optional[Dict[str, int]] 

2098 **fval # type: Any 

2099 ): 

2100 # type: (...) -> None 

2101 """Bind 2 layers on some specific fields' values. 

2102 

2103 It makes the packet being built and dissected when the arguments 

2104 are present. 

2105 

2106 This function calls both bind_bottom_up and bind_top_down, with 

2107 all passed arguments. 

2108 

2109 Please have a look at their docs: 

2110 - help(bind_bottom_up) 

2111 - help(bind_top_down) 

2112 """ 

2113 if __fval is not None: 

2114 fval.update(__fval) 

2115 bind_top_down(lower, upper, **fval) 

2116 bind_bottom_up(lower, upper, **fval) 

2117 

2118 

2119def split_bottom_up(lower, # type: Type[Packet] 

2120 upper, # type: Type[Packet] 

2121 __fval=None, # type: Optional[Any] 

2122 **fval # type: Any 

2123 ): 

2124 # type: (...) -> None 

2125 """This call un-links an association that was made using bind_bottom_up. 

2126 Have a look at help(bind_bottom_up) 

2127 """ 

2128 if __fval is not None: 

2129 fval.update(__fval) 

2130 

2131 def do_filter(params, cls): 

2132 # type: (Dict[str, int], Type[Packet]) -> bool 

2133 params_is_invalid = any( 

2134 k not in params or params[k] != v for k, v in fval.items() 

2135 ) 

2136 return cls != upper or params_is_invalid 

2137 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)] 

2138 

2139 

2140def split_top_down(lower, # type: Type[Packet] 

2141 upper, # type: Type[Packet] 

2142 __fval=None, # type: Optional[Any] 

2143 **fval # type: Any 

2144 ): 

2145 # type: (...) -> None 

2146 """This call un-links an association that was made using bind_top_down. 

2147 Have a look at help(bind_top_down) 

2148 """ 

2149 if __fval is not None: 

2150 fval.update(__fval) 

2151 if lower in upper._overload_fields: 

2152 ofval = upper._overload_fields[lower] 

2153 if any(k not in ofval or ofval[k] != v for k, v in fval.items()): 

2154 return 

2155 upper._overload_fields = upper._overload_fields.copy() # type: ignore 

2156 del upper._overload_fields[lower] 

2157 

2158 

2159@conf.commands.register 

2160def split_layers(lower, # type: Type[Packet] 

2161 upper, # type: Type[Packet] 

2162 __fval=None, # type: Optional[Any] 

2163 **fval # type: Any 

2164 ): 

2165 # type: (...) -> None 

2166 """Split 2 layers previously bound. 

2167 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501 

2168 bind_layers. 

2169 

2170 Please have a look at their docs: 

2171 - help(split_bottom_up) 

2172 - help(split_top_down) 

2173 """ 

2174 if __fval is not None: 

2175 fval.update(__fval) 

2176 split_bottom_up(lower, upper, **fval) 

2177 split_top_down(lower, upper, **fval) 

2178 

2179 

2180@conf.commands.register 

2181def explore(layer=None): 

2182 # type: (Optional[str]) -> None 

2183 """Function used to discover the Scapy layers and protocols. 

2184 It helps to see which packets exists in contrib or layer files. 

2185 

2186 params: 

2187 - layer: If specified, the function will explore the layer. If not, 

2188 the GUI mode will be activated, to browse the available layers 

2189 

2190 examples: 

2191 >>> explore() # Launches the GUI 

2192 >>> explore("dns") # Explore scapy.layers.dns 

2193 >>> explore("http2") # Explore scapy.contrib.http2 

2194 >>> explore(scapy.layers.bluetooth4LE) 

2195 

2196 Note: to search a packet by name, use ls("name") rather than explore. 

2197 """ 

2198 if layer is None: # GUI MODE 

2199 if not conf.interactive: 

2200 raise Scapy_Exception("explore() GUI-mode cannot be run in " 

2201 "interactive mode. Please provide a " 

2202 "'layer' parameter !") 

2203 # 0 - Imports 

2204 try: 

2205 import prompt_toolkit 

2206 except ImportError: 

2207 raise ImportError("prompt_toolkit is not installed ! " 

2208 "You may install IPython, which contains it, via" 

2209 " `pip install ipython`") 

2210 if not _version_checker(prompt_toolkit, (2, 0)): 

2211 raise ImportError("prompt_toolkit >= 2.0.0 is required !") 

2212 # Only available with prompt_toolkit > 2.0, not released on PyPi yet 

2213 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \ 

2214 button_dialog 

2215 from prompt_toolkit.formatted_text import HTML 

2216 # Check for prompt_toolkit >= 3.0.0 

2217 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str] 

2218 if _version_checker(prompt_toolkit, (3, 0)): 

2219 call_ptk = lambda x: x.run() 

2220 # 1 - Ask for layer or contrib 

2221 btn_diag = button_dialog( 

2222 title="Scapy v%s" % conf.version, 

2223 text=HTML( 

2224 '<style bg="white" fg="red">Chose the type of packets' 

2225 ' you want to explore:</style>' 

2226 ), 

2227 buttons=[ 

2228 ("Layers", "layers"), 

2229 ("Contribs", "contribs"), 

2230 ("Cancel", "cancel") 

2231 ]) 

2232 action = call_ptk(btn_diag) 

2233 # 2 - Retrieve list of Packets 

2234 if action == "layers": 

2235 # Get all loaded layers 

2236 lvalues = conf.layers.layers() 

2237 # Restrict to layers-only (not contribs) + packet.py and asn1*.py 

2238 values = [x for x in lvalues if ("layers" in x[0] or 

2239 "packet" in x[0] or 

2240 "asn1" in x[0])] 

2241 elif action == "contribs": 

2242 # Get all existing contribs 

2243 from scapy.main import list_contrib 

2244 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True)) 

2245 values = [(x['name'], x['description']) 

2246 for x in cvalues] 

2247 # Remove very specific modules 

2248 values = [x for x in values if "can" not in x[0]] 

2249 else: 

2250 # Escape/Cancel was pressed 

2251 return 

2252 # Build tree 

2253 if action == "contribs": 

2254 # A tree is a dictionary. Each layer contains a keyword 

2255 # _l which contains the files in the layer, and a _name 

2256 # argument which is its name. The other keys are the subfolders, 

2257 # which are similar dictionaries 

2258 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501 

2259 for name, desc in values: 

2260 if "." in name: # Folder detected 

2261 parts = name.split(".") 

2262 subtree = tree 

2263 for pa in parts[:-1]: 

2264 if pa not in subtree: 

2265 subtree[pa] = {} 

2266 # one layer deeper 

2267 subtree = subtree[pa] # type: ignore 

2268 subtree["_name"] = pa # type: ignore 

2269 if "_l" not in subtree: 

2270 subtree["_l"] = [] 

2271 subtree["_l"].append((parts[-1], desc)) # type: ignore 

2272 else: 

2273 tree["_l"].append((name, desc)) # type: ignore 

2274 elif action == "layers": 

2275 tree = {"_l": values} 

2276 # 3 - Ask for the layer/contrib module to explore 

2277 current = tree # type: Any 

2278 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501 

2279 while True: 

2280 # Generate tests & form 

2281 folders = list(current.keys()) 

2282 _radio_values = [ 

2283 ("$" + name, str('[+] ' + name.capitalize())) 

2284 for name in folders if not name.startswith("_") 

2285 ] + current.get("_l", []) # type: List[str] 

2286 cur_path = "" 

2287 if previous: 

2288 cur_path = ".".join( 

2289 itertools.chain( 

2290 (x["_name"] for x in previous[1:]), # type: ignore 

2291 (current["_name"],) 

2292 ) 

2293 ) 

2294 extra_text = ( 

2295 '\n<style bg="white" fg="green">> scapy.%s</style>' 

2296 ) % (action + ("." + cur_path if cur_path else "")) 

2297 # Show popup 

2298 rd_diag = radiolist_dialog( 

2299 values=_radio_values, 

2300 title="Scapy v%s" % conf.version, 

2301 text=HTML( 

2302 ( 

2303 '<style bg="white" fg="red">Please select a file' 

2304 'among the following, to see all layers contained in' 

2305 ' it:</style>' 

2306 ) + extra_text 

2307 ), 

2308 cancel_text="Back" if previous else "Cancel" 

2309 ) 

2310 result = call_ptk(rd_diag) 

2311 if result is None: 

2312 # User pressed "Cancel/Back" 

2313 if previous: # Back 

2314 current = previous.pop() 

2315 continue 

2316 else: # Cancel 

2317 return 

2318 if result.startswith("$"): 

2319 previous.append(current) 

2320 current = current[result[1:]] 

2321 else: 

2322 # Enter on layer 

2323 if previous: # In subfolder 

2324 result = cur_path + "." + result 

2325 break 

2326 # 4 - (Contrib only): load contrib 

2327 if action == "contribs": 

2328 from scapy.main import load_contrib 

2329 load_contrib(result) 

2330 result = "scapy.contrib." + result 

2331 else: # NON-GUI MODE 

2332 # We handle layer as a short layer name, full layer name 

2333 # or the module itself 

2334 if isinstance(layer, types.ModuleType): 

2335 layer = layer.__name__ 

2336 if isinstance(layer, str): 

2337 if layer.startswith("scapy.layers."): 

2338 result = layer 

2339 else: 

2340 if layer.startswith("scapy.contrib."): 

2341 layer = layer.replace("scapy.contrib.", "") 

2342 from scapy.main import load_contrib 

2343 load_contrib(layer) 

2344 result_layer, result_contrib = (("scapy.layers.%s" % layer), 

2345 ("scapy.contrib.%s" % layer)) 

2346 if result_layer in conf.layers.ldict: 

2347 result = result_layer 

2348 elif result_contrib in conf.layers.ldict: 

2349 result = result_contrib 

2350 else: 

2351 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 

2352 else: 

2353 warning("Wrong usage ! Check out help(explore)") 

2354 return 

2355 

2356 # COMMON PART 

2357 # Get the list of all Packets contained in that module 

2358 try: 

2359 all_layers = conf.layers.ldict[result] 

2360 except KeyError: 

2361 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 

2362 # Print 

2363 print(conf.color_theme.layer_name("Packets contained in %s:" % result)) 

2364 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 

2365 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers] 

2366 print(pretty_list(rtlst, [("Class", "Name")], borders=True)) 

2367 

2368 

2369def _pkt_ls(obj, # type: Union[Packet, Type[Packet]] 

2370 verbose=False, # type: bool 

2371 ): 

2372 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501 

2373 """Internal function used to resolve `fields_desc` to display it. 

2374 

2375 :param obj: a packet object or class 

2376 :returns: a list containing tuples [(name, clsname, clsname_extras, 

2377 default, long_attrs)] 

2378 """ 

2379 is_pkt = isinstance(obj, Packet) 

2380 if not issubtype(obj, Packet) and not is_pkt: 

2381 raise ValueError 

2382 fields = [] 

2383 for f in obj.fields_desc: 

2384 cur_fld = f 

2385 attrs = [] # type: List[str] 

2386 long_attrs = [] # type: List[str] 

2387 while isinstance(cur_fld, (Emph, ConditionalField)): 

2388 if isinstance(cur_fld, ConditionalField): 

2389 attrs.append(cur_fld.__class__.__name__[:4]) 

2390 cur_fld = cur_fld.fld 

2391 name = cur_fld.name 

2392 default = cur_fld.default 

2393 if verbose and isinstance(cur_fld, EnumField) \ 

2394 and hasattr(cur_fld, "i2s") and cur_fld.i2s: 

2395 if len(cur_fld.i2s or []) < 50: 

2396 long_attrs.extend( 

2397 "%s: %d" % (strval, numval) 

2398 for numval, strval in 

2399 sorted(cur_fld.i2s.items()) 

2400 ) 

2401 elif isinstance(cur_fld, MultiEnumField): 

2402 if isinstance(obj, Packet): 

2403 obj_pkt = obj 

2404 else: 

2405 obj_pkt = obj() 

2406 fld_depend = cur_fld.depends_on(obj_pkt) 

2407 attrs.append("Depends on %s" % fld_depend) 

2408 if verbose: 

2409 cur_i2s = cur_fld.i2s_multi.get( 

2410 cur_fld.depends_on(obj_pkt), {} 

2411 ) 

2412 if len(cur_i2s) < 50: 

2413 long_attrs.extend( 

2414 "%s: %d" % (strval, numval) 

2415 for numval, strval in 

2416 sorted(cur_i2s.items()) 

2417 ) 

2418 elif verbose and isinstance(cur_fld, FlagsField): 

2419 names = cur_fld.names 

2420 long_attrs.append(", ".join(names)) 

2421 elif isinstance(cur_fld, MultipleTypeField): 

2422 default = cur_fld.dflt.default 

2423 attrs.append(", ".join( 

2424 x[0].__class__.__name__ for x in 

2425 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)]) 

2426 )) 

2427 

2428 cls = cur_fld.__class__ 

2429 class_name_extras = "(%s)" % ( 

2430 ", ".join(attrs) 

2431 ) if attrs else "" 

2432 if isinstance(cur_fld, BitField): 

2433 class_name_extras += " (%d bit%s)" % ( 

2434 cur_fld.size, 

2435 "s" if cur_fld.size > 1 else "" 

2436 ) 

2437 fields.append( 

2438 (name, 

2439 cls, 

2440 class_name_extras, 

2441 repr(default), 

2442 long_attrs) 

2443 ) 

2444 return fields 

2445 

2446 

2447@conf.commands.register 

2448def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]] 

2449 case_sensitive=False, # type: bool 

2450 verbose=False # type: bool 

2451 ): 

2452 # type: (...) -> None 

2453 """List available layers, or infos on a given layer class or name. 

2454 

2455 :param obj: Packet / packet name to use 

2456 :param case_sensitive: if obj is a string, is it case sensitive? 

2457 :param verbose: 

2458 """ 

2459 if obj is None or isinstance(obj, str): 

2460 tip = False 

2461 if obj is None: 

2462 tip = True 

2463 all_layers = sorted(conf.layers, key=lambda x: x.__name__) 

2464 else: 

2465 pattern = re.compile( 

2466 obj, 

2467 0 if case_sensitive else re.I 

2468 ) 

2469 # We first order by accuracy, then length 

2470 if case_sensitive: 

2471 sorter = lambda x: (x.__name__.index(obj), len(x.__name__)) 

2472 else: 

2473 obj = obj.lower() 

2474 sorter = lambda x: (x.__name__.lower().index(obj), 

2475 len(x.__name__)) 

2476 all_layers = sorted((layer for layer in conf.layers 

2477 if (isinstance(layer.__name__, str) and 

2478 pattern.search(layer.__name__)) or 

2479 (isinstance(layer.name, str) and 

2480 pattern.search(layer.name))), 

2481 key=sorter) 

2482 for layer in all_layers: 

2483 print("%-10s : %s" % (layer.__name__, layer._name)) 

2484 if tip and conf.interactive: 

2485 print("\nTIP: You may use explore() to navigate through all " 

2486 "layers using a clear GUI") 

2487 else: 

2488 try: 

2489 fields = _pkt_ls( 

2490 obj, 

2491 verbose=verbose 

2492 ) 

2493 is_pkt = isinstance(obj, Packet) 

2494 # Print 

2495 for fname, cls, clsne, dflt, long_attrs in fields: 

2496 clsinfo = cls.__name__ + " " + clsne 

2497 print("%-10s : %-35s =" % (fname, clsinfo), end=' ') 

2498 if is_pkt: 

2499 print("%-15r" % (getattr(obj, fname),), end=' ') 

2500 print("(%r)" % (dflt,)) 

2501 for attr in long_attrs: 

2502 print("%-15s%s" % ("", attr)) 

2503 # Restart for payload if any 

2504 if is_pkt: 

2505 obj = cast(Packet, obj) 

2506 if isinstance(obj.payload, NoPayload): 

2507 return 

2508 print("--") 

2509 ls(obj.payload) 

2510 except ValueError: 

2511 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501 

2512 

2513 

2514@conf.commands.register 

2515def rfc(cls, ret=False, legend=True): 

2516 # type: (Type[Packet], bool, bool) -> Optional[str] 

2517 """ 

2518 Generate an RFC-like representation of a packet def. 

2519 

2520 :param cls: the Packet class 

2521 :param ret: return the result instead of printing (def. False) 

2522 :param legend: show text under the diagram (default True) 

2523 

2524 Ex:: 

2525 

2526 >>> rfc(Ether) 

2527 

2528 """ 

2529 if not issubclass(cls, Packet): 

2530 raise TypeError("Packet class expected") 

2531 cur_len = 0 

2532 cur_line = [] 

2533 lines = [] 

2534 # Get the size (width) that a field will take 

2535 # when formatted, from its length in bits 

2536 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int] 

2537 ident = 0 # Fields UUID 

2538 

2539 # Generate packet groups 

2540 def _iterfields() -> Iterator[Tuple[str, int]]: 

2541 for f in cls.fields_desc: 

2542 # Fancy field name 

2543 fname = f.name.upper().replace("_", " ") 

2544 fsize = int(f.sz * 8) 

2545 yield fname, fsize 

2546 # Add padding optionally 

2547 if isinstance(f, PadField): 

2548 if isinstance(f._align, tuple): 

2549 pad = - cur_len % (f._align[0] * 8) 

2550 else: 

2551 pad = - cur_len % (f._align * 8) 

2552 if pad: 

2553 yield "padding", pad 

2554 for fname, flen in _iterfields(): 

2555 cur_len += flen 

2556 ident += 1 

2557 # The field might exceed the current line or 

2558 # take more than one line. Copy it as required 

2559 while True: 

2560 over = max(0, cur_len - 32) # Exceed 

2561 len1 = clsize(flen - over) # What fits 

2562 cur_line.append((fname[:len1], len1, ident)) 

2563 if cur_len >= 32: 

2564 # Current line is full. start a new line 

2565 lines.append(cur_line) 

2566 cur_len = flen = over 

2567 fname = "" # do not repeat the field 

2568 cur_line = [] 

2569 if not over: 

2570 # there is no data left 

2571 break 

2572 else: 

2573 # End of the field 

2574 break 

2575 # Add the last line if un-finished 

2576 if cur_line: 

2577 lines.append(cur_line) 

2578 # Calculate separations between lines 

2579 seps = [] 

2580 seps.append("+-" * 32 + "+\n") 

2581 for i in range(len(lines) - 1): 

2582 # Start with a full line 

2583 sep = "+-" * 32 + "+\n" 

2584 # Get the line above and below the current 

2585 # separation 

2586 above, below = lines[i], lines[i + 1] 

2587 # The last field of above is shared with below 

2588 if above[-1][2] == below[0][2]: 

2589 # where the field in "above" starts 

2590 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1 

2591 # where the field in "below" ends 

2592 pos_below = below[0][1] 

2593 if pos_above < pos_below: 

2594 # they are overlapping. 

2595 # Now crop the space between those pos 

2596 # and fill it with " " 

2597 pos_above = pos_above + pos_above % 2 

2598 sep = ( 

2599 sep[:1 + pos_above] + 

2600 " " * (pos_below - pos_above) + 

2601 sep[1 + pos_below:] 

2602 ) 

2603 # line is complete 

2604 seps.append(sep) 

2605 # Graph 

2606 result = "" 

2607 # Bytes markers 

2608 result += " " + (" " * 19).join( 

2609 str(x) for x in range(4) 

2610 ) + "\n" 

2611 # Bits markers 

2612 result += " " + " ".join( 

2613 str(x % 10) for x in range(32) 

2614 ) + "\n" 

2615 # Add fields and their separations 

2616 for line, sep in zip(lines, seps): 

2617 result += sep 

2618 for elt, flen, _ in line: 

2619 result += "|" + elt.center(flen, " ") 

2620 result += "|\n" 

2621 result += "+-" * (cur_len or 32) + "+\n" 

2622 # Annotate with the figure name 

2623 if legend: 

2624 result += "\n" + ("Fig. " + cls.__name__).center(66, " ") 

2625 # return if asked for, else print 

2626 if ret: 

2627 return result 

2628 print(result) 

2629 return None 

2630 

2631 

2632############# 

2633# Fuzzing # 

2634############# 

2635 

2636_P = TypeVar('_P', bound=Packet) 

2637 

2638 

2639@conf.commands.register 

2640def fuzz(p, # type: _P 

2641 _inplace=0, # type: int 

2642 ): 

2643 # type: (...) -> _P 

2644 """ 

2645 Transform a layer into a fuzzy layer by replacing some default values 

2646 by random objects. 

2647 

2648 :param p: the Packet instance to fuzz 

2649 :return: the fuzzed packet. 

2650 """ 

2651 if not _inplace: 

2652 p = p.copy() 

2653 q = cast(Packet, p) 

2654 while not isinstance(q, NoPayload): 

2655 new_default_fields = {} 

2656 multiple_type_fields = [] # type: List[str] 

2657 for f in q.fields_desc: 

2658 if isinstance(f, PacketListField): 

2659 for r in getattr(q, f.name): 

2660 fuzz(r, _inplace=1) 

2661 elif isinstance(f, MultipleTypeField): 

2662 # the type of the field will depend on others 

2663 multiple_type_fields.append(f.name) 

2664 elif f.default is not None: 

2665 if not isinstance(f, ConditionalField) or f._evalcond(q): 

2666 rnd = f.randval() 

2667 if rnd is not None: 

2668 new_default_fields[f.name] = rnd 

2669 # Process packets with MultipleTypeFields 

2670 if multiple_type_fields: 

2671 # freeze the other random values 

2672 new_default_fields = { 

2673 key: (val._fix() if isinstance(val, VolatileValue) else val) 

2674 for key, val in new_default_fields.items() 

2675 } 

2676 q.default_fields.update(new_default_fields) 

2677 new_default_fields.clear() 

2678 # add the random values of the MultipleTypeFields 

2679 for name in multiple_type_fields: 

2680 fld = cast(MultipleTypeField, q.get_field(name)) 

2681 rnd = fld._find_fld_pkt(q).randval() 

2682 if rnd is not None: 

2683 new_default_fields[name] = rnd 

2684 q.default_fields.update(new_default_fields) 

2685 q = q.payload 

2686 return p