Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/packet.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1432 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Packet class 

8 

9Provides: 

10 - the default Packet classes 

11 - binding mechanisms 

12 - fuzz() method 

13 - exploration methods: explore() / ls() 

14""" 

15 

16from collections import defaultdict 

17 

18import json 

19import re 

20import time 

21import itertools 

22import copy 

23import types 

24import warnings 

25 

26from scapy.fields import ( 

27 AnyField, 

28 BitField, 

29 ConditionalField, 

30 Emph, 

31 EnumField, 

32 Field, 

33 FlagsField, 

34 FlagValue, 

35 MayEnd, 

36 MultiEnumField, 

37 MultipleTypeField, 

38 PadField, 

39 PacketListField, 

40 RawVal, 

41 StrField, 

42) 

43from scapy.config import conf, _version_checker 

44from scapy.compat import raw, orb, bytes_encode 

45from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \ 

46 _CanvasDumpExtended 

47from scapy.interfaces import _GlobInterfaceType 

48from scapy.volatile import RandField, VolatileValue 

49from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \ 

50 pretty_list, EDecimal 

51from scapy.error import Scapy_Exception, log_runtime, warning 

52from scapy.libs.test_pyx import PYX 

53 

54# Typing imports 

55from typing import ( 

56 Any, 

57 Callable, 

58 Dict, 

59 Iterator, 

60 List, 

61 NoReturn, 

62 Optional, 

63 Set, 

64 Tuple, 

65 Type, 

66 TypeVar, 

67 Union, 

68 Sequence, 

69 cast, 

70) 

71from scapy.compat import Self 

72 

73try: 

74 import pyx 

75except ImportError: 

76 pass 

77 

78 

79_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]]) 

80 

81 

82class Packet( 

83 BasePacket, 

84 _CanvasDumpExtended, 

85 metaclass=Packet_metaclass 

86): 

87 __slots__ = [ 

88 "time", "sent_time", "name", 

89 "default_fields", "fields", "fieldtype", 

90 "overload_fields", "overloaded_fields", 

91 "packetfields", 

92 "original", "explicit", "raw_packet_cache", 

93 "raw_packet_cache_fields", "_pkt", "post_transforms", 

94 "stop_dissection_after", 

95 # then payload, underlayer and parent 

96 "payload", "underlayer", "parent", 

97 "name", 

98 # used for sr() 

99 "_answered", 

100 # used when sniffing 

101 "direction", "sniffed_on", 

102 # handle snaplen Vs real length 

103 "wirelen", 

104 "comment", 

105 "process_information" 

106 ] 

107 name = None 

108 fields_desc = [] # type: List[AnyField] 

109 deprecated_fields = {} # type: Dict[str, Tuple[str, str]] 

110 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 

111 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]] 

112 show_indent = 1 

113 show_summary = True 

114 match_subclass = False 

115 class_dont_cache = {} # type: Dict[Type[Packet], bool] 

116 class_packetfields = {} # type: Dict[Type[Packet], Any] 

117 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 

118 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]] 

119 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501 

120 

121 @classmethod 

122 def from_hexcap(cls): 

123 # type: (Type[Packet]) -> Packet 

124 return cls(import_hexcap()) 

125 

126 @classmethod 

127 def upper_bonds(self): 

128 # type: () -> None 

129 for fval, upper in self.payload_guess: 

130 print( 

131 "%-20s %s" % ( 

132 upper.__name__, 

133 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 

134 ) 

135 ) 

136 

137 @classmethod 

138 def lower_bonds(self): 

139 # type: () -> None 

140 for lower, fval in self._overload_fields.items(): 

141 print( 

142 "%-20s %s" % ( 

143 lower.__name__, 

144 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 

145 ) 

146 ) 

147 

148 def __init__(self, 

149 _pkt=b"", # type: Union[bytes, bytearray] 

150 post_transform=None, # type: Any 

151 _internal=0, # type: int 

152 _underlayer=None, # type: Optional[Packet] 

153 _parent=None, # type: Optional[Packet] 

154 stop_dissection_after=None, # type: Optional[Type[Packet]] 

155 **fields # type: Any 

156 ): 

157 # type: (...) -> None 

158 self.time = time.time() # type: Union[EDecimal, float] 

159 self.sent_time = None # type: Union[EDecimal, float, None] 

160 self.name = (self.__class__.__name__ 

161 if self._name is None else 

162 self._name) 

163 self.default_fields = {} # type: Dict[str, Any] 

164 self.overload_fields = self._overload_fields 

165 self.overloaded_fields = {} # type: Dict[str, Any] 

166 self.fields = {} # type: Dict[str, Any] 

167 self.fieldtype = {} # type: Dict[str, AnyField] 

168 self.packetfields = [] # type: List[AnyField] 

169 self.payload = NoPayload() # type: Packet 

170 self.init_fields(bool(_pkt)) 

171 self.underlayer = _underlayer 

172 self.parent = _parent 

173 if isinstance(_pkt, bytearray): 

174 _pkt = bytes(_pkt) 

175 self.original = _pkt 

176 self.explicit = 0 

177 self.raw_packet_cache = None # type: Optional[bytes] 

178 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501 

179 self.wirelen = None # type: Optional[int] 

180 self.direction = None # type: Optional[int] 

181 self.sniffed_on = None # type: Optional[_GlobInterfaceType] 

182 self.comment = None # type: Optional[bytes] 

183 self.process_information = None # type: Optional[Dict[str, Any]] 

184 self.stop_dissection_after = stop_dissection_after 

185 if _pkt: 

186 self.dissect(_pkt) 

187 if not _internal: 

188 self.dissection_done(self) 

189 # We use this strange initialization so that the fields 

190 # are initialized in their declaration order. 

191 # It is required to always support MultipleTypeField 

192 for field in self.fields_desc: 

193 fname = field.name 

194 try: 

195 value = fields.pop(fname) 

196 except KeyError: 

197 continue 

198 self.fields[fname] = value if isinstance(value, RawVal) else \ 

199 self.get_field(fname).any2i(self, value) 

200 # The remaining fields are unknown 

201 for fname in fields: 

202 if fname in self.deprecated_fields: 

203 # Resolve deprecated fields 

204 value = fields[fname] 

205 fname = self._resolve_alias(fname) 

206 self.fields[fname] = value if isinstance(value, RawVal) else \ 

207 self.get_field(fname).any2i(self, value) 

208 continue 

209 raise AttributeError(fname) 

210 if isinstance(post_transform, list): 

211 self.post_transforms = post_transform 

212 elif post_transform is None: 

213 self.post_transforms = [] 

214 else: 

215 self.post_transforms = [post_transform] 

216 

217 _PickleType = Tuple[ 

218 Union[EDecimal, float], 

219 Optional[Union[EDecimal, float, None]], 

220 Optional[int], 

221 Optional[_GlobInterfaceType], 

222 Optional[int], 

223 Optional[bytes], 

224 ] 

225 

226 def __reduce__(self): 

227 # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType] 

228 """Used by pickling methods""" 

229 return (self.__class__, (self.build(),), ( 

230 self.time, 

231 self.sent_time, 

232 self.direction, 

233 self.sniffed_on, 

234 self.wirelen, 

235 self.comment 

236 )) 

237 

238 def __setstate__(self, state): 

239 # type: (Packet._PickleType) -> Packet 

240 """Rebuild state using pickable methods""" 

241 self.time = state[0] 

242 self.sent_time = state[1] 

243 self.direction = state[2] 

244 self.sniffed_on = state[3] 

245 self.wirelen = state[4] 

246 self.comment = state[5] 

247 return self 

248 

249 def __deepcopy__(self, 

250 memo, # type: Any 

251 ): 

252 # type: (...) -> Packet 

253 """Used by copy.deepcopy""" 

254 return self.copy() 

255 

256 def init_fields(self, for_dissect_only=False): 

257 # type: (bool) -> None 

258 """ 

259 Initialize each fields of the fields_desc dict 

260 """ 

261 

262 if self.class_dont_cache.get(self.__class__, False): 

263 self.do_init_fields(self.fields_desc) 

264 else: 

265 self.do_init_cached_fields(for_dissect_only=for_dissect_only) 

266 

267 def do_init_fields(self, 

268 flist, # type: Sequence[AnyField] 

269 ): 

270 # type: (...) -> None 

271 """ 

272 Initialize each fields of the fields_desc dict 

273 """ 

274 default_fields = {} 

275 for f in flist: 

276 default_fields[f.name] = copy.deepcopy(f.default) 

277 self.fieldtype[f.name] = f 

278 if f.holds_packets: 

279 self.packetfields.append(f) 

280 # We set default_fields last to avoid race issues 

281 self.default_fields = default_fields 

282 

283 def do_init_cached_fields(self, for_dissect_only=False): 

284 # type: (bool) -> None 

285 """ 

286 Initialize each fields of the fields_desc dict, or use the cached 

287 fields information 

288 """ 

289 

290 cls_name = self.__class__ 

291 

292 # Build the fields information 

293 if Packet.class_default_fields.get(cls_name, None) is None: 

294 self.prepare_cached_fields(self.fields_desc) 

295 

296 # Use fields information from cache 

297 default_fields = Packet.class_default_fields.get(cls_name, None) 

298 if default_fields: 

299 self.default_fields = default_fields 

300 self.fieldtype = Packet.class_fieldtype[cls_name] 

301 self.packetfields = Packet.class_packetfields[cls_name] 

302 

303 # Optimization: no need for references when only dissecting. 

304 if for_dissect_only: 

305 return 

306 

307 # Deepcopy default references 

308 for fname in Packet.class_default_fields_ref[cls_name]: 

309 value = self.default_fields[fname] 

310 try: 

311 self.fields[fname] = value.copy() 

312 except AttributeError: 

313 # Python 2.7 - list only 

314 self.fields[fname] = value[:] 

315 

316 def prepare_cached_fields(self, flist): 

317 # type: (Sequence[AnyField]) -> None 

318 """ 

319 Prepare the cached fields of the fields_desc dict 

320 """ 

321 

322 cls_name = self.__class__ 

323 

324 # Fields cache initialization 

325 if not flist: 

326 return 

327 

328 class_default_fields = dict() 

329 class_default_fields_ref = list() 

330 class_fieldtype = dict() 

331 class_packetfields = list() 

332 

333 # Fields initialization 

334 for f in flist: 

335 if isinstance(f, MultipleTypeField): 

336 # Abort 

337 self.class_dont_cache[cls_name] = True 

338 self.do_init_fields(self.fields_desc) 

339 return 

340 

341 class_default_fields[f.name] = copy.deepcopy(f.default) 

342 class_fieldtype[f.name] = f 

343 if f.holds_packets: 

344 class_packetfields.append(f) 

345 

346 # Remember references 

347 if isinstance(f.default, (list, dict, set, RandField, Packet)): 

348 class_default_fields_ref.append(f.name) 

349 

350 # Apply 

351 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref 

352 Packet.class_fieldtype[cls_name] = class_fieldtype 

353 Packet.class_packetfields[cls_name] = class_packetfields 

354 # Last to avoid racing issues 

355 Packet.class_default_fields[cls_name] = class_default_fields 

356 

357 def dissection_done(self, pkt): 

358 # type: (Packet) -> None 

359 """DEV: will be called after a dissection is completed""" 

360 self.post_dissection(pkt) 

361 self.payload.dissection_done(pkt) 

362 

363 def post_dissection(self, pkt): 

364 # type: (Packet) -> None 

365 """DEV: is called after the dissection of the whole packet""" 

366 pass 

367 

368 def get_field(self, fld): 

369 # type: (str) -> AnyField 

370 """DEV: returns the field instance from the name of the field""" 

371 return self.fieldtype[fld] 

372 

373 def add_payload(self, payload): 

374 # type: (Union[Packet, bytes]) -> None 

375 if payload is None: 

376 return 

377 elif not isinstance(self.payload, NoPayload): 

378 self.payload.add_payload(payload) 

379 else: 

380 if isinstance(payload, Packet): 

381 self.payload = payload 

382 payload.add_underlayer(self) 

383 for t in self.aliastypes: 

384 if t in payload.overload_fields: 

385 self.overloaded_fields = payload.overload_fields[t] 

386 break 

387 elif isinstance(payload, (bytes, str, bytearray, memoryview)): 

388 self.payload = conf.raw_layer(load=bytes_encode(payload)) 

389 else: 

390 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501 

391 

392 def remove_payload(self): 

393 # type: () -> None 

394 self.payload.remove_underlayer(self) 

395 self.payload = NoPayload() 

396 self.overloaded_fields = {} 

397 

398 def add_underlayer(self, underlayer): 

399 # type: (Packet) -> None 

400 self.underlayer = underlayer 

401 

402 def remove_underlayer(self, other): 

403 # type: (Packet) -> None 

404 self.underlayer = None 

405 

406 def add_parent(self, parent): 

407 # type: (Packet) -> None 

408 """Set packet parent. 

409 When packet is an element in PacketListField, parent field would 

410 point to the list owner packet.""" 

411 self.parent = parent 

412 

413 def remove_parent(self, other): 

414 # type: (Packet) -> None 

415 """Remove packet parent. 

416 When packet is an element in PacketListField, parent field would 

417 point to the list owner packet.""" 

418 self.parent = None 

419 

420 def copy(self) -> Self: 

421 """Returns a deep copy of the instance.""" 

422 clone = self.__class__() 

423 clone.fields = self.copy_fields_dict(self.fields) 

424 clone.default_fields = self.copy_fields_dict(self.default_fields) 

425 clone.overloaded_fields = self.overloaded_fields.copy() 

426 clone.underlayer = self.underlayer 

427 clone.parent = self.parent 

428 clone.explicit = self.explicit 

429 clone.raw_packet_cache = self.raw_packet_cache 

430 clone.raw_packet_cache_fields = self.copy_fields_dict( 

431 self.raw_packet_cache_fields 

432 ) 

433 clone.wirelen = self.wirelen 

434 clone.post_transforms = self.post_transforms[:] 

435 clone.payload = self.payload.copy() 

436 clone.payload.add_underlayer(clone) 

437 clone.time = self.time 

438 clone.comment = self.comment 

439 clone.direction = self.direction 

440 clone.sniffed_on = self.sniffed_on 

441 return clone 

442 

443 def _resolve_alias(self, attr): 

444 # type: (str) -> str 

445 new_attr, version = self.deprecated_fields[attr] 

446 warnings.warn( 

447 "%s has been deprecated in favor of %s since %s !" % ( 

448 attr, new_attr, version 

449 ), DeprecationWarning 

450 ) 

451 return new_attr 

452 

453 def getfieldval(self, attr): 

454 # type: (str) -> Any 

455 if self.deprecated_fields and attr in self.deprecated_fields: 

456 attr = self._resolve_alias(attr) 

457 if attr in self.fields: 

458 return self.fields[attr] 

459 if attr in self.overloaded_fields: 

460 return self.overloaded_fields[attr] 

461 if attr in self.default_fields: 

462 return self.default_fields[attr] 

463 return self.payload.getfieldval(attr) 

464 

465 def getfield_and_val(self, attr): 

466 # type: (str) -> Tuple[AnyField, Any] 

467 if self.deprecated_fields and attr in self.deprecated_fields: 

468 attr = self._resolve_alias(attr) 

469 if attr in self.fields: 

470 return self.get_field(attr), self.fields[attr] 

471 if attr in self.overloaded_fields: 

472 return self.get_field(attr), self.overloaded_fields[attr] 

473 if attr in self.default_fields: 

474 return self.get_field(attr), self.default_fields[attr] 

475 raise ValueError 

476 

477 def __getattr__(self, attr): 

478 # type: (str) -> Any 

479 try: 

480 fld, v = self.getfield_and_val(attr) 

481 except ValueError: 

482 return self.payload.__getattr__(attr) 

483 if fld is not None: 

484 return v if isinstance(v, RawVal) else fld.i2h(self, v) 

485 return v 

486 

487 def setfieldval(self, attr, val): 

488 # type: (str, Any) -> None 

489 if self.deprecated_fields and attr in self.deprecated_fields: 

490 attr = self._resolve_alias(attr) 

491 if attr in self.default_fields: 

492 fld = self.get_field(attr) 

493 if fld is None: 

494 any2i = lambda x, y: y # type: Callable[..., Any] 

495 else: 

496 any2i = fld.any2i 

497 self.fields[attr] = val if isinstance(val, RawVal) else \ 

498 any2i(self, val) 

499 self.explicit = 0 

500 self.raw_packet_cache = None 

501 self.raw_packet_cache_fields = None 

502 self.wirelen = None 

503 elif attr == "payload": 

504 self.remove_payload() 

505 self.add_payload(val) 

506 else: 

507 self.payload.setfieldval(attr, val) 

508 

509 def __setattr__(self, attr, val): 

510 # type: (str, Any) -> None 

511 if attr in self.__all_slots__: 

512 return object.__setattr__(self, attr, val) 

513 try: 

514 return self.setfieldval(attr, val) 

515 except AttributeError: 

516 pass 

517 return object.__setattr__(self, attr, val) 

518 

519 def delfieldval(self, attr): 

520 # type: (str) -> None 

521 if attr in self.fields: 

522 del self.fields[attr] 

523 self.explicit = 0 # in case a default value must be explicit 

524 self.raw_packet_cache = None 

525 self.raw_packet_cache_fields = None 

526 self.wirelen = None 

527 elif attr in self.default_fields: 

528 pass 

529 elif attr == "payload": 

530 self.remove_payload() 

531 else: 

532 self.payload.delfieldval(attr) 

533 

534 def __delattr__(self, attr): 

535 # type: (str) -> None 

536 if attr == "payload": 

537 return self.remove_payload() 

538 if attr in self.__all_slots__: 

539 return object.__delattr__(self, attr) 

540 try: 

541 return self.delfieldval(attr) 

542 except AttributeError: 

543 pass 

544 return object.__delattr__(self, attr) 

545 

546 def _superdir(self): 

547 # type: () -> Set[str] 

548 """ 

549 Return a list of slots and methods, including those from subclasses. 

550 """ 

551 attrs = set() # type: Set[str] 

552 cls = self.__class__ 

553 if hasattr(cls, '__all_slots__'): 

554 attrs.update(cls.__all_slots__) 

555 for bcls in cls.__mro__: 

556 if hasattr(bcls, '__dict__'): 

557 attrs.update(bcls.__dict__) 

558 return attrs 

559 

560 def __dir__(self): 

561 # type: () -> List[str] 

562 """ 

563 Add fields to tab completion list. 

564 """ 

565 return sorted(itertools.chain(self._superdir(), self.default_fields)) 

566 

567 def __repr__(self): 

568 # type: () -> str 

569 s = "" 

570 ct = conf.color_theme 

571 for f in self.fields_desc: 

572 if isinstance(f, ConditionalField) and not f._evalcond(self): 

573 continue 

574 if f.name in self.fields: 

575 fval = self.fields[f.name] 

576 if isinstance(fval, (list, dict, set)) and len(fval) == 0: 

577 continue 

578 val = f.i2repr(self, fval) 

579 elif f.name in self.overloaded_fields: 

580 fover = self.overloaded_fields[f.name] 

581 if isinstance(fover, (list, dict, set)) and len(fover) == 0: 

582 continue 

583 val = f.i2repr(self, fover) 

584 else: 

585 continue 

586 if isinstance(f, Emph) or f in conf.emph: 

587 ncol = ct.emph_field_name 

588 vcol = ct.emph_field_value 

589 else: 

590 ncol = ct.field_name 

591 vcol = ct.field_value 

592 

593 s += " %s%s%s" % (ncol(f.name), 

594 ct.punct("="), 

595 vcol(val)) 

596 return "%s%s %s %s%s%s" % (ct.punct("<"), 

597 ct.layer_name(self.__class__.__name__), 

598 s, 

599 ct.punct("|"), 

600 repr(self.payload), 

601 ct.punct(">")) 

602 

603 def __str__(self): 

604 # type: () -> str 

605 return self.summary() 

606 

607 def __bytes__(self): 

608 # type: () -> bytes 

609 return self.build() 

610 

611 def __div__(self, other): 

612 # type: (Any) -> Self 

613 if isinstance(other, Packet): 

614 cloneA = self.copy() 

615 cloneB = other.copy() 

616 cloneA.add_payload(cloneB) 

617 return cloneA 

618 elif isinstance(other, (bytes, str, bytearray, memoryview)): 

619 return self / conf.raw_layer(load=bytes_encode(other)) 

620 else: 

621 return other.__rdiv__(self) # type: ignore 

622 __truediv__ = __div__ 

623 

624 def __rdiv__(self, other): 

625 # type: (Any) -> Packet 

626 if isinstance(other, (bytes, str, bytearray, memoryview)): 

627 return conf.raw_layer(load=bytes_encode(other)) / self 

628 else: 

629 raise TypeError 

630 __rtruediv__ = __rdiv__ 

631 

632 def __mul__(self, other): 

633 # type: (Any) -> List[Packet] 

634 if isinstance(other, int): 

635 return [self] * other 

636 else: 

637 raise TypeError 

638 

639 def __rmul__(self, other): 

640 # type: (Any) -> List[Packet] 

641 return self.__mul__(other) 

642 

643 def __nonzero__(self): 

644 # type: () -> bool 

645 return True 

646 __bool__ = __nonzero__ 

647 

648 def __len__(self): 

649 # type: () -> int 

650 return len(self.__bytes__()) 

651 

652 def copy_field_value(self, fieldname, value): 

653 # type: (str, Any) -> Any 

654 return self.get_field(fieldname).do_copy(value) 

655 

656 def copy_fields_dict(self, fields): 

657 # type: (_T) -> _T 

658 if fields is None: 

659 return None 

660 return {fname: self.copy_field_value(fname, fval) 

661 for fname, fval in fields.items()} 

662 

663 def _raw_packet_cache_field_value(self, fld, val, copy=False): 

664 # type: (AnyField, Any, bool) -> Optional[Any] 

665 """Get a value representative of a mutable field to detect changes""" 

666 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any] 

667 if fld.holds_packets: 

668 # avoid copying whole packets (perf: #GH3894) 

669 if fld.islist: 

670 return [ 

671 (_cpy(x.fields), x.payload.raw_packet_cache) for x in val 

672 ] 

673 else: 

674 return (_cpy(val.fields), val.payload.raw_packet_cache) 

675 elif fld.islist or fld.ismutable: 

676 return _cpy(val) 

677 return None 

678 

679 def clear_cache(self): 

680 # type: () -> None 

681 """Clear the raw packet cache for the field and all its subfields""" 

682 self.raw_packet_cache = None 

683 for fname, fval in self.fields.items(): 

684 fld = self.get_field(fname) 

685 if fld.holds_packets: 

686 if isinstance(fval, Packet): 

687 fval.clear_cache() 

688 elif isinstance(fval, list): 

689 for fsubval in fval: 

690 fsubval.clear_cache() 

691 self.payload.clear_cache() 

692 

693 def self_build(self): 

694 # type: () -> bytes 

695 """ 

696 Create the default layer regarding fields_desc dict 

697 """ 

698 if self.raw_packet_cache is not None and \ 

699 self.raw_packet_cache_fields is not None: 

700 for fname, fval in self.raw_packet_cache_fields.items(): 

701 fld, val = self.getfield_and_val(fname) 

702 if self._raw_packet_cache_field_value(fld, val) != fval: 

703 self.raw_packet_cache = None 

704 self.raw_packet_cache_fields = None 

705 self.wirelen = None 

706 break 

707 if self.raw_packet_cache is not None: 

708 return self.raw_packet_cache 

709 p = b"" 

710 for f in self.fields_desc: 

711 val = self.getfieldval(f.name) 

712 if isinstance(val, RawVal): 

713 p += bytes(val) 

714 else: 

715 try: 

716 p = f.addfield(self, p, val) 

717 except Exception as ex: 

718 try: 

719 ex.args = ( 

720 "While dissecting field '%s': " % f.name + 

721 ex.args[0], 

722 ) + ex.args[1:] 

723 except (AttributeError, IndexError): 

724 pass 

725 raise ex 

726 return p 

727 

728 def do_build_payload(self): 

729 # type: () -> bytes 

730 """ 

731 Create the default version of the payload layer 

732 

733 :return: a string of payload layer 

734 """ 

735 return self.payload.do_build() 

736 

737 def do_build(self): 

738 # type: () -> bytes 

739 """ 

740 Create the default version of the layer 

741 

742 :return: a string of the packet with the payload 

743 """ 

744 if not self.explicit: 

745 self = next(iter(self)) 

746 pkt = self.self_build() 

747 for t in self.post_transforms: 

748 pkt = t(pkt) 

749 pay = self.do_build_payload() 

750 if self.raw_packet_cache is None: 

751 return self.post_build(pkt, pay) 

752 else: 

753 return pkt + pay 

754 

755 def build_padding(self): 

756 # type: () -> bytes 

757 return self.payload.build_padding() 

758 

759 def build(self): 

760 # type: () -> bytes 

761 """ 

762 Create the current layer 

763 

764 :return: string of the packet with the payload 

765 """ 

766 p = self.do_build() 

767 p += self.build_padding() 

768 p = self.build_done(p) 

769 return p 

770 

771 def post_build(self, pkt, pay): 

772 # type: (bytes, bytes) -> bytes 

773 """ 

774 DEV: called right after the current layer is build. 

775 

776 :param str pkt: the current packet (build by self_build function) 

777 :param str pay: the packet payload (build by do_build_payload function) 

778 :return: a string of the packet with the payload 

779 """ 

780 return pkt + pay 

781 

782 def build_done(self, p): 

783 # type: (bytes) -> bytes 

784 return self.payload.build_done(p) 

785 

786 def do_build_ps(self): 

787 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501 

788 p = b"" 

789 pl = [] 

790 q = b"" 

791 for f in self.fields_desc: 

792 if isinstance(f, ConditionalField) and not f._evalcond(self): 

793 continue 

794 p = f.addfield(self, p, self.getfieldval(f.name)) 

795 if isinstance(p, bytes): 

796 r = p[len(q):] 

797 q = p 

798 else: 

799 r = b"" 

800 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r)) 

801 

802 pkt, lst = self.payload.build_ps(internal=1) 

803 p += pkt 

804 lst.append((self, pl)) 

805 

806 return p, lst 

807 

808 def build_ps(self, internal=0): 

809 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501 

810 p, lst = self.do_build_ps() 

811# if not internal: 

812# pkt = self 

813# while pkt.haslayer(conf.padding_layer): 

814# pkt = pkt.getlayer(conf.padding_layer) 

815# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) ) 

816# p += pkt.load 

817# pkt = pkt.payload 

818 return p, lst 

819 

820 def canvas_dump(self, layer_shift=0, rebuild=1): 

821 # type: (int, int) -> pyx.canvas.canvas 

822 if PYX == 0: 

823 raise ImportError("PyX and its dependencies must be installed") 

824 canvas = pyx.canvas.canvas() 

825 if rebuild: 

826 _, t = self.__class__(raw(self)).build_ps() 

827 else: 

828 _, t = self.build_ps() 

829 YTXTI = len(t) 

830 for _, l in t: 

831 YTXTI += len(l) 

832 YTXT = float(YTXTI) 

833 YDUMP = YTXT 

834 

835 XSTART = 1 

836 XDSTART = 10 

837 y = 0.0 

838 yd = 0.0 

839 XMUL = 0.55 

840 YMUL = 0.4 

841 

842 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb) 

843 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb) 

844# backcolor=makecol(0.376, 0.729, 0.525, 1.0) 

845 

846 def hexstr(x): 

847 # type: (bytes) -> str 

848 return " ".join("%02x" % orb(c) for c in x) 

849 

850 def make_dump_txt(x, y, txt): 

851 # type: (int, float, bytes) -> pyx.text.text 

852 return pyx.text.text( 

853 XDSTART + x * XMUL, 

854 (YDUMP - y) * YMUL, 

855 r"\tt{%s}" % hexstr(txt), 

856 [pyx.text.size.Large] 

857 ) 

858 

859 def make_box(o): 

860 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox 

861 return pyx.box.rect( 

862 o.left(), o.bottom(), o.width(), o.height(), 

863 relcenter=(0.5, 0.5) 

864 ) 

865 

866 def make_frame(lst): 

867 # type: (List[Any]) -> pyx.path.path 

868 if len(lst) == 1: 

869 b = lst[0].bbox() 

870 b.enlarge(pyx.unit.u_pt) 

871 return b.path() 

872 else: 

873 fb = lst[0].bbox() 

874 fb.enlarge(pyx.unit.u_pt) 

875 lb = lst[-1].bbox() 

876 lb.enlarge(pyx.unit.u_pt) 

877 if len(lst) == 2 and fb.left() > lb.right(): 

878 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()), 

879 pyx.path.lineto(fb.left(), fb.top()), 

880 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501 

881 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501 

882 pyx.path.moveto(lb.left(), lb.top()), 

883 pyx.path.lineto(lb.right(), lb.top()), 

884 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 

885 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501 

886 else: 

887 # XXX 

888 gb = lst[1].bbox() 

889 if gb != lb: 

890 gb.enlarge(pyx.unit.u_pt) 

891 kb = lst[-2].bbox() 

892 if kb != gb and kb != lb: 

893 kb.enlarge(pyx.unit.u_pt) 

894 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()), 

895 pyx.path.lineto(fb.right(), fb.top()), 

896 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501 

897 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501 

898 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 

899 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501 

900 pyx.path.lineto(lb.left(), gb.top()), 

901 pyx.path.lineto(fb.left(), gb.top()), 

902 pyx.path.closepath(),) 

903 

904 def make_dump(s, # type: bytes 

905 shift=0, # type: int 

906 y=0., # type: float 

907 col=None, # type: pyx.color.color 

908 bkcol=None, # type: pyx.color.color 

909 large=16 # type: int 

910 ): 

911 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501 

912 c = pyx.canvas.canvas() 

913 tlist = [] 

914 while s: 

915 dmp, s = s[:large - shift], s[large - shift:] 

916 txt = make_dump_txt(shift, y, dmp) 

917 tlist.append(txt) 

918 shift += len(dmp) 

919 if shift >= 16: 

920 shift = 0 

921 y += 1 

922 if col is None: 

923 col = pyx.color.rgb.red 

924 if bkcol is None: 

925 bkcol = pyx.color.rgb.white 

926 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501 

927 for txt in tlist: 

928 c.insert(txt) 

929 return c, tlist[-1].bbox(), shift, y 

930 

931 last_shift, last_y = 0, 0.0 

932 while t: 

933 bkcol = next(backcolor) 

934 proto, fields = t.pop() 

935 y += 0.5 

936 pt = pyx.text.text( 

937 XSTART, 

938 (YTXT - y) * YMUL, 

939 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape( 

940 str(proto.name) 

941 ), 

942 [pyx.text.size.Large] 

943 ) 

944 y += 1 

945 ptbb = pt.bbox() 

946 ptbb.enlarge(pyx.unit.u_pt * 2) 

947 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501 

948 canvas.insert(pt) 

949 for field, fval, fdump in fields: 

950 col = next(forecolor) 

951 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501 

952 if isinstance(field, BitField): 

953 fsize = '%sb' % field.size 

954 else: 

955 fsize = '%sB' % len(fdump) 

956 if (hasattr(field, 'field') and 

957 'LE' in field.field.__class__.__name__[:3] or 

958 'LE' in field.__class__.__name__[:3]): 

959 fsize = r'$\scriptstyle\langle$' + fsize 

960 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501 

961 if isinstance(fval, str): 

962 if len(fval) > 18: 

963 fval = fval[:18] + "[...]" 

964 else: 

965 fval = "" 

966 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501 

967 y += 1.0 

968 if fdump: 

969 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501 

970 

971 dtb = target 

972 vtb = vt.bbox() 

973 bxvt = make_box(vtb) 

974 bxdt = make_box(dtb) 

975 dtb.enlarge(pyx.unit.u_pt) 

976 try: 

977 if yd < 0: 

978 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501 

979 else: 

980 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501 

981 except Exception: 

982 pass 

983 else: 

984 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501 

985 

986 canvas.insert(dt) 

987 

988 canvas.insert(ft) 

989 canvas.insert(st) 

990 canvas.insert(vt) 

991 last_y += layer_shift 

992 

993 return canvas 

994 

995 def extract_padding(self, s): 

996 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 

997 """ 

998 DEV: to be overloaded to extract current layer's padding. 

999 

1000 :param str s: the current layer 

1001 :return: a couple of strings (actual layer, padding) 

1002 """ 

1003 return s, None 

1004 

1005 def post_dissect(self, s): 

1006 # type: (bytes) -> bytes 

1007 """DEV: is called right after the current layer has been dissected""" 

1008 return s 

1009 

1010 def pre_dissect(self, s): 

1011 # type: (bytes) -> bytes 

1012 """DEV: is called right before the current layer is dissected""" 

1013 return s 

1014 

1015 def do_dissect(self, s): 

1016 # type: (bytes) -> bytes 

1017 _raw = s 

1018 self.raw_packet_cache_fields = {} 

1019 for f in self.fields_desc: 

1020 s, fval = f.getfield(self, s) 

1021 # Skip unused ConditionalField 

1022 if isinstance(f, ConditionalField) and fval is None: 

1023 continue 

1024 # We need to track fields with mutable values to discard 

1025 # .raw_packet_cache when needed. 

1026 if (f.islist or f.holds_packets or f.ismutable) and fval is not None: 

1027 self.raw_packet_cache_fields[f.name] = \ 

1028 self._raw_packet_cache_field_value(f, fval, copy=True) 

1029 self.fields[f.name] = fval 

1030 # Nothing left to dissect 

1031 if not s and (isinstance(f, MayEnd) or 

1032 (fval is not None and isinstance(f, ConditionalField) and 

1033 isinstance(f.fld, MayEnd))): 

1034 break 

1035 self.raw_packet_cache = _raw[:-len(s)] if s else _raw 

1036 self.explicit = 1 

1037 return s 

1038 

1039 def do_dissect_payload(self, s): 

1040 # type: (bytes) -> None 

1041 """ 

1042 Perform the dissection of the layer's payload 

1043 

1044 :param str s: the raw layer 

1045 """ 

1046 if s: 

1047 if ( 

1048 self.stop_dissection_after and 

1049 isinstance(self, self.stop_dissection_after) 

1050 ): 

1051 # stop dissection here 

1052 p = conf.raw_layer(s, _internal=1, _underlayer=self) 

1053 self.add_payload(p) 

1054 return 

1055 cls = self.guess_payload_class(s) 

1056 try: 

1057 p = cls( 

1058 s, 

1059 stop_dissection_after=self.stop_dissection_after, 

1060 _internal=1, 

1061 _underlayer=self, 

1062 ) 

1063 except KeyboardInterrupt: 

1064 raise 

1065 except Exception: 

1066 if conf.debug_dissector: 

1067 if issubtype(cls, Packet): 

1068 log_runtime.error("%s dissector failed", cls.__name__) 

1069 else: 

1070 log_runtime.error("%s.guess_payload_class() returned " 

1071 "[%s]", 

1072 self.__class__.__name__, repr(cls)) 

1073 if cls is not None: 

1074 raise 

1075 p = conf.raw_layer(s, _internal=1, _underlayer=self) 

1076 self.add_payload(p) 

1077 

1078 def dissect(self, s): 

1079 # type: (bytes) -> None 

1080 s = self.pre_dissect(s) 

1081 

1082 s = self.do_dissect(s) 

1083 

1084 s = self.post_dissect(s) 

1085 

1086 payl, pad = self.extract_padding(s) 

1087 self.do_dissect_payload(payl) 

1088 if pad and conf.padding: 

1089 self.add_payload(conf.padding_layer(pad)) 

1090 

1091 def guess_payload_class(self, payload): 

1092 # type: (bytes) -> Type[Packet] 

1093 """ 

1094 DEV: Guesses the next payload class from layer bonds. 

1095 Can be overloaded to use a different mechanism. 

1096 

1097 :param str payload: the layer's payload 

1098 :return: the payload class 

1099 """ 

1100 for t in self.aliastypes: 

1101 for fval, cls in t.payload_guess: 

1102 try: 

1103 if all(v == self.getfieldval(k) 

1104 for k, v in fval.items()): 

1105 return cls # type: ignore 

1106 except AttributeError: 

1107 pass 

1108 return self.default_payload_class(payload) 

1109 

1110 def default_payload_class(self, payload): 

1111 # type: (bytes) -> Type[Packet] 

1112 """ 

1113 DEV: Returns the default payload class if nothing has been found by the 

1114 guess_payload_class() method. 

1115 

1116 :param str payload: the layer's payload 

1117 :return: the default payload class define inside the configuration file 

1118 """ 

1119 return conf.raw_layer 

1120 

1121 def hide_defaults(self): 

1122 # type: () -> None 

1123 """Removes fields' values that are the same as default values.""" 

1124 # use list(): self.fields is modified in the loop 

1125 for k, v in list(self.fields.items()): 

1126 v = self.fields[k] 

1127 if k in self.default_fields: 

1128 if self.default_fields[k] == v: 

1129 del self.fields[k] 

1130 self.payload.hide_defaults() 

1131 

1132 def clone_with(self, payload=None, **kargs): 

1133 # type: (Optional[Any], **Any) -> Any 

1134 pkt = self.__class__() 

1135 pkt.explicit = 1 

1136 pkt.fields = kargs 

1137 pkt.default_fields = self.copy_fields_dict(self.default_fields) 

1138 pkt.overloaded_fields = self.overloaded_fields.copy() 

1139 pkt.time = self.time 

1140 pkt.underlayer = self.underlayer 

1141 pkt.parent = self.parent 

1142 pkt.post_transforms = self.post_transforms 

1143 pkt.raw_packet_cache = self.raw_packet_cache 

1144 pkt.raw_packet_cache_fields = self.copy_fields_dict( 

1145 self.raw_packet_cache_fields 

1146 ) 

1147 pkt.wirelen = self.wirelen 

1148 pkt.comment = self.comment 

1149 pkt.sniffed_on = self.sniffed_on 

1150 pkt.direction = self.direction 

1151 if payload is not None: 

1152 pkt.add_payload(payload) 

1153 return pkt 

1154 

1155 def __iter__(self): 

1156 # type: () -> Iterator[Packet] 

1157 """Iterates through all sub-packets generated by this Packet.""" 

1158 def loop(todo, done, self=self): 

1159 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet] 

1160 if todo: 

1161 eltname = todo.pop() 

1162 elt = self.getfieldval(eltname) 

1163 if not isinstance(elt, Gen): 

1164 if self.get_field(eltname).islist: 

1165 elt = SetGen([elt]) 

1166 else: 

1167 elt = SetGen(elt) 

1168 for e in elt: 

1169 done[eltname] = e 

1170 for x in loop(todo[:], done): 

1171 yield x 

1172 else: 

1173 if isinstance(self.payload, NoPayload): 

1174 payloads = SetGen([None]) # type: SetGen[Packet] 

1175 else: 

1176 payloads = self.payload 

1177 for payl in payloads: 

1178 # Let's make sure subpackets are consistent 

1179 done2 = done.copy() 

1180 for k in done2: 

1181 if isinstance(done2[k], VolatileValue): 

1182 done2[k] = done2[k]._fix() 

1183 pkt = self.clone_with(payload=payl, **done2) 

1184 yield pkt 

1185 

1186 if self.explicit or self.raw_packet_cache is not None: 

1187 todo = [] 

1188 done = self.fields 

1189 else: 

1190 todo = [k for (k, v) in itertools.chain(self.default_fields.items(), 

1191 self.overloaded_fields.items()) 

1192 if isinstance(v, VolatileValue)] + list(self.fields) 

1193 done = {} 

1194 return loop(todo, done) 

1195 

1196 def iterpayloads(self): 

1197 # type: () -> Iterator[Packet] 

1198 """Used to iter through the payloads of a Packet. 

1199 Useful for DNS or 802.11 for instance. 

1200 """ 

1201 yield self 

1202 current = self 

1203 while current.payload: 

1204 current = current.payload 

1205 yield current 

1206 

1207 def __gt__(self, other): 

1208 # type: (Packet) -> int 

1209 """True if other is an answer from self (self ==> other).""" 

1210 if isinstance(other, Packet): 

1211 return other < self 

1212 elif isinstance(other, bytes): 

1213 return 1 

1214 else: 

1215 raise TypeError((self, other)) 

1216 

1217 def __lt__(self, other): 

1218 # type: (Packet) -> int 

1219 """True if self is an answer from other (other ==> self).""" 

1220 if isinstance(other, Packet): 

1221 return self.answers(other) 

1222 elif isinstance(other, bytes): 

1223 return 1 

1224 else: 

1225 raise TypeError((self, other)) 

1226 

1227 def __eq__(self, other): 

1228 # type: (Any) -> bool 

1229 if not isinstance(other, self.__class__): 

1230 return False 

1231 for f in self.fields_desc: 

1232 if f not in other.fields_desc: 

1233 return False 

1234 if self.getfieldval(f.name) != other.getfieldval(f.name): 

1235 return False 

1236 return self.payload == other.payload 

1237 

1238 def __ne__(self, other): 

1239 # type: (Any) -> bool 

1240 return not self.__eq__(other) 

1241 

1242 # Note: setting __hash__ to None is the standard way 

1243 # of making an object un-hashable. mypy doesn't know that 

1244 __hash__ = None # type: ignore 

1245 

1246 def hashret(self): 

1247 # type: () -> bytes 

1248 """DEV: returns a string that has the same value for a request 

1249 and its answer.""" 

1250 return self.payload.hashret() 

1251 

1252 def answers(self, other): 

1253 # type: (Packet) -> int 

1254 """DEV: true if self is an answer from other""" 

1255 if other.__class__ == self.__class__: 

1256 return self.payload.answers(other.payload) 

1257 return 0 

1258 

1259 def layers(self): 

1260 # type: () -> List[Type[Packet]] 

1261 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501 

1262 layers = [] 

1263 lyr = self # type: Optional[Packet] 

1264 while lyr: 

1265 layers.append(lyr.__class__) 

1266 lyr = lyr.payload.getlayer(0, _subclass=True) 

1267 return layers 

1268 

1269 def haslayer(self, cls, _subclass=None): 

1270 # type: (Union[Type[Packet], str], Optional[bool]) -> int 

1271 """ 

1272 true if self has a layer that is an instance of cls. 

1273 Superseded by "cls in self" syntax. 

1274 """ 

1275 if _subclass is None: 

1276 _subclass = self.match_subclass or None 

1277 if _subclass: 

1278 match = issubtype 

1279 else: 

1280 match = lambda x, t: bool(x == t) 

1281 if cls is None or match(self.__class__, cls) \ 

1282 or cls in [self.__class__.__name__, self._name]: 

1283 return True 

1284 for f in self.packetfields: 

1285 fvalue_gen = self.getfieldval(f.name) 

1286 if fvalue_gen is None: 

1287 continue 

1288 if not f.islist: 

1289 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 

1290 for fvalue in fvalue_gen: 

1291 if isinstance(fvalue, Packet): 

1292 ret = fvalue.haslayer(cls, _subclass=_subclass) 

1293 if ret: 

1294 return ret 

1295 return self.payload.haslayer(cls, _subclass=_subclass) 

1296 

1297 def getlayer(self, 

1298 cls, # type: Union[int, Type[Packet], str] 

1299 nb=1, # type: int 

1300 _track=None, # type: Optional[List[int]] 

1301 _subclass=None, # type: Optional[bool] 

1302 **flt # type: Any 

1303 ): 

1304 # type: (...) -> Optional[Packet] 

1305 """Return the nb^th layer that is an instance of cls, matching flt 

1306values. 

1307 """ 

1308 if _subclass is None: 

1309 _subclass = self.match_subclass or None 

1310 if _subclass: 

1311 match = issubtype 

1312 else: 

1313 match = lambda x, t: bool(x == t) 

1314 # Note: 

1315 # cls can be int, packet, str 

1316 # string_class_name can be packet, str (packet or packet+field) 

1317 # class_name can be packet, str (packet only) 

1318 if isinstance(cls, int): 

1319 nb = cls + 1 

1320 string_class_name = "" # type: Union[Type[Packet], str] 

1321 else: 

1322 string_class_name = cls 

1323 class_name = "" # type: Union[Type[Packet], str] 

1324 fld = None # type: Optional[str] 

1325 if isinstance(string_class_name, str) and "." in string_class_name: 

1326 class_name, fld = string_class_name.split(".", 1) 

1327 else: 

1328 class_name, fld = string_class_name, None 

1329 if not class_name or match(self.__class__, class_name) \ 

1330 or class_name in [self.__class__.__name__, self._name]: 

1331 if all(self.getfieldval(fldname) == fldvalue 

1332 for fldname, fldvalue in flt.items()): 

1333 if nb == 1: 

1334 if fld is None: 

1335 return self 

1336 else: 

1337 return self.getfieldval(fld) # type: ignore 

1338 else: 

1339 nb -= 1 

1340 for f in self.packetfields: 

1341 fvalue_gen = self.getfieldval(f.name) 

1342 if fvalue_gen is None: 

1343 continue 

1344 if not f.islist: 

1345 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 

1346 for fvalue in fvalue_gen: 

1347 if isinstance(fvalue, Packet): 

1348 track = [] # type: List[int] 

1349 ret = fvalue.getlayer(class_name, nb=nb, _track=track, 

1350 _subclass=_subclass, **flt) 

1351 if ret is not None: 

1352 return ret 

1353 nb = track[0] 

1354 return self.payload.getlayer(class_name, nb=nb, _track=_track, 

1355 _subclass=_subclass, **flt) 

1356 

1357 def firstlayer(self): 

1358 # type: () -> Packet 

1359 q = self 

1360 while q.underlayer is not None: 

1361 q = q.underlayer 

1362 return q 

1363 

1364 def __getitem__(self, cls): 

1365 # type: (Union[Type[Packet], str]) -> Any 

1366 if isinstance(cls, slice): 

1367 lname = cls.start 

1368 if cls.stop: 

1369 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {})) 

1370 else: 

1371 ret = self.getlayer(cls.start, **(cls.step or {})) 

1372 else: 

1373 lname = cls 

1374 ret = self.getlayer(cls) 

1375 if ret is None: 

1376 if isinstance(lname, type): 

1377 name = lname.__name__ 

1378 elif not isinstance(lname, bytes): 

1379 name = repr(lname) 

1380 else: 

1381 name = cast(str, lname) 

1382 raise IndexError("Layer [%s] not found" % name) 

1383 return ret 

1384 

1385 def __delitem__(self, cls): 

1386 # type: (Type[Packet]) -> None 

1387 del self[cls].underlayer.payload 

1388 

1389 def __setitem__(self, cls, val): 

1390 # type: (Type[Packet], Packet) -> None 

1391 self[cls].underlayer.payload = val 

1392 

1393 def __contains__(self, cls): 

1394 # type: (Union[Type[Packet], str]) -> int 

1395 """ 

1396 "cls in self" returns true if self has a layer which is an 

1397 instance of cls. 

1398 """ 

1399 return self.haslayer(cls) 

1400 

1401 def route(self): 

1402 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]] 

1403 return self.payload.route() 

1404 

1405 def fragment(self, *args, **kargs): 

1406 # type: (*Any, **Any) -> List[Packet] 

1407 return self.payload.fragment(*args, **kargs) 

1408 

1409 def display(self, *args, **kargs): # Deprecated. Use show() 

1410 # type: (*Any, **Any) -> None 

1411 """Deprecated. Use show() method.""" 

1412 self.show(*args, **kargs) 

1413 

1414 def _show_or_dump(self, 

1415 dump=False, # type: bool 

1416 indent=3, # type: int 

1417 lvl="", # type: str 

1418 label_lvl="", # type: str 

1419 first_call=True # type: bool 

1420 ): 

1421 # type: (...) -> Optional[str] 

1422 """ 

1423 Internal method that shows or dumps a hierarchical view of a packet. 

1424 Called by show. 

1425 

1426 :param dump: determine if it prints or returns the string value 

1427 :param int indent: the size of indentation for each layer 

1428 :param str lvl: additional information about the layer lvl 

1429 :param str label_lvl: additional information about the layer fields 

1430 :param first_call: determine if the current function is the first 

1431 :return: return a hierarchical view if dump, else print it 

1432 """ 

1433 

1434 if dump: 

1435 from scapy.themes import ColorTheme, AnsiColorTheme 

1436 ct: ColorTheme = AnsiColorTheme() # No color for dump output 

1437 else: 

1438 ct = conf.color_theme 

1439 s = "%s%s %s %s\n" % (label_lvl, 

1440 ct.punct("###["), 

1441 ct.layer_name(self.name), 

1442 ct.punct("]###")) 

1443 fields = self.fields_desc.copy() 

1444 while fields: 

1445 f = fields.pop(0) 

1446 if isinstance(f, ConditionalField) and not f._evalcond(self): 

1447 continue 

1448 if hasattr(f, "fields"): # Field has subfields 

1449 s += "%s %s =\n" % ( 

1450 label_lvl + lvl, 

1451 ct.depreciate_field_name(f.name), 

1452 ) 

1453 lvl += " " * indent * self.show_indent 

1454 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)): 

1455 fields.insert(i, fld) 

1456 continue 

1457 if isinstance(f, Emph) or f in conf.emph: 

1458 ncol = ct.emph_field_name 

1459 vcol = ct.emph_field_value 

1460 else: 

1461 ncol = ct.field_name 

1462 vcol = ct.field_value 

1463 pad = max(0, 10 - len(f.name)) * " " 

1464 fvalue = self.getfieldval(f.name) 

1465 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501 

1466 s += "%s %s%s%s%s\n" % (label_lvl + lvl, 

1467 ct.punct("\\"), 

1468 ncol(f.name), 

1469 pad, 

1470 ct.punct("\\")) 

1471 fvalue_gen = SetGen( 

1472 fvalue, 

1473 _iterpacket=0 

1474 ) # type: SetGen[Packet] 

1475 for fvalue in fvalue_gen: 

1476 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501 

1477 else: 

1478 begn = "%s %s%s%s " % (label_lvl + lvl, 

1479 ncol(f.name), 

1480 pad, 

1481 ct.punct("="),) 

1482 reprval = f.i2repr(self, fvalue) 

1483 if isinstance(reprval, str): 

1484 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501 

1485 len(lvl) + 

1486 len(f.name) + 

1487 4)) 

1488 s += "%s%s\n" % (begn, vcol(reprval)) 

1489 if self.payload: 

1490 s += self.payload._show_or_dump( # type: ignore 

1491 dump=dump, 

1492 indent=indent, 

1493 lvl=lvl + (" " * indent * self.show_indent), 

1494 label_lvl=label_lvl, 

1495 first_call=False 

1496 ) 

1497 

1498 if first_call and not dump: 

1499 print(s) 

1500 return None 

1501 else: 

1502 return s 

1503 

1504 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 

1505 # type: (bool, int, str, str) -> Optional[Any] 

1506 """ 

1507 Prints or returns (when "dump" is true) a hierarchical view of the 

1508 packet. 

1509 

1510 :param dump: determine if it prints or returns the string value 

1511 :param int indent: the size of indentation for each layer 

1512 :param str lvl: additional information about the layer lvl 

1513 :param str label_lvl: additional information about the layer fields 

1514 :return: return a hierarchical view if dump, else print it 

1515 """ 

1516 return self._show_or_dump(dump, indent, lvl, label_lvl) 

1517 

1518 def show2(self, dump=False, indent=3, lvl="", label_lvl=""): 

1519 # type: (bool, int, str, str) -> Optional[Any] 

1520 """ 

1521 Prints or returns (when "dump" is true) a hierarchical view of an 

1522 assembled version of the packet, so that automatic fields are 

1523 calculated (checksums, etc.) 

1524 

1525 :param dump: determine if it prints or returns the string value 

1526 :param int indent: the size of indentation for each layer 

1527 :param str lvl: additional information about the layer lvl 

1528 :param str label_lvl: additional information about the layer fields 

1529 :return: return a hierarchical view if dump, else print it 

1530 """ 

1531 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl) 

1532 

1533 def sprintf(self, fmt, relax=1): 

1534 # type: (str, int) -> str 

1535 """ 

1536 sprintf(format, [relax=1]) -> str 

1537 

1538 Where format is a string that can include directives. A directive 

1539 begins and ends by % and has the following format: 

1540 ``%[fmt[r],][cls[:nb].]field%`` 

1541 

1542 :param fmt: is a classic printf directive, "r" can be appended for raw 

1543 substitution: 

1544 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer 

1545 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer). 

1546 Special case : "%.time%" is the creation time. 

1547 Ex:: 

1548 

1549 p.sprintf( 

1550 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% " 

1551 "%03xr,IP.proto% %r,TCP.flags%" 

1552 ) 

1553 

1554 Moreover, the format string can include conditional statements. A 

1555 conditional statement looks like : {layer:string} where layer is a 

1556 layer name, and string is the string to insert in place of the 

1557 condition if it is true, i.e. if layer is present. If layer is 

1558 preceded by a "!", the result is inverted. Conditions can be 

1559 imbricated. A valid statement can be:: 

1560 

1561 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet") 

1562 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}") 

1563 

1564 A side effect is that, to obtain "{" and "}" characters, you must use 

1565 "%(" and "%)". 

1566 """ 

1567 

1568 escape = {"%": "%", 

1569 "(": "{", 

1570 ")": "}"} 

1571 

1572 # Evaluate conditions 

1573 while "{" in fmt: 

1574 i = fmt.rindex("{") 

1575 j = fmt[i + 1:].index("}") 

1576 cond = fmt[i + 1:i + j + 1] 

1577 k = cond.find(":") 

1578 if k < 0: 

1579 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501 

1580 cond, format_ = cond[:k], cond[k + 1:] 

1581 res = False 

1582 if cond[0] == "!": 

1583 res = True 

1584 cond = cond[1:] 

1585 if self.haslayer(cond): 

1586 res = not res 

1587 if not res: 

1588 format_ = "" 

1589 fmt = fmt[:i] + format_ + fmt[i + j + 2:] 

1590 

1591 # Evaluate directives 

1592 s = "" 

1593 while "%" in fmt: 

1594 i = fmt.index("%") 

1595 s += fmt[:i] 

1596 fmt = fmt[i + 1:] 

1597 if fmt and fmt[0] in escape: 

1598 s += escape[fmt[0]] 

1599 fmt = fmt[1:] 

1600 continue 

1601 try: 

1602 i = fmt.index("%") 

1603 sfclsfld = fmt[:i] 

1604 fclsfld = sfclsfld.split(",") 

1605 if len(fclsfld) == 1: 

1606 f = "s" 

1607 clsfld = fclsfld[0] 

1608 elif len(fclsfld) == 2: 

1609 f, clsfld = fclsfld 

1610 else: 

1611 raise Scapy_Exception 

1612 if "." in clsfld: 

1613 cls, fld = clsfld.split(".") 

1614 else: 

1615 cls = self.__class__.__name__ 

1616 fld = clsfld 

1617 num = 1 

1618 if ":" in cls: 

1619 cls, snum = cls.split(":") 

1620 num = int(snum) 

1621 fmt = fmt[i + 1:] 

1622 except Exception: 

1623 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501 

1624 else: 

1625 if fld == "time": 

1626 val = time.strftime( 

1627 "%H:%M:%S.%%06i", 

1628 time.localtime(float(self.time)) 

1629 ) % int((self.time - int(self.time)) * 1000000) 

1630 elif cls == self.__class__.__name__ and hasattr(self, fld): 

1631 if num > 1: 

1632 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501 

1633 f = "s" 

1634 else: 

1635 try: 

1636 val = self.getfieldval(fld) 

1637 except AttributeError: 

1638 val = getattr(self, fld) 

1639 if f[-1] == "r": # Raw field value 

1640 f = f[:-1] 

1641 if not f: 

1642 f = "s" 

1643 else: 

1644 if fld in self.fieldtype: 

1645 val = self.fieldtype[fld].i2repr(self, val) 

1646 else: 

1647 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax) 

1648 f = "s" 

1649 s += ("%" + f) % val 

1650 

1651 s += fmt 

1652 return s 

1653 

1654 def mysummary(self): 

1655 # type: () -> str 

1656 """DEV: can be overloaded to return a string that summarizes the layer. 

1657 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501 

1658 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501 

1659 mysummary() must be called if they are present.""" 

1660 return "" 

1661 

1662 def _do_summary(self): 

1663 # type: () -> Tuple[int, str, List[Any]] 

1664 found, s, needed = self.payload._do_summary() 

1665 ret = "" 

1666 if not found or self.__class__ in needed: 

1667 ret = self.mysummary() 

1668 if isinstance(ret, tuple): 

1669 ret, n = ret 

1670 needed += n 

1671 if ret or needed: 

1672 found = 1 

1673 if not ret: 

1674 ret = self.__class__.__name__ if self.show_summary else "" 

1675 if self.__class__ in conf.emph: 

1676 impf = [] 

1677 for f in self.fields_desc: 

1678 if f in conf.emph: 

1679 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501 

1680 ret = "%s [%s]" % (ret, " ".join(impf)) 

1681 if ret and s: 

1682 ret = "%s / %s" % (ret, s) 

1683 else: 

1684 ret = "%s%s" % (ret, s) 

1685 return found, ret, needed 

1686 

1687 def summary(self, intern=0): 

1688 # type: (int) -> str 

1689 """Prints a one line summary of a packet.""" 

1690 return self._do_summary()[1] 

1691 

1692 def lastlayer(self, layer=None): 

1693 # type: (Optional[Packet]) -> Packet 

1694 """Returns the uppest layer of the packet""" 

1695 return self.payload.lastlayer(self) 

1696 

1697 def decode_payload_as(self, cls): 

1698 # type: (Type[Packet]) -> None 

1699 """Reassembles the payload and decode it using another packet class""" 

1700 s = raw(self.payload) 

1701 self.payload = cls(s, _internal=1, _underlayer=self) 

1702 pp = self 

1703 while pp.underlayer is not None: 

1704 pp = pp.underlayer 

1705 self.payload.dissection_done(pp) 

1706 

1707 def _command(self, json=False): 

1708 # type: (bool) -> List[Tuple[str, Any]] 

1709 """ 

1710 Internal method used to generate command() and json() 

1711 """ 

1712 f = [] 

1713 iterator: Iterator[Tuple[str, Any]] 

1714 if json: 

1715 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc) 

1716 else: 

1717 iterator = iter(self.fields.items()) 

1718 for fn, fv in iterator: 

1719 fld = self.get_field(fn) 

1720 if isinstance(fv, (list, dict, set)) and not fv and not fld.default: 

1721 continue 

1722 if isinstance(fv, Packet): 

1723 if json: 

1724 fv = {k: v for (k, v) in fv._command(json=True)} 

1725 else: 

1726 fv = fv.command() 

1727 elif fld.islist and fld.holds_packets and isinstance(fv, list): 

1728 if json: 

1729 fv = [ 

1730 {k: v for (k, v) in x} 

1731 for x in map(lambda y: Packet._command(y, json=True), fv) 

1732 ] 

1733 else: 

1734 fv = "[%s]" % ",".join(map(Packet.command, fv)) 

1735 elif fld.islist and isinstance(fv, list): 

1736 if json: 

1737 fv = [ 

1738 getattr(x, 'command', lambda: repr(x))() 

1739 for x in fv 

1740 ] 

1741 else: 

1742 fv = "[%s]" % ",".join( 

1743 getattr(x, 'command', lambda: repr(x))() 

1744 for x in fv 

1745 ) 

1746 elif isinstance(fv, FlagValue): 

1747 fv = int(fv) 

1748 elif callable(getattr(fv, 'command', None)): 

1749 fv = fv.command(json=json) 

1750 else: 

1751 if json: 

1752 if isinstance(fv, bytes): 

1753 fv = fv.decode("utf-8", errors="backslashreplace") 

1754 else: 

1755 fv = fld.i2h(self, fv) 

1756 else: 

1757 fv = repr(fld.i2h(self, fv)) 

1758 f.append((fn, fv)) 

1759 return f 

1760 

1761 def command(self): 

1762 # type: () -> str 

1763 """ 

1764 Returns a string representing the command you have to type to 

1765 obtain the same packet 

1766 """ 

1767 c = "%s(%s)" % ( 

1768 self.__class__.__name__, 

1769 ", ".join("%s=%s" % x for x in self._command()) 

1770 ) 

1771 pc = self.payload.command() 

1772 if pc: 

1773 c += "/" + pc 

1774 return c 

1775 

1776 def json(self): 

1777 # type: () -> str 

1778 """ 

1779 Returns a JSON representing the packet. 

1780 

1781 Please note that this cannot be used for bijective usage: data loss WILL occur, 

1782 so it will not make sense to try to rebuild the packet from the output. 

1783 This must only be used for a grepping/displaying purpose. 

1784 """ 

1785 dump = json.dumps({k: v for (k, v) in self._command(json=True)}) 

1786 pc = self.payload.json() 

1787 if pc: 

1788 dump = dump[:-1] + ", \"payload\": %s}" % pc 

1789 return dump 

1790 

1791 

1792class NoPayload(Packet): 

1793 def __new__(cls, *args, **kargs): 

1794 # type: (Type[Packet], *Any, **Any) -> NoPayload 

1795 singl = cls.__dict__.get("__singl__") 

1796 if singl is None: 

1797 cls.__singl__ = singl = Packet.__new__(cls) 

1798 Packet.__init__(singl) 

1799 return cast(NoPayload, singl) 

1800 

1801 def __init__(self, *args, **kargs): 

1802 # type: (*Any, **Any) -> None 

1803 pass 

1804 

1805 def dissection_done(self, pkt): 

1806 # type: (Packet) -> None 

1807 pass 

1808 

1809 def add_payload(self, payload): 

1810 # type: (Union[Packet, bytes]) -> NoReturn 

1811 raise Scapy_Exception("Can't add payload to NoPayload instance") 

1812 

1813 def remove_payload(self): 

1814 # type: () -> None 

1815 pass 

1816 

1817 def add_underlayer(self, underlayer): 

1818 # type: (Any) -> None 

1819 pass 

1820 

1821 def remove_underlayer(self, other): 

1822 # type: (Packet) -> None 

1823 pass 

1824 

1825 def add_parent(self, parent): 

1826 # type: (Any) -> None 

1827 pass 

1828 

1829 def remove_parent(self, other): 

1830 # type: (Packet) -> None 

1831 pass 

1832 

1833 def copy(self): 

1834 # type: () -> NoPayload 

1835 return self 

1836 

1837 def clear_cache(self): 

1838 # type: () -> None 

1839 pass 

1840 

1841 def __repr__(self): 

1842 # type: () -> str 

1843 return "" 

1844 

1845 def __str__(self): 

1846 # type: () -> str 

1847 return "" 

1848 

1849 def __bytes__(self): 

1850 # type: () -> bytes 

1851 return b"" 

1852 

1853 def __nonzero__(self): 

1854 # type: () -> bool 

1855 return False 

1856 __bool__ = __nonzero__ 

1857 

1858 def do_build(self): 

1859 # type: () -> bytes 

1860 return b"" 

1861 

1862 def build(self): 

1863 # type: () -> bytes 

1864 return b"" 

1865 

1866 def build_padding(self): 

1867 # type: () -> bytes 

1868 return b"" 

1869 

1870 def build_done(self, p): 

1871 # type: (bytes) -> bytes 

1872 return p 

1873 

1874 def build_ps(self, internal=0): 

1875 # type: (int) -> Tuple[bytes, List[Any]] 

1876 return b"", [] 

1877 

1878 def getfieldval(self, attr): 

1879 # type: (str) -> NoReturn 

1880 raise AttributeError(attr) 

1881 

1882 def getfield_and_val(self, attr): 

1883 # type: (str) -> NoReturn 

1884 raise AttributeError(attr) 

1885 

1886 def setfieldval(self, attr, val): 

1887 # type: (str, Any) -> NoReturn 

1888 raise AttributeError(attr) 

1889 

1890 def delfieldval(self, attr): 

1891 # type: (str) -> NoReturn 

1892 raise AttributeError(attr) 

1893 

1894 def hide_defaults(self): 

1895 # type: () -> None 

1896 pass 

1897 

1898 def __iter__(self): 

1899 # type: () -> Iterator[Packet] 

1900 return iter([]) 

1901 

1902 def __eq__(self, other): 

1903 # type: (Any) -> bool 

1904 if isinstance(other, NoPayload): 

1905 return True 

1906 return False 

1907 

1908 def hashret(self): 

1909 # type: () -> bytes 

1910 return b"" 

1911 

1912 def answers(self, other): 

1913 # type: (Packet) -> bool 

1914 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501 

1915 

1916 def haslayer(self, cls, _subclass=None): 

1917 # type: (Union[Type[Packet], str], Optional[bool]) -> int 

1918 return 0 

1919 

1920 def getlayer(self, 

1921 cls, # type: Union[int, Type[Packet], str] 

1922 nb=1, # type: int 

1923 _track=None, # type: Optional[List[int]] 

1924 _subclass=None, # type: Optional[bool] 

1925 **flt # type: Any 

1926 ): 

1927 # type: (...) -> Optional[Packet] 

1928 if _track is not None: 

1929 _track.append(nb) 

1930 return None 

1931 

1932 def fragment(self, *args, **kargs): 

1933 # type: (*Any, **Any) -> List[Packet] 

1934 raise Scapy_Exception("cannot fragment this packet") 

1935 

1936 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 

1937 # type: (bool, int, str, str) -> None 

1938 pass 

1939 

1940 def sprintf(self, fmt, relax=1): 

1941 # type: (str, int) -> str 

1942 if relax: 

1943 return "??" 

1944 else: 

1945 raise Scapy_Exception("Format not found [%s]" % fmt) 

1946 

1947 def _do_summary(self): 

1948 # type: () -> Tuple[int, str, List[Any]] 

1949 return 0, "", [] 

1950 

1951 def layers(self): 

1952 # type: () -> List[Type[Packet]] 

1953 return [] 

1954 

1955 def lastlayer(self, layer=None): 

1956 # type: (Optional[Packet]) -> Packet 

1957 return layer or self 

1958 

1959 def command(self): 

1960 # type: () -> str 

1961 return "" 

1962 

1963 def json(self): 

1964 # type: () -> str 

1965 return "" 

1966 

1967 def route(self): 

1968 # type: () -> Tuple[None, None, None] 

1969 return (None, None, None) 

1970 

1971 

1972#################### 

1973# packet classes # 

1974#################### 

1975 

1976 

1977class Raw(Packet): 

1978 name = "Raw" 

1979 fields_desc = [StrField("load", b"")] 

1980 

1981 def __init__(self, _pkt=b"", *args, **kwargs): 

1982 # type: (bytes, *Any, **Any) -> None 

1983 if _pkt and not isinstance(_pkt, bytes): 

1984 if isinstance(_pkt, tuple): 

1985 _pkt, bn = _pkt 

1986 _pkt = bytes_encode(_pkt), bn 

1987 else: 

1988 _pkt = bytes_encode(_pkt) 

1989 super(Raw, self).__init__(_pkt, *args, **kwargs) 

1990 

1991 def answers(self, other): 

1992 # type: (Packet) -> int 

1993 return 1 

1994 

1995 def mysummary(self): 

1996 # type: () -> str 

1997 cs = conf.raw_summary 

1998 if cs: 

1999 if callable(cs): 

2000 return "Raw %s" % cs(self.load) 

2001 else: 

2002 return "Raw %r" % self.load 

2003 return Packet.mysummary(self) 

2004 

2005 

2006class Padding(Raw): 

2007 name = "Padding" 

2008 

2009 def self_build(self): 

2010 # type: (Optional[Any]) -> bytes 

2011 return b"" 

2012 

2013 def build_padding(self): 

2014 # type: () -> bytes 

2015 return ( 

2016 bytes_encode(self.load) if self.raw_packet_cache is None 

2017 else self.raw_packet_cache 

2018 ) + self.payload.build_padding() 

2019 

2020 

2021conf.raw_layer = Raw 

2022conf.padding_layer = Padding 

2023if conf.default_l2 is None: 

2024 conf.default_l2 = Raw 

2025 

2026################# 

2027# Bind layers # 

2028################# 

2029 

2030 

2031def bind_bottom_up(lower, # type: Type[Packet] 

2032 upper, # type: Type[Packet] 

2033 __fval=None, # type: Optional[Any] 

2034 **fval # type: Any 

2035 ): 

2036 # type: (...) -> None 

2037 r"""Bind 2 layers for dissection. 

2038 The upper layer will be chosen for dissection on top of the lower layer, if 

2039 ALL the passed arguments are validated. If multiple calls are made with 

2040 the same layers, the last one will be used as default. 

2041 

2042 ex: 

2043 >>> bind_bottom_up(Ether, SNAP, type=0x1234) 

2044 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501 

2045 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501 

2046 """ 

2047 if __fval is not None: 

2048 fval.update(__fval) 

2049 lower.payload_guess = lower.payload_guess[:] 

2050 lower.payload_guess.append((fval, upper)) 

2051 

2052 

2053def bind_top_down(lower, # type: Type[Packet] 

2054 upper, # type: Type[Packet] 

2055 __fval=None, # type: Optional[Any] 

2056 **fval # type: Any 

2057 ): 

2058 # type: (...) -> None 

2059 """Bind 2 layers for building. 

2060 When the upper layer is added as a payload of the lower layer, all the 

2061 arguments will be applied to them. 

2062 

2063 ex: 

2064 >>> bind_top_down(Ether, SNAP, type=0x1234) 

2065 >>> Ether()/SNAP() 

2066 <Ether type=0x1234 |<SNAP |>> 

2067 """ 

2068 if __fval is not None: 

2069 fval.update(__fval) 

2070 upper._overload_fields = upper._overload_fields.copy() # type: ignore 

2071 upper._overload_fields[lower] = fval 

2072 

2073 

2074@conf.commands.register 

2075def bind_layers(lower, # type: Type[Packet] 

2076 upper, # type: Type[Packet] 

2077 __fval=None, # type: Optional[Dict[str, int]] 

2078 **fval # type: Any 

2079 ): 

2080 # type: (...) -> None 

2081 """Bind 2 layers on some specific fields' values. 

2082 

2083 It makes the packet being built and dissected when the arguments 

2084 are present. 

2085 

2086 This function calls both bind_bottom_up and bind_top_down, with 

2087 all passed arguments. 

2088 

2089 Please have a look at their docs: 

2090 - help(bind_bottom_up) 

2091 - help(bind_top_down) 

2092 """ 

2093 if __fval is not None: 

2094 fval.update(__fval) 

2095 bind_top_down(lower, upper, **fval) 

2096 bind_bottom_up(lower, upper, **fval) 

2097 

2098 

2099def split_bottom_up(lower, # type: Type[Packet] 

2100 upper, # type: Type[Packet] 

2101 __fval=None, # type: Optional[Any] 

2102 **fval # type: Any 

2103 ): 

2104 # type: (...) -> None 

2105 """This call un-links an association that was made using bind_bottom_up. 

2106 Have a look at help(bind_bottom_up) 

2107 """ 

2108 if __fval is not None: 

2109 fval.update(__fval) 

2110 

2111 def do_filter(params, cls): 

2112 # type: (Dict[str, int], Type[Packet]) -> bool 

2113 params_is_invalid = any( 

2114 k not in params or params[k] != v for k, v in fval.items() 

2115 ) 

2116 return cls != upper or params_is_invalid 

2117 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)] 

2118 

2119 

2120def split_top_down(lower, # type: Type[Packet] 

2121 upper, # type: Type[Packet] 

2122 __fval=None, # type: Optional[Any] 

2123 **fval # type: Any 

2124 ): 

2125 # type: (...) -> None 

2126 """This call un-links an association that was made using bind_top_down. 

2127 Have a look at help(bind_top_down) 

2128 """ 

2129 if __fval is not None: 

2130 fval.update(__fval) 

2131 if lower in upper._overload_fields: 

2132 ofval = upper._overload_fields[lower] 

2133 if any(k not in ofval or ofval[k] != v for k, v in fval.items()): 

2134 return 

2135 upper._overload_fields = upper._overload_fields.copy() # type: ignore 

2136 del upper._overload_fields[lower] 

2137 

2138 

2139@conf.commands.register 

2140def split_layers(lower, # type: Type[Packet] 

2141 upper, # type: Type[Packet] 

2142 __fval=None, # type: Optional[Any] 

2143 **fval # type: Any 

2144 ): 

2145 # type: (...) -> None 

2146 """Split 2 layers previously bound. 

2147 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501 

2148 bind_layers. 

2149 

2150 Please have a look at their docs: 

2151 - help(split_bottom_up) 

2152 - help(split_top_down) 

2153 """ 

2154 if __fval is not None: 

2155 fval.update(__fval) 

2156 split_bottom_up(lower, upper, **fval) 

2157 split_top_down(lower, upper, **fval) 

2158 

2159 

2160@conf.commands.register 

2161def explore(layer=None): 

2162 # type: (Optional[str]) -> None 

2163 """Function used to discover the Scapy layers and protocols. 

2164 It helps to see which packets exists in contrib or layer files. 

2165 

2166 params: 

2167 - layer: If specified, the function will explore the layer. If not, 

2168 the GUI mode will be activated, to browse the available layers 

2169 

2170 examples: 

2171 >>> explore() # Launches the GUI 

2172 >>> explore("dns") # Explore scapy.layers.dns 

2173 >>> explore("http2") # Explore scapy.contrib.http2 

2174 >>> explore(scapy.layers.bluetooth4LE) 

2175 

2176 Note: to search a packet by name, use ls("name") rather than explore. 

2177 """ 

2178 if layer is None: # GUI MODE 

2179 if not conf.interactive: 

2180 raise Scapy_Exception("explore() GUI-mode cannot be run in " 

2181 "interactive mode. Please provide a " 

2182 "'layer' parameter !") 

2183 # 0 - Imports 

2184 try: 

2185 import prompt_toolkit 

2186 except ImportError: 

2187 raise ImportError("prompt_toolkit is not installed ! " 

2188 "You may install IPython, which contains it, via" 

2189 " `pip install ipython`") 

2190 if not _version_checker(prompt_toolkit, (2, 0)): 

2191 raise ImportError("prompt_toolkit >= 2.0.0 is required !") 

2192 # Only available with prompt_toolkit > 2.0, not released on PyPi yet 

2193 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \ 

2194 button_dialog 

2195 from prompt_toolkit.formatted_text import HTML 

2196 # Check for prompt_toolkit >= 3.0.0 

2197 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str] 

2198 if _version_checker(prompt_toolkit, (3, 0)): 

2199 call_ptk = lambda x: x.run() 

2200 # 1 - Ask for layer or contrib 

2201 btn_diag = button_dialog( 

2202 title="Scapy v%s" % conf.version, 

2203 text=HTML( 

2204 '<style bg="white" fg="red">Chose the type of packets' 

2205 ' you want to explore:</style>' 

2206 ), 

2207 buttons=[ 

2208 ("Layers", "layers"), 

2209 ("Contribs", "contribs"), 

2210 ("Cancel", "cancel") 

2211 ]) 

2212 action = call_ptk(btn_diag) 

2213 # 2 - Retrieve list of Packets 

2214 if action == "layers": 

2215 # Get all loaded layers 

2216 lvalues = conf.layers.layers() 

2217 # Restrict to layers-only (not contribs) + packet.py and asn1*.py 

2218 values = [x for x in lvalues if ("layers" in x[0] or 

2219 "packet" in x[0] or 

2220 "asn1" in x[0])] 

2221 elif action == "contribs": 

2222 # Get all existing contribs 

2223 from scapy.main import list_contrib 

2224 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True)) 

2225 values = [(x['name'], x['description']) 

2226 for x in cvalues] 

2227 # Remove very specific modules 

2228 values = [x for x in values if "can" not in x[0]] 

2229 else: 

2230 # Escape/Cancel was pressed 

2231 return 

2232 # Build tree 

2233 if action == "contribs": 

2234 # A tree is a dictionary. Each layer contains a keyword 

2235 # _l which contains the files in the layer, and a _name 

2236 # argument which is its name. The other keys are the subfolders, 

2237 # which are similar dictionaries 

2238 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501 

2239 for name, desc in values: 

2240 if "." in name: # Folder detected 

2241 parts = name.split(".") 

2242 subtree = tree 

2243 for pa in parts[:-1]: 

2244 if pa not in subtree: 

2245 subtree[pa] = {} 

2246 # one layer deeper 

2247 subtree = subtree[pa] # type: ignore 

2248 subtree["_name"] = pa # type: ignore 

2249 if "_l" not in subtree: 

2250 subtree["_l"] = [] 

2251 subtree["_l"].append((parts[-1], desc)) # type: ignore 

2252 else: 

2253 tree["_l"].append((name, desc)) # type: ignore 

2254 elif action == "layers": 

2255 tree = {"_l": values} 

2256 # 3 - Ask for the layer/contrib module to explore 

2257 current = tree # type: Any 

2258 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501 

2259 while True: 

2260 # Generate tests & form 

2261 folders = list(current.keys()) 

2262 _radio_values = [ 

2263 ("$" + name, str('[+] ' + name.capitalize())) 

2264 for name in folders if not name.startswith("_") 

2265 ] + current.get("_l", []) # type: List[str] 

2266 cur_path = "" 

2267 if previous: 

2268 cur_path = ".".join( 

2269 itertools.chain( 

2270 (x["_name"] for x in previous[1:]), # type: ignore 

2271 (current["_name"],) 

2272 ) 

2273 ) 

2274 extra_text = ( 

2275 '\n<style bg="white" fg="green">> scapy.%s</style>' 

2276 ) % (action + ("." + cur_path if cur_path else "")) 

2277 # Show popup 

2278 rd_diag = radiolist_dialog( 

2279 values=_radio_values, 

2280 title="Scapy v%s" % conf.version, 

2281 text=HTML( 

2282 ( 

2283 '<style bg="white" fg="red">Please select a file' 

2284 'among the following, to see all layers contained in' 

2285 ' it:</style>' 

2286 ) + extra_text 

2287 ), 

2288 cancel_text="Back" if previous else "Cancel" 

2289 ) 

2290 result = call_ptk(rd_diag) 

2291 if result is None: 

2292 # User pressed "Cancel/Back" 

2293 if previous: # Back 

2294 current = previous.pop() 

2295 continue 

2296 else: # Cancel 

2297 return 

2298 if result.startswith("$"): 

2299 previous.append(current) 

2300 current = current[result[1:]] 

2301 else: 

2302 # Enter on layer 

2303 if previous: # In subfolder 

2304 result = cur_path + "." + result 

2305 break 

2306 # 4 - (Contrib only): load contrib 

2307 if action == "contribs": 

2308 from scapy.main import load_contrib 

2309 load_contrib(result) 

2310 result = "scapy.contrib." + result 

2311 else: # NON-GUI MODE 

2312 # We handle layer as a short layer name, full layer name 

2313 # or the module itself 

2314 if isinstance(layer, types.ModuleType): 

2315 layer = layer.__name__ 

2316 if isinstance(layer, str): 

2317 if layer.startswith("scapy.layers."): 

2318 result = layer 

2319 else: 

2320 if layer.startswith("scapy.contrib."): 

2321 layer = layer.replace("scapy.contrib.", "") 

2322 from scapy.main import load_contrib 

2323 load_contrib(layer) 

2324 result_layer, result_contrib = (("scapy.layers.%s" % layer), 

2325 ("scapy.contrib.%s" % layer)) 

2326 if result_layer in conf.layers.ldict: 

2327 result = result_layer 

2328 elif result_contrib in conf.layers.ldict: 

2329 result = result_contrib 

2330 else: 

2331 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 

2332 else: 

2333 warning("Wrong usage ! Check out help(explore)") 

2334 return 

2335 

2336 # COMMON PART 

2337 # Get the list of all Packets contained in that module 

2338 try: 

2339 all_layers = conf.layers.ldict[result] 

2340 except KeyError: 

2341 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 

2342 # Print 

2343 print(conf.color_theme.layer_name("Packets contained in %s:" % result)) 

2344 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 

2345 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers] 

2346 print(pretty_list(rtlst, [("Class", "Name")], borders=True)) 

2347 

2348 

2349def _pkt_ls(obj, # type: Union[Packet, Type[Packet]] 

2350 verbose=False, # type: bool 

2351 ): 

2352 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501 

2353 """Internal function used to resolve `fields_desc` to display it. 

2354 

2355 :param obj: a packet object or class 

2356 :returns: a list containing tuples [(name, clsname, clsname_extras, 

2357 default, long_attrs)] 

2358 """ 

2359 is_pkt = isinstance(obj, Packet) 

2360 if not issubtype(obj, Packet) and not is_pkt: 

2361 raise ValueError 

2362 fields = [] 

2363 for f in obj.fields_desc: 

2364 cur_fld = f 

2365 attrs = [] # type: List[str] 

2366 long_attrs = [] # type: List[str] 

2367 while isinstance(cur_fld, (Emph, ConditionalField)): 

2368 if isinstance(cur_fld, ConditionalField): 

2369 attrs.append(cur_fld.__class__.__name__[:4]) 

2370 cur_fld = cur_fld.fld 

2371 name = cur_fld.name 

2372 default = cur_fld.default 

2373 if verbose and isinstance(cur_fld, EnumField) \ 

2374 and hasattr(cur_fld, "i2s") and cur_fld.i2s: 

2375 if len(cur_fld.i2s or []) < 50: 

2376 long_attrs.extend( 

2377 "%s: %d" % (strval, numval) 

2378 for numval, strval in 

2379 sorted(cur_fld.i2s.items()) 

2380 ) 

2381 elif isinstance(cur_fld, MultiEnumField): 

2382 if isinstance(obj, Packet): 

2383 obj_pkt = obj 

2384 else: 

2385 obj_pkt = obj() 

2386 fld_depend = cur_fld.depends_on(obj_pkt) 

2387 attrs.append("Depends on %s" % fld_depend) 

2388 if verbose: 

2389 cur_i2s = cur_fld.i2s_multi.get( 

2390 cur_fld.depends_on(obj_pkt), {} 

2391 ) 

2392 if len(cur_i2s) < 50: 

2393 long_attrs.extend( 

2394 "%s: %d" % (strval, numval) 

2395 for numval, strval in 

2396 sorted(cur_i2s.items()) 

2397 ) 

2398 elif verbose and isinstance(cur_fld, FlagsField): 

2399 names = cur_fld.names 

2400 long_attrs.append(", ".join(names)) 

2401 elif isinstance(cur_fld, MultipleTypeField): 

2402 default = cur_fld.dflt.default 

2403 attrs.append(", ".join( 

2404 x[0].__class__.__name__ for x in 

2405 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)]) 

2406 )) 

2407 

2408 cls = cur_fld.__class__ 

2409 class_name_extras = "(%s)" % ( 

2410 ", ".join(attrs) 

2411 ) if attrs else "" 

2412 if isinstance(cur_fld, BitField): 

2413 class_name_extras += " (%d bit%s)" % ( 

2414 cur_fld.size, 

2415 "s" if cur_fld.size > 1 else "" 

2416 ) 

2417 fields.append( 

2418 (name, 

2419 cls, 

2420 class_name_extras, 

2421 repr(default), 

2422 long_attrs) 

2423 ) 

2424 return fields 

2425 

2426 

2427@conf.commands.register 

2428def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]] 

2429 case_sensitive=False, # type: bool 

2430 verbose=False # type: bool 

2431 ): 

2432 # type: (...) -> None 

2433 """List available layers, or infos on a given layer class or name. 

2434 

2435 :param obj: Packet / packet name to use 

2436 :param case_sensitive: if obj is a string, is it case sensitive? 

2437 :param verbose: 

2438 """ 

2439 if obj is None or isinstance(obj, str): 

2440 tip = False 

2441 if obj is None: 

2442 tip = True 

2443 all_layers = sorted(conf.layers, key=lambda x: x.__name__) 

2444 else: 

2445 pattern = re.compile( 

2446 obj, 

2447 0 if case_sensitive else re.I 

2448 ) 

2449 # We first order by accuracy, then length 

2450 if case_sensitive: 

2451 sorter = lambda x: (x.__name__.index(obj), len(x.__name__)) 

2452 else: 

2453 obj = obj.lower() 

2454 sorter = lambda x: (x.__name__.lower().index(obj), 

2455 len(x.__name__)) 

2456 all_layers = sorted((layer for layer in conf.layers 

2457 if (isinstance(layer.__name__, str) and 

2458 pattern.search(layer.__name__)) or 

2459 (isinstance(layer.name, str) and 

2460 pattern.search(layer.name))), 

2461 key=sorter) 

2462 for layer in all_layers: 

2463 print("%-10s : %s" % (layer.__name__, layer._name)) 

2464 if tip and conf.interactive: 

2465 print("\nTIP: You may use explore() to navigate through all " 

2466 "layers using a clear GUI") 

2467 else: 

2468 try: 

2469 fields = _pkt_ls( 

2470 obj, 

2471 verbose=verbose 

2472 ) 

2473 is_pkt = isinstance(obj, Packet) 

2474 # Print 

2475 for fname, cls, clsne, dflt, long_attrs in fields: 

2476 clsinfo = cls.__name__ + " " + clsne 

2477 print("%-10s : %-35s =" % (fname, clsinfo), end=' ') 

2478 if is_pkt: 

2479 print("%-15r" % (getattr(obj, fname),), end=' ') 

2480 print("(%r)" % (dflt,)) 

2481 for attr in long_attrs: 

2482 print("%-15s%s" % ("", attr)) 

2483 # Restart for payload if any 

2484 if is_pkt: 

2485 obj = cast(Packet, obj) 

2486 if isinstance(obj.payload, NoPayload): 

2487 return 

2488 print("--") 

2489 ls(obj.payload) 

2490 except ValueError: 

2491 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501 

2492 

2493 

2494@conf.commands.register 

2495def rfc(cls, ret=False, legend=True): 

2496 # type: (Type[Packet], bool, bool) -> Optional[str] 

2497 """ 

2498 Generate an RFC-like representation of a packet def. 

2499 

2500 :param cls: the Packet class 

2501 :param ret: return the result instead of printing (def. False) 

2502 :param legend: show text under the diagram (default True) 

2503 

2504 Ex:: 

2505 

2506 >>> rfc(Ether) 

2507 

2508 """ 

2509 if not issubclass(cls, Packet): 

2510 raise TypeError("Packet class expected") 

2511 cur_len = 0 

2512 cur_line = [] 

2513 lines = [] 

2514 # Get the size (width) that a field will take 

2515 # when formatted, from its length in bits 

2516 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int] 

2517 ident = 0 # Fields UUID 

2518 

2519 # Generate packet groups 

2520 def _iterfields() -> Iterator[Tuple[str, int]]: 

2521 for f in cls.fields_desc: 

2522 # Fancy field name 

2523 fname = f.name.upper().replace("_", " ") 

2524 fsize = int(f.sz * 8) 

2525 yield fname, fsize 

2526 # Add padding optionally 

2527 if isinstance(f, PadField): 

2528 if isinstance(f._align, tuple): 

2529 pad = - cur_len % (f._align[0] * 8) 

2530 else: 

2531 pad = - cur_len % (f._align * 8) 

2532 if pad: 

2533 yield "padding", pad 

2534 for fname, flen in _iterfields(): 

2535 cur_len += flen 

2536 ident += 1 

2537 # The field might exceed the current line or 

2538 # take more than one line. Copy it as required 

2539 while True: 

2540 over = max(0, cur_len - 32) # Exceed 

2541 len1 = clsize(flen - over) # What fits 

2542 cur_line.append((fname[:len1], len1, ident)) 

2543 if cur_len >= 32: 

2544 # Current line is full. start a new line 

2545 lines.append(cur_line) 

2546 cur_len = flen = over 

2547 fname = "" # do not repeat the field 

2548 cur_line = [] 

2549 if not over: 

2550 # there is no data left 

2551 break 

2552 else: 

2553 # End of the field 

2554 break 

2555 # Add the last line if un-finished 

2556 if cur_line: 

2557 lines.append(cur_line) 

2558 # Calculate separations between lines 

2559 seps = [] 

2560 seps.append("+-" * 32 + "+\n") 

2561 for i in range(len(lines) - 1): 

2562 # Start with a full line 

2563 sep = "+-" * 32 + "+\n" 

2564 # Get the line above and below the current 

2565 # separation 

2566 above, below = lines[i], lines[i + 1] 

2567 # The last field of above is shared with below 

2568 if above[-1][2] == below[0][2]: 

2569 # where the field in "above" starts 

2570 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1 

2571 # where the field in "below" ends 

2572 pos_below = below[0][1] 

2573 if pos_above < pos_below: 

2574 # they are overlapping. 

2575 # Now crop the space between those pos 

2576 # and fill it with " " 

2577 pos_above = pos_above + pos_above % 2 

2578 sep = ( 

2579 sep[:1 + pos_above] + 

2580 " " * (pos_below - pos_above) + 

2581 sep[1 + pos_below:] 

2582 ) 

2583 # line is complete 

2584 seps.append(sep) 

2585 # Graph 

2586 result = "" 

2587 # Bytes markers 

2588 result += " " + (" " * 19).join( 

2589 str(x) for x in range(4) 

2590 ) + "\n" 

2591 # Bits markers 

2592 result += " " + " ".join( 

2593 str(x % 10) for x in range(32) 

2594 ) + "\n" 

2595 # Add fields and their separations 

2596 for line, sep in zip(lines, seps): 

2597 result += sep 

2598 for elt, flen, _ in line: 

2599 result += "|" + elt.center(flen, " ") 

2600 result += "|\n" 

2601 result += "+-" * (cur_len or 32) + "+\n" 

2602 # Annotate with the figure name 

2603 if legend: 

2604 result += "\n" + ("Fig. " + cls.__name__).center(66, " ") 

2605 # return if asked for, else print 

2606 if ret: 

2607 return result 

2608 print(result) 

2609 return None 

2610 

2611 

2612############# 

2613# Fuzzing # 

2614############# 

2615 

2616_P = TypeVar('_P', bound=Packet) 

2617 

2618 

2619@conf.commands.register 

2620def fuzz(p, # type: _P 

2621 _inplace=0, # type: int 

2622 ): 

2623 # type: (...) -> _P 

2624 """ 

2625 Transform a layer into a fuzzy layer by replacing some default values 

2626 by random objects. 

2627 

2628 :param p: the Packet instance to fuzz 

2629 :return: the fuzzed packet. 

2630 """ 

2631 if not _inplace: 

2632 p = p.copy() 

2633 q = cast(Packet, p) 

2634 while not isinstance(q, NoPayload): 

2635 new_default_fields = {} 

2636 multiple_type_fields = [] # type: List[str] 

2637 for f in q.fields_desc: 

2638 if isinstance(f, PacketListField): 

2639 for r in getattr(q, f.name): 

2640 fuzz(r, _inplace=1) 

2641 elif isinstance(f, MultipleTypeField): 

2642 # the type of the field will depend on others 

2643 multiple_type_fields.append(f.name) 

2644 elif f.default is not None: 

2645 if not isinstance(f, ConditionalField) or f._evalcond(q): 

2646 rnd = f.randval() 

2647 if rnd is not None: 

2648 new_default_fields[f.name] = rnd 

2649 # Process packets with MultipleTypeFields 

2650 if multiple_type_fields: 

2651 # freeze the other random values 

2652 new_default_fields = { 

2653 key: (val._fix() if isinstance(val, VolatileValue) else val) 

2654 for key, val in new_default_fields.items() 

2655 } 

2656 q.default_fields.update(new_default_fields) 

2657 new_default_fields.clear() 

2658 # add the random values of the MultipleTypeFields 

2659 for name in multiple_type_fields: 

2660 fld = cast(MultipleTypeField, q.get_field(name)) 

2661 rnd = fld._find_fld_pkt(q).randval() 

2662 if rnd is not None: 

2663 new_default_fields[name] = rnd 

2664 q.default_fields.update(new_default_fields) 

2665 q = q.payload 

2666 return p