Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/packet.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1418 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Packet class 

8 

9Provides: 

10 - the default Packet classes 

11 - binding mechanisms 

12 - fuzz() method 

13 - exploration methods: explore() / ls() 

14""" 

15 

16from collections import defaultdict 

17 

18import json 

19import re 

20import time 

21import itertools 

22import copy 

23import types 

24import warnings 

25 

26from scapy.fields import ( 

27 AnyField, 

28 BitField, 

29 ConditionalField, 

30 Emph, 

31 EnumField, 

32 Field, 

33 FlagsField, 

34 FlagValue, 

35 MayEnd, 

36 MultiEnumField, 

37 MultipleTypeField, 

38 PacketListField, 

39 RawVal, 

40 StrField, 

41) 

42from scapy.config import conf, _version_checker 

43from scapy.compat import raw, orb, bytes_encode 

44from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \ 

45 _CanvasDumpExtended 

46from scapy.interfaces import _GlobInterfaceType 

47from scapy.volatile import RandField, VolatileValue 

48from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \ 

49 pretty_list, EDecimal 

50from scapy.error import Scapy_Exception, log_runtime, warning 

51from scapy.libs.test_pyx import PYX 

52 

53# Typing imports 

54from typing import ( 

55 Any, 

56 Callable, 

57 Dict, 

58 Iterator, 

59 List, 

60 NoReturn, 

61 Optional, 

62 Set, 

63 Tuple, 

64 Type, 

65 TypeVar, 

66 Union, 

67 Sequence, 

68 cast, 

69) 

70from scapy.compat import Self 

71 

72try: 

73 import pyx 

74except ImportError: 

75 pass 

76 

77 

78_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]]) 

79 

80 

81class Packet( 

82 BasePacket, 

83 _CanvasDumpExtended, 

84 metaclass=Packet_metaclass 

85): 

86 __slots__ = [ 

87 "time", "sent_time", "name", 

88 "default_fields", "fields", "fieldtype", 

89 "overload_fields", "overloaded_fields", 

90 "packetfields", 

91 "original", "explicit", "raw_packet_cache", 

92 "raw_packet_cache_fields", "_pkt", "post_transforms", 

93 "stop_dissection_after", 

94 # then payload, underlayer and parent 

95 "payload", "underlayer", "parent", 

96 "name", 

97 # used for sr() 

98 "_answered", 

99 # used when sniffing 

100 "direction", "sniffed_on", 

101 # handle snaplen Vs real length 

102 "wirelen", 

103 "comment" 

104 ] 

105 name = None 

106 fields_desc = [] # type: List[AnyField] 

107 deprecated_fields = {} # type: Dict[str, Tuple[str, str]] 

108 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 

109 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]] 

110 show_indent = 1 

111 show_summary = True 

112 match_subclass = False 

113 class_dont_cache = {} # type: Dict[Type[Packet], bool] 

114 class_packetfields = {} # type: Dict[Type[Packet], Any] 

115 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 

116 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]] 

117 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501 

118 

119 @classmethod 

120 def from_hexcap(cls): 

121 # type: (Type[Packet]) -> Packet 

122 return cls(import_hexcap()) 

123 

124 @classmethod 

125 def upper_bonds(self): 

126 # type: () -> None 

127 for fval, upper in self.payload_guess: 

128 print( 

129 "%-20s %s" % ( 

130 upper.__name__, 

131 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 

132 ) 

133 ) 

134 

135 @classmethod 

136 def lower_bonds(self): 

137 # type: () -> None 

138 for lower, fval in self._overload_fields.items(): 

139 print( 

140 "%-20s %s" % ( 

141 lower.__name__, 

142 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 

143 ) 

144 ) 

145 

146 def __init__(self, 

147 _pkt=b"", # type: Union[bytes, bytearray] 

148 post_transform=None, # type: Any 

149 _internal=0, # type: int 

150 _underlayer=None, # type: Optional[Packet] 

151 _parent=None, # type: Optional[Packet] 

152 stop_dissection_after=None, # type: Optional[Type[Packet]] 

153 **fields # type: Any 

154 ): 

155 # type: (...) -> None 

156 self.time = time.time() # type: Union[EDecimal, float] 

157 self.sent_time = None # type: Union[EDecimal, float, None] 

158 self.name = (self.__class__.__name__ 

159 if self._name is None else 

160 self._name) 

161 self.default_fields = {} # type: Dict[str, Any] 

162 self.overload_fields = self._overload_fields 

163 self.overloaded_fields = {} # type: Dict[str, Any] 

164 self.fields = {} # type: Dict[str, Any] 

165 self.fieldtype = {} # type: Dict[str, AnyField] 

166 self.packetfields = [] # type: List[AnyField] 

167 self.payload = NoPayload() # type: Packet 

168 self.init_fields() 

169 self.underlayer = _underlayer 

170 self.parent = _parent 

171 if isinstance(_pkt, bytearray): 

172 _pkt = bytes(_pkt) 

173 self.original = _pkt 

174 self.explicit = 0 

175 self.raw_packet_cache = None # type: Optional[bytes] 

176 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501 

177 self.wirelen = None # type: Optional[int] 

178 self.direction = None # type: Optional[int] 

179 self.sniffed_on = None # type: Optional[_GlobInterfaceType] 

180 self.comment = None # type: Optional[bytes] 

181 self.stop_dissection_after = stop_dissection_after 

182 if _pkt: 

183 self.dissect(_pkt) 

184 if not _internal: 

185 self.dissection_done(self) 

186 # We use this strange initialization so that the fields 

187 # are initialized in their declaration order. 

188 # It is required to always support MultipleTypeField 

189 for field in self.fields_desc: 

190 fname = field.name 

191 try: 

192 value = fields.pop(fname) 

193 except KeyError: 

194 continue 

195 self.fields[fname] = value if isinstance(value, RawVal) else \ 

196 self.get_field(fname).any2i(self, value) 

197 # The remaining fields are unknown 

198 for fname in fields: 

199 if fname in self.deprecated_fields: 

200 # Resolve deprecated fields 

201 value = fields[fname] 

202 fname = self._resolve_alias(fname) 

203 self.fields[fname] = value if isinstance(value, RawVal) else \ 

204 self.get_field(fname).any2i(self, value) 

205 continue 

206 raise AttributeError(fname) 

207 if isinstance(post_transform, list): 

208 self.post_transforms = post_transform 

209 elif post_transform is None: 

210 self.post_transforms = [] 

211 else: 

212 self.post_transforms = [post_transform] 

213 

214 _PickleType = Tuple[ 

215 Union[EDecimal, float], 

216 Optional[Union[EDecimal, float, None]], 

217 Optional[int], 

218 Optional[_GlobInterfaceType], 

219 Optional[int], 

220 Optional[bytes], 

221 ] 

222 

223 def __reduce__(self): 

224 # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType] 

225 """Used by pickling methods""" 

226 return (self.__class__, (self.build(),), ( 

227 self.time, 

228 self.sent_time, 

229 self.direction, 

230 self.sniffed_on, 

231 self.wirelen, 

232 self.comment 

233 )) 

234 

235 def __setstate__(self, state): 

236 # type: (Packet._PickleType) -> Packet 

237 """Rebuild state using pickable methods""" 

238 self.time = state[0] 

239 self.sent_time = state[1] 

240 self.direction = state[2] 

241 self.sniffed_on = state[3] 

242 self.wirelen = state[4] 

243 self.comment = state[5] 

244 return self 

245 

246 def __deepcopy__(self, 

247 memo, # type: Any 

248 ): 

249 # type: (...) -> Packet 

250 """Used by copy.deepcopy""" 

251 return self.copy() 

252 

253 def init_fields(self): 

254 # type: () -> None 

255 """ 

256 Initialize each fields of the fields_desc dict 

257 """ 

258 

259 if self.class_dont_cache.get(self.__class__, False): 

260 self.do_init_fields(self.fields_desc) 

261 else: 

262 self.do_init_cached_fields() 

263 

264 def do_init_fields(self, 

265 flist, # type: Sequence[AnyField] 

266 ): 

267 # type: (...) -> None 

268 """ 

269 Initialize each fields of the fields_desc dict 

270 """ 

271 default_fields = {} 

272 for f in flist: 

273 default_fields[f.name] = copy.deepcopy(f.default) 

274 self.fieldtype[f.name] = f 

275 if f.holds_packets: 

276 self.packetfields.append(f) 

277 # We set default_fields last to avoid race issues 

278 self.default_fields = default_fields 

279 

280 def do_init_cached_fields(self): 

281 # type: () -> None 

282 """ 

283 Initialize each fields of the fields_desc dict, or use the cached 

284 fields information 

285 """ 

286 

287 cls_name = self.__class__ 

288 

289 # Build the fields information 

290 if Packet.class_default_fields.get(cls_name, None) is None: 

291 self.prepare_cached_fields(self.fields_desc) 

292 

293 # Use fields information from cache 

294 default_fields = Packet.class_default_fields.get(cls_name, None) 

295 if default_fields: 

296 self.default_fields = default_fields 

297 self.fieldtype = Packet.class_fieldtype[cls_name] 

298 self.packetfields = Packet.class_packetfields[cls_name] 

299 

300 # Deepcopy default references 

301 for fname in Packet.class_default_fields_ref[cls_name]: 

302 value = self.default_fields[fname] 

303 try: 

304 self.fields[fname] = value.copy() 

305 except AttributeError: 

306 # Python 2.7 - list only 

307 self.fields[fname] = value[:] 

308 

309 def prepare_cached_fields(self, flist): 

310 # type: (Sequence[AnyField]) -> None 

311 """ 

312 Prepare the cached fields of the fields_desc dict 

313 """ 

314 

315 cls_name = self.__class__ 

316 

317 # Fields cache initialization 

318 if not flist: 

319 return 

320 

321 class_default_fields = dict() 

322 class_default_fields_ref = list() 

323 class_fieldtype = dict() 

324 class_packetfields = list() 

325 

326 # Fields initialization 

327 for f in flist: 

328 if isinstance(f, MultipleTypeField): 

329 # Abort 

330 self.class_dont_cache[cls_name] = True 

331 self.do_init_fields(self.fields_desc) 

332 return 

333 

334 tmp_copy = copy.deepcopy(f.default) 

335 class_default_fields[f.name] = tmp_copy 

336 class_fieldtype[f.name] = f 

337 if f.holds_packets: 

338 class_packetfields.append(f) 

339 

340 # Remember references 

341 if isinstance(f.default, (list, dict, set, RandField, Packet)): 

342 class_default_fields_ref.append(f.name) 

343 

344 # Apply 

345 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref 

346 Packet.class_fieldtype[cls_name] = class_fieldtype 

347 Packet.class_packetfields[cls_name] = class_packetfields 

348 # Last to avoid racing issues 

349 Packet.class_default_fields[cls_name] = class_default_fields 

350 

351 def dissection_done(self, pkt): 

352 # type: (Packet) -> None 

353 """DEV: will be called after a dissection is completed""" 

354 self.post_dissection(pkt) 

355 self.payload.dissection_done(pkt) 

356 

357 def post_dissection(self, pkt): 

358 # type: (Packet) -> None 

359 """DEV: is called after the dissection of the whole packet""" 

360 pass 

361 

362 def get_field(self, fld): 

363 # type: (str) -> AnyField 

364 """DEV: returns the field instance from the name of the field""" 

365 return self.fieldtype[fld] 

366 

367 def add_payload(self, payload): 

368 # type: (Union[Packet, bytes]) -> None 

369 if payload is None: 

370 return 

371 elif not isinstance(self.payload, NoPayload): 

372 self.payload.add_payload(payload) 

373 else: 

374 if isinstance(payload, Packet): 

375 self.payload = payload 

376 payload.add_underlayer(self) 

377 for t in self.aliastypes: 

378 if t in payload.overload_fields: 

379 self.overloaded_fields = payload.overload_fields[t] 

380 break 

381 elif isinstance(payload, (bytes, str, bytearray, memoryview)): 

382 self.payload = conf.raw_layer(load=bytes_encode(payload)) 

383 else: 

384 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501 

385 

386 def remove_payload(self): 

387 # type: () -> None 

388 self.payload.remove_underlayer(self) 

389 self.payload = NoPayload() 

390 self.overloaded_fields = {} 

391 

392 def add_underlayer(self, underlayer): 

393 # type: (Packet) -> None 

394 self.underlayer = underlayer 

395 

396 def remove_underlayer(self, other): 

397 # type: (Packet) -> None 

398 self.underlayer = None 

399 

400 def add_parent(self, parent): 

401 # type: (Packet) -> None 

402 """Set packet parent. 

403 When packet is an element in PacketListField, parent field would 

404 point to the list owner packet.""" 

405 self.parent = parent 

406 

407 def remove_parent(self, other): 

408 # type: (Packet) -> None 

409 """Remove packet parent. 

410 When packet is an element in PacketListField, parent field would 

411 point to the list owner packet.""" 

412 self.parent = None 

413 

414 def copy(self) -> Self: 

415 """Returns a deep copy of the instance.""" 

416 clone = self.__class__() 

417 clone.fields = self.copy_fields_dict(self.fields) 

418 clone.default_fields = self.copy_fields_dict(self.default_fields) 

419 clone.overloaded_fields = self.overloaded_fields.copy() 

420 clone.underlayer = self.underlayer 

421 clone.parent = self.parent 

422 clone.explicit = self.explicit 

423 clone.raw_packet_cache = self.raw_packet_cache 

424 clone.raw_packet_cache_fields = self.copy_fields_dict( 

425 self.raw_packet_cache_fields 

426 ) 

427 clone.wirelen = self.wirelen 

428 clone.post_transforms = self.post_transforms[:] 

429 clone.payload = self.payload.copy() 

430 clone.payload.add_underlayer(clone) 

431 clone.time = self.time 

432 clone.comment = self.comment 

433 clone.direction = self.direction 

434 clone.sniffed_on = self.sniffed_on 

435 return clone 

436 

437 def _resolve_alias(self, attr): 

438 # type: (str) -> str 

439 new_attr, version = self.deprecated_fields[attr] 

440 warnings.warn( 

441 "%s has been deprecated in favor of %s since %s !" % ( 

442 attr, new_attr, version 

443 ), DeprecationWarning 

444 ) 

445 return new_attr 

446 

447 def getfieldval(self, attr): 

448 # type: (str) -> Any 

449 if self.deprecated_fields and attr in self.deprecated_fields: 

450 attr = self._resolve_alias(attr) 

451 if attr in self.fields: 

452 return self.fields[attr] 

453 if attr in self.overloaded_fields: 

454 return self.overloaded_fields[attr] 

455 if attr in self.default_fields: 

456 return self.default_fields[attr] 

457 return self.payload.getfieldval(attr) 

458 

459 def getfield_and_val(self, attr): 

460 # type: (str) -> Tuple[AnyField, Any] 

461 if self.deprecated_fields and attr in self.deprecated_fields: 

462 attr = self._resolve_alias(attr) 

463 if attr in self.fields: 

464 return self.get_field(attr), self.fields[attr] 

465 if attr in self.overloaded_fields: 

466 return self.get_field(attr), self.overloaded_fields[attr] 

467 if attr in self.default_fields: 

468 return self.get_field(attr), self.default_fields[attr] 

469 raise ValueError 

470 

471 def __getattr__(self, attr): 

472 # type: (str) -> Any 

473 try: 

474 fld, v = self.getfield_and_val(attr) 

475 except ValueError: 

476 return self.payload.__getattr__(attr) 

477 if fld is not None: 

478 return v if isinstance(v, RawVal) else fld.i2h(self, v) 

479 return v 

480 

481 def setfieldval(self, attr, val): 

482 # type: (str, Any) -> None 

483 if self.deprecated_fields and attr in self.deprecated_fields: 

484 attr = self._resolve_alias(attr) 

485 if attr in self.default_fields: 

486 fld = self.get_field(attr) 

487 if fld is None: 

488 any2i = lambda x, y: y # type: Callable[..., Any] 

489 else: 

490 any2i = fld.any2i 

491 self.fields[attr] = val if isinstance(val, RawVal) else \ 

492 any2i(self, val) 

493 self.explicit = 0 

494 self.raw_packet_cache = None 

495 self.raw_packet_cache_fields = None 

496 self.wirelen = None 

497 elif attr == "payload": 

498 self.remove_payload() 

499 self.add_payload(val) 

500 else: 

501 self.payload.setfieldval(attr, val) 

502 

503 def __setattr__(self, attr, val): 

504 # type: (str, Any) -> None 

505 if attr in self.__all_slots__: 

506 return object.__setattr__(self, attr, val) 

507 try: 

508 return self.setfieldval(attr, val) 

509 except AttributeError: 

510 pass 

511 return object.__setattr__(self, attr, val) 

512 

513 def delfieldval(self, attr): 

514 # type: (str) -> None 

515 if attr in self.fields: 

516 del self.fields[attr] 

517 self.explicit = 0 # in case a default value must be explicit 

518 self.raw_packet_cache = None 

519 self.raw_packet_cache_fields = None 

520 self.wirelen = None 

521 elif attr in self.default_fields: 

522 pass 

523 elif attr == "payload": 

524 self.remove_payload() 

525 else: 

526 self.payload.delfieldval(attr) 

527 

528 def __delattr__(self, attr): 

529 # type: (str) -> None 

530 if attr == "payload": 

531 return self.remove_payload() 

532 if attr in self.__all_slots__: 

533 return object.__delattr__(self, attr) 

534 try: 

535 return self.delfieldval(attr) 

536 except AttributeError: 

537 pass 

538 return object.__delattr__(self, attr) 

539 

540 def _superdir(self): 

541 # type: () -> Set[str] 

542 """ 

543 Return a list of slots and methods, including those from subclasses. 

544 """ 

545 attrs = set() # type: Set[str] 

546 cls = self.__class__ 

547 if hasattr(cls, '__all_slots__'): 

548 attrs.update(cls.__all_slots__) 

549 for bcls in cls.__mro__: 

550 if hasattr(bcls, '__dict__'): 

551 attrs.update(bcls.__dict__) 

552 return attrs 

553 

554 def __dir__(self): 

555 # type: () -> List[str] 

556 """ 

557 Add fields to tab completion list. 

558 """ 

559 return sorted(itertools.chain(self._superdir(), self.default_fields)) 

560 

561 def __repr__(self): 

562 # type: () -> str 

563 s = "" 

564 ct = conf.color_theme 

565 for f in self.fields_desc: 

566 if isinstance(f, ConditionalField) and not f._evalcond(self): 

567 continue 

568 if f.name in self.fields: 

569 fval = self.fields[f.name] 

570 if isinstance(fval, (list, dict, set)) and len(fval) == 0: 

571 continue 

572 val = f.i2repr(self, fval) 

573 elif f.name in self.overloaded_fields: 

574 fover = self.overloaded_fields[f.name] 

575 if isinstance(fover, (list, dict, set)) and len(fover) == 0: 

576 continue 

577 val = f.i2repr(self, fover) 

578 else: 

579 continue 

580 if isinstance(f, Emph) or f in conf.emph: 

581 ncol = ct.emph_field_name 

582 vcol = ct.emph_field_value 

583 else: 

584 ncol = ct.field_name 

585 vcol = ct.field_value 

586 

587 s += " %s%s%s" % (ncol(f.name), 

588 ct.punct("="), 

589 vcol(val)) 

590 return "%s%s %s %s%s%s" % (ct.punct("<"), 

591 ct.layer_name(self.__class__.__name__), 

592 s, 

593 ct.punct("|"), 

594 repr(self.payload), 

595 ct.punct(">")) 

596 

597 def __str__(self): 

598 # type: () -> str 

599 return self.summary() 

600 

601 def __bytes__(self): 

602 # type: () -> bytes 

603 return self.build() 

604 

605 def __div__(self, other): 

606 # type: (Any) -> Self 

607 if isinstance(other, Packet): 

608 cloneA = self.copy() 

609 cloneB = other.copy() 

610 cloneA.add_payload(cloneB) 

611 return cloneA 

612 elif isinstance(other, (bytes, str, bytearray, memoryview)): 

613 return self / conf.raw_layer(load=bytes_encode(other)) 

614 else: 

615 return other.__rdiv__(self) # type: ignore 

616 __truediv__ = __div__ 

617 

618 def __rdiv__(self, other): 

619 # type: (Any) -> Packet 

620 if isinstance(other, (bytes, str, bytearray, memoryview)): 

621 return conf.raw_layer(load=bytes_encode(other)) / self 

622 else: 

623 raise TypeError 

624 __rtruediv__ = __rdiv__ 

625 

626 def __mul__(self, other): 

627 # type: (Any) -> List[Packet] 

628 if isinstance(other, int): 

629 return [self] * other 

630 else: 

631 raise TypeError 

632 

633 def __rmul__(self, other): 

634 # type: (Any) -> List[Packet] 

635 return self.__mul__(other) 

636 

637 def __nonzero__(self): 

638 # type: () -> bool 

639 return True 

640 __bool__ = __nonzero__ 

641 

642 def __len__(self): 

643 # type: () -> int 

644 return len(self.__bytes__()) 

645 

646 def copy_field_value(self, fieldname, value): 

647 # type: (str, Any) -> Any 

648 return self.get_field(fieldname).do_copy(value) 

649 

650 def copy_fields_dict(self, fields): 

651 # type: (_T) -> _T 

652 if fields is None: 

653 return None 

654 return {fname: self.copy_field_value(fname, fval) 

655 for fname, fval in fields.items()} 

656 

657 def _raw_packet_cache_field_value(self, fld, val, copy=False): 

658 # type: (AnyField, Any, bool) -> Optional[Any] 

659 """Get a value representative of a mutable field to detect changes""" 

660 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any] 

661 if fld.holds_packets: 

662 # avoid copying whole packets (perf: #GH3894) 

663 if fld.islist: 

664 return [ 

665 _cpy(x.fields) for x in val 

666 ] 

667 else: 

668 return _cpy(val.fields) 

669 elif fld.islist or fld.ismutable: 

670 return _cpy(val) 

671 return None 

672 

673 def clear_cache(self): 

674 # type: () -> None 

675 """Clear the raw packet cache for the field and all its subfields""" 

676 self.raw_packet_cache = None 

677 for fname, fval in self.fields.items(): 

678 fld = self.get_field(fname) 

679 if fld.holds_packets: 

680 if isinstance(fval, Packet): 

681 fval.clear_cache() 

682 elif isinstance(fval, list): 

683 for fsubval in fval: 

684 fsubval.clear_cache() 

685 self.payload.clear_cache() 

686 

687 def self_build(self): 

688 # type: () -> bytes 

689 """ 

690 Create the default layer regarding fields_desc dict 

691 

692 :param field_pos_list: 

693 """ 

694 if self.raw_packet_cache is not None and \ 

695 self.raw_packet_cache_fields is not None: 

696 for fname, fval in self.raw_packet_cache_fields.items(): 

697 fld, val = self.getfield_and_val(fname) 

698 if self._raw_packet_cache_field_value(fld, val) != fval: 

699 self.raw_packet_cache = None 

700 self.raw_packet_cache_fields = None 

701 self.wirelen = None 

702 break 

703 if self.raw_packet_cache is not None: 

704 return self.raw_packet_cache 

705 p = b"" 

706 for f in self.fields_desc: 

707 val = self.getfieldval(f.name) 

708 if isinstance(val, RawVal): 

709 p += bytes(val) 

710 else: 

711 try: 

712 p = f.addfield(self, p, val) 

713 except Exception as ex: 

714 try: 

715 ex.args = ( 

716 "While dissecting field '%s': " % f.name + 

717 ex.args[0], 

718 ) + ex.args[1:] 

719 except (AttributeError, IndexError): 

720 pass 

721 raise ex 

722 return p 

723 

724 def do_build_payload(self): 

725 # type: () -> bytes 

726 """ 

727 Create the default version of the payload layer 

728 

729 :return: a string of payload layer 

730 """ 

731 return self.payload.do_build() 

732 

733 def do_build(self): 

734 # type: () -> bytes 

735 """ 

736 Create the default version of the layer 

737 

738 :return: a string of the packet with the payload 

739 """ 

740 if not self.explicit: 

741 self = next(iter(self)) 

742 pkt = self.self_build() 

743 for t in self.post_transforms: 

744 pkt = t(pkt) 

745 pay = self.do_build_payload() 

746 if self.raw_packet_cache is None: 

747 return self.post_build(pkt, pay) 

748 else: 

749 return pkt + pay 

750 

751 def build_padding(self): 

752 # type: () -> bytes 

753 return self.payload.build_padding() 

754 

755 def build(self): 

756 # type: () -> bytes 

757 """ 

758 Create the current layer 

759 

760 :return: string of the packet with the payload 

761 """ 

762 p = self.do_build() 

763 p += self.build_padding() 

764 p = self.build_done(p) 

765 return p 

766 

767 def post_build(self, pkt, pay): 

768 # type: (bytes, bytes) -> bytes 

769 """ 

770 DEV: called right after the current layer is build. 

771 

772 :param str pkt: the current packet (build by self_build function) 

773 :param str pay: the packet payload (build by do_build_payload function) 

774 :return: a string of the packet with the payload 

775 """ 

776 return pkt + pay 

777 

778 def build_done(self, p): 

779 # type: (bytes) -> bytes 

780 return self.payload.build_done(p) 

781 

782 def do_build_ps(self): 

783 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501 

784 p = b"" 

785 pl = [] 

786 q = b"" 

787 for f in self.fields_desc: 

788 if isinstance(f, ConditionalField) and not f._evalcond(self): 

789 continue 

790 p = f.addfield(self, p, self.getfieldval(f.name)) 

791 if isinstance(p, bytes): 

792 r = p[len(q):] 

793 q = p 

794 else: 

795 r = b"" 

796 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r)) 

797 

798 pkt, lst = self.payload.build_ps(internal=1) 

799 p += pkt 

800 lst.append((self, pl)) 

801 

802 return p, lst 

803 

804 def build_ps(self, internal=0): 

805 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501 

806 p, lst = self.do_build_ps() 

807# if not internal: 

808# pkt = self 

809# while pkt.haslayer(conf.padding_layer): 

810# pkt = pkt.getlayer(conf.padding_layer) 

811# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) ) 

812# p += pkt.load 

813# pkt = pkt.payload 

814 return p, lst 

815 

816 def canvas_dump(self, layer_shift=0, rebuild=1): 

817 # type: (int, int) -> pyx.canvas.canvas 

818 if PYX == 0: 

819 raise ImportError("PyX and its dependencies must be installed") 

820 canvas = pyx.canvas.canvas() 

821 if rebuild: 

822 _, t = self.__class__(raw(self)).build_ps() 

823 else: 

824 _, t = self.build_ps() 

825 YTXTI = len(t) 

826 for _, l in t: 

827 YTXTI += len(l) 

828 YTXT = float(YTXTI) 

829 YDUMP = YTXT 

830 

831 XSTART = 1 

832 XDSTART = 10 

833 y = 0.0 

834 yd = 0.0 

835 XMUL = 0.55 

836 YMUL = 0.4 

837 

838 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb) 

839 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb) 

840# backcolor=makecol(0.376, 0.729, 0.525, 1.0) 

841 

842 def hexstr(x): 

843 # type: (bytes) -> str 

844 return " ".join("%02x" % orb(c) for c in x) 

845 

846 def make_dump_txt(x, y, txt): 

847 # type: (int, float, bytes) -> pyx.text.text 

848 return pyx.text.text( 

849 XDSTART + x * XMUL, 

850 (YDUMP - y) * YMUL, 

851 r"\tt{%s}" % hexstr(txt), 

852 [pyx.text.size.Large] 

853 ) 

854 

855 def make_box(o): 

856 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox 

857 return pyx.box.rect( 

858 o.left(), o.bottom(), o.width(), o.height(), 

859 relcenter=(0.5, 0.5) 

860 ) 

861 

862 def make_frame(lst): 

863 # type: (List[Any]) -> pyx.path.path 

864 if len(lst) == 1: 

865 b = lst[0].bbox() 

866 b.enlarge(pyx.unit.u_pt) 

867 return b.path() 

868 else: 

869 fb = lst[0].bbox() 

870 fb.enlarge(pyx.unit.u_pt) 

871 lb = lst[-1].bbox() 

872 lb.enlarge(pyx.unit.u_pt) 

873 if len(lst) == 2 and fb.left() > lb.right(): 

874 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()), 

875 pyx.path.lineto(fb.left(), fb.top()), 

876 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501 

877 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501 

878 pyx.path.moveto(lb.left(), lb.top()), 

879 pyx.path.lineto(lb.right(), lb.top()), 

880 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 

881 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501 

882 else: 

883 # XXX 

884 gb = lst[1].bbox() 

885 if gb != lb: 

886 gb.enlarge(pyx.unit.u_pt) 

887 kb = lst[-2].bbox() 

888 if kb != gb and kb != lb: 

889 kb.enlarge(pyx.unit.u_pt) 

890 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()), 

891 pyx.path.lineto(fb.right(), fb.top()), 

892 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501 

893 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501 

894 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 

895 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501 

896 pyx.path.lineto(lb.left(), gb.top()), 

897 pyx.path.lineto(fb.left(), gb.top()), 

898 pyx.path.closepath(),) 

899 

900 def make_dump(s, # type: bytes 

901 shift=0, # type: int 

902 y=0., # type: float 

903 col=None, # type: pyx.color.color 

904 bkcol=None, # type: pyx.color.color 

905 large=16 # type: int 

906 ): 

907 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501 

908 c = pyx.canvas.canvas() 

909 tlist = [] 

910 while s: 

911 dmp, s = s[:large - shift], s[large - shift:] 

912 txt = make_dump_txt(shift, y, dmp) 

913 tlist.append(txt) 

914 shift += len(dmp) 

915 if shift >= 16: 

916 shift = 0 

917 y += 1 

918 if col is None: 

919 col = pyx.color.rgb.red 

920 if bkcol is None: 

921 bkcol = pyx.color.rgb.white 

922 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501 

923 for txt in tlist: 

924 c.insert(txt) 

925 return c, tlist[-1].bbox(), shift, y 

926 

927 last_shift, last_y = 0, 0.0 

928 while t: 

929 bkcol = next(backcolor) 

930 proto, fields = t.pop() 

931 y += 0.5 

932 pt = pyx.text.text( 

933 XSTART, 

934 (YTXT - y) * YMUL, 

935 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape( 

936 str(proto.name) 

937 ), 

938 [pyx.text.size.Large] 

939 ) 

940 y += 1 

941 ptbb = pt.bbox() 

942 ptbb.enlarge(pyx.unit.u_pt * 2) 

943 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501 

944 canvas.insert(pt) 

945 for field, fval, fdump in fields: 

946 col = next(forecolor) 

947 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501 

948 if isinstance(field, BitField): 

949 fsize = '%sb' % field.size 

950 else: 

951 fsize = '%sB' % len(fdump) 

952 if (hasattr(field, 'field') and 

953 'LE' in field.field.__class__.__name__[:3] or 

954 'LE' in field.__class__.__name__[:3]): 

955 fsize = r'$\scriptstyle\langle$' + fsize 

956 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501 

957 if isinstance(fval, str): 

958 if len(fval) > 18: 

959 fval = fval[:18] + "[...]" 

960 else: 

961 fval = "" 

962 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501 

963 y += 1.0 

964 if fdump: 

965 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501 

966 

967 dtb = target 

968 vtb = vt.bbox() 

969 bxvt = make_box(vtb) 

970 bxdt = make_box(dtb) 

971 dtb.enlarge(pyx.unit.u_pt) 

972 try: 

973 if yd < 0: 

974 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501 

975 else: 

976 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501 

977 except Exception: 

978 pass 

979 else: 

980 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501 

981 

982 canvas.insert(dt) 

983 

984 canvas.insert(ft) 

985 canvas.insert(st) 

986 canvas.insert(vt) 

987 last_y += layer_shift 

988 

989 return canvas 

990 

991 def extract_padding(self, s): 

992 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 

993 """ 

994 DEV: to be overloaded to extract current layer's padding. 

995 

996 :param str s: the current layer 

997 :return: a couple of strings (actual layer, padding) 

998 """ 

999 return s, None 

1000 

1001 def post_dissect(self, s): 

1002 # type: (bytes) -> bytes 

1003 """DEV: is called right after the current layer has been dissected""" 

1004 return s 

1005 

1006 def pre_dissect(self, s): 

1007 # type: (bytes) -> bytes 

1008 """DEV: is called right before the current layer is dissected""" 

1009 return s 

1010 

1011 def do_dissect(self, s): 

1012 # type: (bytes) -> bytes 

1013 _raw = s 

1014 self.raw_packet_cache_fields = {} 

1015 for f in self.fields_desc: 

1016 s, fval = f.getfield(self, s) 

1017 # Skip unused ConditionalField 

1018 if isinstance(f, ConditionalField) and fval is None: 

1019 continue 

1020 # We need to track fields with mutable values to discard 

1021 # .raw_packet_cache when needed. 

1022 if (f.islist or f.holds_packets or f.ismutable) and fval is not None: 

1023 self.raw_packet_cache_fields[f.name] = \ 

1024 self._raw_packet_cache_field_value(f, fval, copy=True) 

1025 self.fields[f.name] = fval 

1026 # Nothing left to dissect 

1027 if not s and (isinstance(f, MayEnd) or 

1028 (fval is not None and isinstance(f, ConditionalField) and 

1029 isinstance(f.fld, MayEnd))): 

1030 break 

1031 self.raw_packet_cache = _raw[:-len(s)] if s else _raw 

1032 self.explicit = 1 

1033 return s 

1034 

1035 def do_dissect_payload(self, s): 

1036 # type: (bytes) -> None 

1037 """ 

1038 Perform the dissection of the layer's payload 

1039 

1040 :param str s: the raw layer 

1041 """ 

1042 if s: 

1043 if ( 

1044 self.stop_dissection_after and 

1045 isinstance(self, self.stop_dissection_after) 

1046 ): 

1047 # stop dissection here 

1048 p = conf.raw_layer(s, _internal=1, _underlayer=self) 

1049 self.add_payload(p) 

1050 return 

1051 cls = self.guess_payload_class(s) 

1052 try: 

1053 p = cls( 

1054 s, 

1055 stop_dissection_after=self.stop_dissection_after, 

1056 _internal=1, 

1057 _underlayer=self, 

1058 ) 

1059 except KeyboardInterrupt: 

1060 raise 

1061 except Exception: 

1062 if conf.debug_dissector: 

1063 if issubtype(cls, Packet): 

1064 log_runtime.error("%s dissector failed", cls.__name__) 

1065 else: 

1066 log_runtime.error("%s.guess_payload_class() returned " 

1067 "[%s]", 

1068 self.__class__.__name__, repr(cls)) 

1069 if cls is not None: 

1070 raise 

1071 p = conf.raw_layer(s, _internal=1, _underlayer=self) 

1072 self.add_payload(p) 

1073 

1074 def dissect(self, s): 

1075 # type: (bytes) -> None 

1076 s = self.pre_dissect(s) 

1077 

1078 s = self.do_dissect(s) 

1079 

1080 s = self.post_dissect(s) 

1081 

1082 payl, pad = self.extract_padding(s) 

1083 self.do_dissect_payload(payl) 

1084 if pad and conf.padding: 

1085 self.add_payload(conf.padding_layer(pad)) 

1086 

1087 def guess_payload_class(self, payload): 

1088 # type: (bytes) -> Type[Packet] 

1089 """ 

1090 DEV: Guesses the next payload class from layer bonds. 

1091 Can be overloaded to use a different mechanism. 

1092 

1093 :param str payload: the layer's payload 

1094 :return: the payload class 

1095 """ 

1096 for t in self.aliastypes: 

1097 for fval, cls in t.payload_guess: 

1098 try: 

1099 if all(v == self.getfieldval(k) 

1100 for k, v in fval.items()): 

1101 return cls # type: ignore 

1102 except AttributeError: 

1103 pass 

1104 return self.default_payload_class(payload) 

1105 

1106 def default_payload_class(self, payload): 

1107 # type: (bytes) -> Type[Packet] 

1108 """ 

1109 DEV: Returns the default payload class if nothing has been found by the 

1110 guess_payload_class() method. 

1111 

1112 :param str payload: the layer's payload 

1113 :return: the default payload class define inside the configuration file 

1114 """ 

1115 return conf.raw_layer 

1116 

1117 def hide_defaults(self): 

1118 # type: () -> None 

1119 """Removes fields' values that are the same as default values.""" 

1120 # use list(): self.fields is modified in the loop 

1121 for k, v in list(self.fields.items()): 

1122 v = self.fields[k] 

1123 if k in self.default_fields: 

1124 if self.default_fields[k] == v: 

1125 del self.fields[k] 

1126 self.payload.hide_defaults() 

1127 

1128 def clone_with(self, payload=None, **kargs): 

1129 # type: (Optional[Any], **Any) -> Any 

1130 pkt = self.__class__() 

1131 pkt.explicit = 1 

1132 pkt.fields = kargs 

1133 pkt.default_fields = self.copy_fields_dict(self.default_fields) 

1134 pkt.overloaded_fields = self.overloaded_fields.copy() 

1135 pkt.time = self.time 

1136 pkt.underlayer = self.underlayer 

1137 pkt.parent = self.parent 

1138 pkt.post_transforms = self.post_transforms 

1139 pkt.raw_packet_cache = self.raw_packet_cache 

1140 pkt.raw_packet_cache_fields = self.copy_fields_dict( 

1141 self.raw_packet_cache_fields 

1142 ) 

1143 pkt.wirelen = self.wirelen 

1144 pkt.comment = self.comment 

1145 pkt.sniffed_on = self.sniffed_on 

1146 pkt.direction = self.direction 

1147 if payload is not None: 

1148 pkt.add_payload(payload) 

1149 return pkt 

1150 

1151 def __iter__(self): 

1152 # type: () -> Iterator[Packet] 

1153 """Iterates through all sub-packets generated by this Packet.""" 

1154 def loop(todo, done, self=self): 

1155 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet] 

1156 if todo: 

1157 eltname = todo.pop() 

1158 elt = self.getfieldval(eltname) 

1159 if not isinstance(elt, Gen): 

1160 if self.get_field(eltname).islist: 

1161 elt = SetGen([elt]) 

1162 else: 

1163 elt = SetGen(elt) 

1164 for e in elt: 

1165 done[eltname] = e 

1166 for x in loop(todo[:], done): 

1167 yield x 

1168 else: 

1169 if isinstance(self.payload, NoPayload): 

1170 payloads = SetGen([None]) # type: SetGen[Packet] 

1171 else: 

1172 payloads = self.payload 

1173 for payl in payloads: 

1174 # Let's make sure subpackets are consistent 

1175 done2 = done.copy() 

1176 for k in done2: 

1177 if isinstance(done2[k], VolatileValue): 

1178 done2[k] = done2[k]._fix() 

1179 pkt = self.clone_with(payload=payl, **done2) 

1180 yield pkt 

1181 

1182 if self.explicit or self.raw_packet_cache is not None: 

1183 todo = [] 

1184 done = self.fields 

1185 else: 

1186 todo = [k for (k, v) in itertools.chain(self.default_fields.items(), 

1187 self.overloaded_fields.items()) 

1188 if isinstance(v, VolatileValue)] + list(self.fields) 

1189 done = {} 

1190 return loop(todo, done) 

1191 

1192 def iterpayloads(self): 

1193 # type: () -> Iterator[Packet] 

1194 """Used to iter through the payloads of a Packet. 

1195 Useful for DNS or 802.11 for instance. 

1196 """ 

1197 yield self 

1198 current = self 

1199 while current.payload: 

1200 current = current.payload 

1201 yield current 

1202 

1203 def __gt__(self, other): 

1204 # type: (Packet) -> int 

1205 """True if other is an answer from self (self ==> other).""" 

1206 if isinstance(other, Packet): 

1207 return other < self 

1208 elif isinstance(other, bytes): 

1209 return 1 

1210 else: 

1211 raise TypeError((self, other)) 

1212 

1213 def __lt__(self, other): 

1214 # type: (Packet) -> int 

1215 """True if self is an answer from other (other ==> self).""" 

1216 if isinstance(other, Packet): 

1217 return self.answers(other) 

1218 elif isinstance(other, bytes): 

1219 return 1 

1220 else: 

1221 raise TypeError((self, other)) 

1222 

1223 def __eq__(self, other): 

1224 # type: (Any) -> bool 

1225 if not isinstance(other, self.__class__): 

1226 return False 

1227 for f in self.fields_desc: 

1228 if f not in other.fields_desc: 

1229 return False 

1230 if self.getfieldval(f.name) != other.getfieldval(f.name): 

1231 return False 

1232 return self.payload == other.payload 

1233 

1234 def __ne__(self, other): 

1235 # type: (Any) -> bool 

1236 return not self.__eq__(other) 

1237 

1238 # Note: setting __hash__ to None is the standard way 

1239 # of making an object un-hashable. mypy doesn't know that 

1240 __hash__ = None # type: ignore 

1241 

1242 def hashret(self): 

1243 # type: () -> bytes 

1244 """DEV: returns a string that has the same value for a request 

1245 and its answer.""" 

1246 return self.payload.hashret() 

1247 

1248 def answers(self, other): 

1249 # type: (Packet) -> int 

1250 """DEV: true if self is an answer from other""" 

1251 if other.__class__ == self.__class__: 

1252 return self.payload.answers(other.payload) 

1253 return 0 

1254 

1255 def layers(self): 

1256 # type: () -> List[Type[Packet]] 

1257 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501 

1258 layers = [] 

1259 lyr = self # type: Optional[Packet] 

1260 while lyr: 

1261 layers.append(lyr.__class__) 

1262 lyr = lyr.payload.getlayer(0, _subclass=True) 

1263 return layers 

1264 

1265 def haslayer(self, cls, _subclass=None): 

1266 # type: (Union[Type[Packet], str], Optional[bool]) -> int 

1267 """ 

1268 true if self has a layer that is an instance of cls. 

1269 Superseded by "cls in self" syntax. 

1270 """ 

1271 if _subclass is None: 

1272 _subclass = self.match_subclass or None 

1273 if _subclass: 

1274 match = issubtype 

1275 else: 

1276 match = lambda x, t: bool(x == t) 

1277 if cls is None or match(self.__class__, cls) \ 

1278 or cls in [self.__class__.__name__, self._name]: 

1279 return True 

1280 for f in self.packetfields: 

1281 fvalue_gen = self.getfieldval(f.name) 

1282 if fvalue_gen is None: 

1283 continue 

1284 if not f.islist: 

1285 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 

1286 for fvalue in fvalue_gen: 

1287 if isinstance(fvalue, Packet): 

1288 ret = fvalue.haslayer(cls, _subclass=_subclass) 

1289 if ret: 

1290 return ret 

1291 return self.payload.haslayer(cls, _subclass=_subclass) 

1292 

1293 def getlayer(self, 

1294 cls, # type: Union[int, Type[Packet], str] 

1295 nb=1, # type: int 

1296 _track=None, # type: Optional[List[int]] 

1297 _subclass=None, # type: Optional[bool] 

1298 **flt # type: Any 

1299 ): 

1300 # type: (...) -> Optional[Packet] 

1301 """Return the nb^th layer that is an instance of cls, matching flt 

1302values. 

1303 """ 

1304 if _subclass is None: 

1305 _subclass = self.match_subclass or None 

1306 if _subclass: 

1307 match = issubtype 

1308 else: 

1309 match = lambda x, t: bool(x == t) 

1310 # Note: 

1311 # cls can be int, packet, str 

1312 # string_class_name can be packet, str (packet or packet+field) 

1313 # class_name can be packet, str (packet only) 

1314 if isinstance(cls, int): 

1315 nb = cls + 1 

1316 string_class_name = "" # type: Union[Type[Packet], str] 

1317 else: 

1318 string_class_name = cls 

1319 class_name = "" # type: Union[Type[Packet], str] 

1320 fld = None # type: Optional[str] 

1321 if isinstance(string_class_name, str) and "." in string_class_name: 

1322 class_name, fld = string_class_name.split(".", 1) 

1323 else: 

1324 class_name, fld = string_class_name, None 

1325 if not class_name or match(self.__class__, class_name) \ 

1326 or class_name in [self.__class__.__name__, self._name]: 

1327 if all(self.getfieldval(fldname) == fldvalue 

1328 for fldname, fldvalue in flt.items()): 

1329 if nb == 1: 

1330 if fld is None: 

1331 return self 

1332 else: 

1333 return self.getfieldval(fld) # type: ignore 

1334 else: 

1335 nb -= 1 

1336 for f in self.packetfields: 

1337 fvalue_gen = self.getfieldval(f.name) 

1338 if fvalue_gen is None: 

1339 continue 

1340 if not f.islist: 

1341 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 

1342 for fvalue in fvalue_gen: 

1343 if isinstance(fvalue, Packet): 

1344 track = [] # type: List[int] 

1345 ret = fvalue.getlayer(class_name, nb=nb, _track=track, 

1346 _subclass=_subclass, **flt) 

1347 if ret is not None: 

1348 return ret 

1349 nb = track[0] 

1350 return self.payload.getlayer(class_name, nb=nb, _track=_track, 

1351 _subclass=_subclass, **flt) 

1352 

1353 def firstlayer(self): 

1354 # type: () -> Packet 

1355 q = self 

1356 while q.underlayer is not None: 

1357 q = q.underlayer 

1358 return q 

1359 

1360 def __getitem__(self, cls): 

1361 # type: (Union[Type[Packet], str]) -> Any 

1362 if isinstance(cls, slice): 

1363 lname = cls.start 

1364 if cls.stop: 

1365 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {})) 

1366 else: 

1367 ret = self.getlayer(cls.start, **(cls.step or {})) 

1368 else: 

1369 lname = cls 

1370 ret = self.getlayer(cls) 

1371 if ret is None: 

1372 if isinstance(lname, type): 

1373 name = lname.__name__ 

1374 elif not isinstance(lname, bytes): 

1375 name = repr(lname) 

1376 else: 

1377 name = cast(str, lname) 

1378 raise IndexError("Layer [%s] not found" % name) 

1379 return ret 

1380 

1381 def __delitem__(self, cls): 

1382 # type: (Type[Packet]) -> None 

1383 del self[cls].underlayer.payload 

1384 

1385 def __setitem__(self, cls, val): 

1386 # type: (Type[Packet], Packet) -> None 

1387 self[cls].underlayer.payload = val 

1388 

1389 def __contains__(self, cls): 

1390 # type: (Union[Type[Packet], str]) -> int 

1391 """ 

1392 "cls in self" returns true if self has a layer which is an 

1393 instance of cls. 

1394 """ 

1395 return self.haslayer(cls) 

1396 

1397 def route(self): 

1398 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]] 

1399 return self.payload.route() 

1400 

1401 def fragment(self, *args, **kargs): 

1402 # type: (*Any, **Any) -> List[Packet] 

1403 return self.payload.fragment(*args, **kargs) 

1404 

1405 def display(self, *args, **kargs): # Deprecated. Use show() 

1406 # type: (*Any, **Any) -> None 

1407 """Deprecated. Use show() method.""" 

1408 self.show(*args, **kargs) 

1409 

1410 def _show_or_dump(self, 

1411 dump=False, # type: bool 

1412 indent=3, # type: int 

1413 lvl="", # type: str 

1414 label_lvl="", # type: str 

1415 first_call=True # type: bool 

1416 ): 

1417 # type: (...) -> Optional[str] 

1418 """ 

1419 Internal method that shows or dumps a hierarchical view of a packet. 

1420 Called by show. 

1421 

1422 :param dump: determine if it prints or returns the string value 

1423 :param int indent: the size of indentation for each layer 

1424 :param str lvl: additional information about the layer lvl 

1425 :param str label_lvl: additional information about the layer fields 

1426 :param first_call: determine if the current function is the first 

1427 :return: return a hierarchical view if dump, else print it 

1428 """ 

1429 

1430 if dump: 

1431 from scapy.themes import ColorTheme, AnsiColorTheme 

1432 ct: ColorTheme = AnsiColorTheme() # No color for dump output 

1433 else: 

1434 ct = conf.color_theme 

1435 s = "%s%s %s %s\n" % (label_lvl, 

1436 ct.punct("###["), 

1437 ct.layer_name(self.name), 

1438 ct.punct("]###")) 

1439 fields = self.fields_desc.copy() 

1440 while fields: 

1441 f = fields.pop(0) 

1442 if isinstance(f, ConditionalField) and not f._evalcond(self): 

1443 continue 

1444 if hasattr(f, "fields"): # Field has subfields 

1445 s += "%s %s =\n" % ( 

1446 label_lvl + lvl, 

1447 ct.depreciate_field_name(f.name), 

1448 ) 

1449 lvl += " " * indent * self.show_indent 

1450 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)): 

1451 fields.insert(i, fld) 

1452 continue 

1453 if isinstance(f, Emph) or f in conf.emph: 

1454 ncol = ct.emph_field_name 

1455 vcol = ct.emph_field_value 

1456 else: 

1457 ncol = ct.field_name 

1458 vcol = ct.field_value 

1459 pad = max(0, 10 - len(f.name)) * " " 

1460 fvalue = self.getfieldval(f.name) 

1461 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501 

1462 s += "%s %s%s%s%s\n" % (label_lvl + lvl, 

1463 ct.punct("\\"), 

1464 ncol(f.name), 

1465 pad, 

1466 ct.punct("\\")) 

1467 fvalue_gen = SetGen( 

1468 fvalue, 

1469 _iterpacket=0 

1470 ) # type: SetGen[Packet] 

1471 for fvalue in fvalue_gen: 

1472 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501 

1473 else: 

1474 begn = "%s %s%s%s " % (label_lvl + lvl, 

1475 ncol(f.name), 

1476 pad, 

1477 ct.punct("="),) 

1478 reprval = f.i2repr(self, fvalue) 

1479 if isinstance(reprval, str): 

1480 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501 

1481 len(lvl) + 

1482 len(f.name) + 

1483 4)) 

1484 s += "%s%s\n" % (begn, vcol(reprval)) 

1485 if self.payload: 

1486 s += self.payload._show_or_dump( # type: ignore 

1487 dump=dump, 

1488 indent=indent, 

1489 lvl=lvl + (" " * indent * self.show_indent), 

1490 label_lvl=label_lvl, 

1491 first_call=False 

1492 ) 

1493 

1494 if first_call and not dump: 

1495 print(s) 

1496 return None 

1497 else: 

1498 return s 

1499 

1500 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 

1501 # type: (bool, int, str, str) -> Optional[Any] 

1502 """ 

1503 Prints or returns (when "dump" is true) a hierarchical view of the 

1504 packet. 

1505 

1506 :param dump: determine if it prints or returns the string value 

1507 :param int indent: the size of indentation for each layer 

1508 :param str lvl: additional information about the layer lvl 

1509 :param str label_lvl: additional information about the layer fields 

1510 :return: return a hierarchical view if dump, else print it 

1511 """ 

1512 return self._show_or_dump(dump, indent, lvl, label_lvl) 

1513 

1514 def show2(self, dump=False, indent=3, lvl="", label_lvl=""): 

1515 # type: (bool, int, str, str) -> Optional[Any] 

1516 """ 

1517 Prints or returns (when "dump" is true) a hierarchical view of an 

1518 assembled version of the packet, so that automatic fields are 

1519 calculated (checksums, etc.) 

1520 

1521 :param dump: determine if it prints or returns the string value 

1522 :param int indent: the size of indentation for each layer 

1523 :param str lvl: additional information about the layer lvl 

1524 :param str label_lvl: additional information about the layer fields 

1525 :return: return a hierarchical view if dump, else print it 

1526 """ 

1527 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl) 

1528 

1529 def sprintf(self, fmt, relax=1): 

1530 # type: (str, int) -> str 

1531 """ 

1532 sprintf(format, [relax=1]) -> str 

1533 

1534 Where format is a string that can include directives. A directive 

1535 begins and ends by % and has the following format: 

1536 ``%[fmt[r],][cls[:nb].]field%`` 

1537 

1538 :param fmt: is a classic printf directive, "r" can be appended for raw 

1539 substitution: 

1540 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer 

1541 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer). 

1542 Special case : "%.time%" is the creation time. 

1543 Ex:: 

1544 

1545 p.sprintf( 

1546 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% " 

1547 "%03xr,IP.proto% %r,TCP.flags%" 

1548 ) 

1549 

1550 Moreover, the format string can include conditional statements. A 

1551 conditional statement looks like : {layer:string} where layer is a 

1552 layer name, and string is the string to insert in place of the 

1553 condition if it is true, i.e. if layer is present. If layer is 

1554 preceded by a "!", the result is inverted. Conditions can be 

1555 imbricated. A valid statement can be:: 

1556 

1557 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet") 

1558 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}") 

1559 

1560 A side effect is that, to obtain "{" and "}" characters, you must use 

1561 "%(" and "%)". 

1562 """ 

1563 

1564 escape = {"%": "%", 

1565 "(": "{", 

1566 ")": "}"} 

1567 

1568 # Evaluate conditions 

1569 while "{" in fmt: 

1570 i = fmt.rindex("{") 

1571 j = fmt[i + 1:].index("}") 

1572 cond = fmt[i + 1:i + j + 1] 

1573 k = cond.find(":") 

1574 if k < 0: 

1575 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501 

1576 cond, format_ = cond[:k], cond[k + 1:] 

1577 res = False 

1578 if cond[0] == "!": 

1579 res = True 

1580 cond = cond[1:] 

1581 if self.haslayer(cond): 

1582 res = not res 

1583 if not res: 

1584 format_ = "" 

1585 fmt = fmt[:i] + format_ + fmt[i + j + 2:] 

1586 

1587 # Evaluate directives 

1588 s = "" 

1589 while "%" in fmt: 

1590 i = fmt.index("%") 

1591 s += fmt[:i] 

1592 fmt = fmt[i + 1:] 

1593 if fmt and fmt[0] in escape: 

1594 s += escape[fmt[0]] 

1595 fmt = fmt[1:] 

1596 continue 

1597 try: 

1598 i = fmt.index("%") 

1599 sfclsfld = fmt[:i] 

1600 fclsfld = sfclsfld.split(",") 

1601 if len(fclsfld) == 1: 

1602 f = "s" 

1603 clsfld = fclsfld[0] 

1604 elif len(fclsfld) == 2: 

1605 f, clsfld = fclsfld 

1606 else: 

1607 raise Scapy_Exception 

1608 if "." in clsfld: 

1609 cls, fld = clsfld.split(".") 

1610 else: 

1611 cls = self.__class__.__name__ 

1612 fld = clsfld 

1613 num = 1 

1614 if ":" in cls: 

1615 cls, snum = cls.split(":") 

1616 num = int(snum) 

1617 fmt = fmt[i + 1:] 

1618 except Exception: 

1619 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501 

1620 else: 

1621 if fld == "time": 

1622 val = time.strftime( 

1623 "%H:%M:%S.%%06i", 

1624 time.localtime(float(self.time)) 

1625 ) % int((self.time - int(self.time)) * 1000000) 

1626 elif cls == self.__class__.__name__ and hasattr(self, fld): 

1627 if num > 1: 

1628 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501 

1629 f = "s" 

1630 else: 

1631 try: 

1632 val = self.getfieldval(fld) 

1633 except AttributeError: 

1634 val = getattr(self, fld) 

1635 if f[-1] == "r": # Raw field value 

1636 f = f[:-1] 

1637 if not f: 

1638 f = "s" 

1639 else: 

1640 if fld in self.fieldtype: 

1641 val = self.fieldtype[fld].i2repr(self, val) 

1642 else: 

1643 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax) 

1644 f = "s" 

1645 s += ("%" + f) % val 

1646 

1647 s += fmt 

1648 return s 

1649 

1650 def mysummary(self): 

1651 # type: () -> str 

1652 """DEV: can be overloaded to return a string that summarizes the layer. 

1653 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501 

1654 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501 

1655 mysummary() must be called if they are present.""" 

1656 return "" 

1657 

1658 def _do_summary(self): 

1659 # type: () -> Tuple[int, str, List[Any]] 

1660 found, s, needed = self.payload._do_summary() 

1661 ret = "" 

1662 if not found or self.__class__ in needed: 

1663 ret = self.mysummary() 

1664 if isinstance(ret, tuple): 

1665 ret, n = ret 

1666 needed += n 

1667 if ret or needed: 

1668 found = 1 

1669 if not ret: 

1670 ret = self.__class__.__name__ if self.show_summary else "" 

1671 if self.__class__ in conf.emph: 

1672 impf = [] 

1673 for f in self.fields_desc: 

1674 if f in conf.emph: 

1675 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501 

1676 ret = "%s [%s]" % (ret, " ".join(impf)) 

1677 if ret and s: 

1678 ret = "%s / %s" % (ret, s) 

1679 else: 

1680 ret = "%s%s" % (ret, s) 

1681 return found, ret, needed 

1682 

1683 def summary(self, intern=0): 

1684 # type: (int) -> str 

1685 """Prints a one line summary of a packet.""" 

1686 return self._do_summary()[1] 

1687 

1688 def lastlayer(self, layer=None): 

1689 # type: (Optional[Packet]) -> Packet 

1690 """Returns the uppest layer of the packet""" 

1691 return self.payload.lastlayer(self) 

1692 

1693 def decode_payload_as(self, cls): 

1694 # type: (Type[Packet]) -> None 

1695 """Reassembles the payload and decode it using another packet class""" 

1696 s = raw(self.payload) 

1697 self.payload = cls(s, _internal=1, _underlayer=self) 

1698 pp = self 

1699 while pp.underlayer is not None: 

1700 pp = pp.underlayer 

1701 self.payload.dissection_done(pp) 

1702 

1703 def _command(self, json=False): 

1704 # type: (bool) -> List[Tuple[str, Any]] 

1705 """ 

1706 Internal method used to generate command() and json() 

1707 """ 

1708 f = [] 

1709 iterator: Iterator[Tuple[str, Any]] 

1710 if json: 

1711 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc) 

1712 else: 

1713 iterator = iter(self.fields.items()) 

1714 for fn, fv in iterator: 

1715 fld = self.get_field(fn) 

1716 if isinstance(fv, (list, dict, set)) and not fv and not fld.default: 

1717 continue 

1718 if isinstance(fv, Packet): 

1719 if json: 

1720 fv = {k: v for (k, v) in fv._command(json=True)} 

1721 else: 

1722 fv = fv.command() 

1723 elif fld.islist and fld.holds_packets and isinstance(fv, list): 

1724 if json: 

1725 fv = [ 

1726 {k: v for (k, v) in x} 

1727 for x in map(lambda y: Packet._command(y, json=True), fv) 

1728 ] 

1729 else: 

1730 fv = "[%s]" % ",".join(map(Packet.command, fv)) 

1731 elif fld.islist and isinstance(fv, list): 

1732 if json: 

1733 fv = [ 

1734 getattr(x, 'command', lambda: repr(x))() 

1735 for x in fv 

1736 ] 

1737 else: 

1738 fv = "[%s]" % ",".join( 

1739 getattr(x, 'command', lambda: repr(x))() 

1740 for x in fv 

1741 ) 

1742 elif isinstance(fv, FlagValue): 

1743 fv = int(fv) 

1744 elif callable(getattr(fv, 'command', None)): 

1745 fv = fv.command(json=json) 

1746 else: 

1747 if json: 

1748 if isinstance(fv, bytes): 

1749 fv = fv.decode("utf-8", errors="backslashreplace") 

1750 else: 

1751 fv = fld.i2h(self, fv) 

1752 else: 

1753 fv = repr(fld.i2h(self, fv)) 

1754 f.append((fn, fv)) 

1755 return f 

1756 

1757 def command(self): 

1758 # type: () -> str 

1759 """ 

1760 Returns a string representing the command you have to type to 

1761 obtain the same packet 

1762 """ 

1763 c = "%s(%s)" % ( 

1764 self.__class__.__name__, 

1765 ", ".join("%s=%s" % x for x in self._command()) 

1766 ) 

1767 pc = self.payload.command() 

1768 if pc: 

1769 c += "/" + pc 

1770 return c 

1771 

1772 def json(self): 

1773 # type: () -> str 

1774 """ 

1775 Returns a JSON representing the packet. 

1776 

1777 Please note that this cannot be used for bijective usage: data loss WILL occur, 

1778 so it will not make sense to try to rebuild the packet from the output. 

1779 This must only be used for a grepping/displaying purpose. 

1780 """ 

1781 dump = json.dumps({k: v for (k, v) in self._command(json=True)}) 

1782 pc = self.payload.json() 

1783 if pc: 

1784 dump = dump[:-1] + ", \"payload\": %s}" % pc 

1785 return dump 

1786 

1787 

1788class NoPayload(Packet): 

1789 def __new__(cls, *args, **kargs): 

1790 # type: (Type[Packet], *Any, **Any) -> NoPayload 

1791 singl = cls.__dict__.get("__singl__") 

1792 if singl is None: 

1793 cls.__singl__ = singl = Packet.__new__(cls) 

1794 Packet.__init__(singl) 

1795 return cast(NoPayload, singl) 

1796 

1797 def __init__(self, *args, **kargs): 

1798 # type: (*Any, **Any) -> None 

1799 pass 

1800 

1801 def dissection_done(self, pkt): 

1802 # type: (Packet) -> None 

1803 pass 

1804 

1805 def add_payload(self, payload): 

1806 # type: (Union[Packet, bytes]) -> NoReturn 

1807 raise Scapy_Exception("Can't add payload to NoPayload instance") 

1808 

1809 def remove_payload(self): 

1810 # type: () -> None 

1811 pass 

1812 

1813 def add_underlayer(self, underlayer): 

1814 # type: (Any) -> None 

1815 pass 

1816 

1817 def remove_underlayer(self, other): 

1818 # type: (Packet) -> None 

1819 pass 

1820 

1821 def add_parent(self, parent): 

1822 # type: (Any) -> None 

1823 pass 

1824 

1825 def remove_parent(self, other): 

1826 # type: (Packet) -> None 

1827 pass 

1828 

1829 def copy(self): 

1830 # type: () -> NoPayload 

1831 return self 

1832 

1833 def clear_cache(self): 

1834 # type: () -> None 

1835 pass 

1836 

1837 def __repr__(self): 

1838 # type: () -> str 

1839 return "" 

1840 

1841 def __str__(self): 

1842 # type: () -> str 

1843 return "" 

1844 

1845 def __bytes__(self): 

1846 # type: () -> bytes 

1847 return b"" 

1848 

1849 def __nonzero__(self): 

1850 # type: () -> bool 

1851 return False 

1852 __bool__ = __nonzero__ 

1853 

1854 def do_build(self): 

1855 # type: () -> bytes 

1856 return b"" 

1857 

1858 def build(self): 

1859 # type: () -> bytes 

1860 return b"" 

1861 

1862 def build_padding(self): 

1863 # type: () -> bytes 

1864 return b"" 

1865 

1866 def build_done(self, p): 

1867 # type: (bytes) -> bytes 

1868 return p 

1869 

1870 def build_ps(self, internal=0): 

1871 # type: (int) -> Tuple[bytes, List[Any]] 

1872 return b"", [] 

1873 

1874 def getfieldval(self, attr): 

1875 # type: (str) -> NoReturn 

1876 raise AttributeError(attr) 

1877 

1878 def getfield_and_val(self, attr): 

1879 # type: (str) -> NoReturn 

1880 raise AttributeError(attr) 

1881 

1882 def setfieldval(self, attr, val): 

1883 # type: (str, Any) -> NoReturn 

1884 raise AttributeError(attr) 

1885 

1886 def delfieldval(self, attr): 

1887 # type: (str) -> NoReturn 

1888 raise AttributeError(attr) 

1889 

1890 def hide_defaults(self): 

1891 # type: () -> None 

1892 pass 

1893 

1894 def __iter__(self): 

1895 # type: () -> Iterator[Packet] 

1896 return iter([]) 

1897 

1898 def __eq__(self, other): 

1899 # type: (Any) -> bool 

1900 if isinstance(other, NoPayload): 

1901 return True 

1902 return False 

1903 

1904 def hashret(self): 

1905 # type: () -> bytes 

1906 return b"" 

1907 

1908 def answers(self, other): 

1909 # type: (Packet) -> bool 

1910 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501 

1911 

1912 def haslayer(self, cls, _subclass=None): 

1913 # type: (Union[Type[Packet], str], Optional[bool]) -> int 

1914 return 0 

1915 

1916 def getlayer(self, 

1917 cls, # type: Union[int, Type[Packet], str] 

1918 nb=1, # type: int 

1919 _track=None, # type: Optional[List[int]] 

1920 _subclass=None, # type: Optional[bool] 

1921 **flt # type: Any 

1922 ): 

1923 # type: (...) -> Optional[Packet] 

1924 if _track is not None: 

1925 _track.append(nb) 

1926 return None 

1927 

1928 def fragment(self, *args, **kargs): 

1929 # type: (*Any, **Any) -> List[Packet] 

1930 raise Scapy_Exception("cannot fragment this packet") 

1931 

1932 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 

1933 # type: (bool, int, str, str) -> None 

1934 pass 

1935 

1936 def sprintf(self, fmt, relax=1): 

1937 # type: (str, int) -> str 

1938 if relax: 

1939 return "??" 

1940 else: 

1941 raise Scapy_Exception("Format not found [%s]" % fmt) 

1942 

1943 def _do_summary(self): 

1944 # type: () -> Tuple[int, str, List[Any]] 

1945 return 0, "", [] 

1946 

1947 def layers(self): 

1948 # type: () -> List[Type[Packet]] 

1949 return [] 

1950 

1951 def lastlayer(self, layer=None): 

1952 # type: (Optional[Packet]) -> Packet 

1953 return layer or self 

1954 

1955 def command(self): 

1956 # type: () -> str 

1957 return "" 

1958 

1959 def json(self): 

1960 # type: () -> str 

1961 return "" 

1962 

1963 def route(self): 

1964 # type: () -> Tuple[None, None, None] 

1965 return (None, None, None) 

1966 

1967 

1968#################### 

1969# packet classes # 

1970#################### 

1971 

1972 

1973class Raw(Packet): 

1974 name = "Raw" 

1975 fields_desc = [StrField("load", b"")] 

1976 

1977 def __init__(self, _pkt=b"", *args, **kwargs): 

1978 # type: (bytes, *Any, **Any) -> None 

1979 if _pkt and not isinstance(_pkt, bytes): 

1980 if isinstance(_pkt, tuple): 

1981 _pkt, bn = _pkt 

1982 _pkt = bytes_encode(_pkt), bn 

1983 else: 

1984 _pkt = bytes_encode(_pkt) 

1985 super(Raw, self).__init__(_pkt, *args, **kwargs) 

1986 

1987 def answers(self, other): 

1988 # type: (Packet) -> int 

1989 return 1 

1990 

1991 def mysummary(self): 

1992 # type: () -> str 

1993 cs = conf.raw_summary 

1994 if cs: 

1995 if callable(cs): 

1996 return "Raw %s" % cs(self.load) 

1997 else: 

1998 return "Raw %r" % self.load 

1999 return Packet.mysummary(self) 

2000 

2001 

2002class Padding(Raw): 

2003 name = "Padding" 

2004 

2005 def self_build(self, field_pos_list=None): 

2006 # type: (Optional[Any]) -> bytes 

2007 return b"" 

2008 

2009 def build_padding(self): 

2010 # type: () -> bytes 

2011 return ( 

2012 bytes_encode(self.load) if self.raw_packet_cache is None 

2013 else self.raw_packet_cache 

2014 ) + self.payload.build_padding() 

2015 

2016 

2017conf.raw_layer = Raw 

2018conf.padding_layer = Padding 

2019if conf.default_l2 is None: 

2020 conf.default_l2 = Raw 

2021 

2022################# 

2023# Bind layers # 

2024################# 

2025 

2026 

2027def bind_bottom_up(lower, # type: Type[Packet] 

2028 upper, # type: Type[Packet] 

2029 __fval=None, # type: Optional[Any] 

2030 **fval # type: Any 

2031 ): 

2032 # type: (...) -> None 

2033 r"""Bind 2 layers for dissection. 

2034 The upper layer will be chosen for dissection on top of the lower layer, if 

2035 ALL the passed arguments are validated. If multiple calls are made with 

2036 the same layers, the last one will be used as default. 

2037 

2038 ex: 

2039 >>> bind_bottom_up(Ether, SNAP, type=0x1234) 

2040 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501 

2041 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501 

2042 """ 

2043 if __fval is not None: 

2044 fval.update(__fval) 

2045 lower.payload_guess = lower.payload_guess[:] 

2046 lower.payload_guess.append((fval, upper)) 

2047 

2048 

2049def bind_top_down(lower, # type: Type[Packet] 

2050 upper, # type: Type[Packet] 

2051 __fval=None, # type: Optional[Any] 

2052 **fval # type: Any 

2053 ): 

2054 # type: (...) -> None 

2055 """Bind 2 layers for building. 

2056 When the upper layer is added as a payload of the lower layer, all the 

2057 arguments will be applied to them. 

2058 

2059 ex: 

2060 >>> bind_top_down(Ether, SNAP, type=0x1234) 

2061 >>> Ether()/SNAP() 

2062 <Ether type=0x1234 |<SNAP |>> 

2063 """ 

2064 if __fval is not None: 

2065 fval.update(__fval) 

2066 upper._overload_fields = upper._overload_fields.copy() # type: ignore 

2067 upper._overload_fields[lower] = fval 

2068 

2069 

2070@conf.commands.register 

2071def bind_layers(lower, # type: Type[Packet] 

2072 upper, # type: Type[Packet] 

2073 __fval=None, # type: Optional[Dict[str, int]] 

2074 **fval # type: Any 

2075 ): 

2076 # type: (...) -> None 

2077 """Bind 2 layers on some specific fields' values. 

2078 

2079 It makes the packet being built and dissected when the arguments 

2080 are present. 

2081 

2082 This function calls both bind_bottom_up and bind_top_down, with 

2083 all passed arguments. 

2084 

2085 Please have a look at their docs: 

2086 - help(bind_bottom_up) 

2087 - help(bind_top_down) 

2088 """ 

2089 if __fval is not None: 

2090 fval.update(__fval) 

2091 bind_top_down(lower, upper, **fval) 

2092 bind_bottom_up(lower, upper, **fval) 

2093 

2094 

2095def split_bottom_up(lower, # type: Type[Packet] 

2096 upper, # type: Type[Packet] 

2097 __fval=None, # type: Optional[Any] 

2098 **fval # type: Any 

2099 ): 

2100 # type: (...) -> None 

2101 """This call un-links an association that was made using bind_bottom_up. 

2102 Have a look at help(bind_bottom_up) 

2103 """ 

2104 if __fval is not None: 

2105 fval.update(__fval) 

2106 

2107 def do_filter(params, cls): 

2108 # type: (Dict[str, int], Type[Packet]) -> bool 

2109 params_is_invalid = any( 

2110 k not in params or params[k] != v for k, v in fval.items() 

2111 ) 

2112 return cls != upper or params_is_invalid 

2113 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)] 

2114 

2115 

2116def split_top_down(lower, # type: Type[Packet] 

2117 upper, # type: Type[Packet] 

2118 __fval=None, # type: Optional[Any] 

2119 **fval # type: Any 

2120 ): 

2121 # type: (...) -> None 

2122 """This call un-links an association that was made using bind_top_down. 

2123 Have a look at help(bind_top_down) 

2124 """ 

2125 if __fval is not None: 

2126 fval.update(__fval) 

2127 if lower in upper._overload_fields: 

2128 ofval = upper._overload_fields[lower] 

2129 if any(k not in ofval or ofval[k] != v for k, v in fval.items()): 

2130 return 

2131 upper._overload_fields = upper._overload_fields.copy() # type: ignore 

2132 del upper._overload_fields[lower] 

2133 

2134 

2135@conf.commands.register 

2136def split_layers(lower, # type: Type[Packet] 

2137 upper, # type: Type[Packet] 

2138 __fval=None, # type: Optional[Any] 

2139 **fval # type: Any 

2140 ): 

2141 # type: (...) -> None 

2142 """Split 2 layers previously bound. 

2143 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501 

2144 bind_layers. 

2145 

2146 Please have a look at their docs: 

2147 - help(split_bottom_up) 

2148 - help(split_top_down) 

2149 """ 

2150 if __fval is not None: 

2151 fval.update(__fval) 

2152 split_bottom_up(lower, upper, **fval) 

2153 split_top_down(lower, upper, **fval) 

2154 

2155 

2156@conf.commands.register 

2157def explore(layer=None): 

2158 # type: (Optional[str]) -> None 

2159 """Function used to discover the Scapy layers and protocols. 

2160 It helps to see which packets exists in contrib or layer files. 

2161 

2162 params: 

2163 - layer: If specified, the function will explore the layer. If not, 

2164 the GUI mode will be activated, to browse the available layers 

2165 

2166 examples: 

2167 >>> explore() # Launches the GUI 

2168 >>> explore("dns") # Explore scapy.layers.dns 

2169 >>> explore("http2") # Explore scapy.contrib.http2 

2170 >>> explore(scapy.layers.bluetooth4LE) 

2171 

2172 Note: to search a packet by name, use ls("name") rather than explore. 

2173 """ 

2174 if layer is None: # GUI MODE 

2175 if not conf.interactive: 

2176 raise Scapy_Exception("explore() GUI-mode cannot be run in " 

2177 "interactive mode. Please provide a " 

2178 "'layer' parameter !") 

2179 # 0 - Imports 

2180 try: 

2181 import prompt_toolkit 

2182 except ImportError: 

2183 raise ImportError("prompt_toolkit is not installed ! " 

2184 "You may install IPython, which contains it, via" 

2185 " `pip install ipython`") 

2186 if not _version_checker(prompt_toolkit, (2, 0)): 

2187 raise ImportError("prompt_toolkit >= 2.0.0 is required !") 

2188 # Only available with prompt_toolkit > 2.0, not released on PyPi yet 

2189 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \ 

2190 button_dialog 

2191 from prompt_toolkit.formatted_text import HTML 

2192 # Check for prompt_toolkit >= 3.0.0 

2193 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str] 

2194 if _version_checker(prompt_toolkit, (3, 0)): 

2195 call_ptk = lambda x: x.run() 

2196 # 1 - Ask for layer or contrib 

2197 btn_diag = button_dialog( 

2198 title="Scapy v%s" % conf.version, 

2199 text=HTML( 

2200 '<style bg="white" fg="red">Chose the type of packets' 

2201 ' you want to explore:</style>' 

2202 ), 

2203 buttons=[ 

2204 ("Layers", "layers"), 

2205 ("Contribs", "contribs"), 

2206 ("Cancel", "cancel") 

2207 ]) 

2208 action = call_ptk(btn_diag) 

2209 # 2 - Retrieve list of Packets 

2210 if action == "layers": 

2211 # Get all loaded layers 

2212 lvalues = conf.layers.layers() 

2213 # Restrict to layers-only (not contribs) + packet.py and asn1*.py 

2214 values = [x for x in lvalues if ("layers" in x[0] or 

2215 "packet" in x[0] or 

2216 "asn1" in x[0])] 

2217 elif action == "contribs": 

2218 # Get all existing contribs 

2219 from scapy.main import list_contrib 

2220 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True)) 

2221 values = [(x['name'], x['description']) 

2222 for x in cvalues] 

2223 # Remove very specific modules 

2224 values = [x for x in values if "can" not in x[0]] 

2225 else: 

2226 # Escape/Cancel was pressed 

2227 return 

2228 # Build tree 

2229 if action == "contribs": 

2230 # A tree is a dictionary. Each layer contains a keyword 

2231 # _l which contains the files in the layer, and a _name 

2232 # argument which is its name. The other keys are the subfolders, 

2233 # which are similar dictionaries 

2234 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501 

2235 for name, desc in values: 

2236 if "." in name: # Folder detected 

2237 parts = name.split(".") 

2238 subtree = tree 

2239 for pa in parts[:-1]: 

2240 if pa not in subtree: 

2241 subtree[pa] = {} 

2242 # one layer deeper 

2243 subtree = subtree[pa] # type: ignore 

2244 subtree["_name"] = pa # type: ignore 

2245 if "_l" not in subtree: 

2246 subtree["_l"] = [] 

2247 subtree["_l"].append((parts[-1], desc)) # type: ignore 

2248 else: 

2249 tree["_l"].append((name, desc)) # type: ignore 

2250 elif action == "layers": 

2251 tree = {"_l": values} 

2252 # 3 - Ask for the layer/contrib module to explore 

2253 current = tree # type: Any 

2254 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501 

2255 while True: 

2256 # Generate tests & form 

2257 folders = list(current.keys()) 

2258 _radio_values = [ 

2259 ("$" + name, str('[+] ' + name.capitalize())) 

2260 for name in folders if not name.startswith("_") 

2261 ] + current.get("_l", []) # type: List[str] 

2262 cur_path = "" 

2263 if previous: 

2264 cur_path = ".".join( 

2265 itertools.chain( 

2266 (x["_name"] for x in previous[1:]), # type: ignore 

2267 (current["_name"],) 

2268 ) 

2269 ) 

2270 extra_text = ( 

2271 '\n<style bg="white" fg="green">> scapy.%s</style>' 

2272 ) % (action + ("." + cur_path if cur_path else "")) 

2273 # Show popup 

2274 rd_diag = radiolist_dialog( 

2275 values=_radio_values, 

2276 title="Scapy v%s" % conf.version, 

2277 text=HTML( 

2278 ( 

2279 '<style bg="white" fg="red">Please select a file' 

2280 'among the following, to see all layers contained in' 

2281 ' it:</style>' 

2282 ) + extra_text 

2283 ), 

2284 cancel_text="Back" if previous else "Cancel" 

2285 ) 

2286 result = call_ptk(rd_diag) 

2287 if result is None: 

2288 # User pressed "Cancel/Back" 

2289 if previous: # Back 

2290 current = previous.pop() 

2291 continue 

2292 else: # Cancel 

2293 return 

2294 if result.startswith("$"): 

2295 previous.append(current) 

2296 current = current[result[1:]] 

2297 else: 

2298 # Enter on layer 

2299 if previous: # In subfolder 

2300 result = cur_path + "." + result 

2301 break 

2302 # 4 - (Contrib only): load contrib 

2303 if action == "contribs": 

2304 from scapy.main import load_contrib 

2305 load_contrib(result) 

2306 result = "scapy.contrib." + result 

2307 else: # NON-GUI MODE 

2308 # We handle layer as a short layer name, full layer name 

2309 # or the module itself 

2310 if isinstance(layer, types.ModuleType): 

2311 layer = layer.__name__ 

2312 if isinstance(layer, str): 

2313 if layer.startswith("scapy.layers."): 

2314 result = layer 

2315 else: 

2316 if layer.startswith("scapy.contrib."): 

2317 layer = layer.replace("scapy.contrib.", "") 

2318 from scapy.main import load_contrib 

2319 load_contrib(layer) 

2320 result_layer, result_contrib = (("scapy.layers.%s" % layer), 

2321 ("scapy.contrib.%s" % layer)) 

2322 if result_layer in conf.layers.ldict: 

2323 result = result_layer 

2324 elif result_contrib in conf.layers.ldict: 

2325 result = result_contrib 

2326 else: 

2327 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 

2328 else: 

2329 warning("Wrong usage ! Check out help(explore)") 

2330 return 

2331 

2332 # COMMON PART 

2333 # Get the list of all Packets contained in that module 

2334 try: 

2335 all_layers = conf.layers.ldict[result] 

2336 except KeyError: 

2337 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 

2338 # Print 

2339 print(conf.color_theme.layer_name("Packets contained in %s:" % result)) 

2340 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 

2341 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers] 

2342 print(pretty_list(rtlst, [("Class", "Name")], borders=True)) 

2343 

2344 

2345def _pkt_ls(obj, # type: Union[Packet, Type[Packet]] 

2346 verbose=False, # type: bool 

2347 ): 

2348 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501 

2349 """Internal function used to resolve `fields_desc` to display it. 

2350 

2351 :param obj: a packet object or class 

2352 :returns: a list containing tuples [(name, clsname, clsname_extras, 

2353 default, long_attrs)] 

2354 """ 

2355 is_pkt = isinstance(obj, Packet) 

2356 if not issubtype(obj, Packet) and not is_pkt: 

2357 raise ValueError 

2358 fields = [] 

2359 for f in obj.fields_desc: 

2360 cur_fld = f 

2361 attrs = [] # type: List[str] 

2362 long_attrs = [] # type: List[str] 

2363 while isinstance(cur_fld, (Emph, ConditionalField)): 

2364 if isinstance(cur_fld, ConditionalField): 

2365 attrs.append(cur_fld.__class__.__name__[:4]) 

2366 cur_fld = cur_fld.fld 

2367 name = cur_fld.name 

2368 default = cur_fld.default 

2369 if verbose and isinstance(cur_fld, EnumField) \ 

2370 and hasattr(cur_fld, "i2s") and cur_fld.i2s: 

2371 if len(cur_fld.i2s or []) < 50: 

2372 long_attrs.extend( 

2373 "%s: %d" % (strval, numval) 

2374 for numval, strval in 

2375 sorted(cur_fld.i2s.items()) 

2376 ) 

2377 elif isinstance(cur_fld, MultiEnumField): 

2378 if isinstance(obj, Packet): 

2379 obj_pkt = obj 

2380 else: 

2381 obj_pkt = obj() 

2382 fld_depend = cur_fld.depends_on(obj_pkt) 

2383 attrs.append("Depends on %s" % fld_depend) 

2384 if verbose: 

2385 cur_i2s = cur_fld.i2s_multi.get( 

2386 cur_fld.depends_on(obj_pkt), {} 

2387 ) 

2388 if len(cur_i2s) < 50: 

2389 long_attrs.extend( 

2390 "%s: %d" % (strval, numval) 

2391 for numval, strval in 

2392 sorted(cur_i2s.items()) 

2393 ) 

2394 elif verbose and isinstance(cur_fld, FlagsField): 

2395 names = cur_fld.names 

2396 long_attrs.append(", ".join(names)) 

2397 elif isinstance(cur_fld, MultipleTypeField): 

2398 default = cur_fld.dflt.default 

2399 attrs.append(", ".join( 

2400 x[0].__class__.__name__ for x in 

2401 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)]) 

2402 )) 

2403 

2404 cls = cur_fld.__class__ 

2405 class_name_extras = "(%s)" % ( 

2406 ", ".join(attrs) 

2407 ) if attrs else "" 

2408 if isinstance(cur_fld, BitField): 

2409 class_name_extras += " (%d bit%s)" % ( 

2410 cur_fld.size, 

2411 "s" if cur_fld.size > 1 else "" 

2412 ) 

2413 fields.append( 

2414 (name, 

2415 cls, 

2416 class_name_extras, 

2417 repr(default), 

2418 long_attrs) 

2419 ) 

2420 return fields 

2421 

2422 

2423@conf.commands.register 

2424def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]] 

2425 case_sensitive=False, # type: bool 

2426 verbose=False # type: bool 

2427 ): 

2428 # type: (...) -> None 

2429 """List available layers, or infos on a given layer class or name. 

2430 

2431 :param obj: Packet / packet name to use 

2432 :param case_sensitive: if obj is a string, is it case sensitive? 

2433 :param verbose: 

2434 """ 

2435 if obj is None or isinstance(obj, str): 

2436 tip = False 

2437 if obj is None: 

2438 tip = True 

2439 all_layers = sorted(conf.layers, key=lambda x: x.__name__) 

2440 else: 

2441 pattern = re.compile( 

2442 obj, 

2443 0 if case_sensitive else re.I 

2444 ) 

2445 # We first order by accuracy, then length 

2446 if case_sensitive: 

2447 sorter = lambda x: (x.__name__.index(obj), len(x.__name__)) 

2448 else: 

2449 obj = obj.lower() 

2450 sorter = lambda x: (x.__name__.lower().index(obj), 

2451 len(x.__name__)) 

2452 all_layers = sorted((layer for layer in conf.layers 

2453 if (isinstance(layer.__name__, str) and 

2454 pattern.search(layer.__name__)) or 

2455 (isinstance(layer.name, str) and 

2456 pattern.search(layer.name))), 

2457 key=sorter) 

2458 for layer in all_layers: 

2459 print("%-10s : %s" % (layer.__name__, layer._name)) 

2460 if tip and conf.interactive: 

2461 print("\nTIP: You may use explore() to navigate through all " 

2462 "layers using a clear GUI") 

2463 else: 

2464 try: 

2465 fields = _pkt_ls( 

2466 obj, 

2467 verbose=verbose 

2468 ) 

2469 is_pkt = isinstance(obj, Packet) 

2470 # Print 

2471 for fname, cls, clsne, dflt, long_attrs in fields: 

2472 clsinfo = cls.__name__ + " " + clsne 

2473 print("%-10s : %-35s =" % (fname, clsinfo), end=' ') 

2474 if is_pkt: 

2475 print("%-15r" % (getattr(obj, fname),), end=' ') 

2476 print("(%r)" % (dflt,)) 

2477 for attr in long_attrs: 

2478 print("%-15s%s" % ("", attr)) 

2479 # Restart for payload if any 

2480 if is_pkt: 

2481 obj = cast(Packet, obj) 

2482 if isinstance(obj.payload, NoPayload): 

2483 return 

2484 print("--") 

2485 ls(obj.payload) 

2486 except ValueError: 

2487 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501 

2488 

2489 

2490@conf.commands.register 

2491def rfc(cls, ret=False, legend=True): 

2492 # type: (Type[Packet], bool, bool) -> Optional[str] 

2493 """ 

2494 Generate an RFC-like representation of a packet def. 

2495 

2496 :param cls: the Packet class 

2497 :param ret: return the result instead of printing (def. False) 

2498 :param legend: show text under the diagram (default True) 

2499 

2500 Ex:: 

2501 

2502 >>> rfc(Ether) 

2503 

2504 """ 

2505 if not issubclass(cls, Packet): 

2506 raise TypeError("Packet class expected") 

2507 cur_len = 0 

2508 cur_line = [] 

2509 lines = [] 

2510 # Get the size (width) that a field will take 

2511 # when formatted, from its length in bits 

2512 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int] 

2513 ident = 0 # Fields UUID 

2514 # Generate packet groups 

2515 for f in cls.fields_desc: 

2516 flen = int(f.sz * 8) 

2517 cur_len += flen 

2518 ident += 1 

2519 # Fancy field name 

2520 fname = f.name.upper().replace("_", " ") 

2521 # The field might exceed the current line or 

2522 # take more than one line. Copy it as required 

2523 while True: 

2524 over = max(0, cur_len - 32) # Exceed 

2525 len1 = clsize(flen - over) # What fits 

2526 cur_line.append((fname[:len1], len1, ident)) 

2527 if cur_len >= 32: 

2528 # Current line is full. start a new line 

2529 lines.append(cur_line) 

2530 cur_len = flen = over 

2531 fname = "" # do not repeat the field 

2532 cur_line = [] 

2533 if not over: 

2534 # there is no data left 

2535 break 

2536 else: 

2537 # End of the field 

2538 break 

2539 # Add the last line if un-finished 

2540 if cur_line: 

2541 lines.append(cur_line) 

2542 # Calculate separations between lines 

2543 seps = [] 

2544 seps.append("+-" * 32 + "+\n") 

2545 for i in range(len(lines) - 1): 

2546 # Start with a full line 

2547 sep = "+-" * 32 + "+\n" 

2548 # Get the line above and below the current 

2549 # separation 

2550 above, below = lines[i], lines[i + 1] 

2551 # The last field of above is shared with below 

2552 if above[-1][2] == below[0][2]: 

2553 # where the field in "above" starts 

2554 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1 

2555 # where the field in "below" ends 

2556 pos_below = below[0][1] 

2557 if pos_above < pos_below: 

2558 # they are overlapping. 

2559 # Now crop the space between those pos 

2560 # and fill it with " " 

2561 pos_above = pos_above + pos_above % 2 

2562 sep = ( 

2563 sep[:1 + pos_above] + 

2564 " " * (pos_below - pos_above) + 

2565 sep[1 + pos_below:] 

2566 ) 

2567 # line is complete 

2568 seps.append(sep) 

2569 # Graph 

2570 result = "" 

2571 # Bytes markers 

2572 result += " " + (" " * 19).join( 

2573 str(x) for x in range(4) 

2574 ) + "\n" 

2575 # Bits markers 

2576 result += " " + " ".join( 

2577 str(x % 10) for x in range(32) 

2578 ) + "\n" 

2579 # Add fields and their separations 

2580 for line, sep in zip(lines, seps): 

2581 result += sep 

2582 for elt, flen, _ in line: 

2583 result += "|" + elt.center(flen, " ") 

2584 result += "|\n" 

2585 result += "+-" * (cur_len or 32) + "+\n" 

2586 # Annotate with the figure name 

2587 if legend: 

2588 result += "\n" + ("Fig. " + cls.__name__).center(66, " ") 

2589 # return if asked for, else print 

2590 if ret: 

2591 return result 

2592 print(result) 

2593 return None 

2594 

2595 

2596############# 

2597# Fuzzing # 

2598############# 

2599 

2600_P = TypeVar('_P', bound=Packet) 

2601 

2602 

2603@conf.commands.register 

2604def fuzz(p, # type: _P 

2605 _inplace=0, # type: int 

2606 ): 

2607 # type: (...) -> _P 

2608 """ 

2609 Transform a layer into a fuzzy layer by replacing some default values 

2610 by random objects. 

2611 

2612 :param p: the Packet instance to fuzz 

2613 :return: the fuzzed packet. 

2614 """ 

2615 if not _inplace: 

2616 p = p.copy() 

2617 q = cast(Packet, p) 

2618 while not isinstance(q, NoPayload): 

2619 new_default_fields = {} 

2620 multiple_type_fields = [] # type: List[str] 

2621 for f in q.fields_desc: 

2622 if isinstance(f, PacketListField): 

2623 for r in getattr(q, f.name): 

2624 fuzz(r, _inplace=1) 

2625 elif isinstance(f, MultipleTypeField): 

2626 # the type of the field will depend on others 

2627 multiple_type_fields.append(f.name) 

2628 elif f.default is not None: 

2629 if not isinstance(f, ConditionalField) or f._evalcond(q): 

2630 rnd = f.randval() 

2631 if rnd is not None: 

2632 new_default_fields[f.name] = rnd 

2633 # Process packets with MultipleTypeFields 

2634 if multiple_type_fields: 

2635 # freeze the other random values 

2636 new_default_fields = { 

2637 key: (val._fix() if isinstance(val, VolatileValue) else val) 

2638 for key, val in new_default_fields.items() 

2639 } 

2640 q.default_fields.update(new_default_fields) 

2641 new_default_fields.clear() 

2642 # add the random values of the MultipleTypeFields 

2643 for name in multiple_type_fields: 

2644 fld = cast(MultipleTypeField, q.get_field(name)) 

2645 rnd = fld._find_fld_pkt(q).randval() 

2646 if rnd is not None: 

2647 new_default_fields[name] = rnd 

2648 q.default_fields.update(new_default_fields) 

2649 q = q.payload 

2650 return p