Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/packet.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1472 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Packet class 

8 

9Provides: 

10 - the default Packet classes 

11 - binding mechanisms 

12 - fuzz() method 

13 - exploration methods: explore() / ls() 

14""" 

15 

16from collections import defaultdict 

17 

18import json 

19import re 

20import time 

21import itertools 

22import copy 

23import types 

24import warnings 

25 

26from scapy.fields import ( 

27 AnyField, 

28 BitField, 

29 ConditionalField, 

30 Emph, 

31 EnumField, 

32 Field, 

33 FlagsField, 

34 FlagValue, 

35 MayEnd, 

36 MultiEnumField, 

37 MultipleTypeField, 

38 PadField, 

39 PacketListField, 

40 RawVal, 

41 StrField, 

42) 

43from scapy.config import conf, _version_checker 

44from scapy.compat import raw, orb, bytes_encode 

45from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \ 

46 _CanvasDumpExtended 

47from scapy.interfaces import _GlobInterfaceType 

48from scapy.volatile import RandField, VolatileValue 

49from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \ 

50 pretty_list, EDecimal 

51from scapy.error import Scapy_Exception, log_runtime, warning 

52from scapy.libs.test_pyx import PYX 

53 

54# Typing imports 

55from typing import ( 

56 Any, 

57 Callable, 

58 Dict, 

59 Iterator, 

60 List, 

61 NoReturn, 

62 Optional, 

63 Set, 

64 Tuple, 

65 Type, 

66 TypeVar, 

67 Union, 

68 Sequence, 

69 cast, 

70) 

71from scapy.compat import Self 

72 

73try: 

74 import pyx 

75except ImportError: 

76 pass 

77 

78 

79_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]]) 

80 

81 

82class Packet( 

83 BasePacket, 

84 _CanvasDumpExtended, 

85 metaclass=Packet_metaclass 

86): 

87 __slots__ = [ 

88 "time", "sent_time", "name", 

89 "default_fields", "fields", "fieldtype", 

90 "overload_fields", "overloaded_fields", 

91 "packetfields", 

92 "original", "explicit", "raw_packet_cache", 

93 "raw_packet_cache_fields", "_pkt", "post_transforms", 

94 "stop_dissection_after", 

95 # then payload, underlayer and parent 

96 "payload", "underlayer", "parent", 

97 "name", 

98 # used for sr() 

99 "_answered", 

100 # used when sniffing 

101 "direction", "sniffed_on", 

102 # handle snaplen Vs real length 

103 "wirelen", 

104 "comments", 

105 "process_information" 

106 ] 

107 name = None 

108 fields_desc = [] # type: List[AnyField] 

109 deprecated_fields = {} # type: Dict[str, Tuple[str, str]] 

110 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 

111 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]] 

112 show_indent = 1 

113 show_summary = True 

114 match_subclass = False 

115 class_dont_cache = {} # type: Dict[Type[Packet], bool] 

116 class_packetfields = {} # type: Dict[Type[Packet], Any] 

117 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 

118 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]] 

119 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501 

120 

121 @classmethod 

122 def from_hexcap(cls): 

123 # type: (Type[Packet]) -> Packet 

124 return cls(import_hexcap()) 

125 

126 @classmethod 

127 def upper_bonds(self): 

128 # type: () -> None 

129 for fval, upper in self.payload_guess: 

130 print( 

131 "%-20s %s" % ( 

132 upper.__name__, 

133 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 

134 ) 

135 ) 

136 

137 @classmethod 

138 def lower_bonds(self): 

139 # type: () -> None 

140 for lower, fval in self._overload_fields.items(): 

141 print( 

142 "%-20s %s" % ( 

143 lower.__name__, 

144 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 

145 ) 

146 ) 

147 

148 def __init__(self, 

149 _pkt=b"", # type: Union[bytes, bytearray] 

150 post_transform=None, # type: Any 

151 _internal=0, # type: int 

152 _underlayer=None, # type: Optional[Packet] 

153 _parent=None, # type: Optional[Packet] 

154 stop_dissection_after=None, # type: Optional[Type[Packet]] 

155 **fields # type: Any 

156 ): 

157 # type: (...) -> None 

158 self.time = time.time() # type: Union[EDecimal, float] 

159 self.sent_time = None # type: Union[EDecimal, float, None] 

160 self.name = (self.__class__.__name__ 

161 if self._name is None else 

162 self._name) 

163 self.default_fields = {} # type: Dict[str, Any] 

164 self.overload_fields = self._overload_fields 

165 self.overloaded_fields = {} # type: Dict[str, Any] 

166 self.fields = {} # type: Dict[str, Any] 

167 self.fieldtype = {} # type: Dict[str, AnyField] 

168 self.packetfields = [] # type: List[AnyField] 

169 self.payload = NoPayload() # type: Packet 

170 self.init_fields(bool(_pkt)) 

171 self.underlayer = _underlayer 

172 self.parent = _parent 

173 if isinstance(_pkt, bytearray): 

174 _pkt = bytes(_pkt) 

175 self.original = _pkt 

176 self.explicit = 0 

177 self.raw_packet_cache = None # type: Optional[bytes] 

178 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501 

179 self.wirelen = None # type: Optional[int] 

180 self.direction = None # type: Optional[int] 

181 self.sniffed_on = None # type: Optional[_GlobInterfaceType] 

182 self.comments = None # type: Optional[List[bytes]] 

183 self.process_information = None # type: Optional[Dict[str, Any]] 

184 self.stop_dissection_after = stop_dissection_after 

185 if _pkt: 

186 self.dissect(_pkt) 

187 if not _internal: 

188 self.dissection_done(self) 

189 # We use this strange initialization so that the fields 

190 # are initialized in their declaration order. 

191 # It is required to always support MultipleTypeField 

192 for field in self.fields_desc: 

193 fname = field.name 

194 try: 

195 value = fields.pop(fname) 

196 except KeyError: 

197 continue 

198 self.fields[fname] = value if isinstance(value, RawVal) else \ 

199 self.get_field(fname).any2i(self, value) 

200 # The remaining fields are unknown 

201 for fname in fields: 

202 if fname in self.deprecated_fields: 

203 # Resolve deprecated fields 

204 value = fields[fname] 

205 fname = self._resolve_alias(fname) 

206 self.fields[fname] = value if isinstance(value, RawVal) else \ 

207 self.get_field(fname).any2i(self, value) 

208 continue 

209 raise AttributeError(fname) 

210 if isinstance(post_transform, list): 

211 self.post_transforms = post_transform 

212 elif post_transform is None: 

213 self.post_transforms = [] 

214 else: 

215 self.post_transforms = [post_transform] 

216 

217 _PickleType = Tuple[ 

218 Union[EDecimal, float], 

219 Optional[Union[EDecimal, float, None]], 

220 Optional[int], 

221 Optional[_GlobInterfaceType], 

222 Optional[int], 

223 Optional[bytes], 

224 ] 

225 _PickleStateType = Union[_PickleType, Dict[str, Any]] 

226 

227 @property 

228 def comment(self): 

229 # type: () -> Optional[bytes] 

230 """Get the comment of the packet""" 

231 if self.comments and len(self.comments): 

232 return self.comments[0] 

233 return None 

234 

235 @comment.setter 

236 def comment(self, value): 

237 # type: (Optional[bytes]) -> None 

238 """ 

239 Set the comment of the packet. 

240 If value is None, it will clear the comments. 

241 """ 

242 if value is not None: 

243 self.comments = [value] 

244 else: 

245 self.comments = None 

246 

247 @classmethod 

248 def _rebuild_pkt(cls, raw_packet): 

249 # type: (Type[Packet], bytes) -> Packet 

250 """Helper used by pickle to reconstruct Packet from raw bytes.""" 

251 return cls(raw_packet) 

252 

253 def __reduce__(self): 

254 # type: () -> Tuple[Any, ...] 

255 """Used by pickling methods""" 

256 state = { 

257 "pickle_state_version": 2, 

258 "time": self.time, 

259 "sent_time": self.sent_time, 

260 "direction": self.direction, 

261 "sniffed_on": self.sniffed_on, 

262 "wirelen": self.wirelen, 

263 # Keep both keys for compatibility with historical/transition code. 

264 "comment": self.comment, 

265 "comments": self.comments, 

266 } 

267 extra_slots = {} 

268 for attr in type(self).__all_slots__ - set(Packet.__slots__): 

269 if hasattr(self, attr): 

270 extra_slots[attr] = getattr(self, attr) 

271 if extra_slots: 

272 state["extra_slots"] = extra_slots # type: ignore 

273 return (type(self)._rebuild_pkt, (self.build(),), state) 

274 

275 def __setstate__(self, state): 

276 # type: (Packet._PickleStateType) -> Packet 

277 """Rebuild state using pickable methods""" 

278 # Legacy format: tuple produced by older Packet.__reduce__. 

279 if isinstance(state, tuple): 

280 self.time = state[0] 

281 self.sent_time = state[1] 

282 self.direction = state[2] 

283 self.sniffed_on = state[3] 

284 self.wirelen = state[4] 

285 self.comment = state[5] 

286 return self 

287 

288 # New format: versioned dict metadata. 

289 self.time = state.get("time", self.time) 

290 self.sent_time = state.get("sent_time", self.sent_time) 

291 self.direction = state.get("direction", self.direction) 

292 self.sniffed_on = state.get("sniffed_on", self.sniffed_on) 

293 self.wirelen = state.get("wirelen", self.wirelen) 

294 

295 if "comments" in state: 

296 self.comments = state["comments"] 

297 elif "comment" in state: 

298 self.comment = state["comment"] 

299 

300 extra_slots = state.get("extra_slots", {}) 

301 if isinstance(extra_slots, dict): 

302 for attr, value in extra_slots.items(): 

303 # Only restore known subclass slots; ignore stale/unknown entries. 

304 if attr in type(self).__all_slots__ and attr not in Packet.__slots__: 

305 try: 

306 setattr(self, attr, value) 

307 except AttributeError: 

308 pass 

309 return self 

310 

311 def __deepcopy__(self, 

312 memo, # type: Any 

313 ): 

314 # type: (...) -> Packet 

315 """Used by copy.deepcopy""" 

316 return self.copy() 

317 

318 def init_fields(self, for_dissect_only=False): 

319 # type: (bool) -> None 

320 """ 

321 Initialize each fields of the fields_desc dict 

322 """ 

323 

324 if self.class_dont_cache.get(self.__class__, False): 

325 self.do_init_fields(self.fields_desc) 

326 else: 

327 self.do_init_cached_fields(for_dissect_only=for_dissect_only) 

328 

329 def do_init_fields(self, 

330 flist, # type: Sequence[AnyField] 

331 ): 

332 # type: (...) -> None 

333 """ 

334 Initialize each fields of the fields_desc dict 

335 """ 

336 default_fields = {} 

337 for f in flist: 

338 default_fields[f.name] = copy.deepcopy(f.default) 

339 self.fieldtype[f.name] = f 

340 if f.holds_packets: 

341 self.packetfields.append(f) 

342 # We set default_fields last to avoid race issues 

343 self.default_fields = default_fields 

344 

345 def do_init_cached_fields(self, for_dissect_only=False): 

346 # type: (bool) -> None 

347 """ 

348 Initialize each fields of the fields_desc dict, or use the cached 

349 fields information 

350 """ 

351 

352 cls_name = self.__class__ 

353 

354 # Build the fields information 

355 if Packet.class_default_fields.get(cls_name, None) is None: 

356 self.prepare_cached_fields(self.fields_desc) 

357 

358 # Use fields information from cache 

359 default_fields = Packet.class_default_fields.get(cls_name, None) 

360 if default_fields: 

361 self.default_fields = default_fields 

362 self.fieldtype = Packet.class_fieldtype[cls_name] 

363 self.packetfields = Packet.class_packetfields[cls_name] 

364 

365 # Optimization: no need for references when only dissecting. 

366 if for_dissect_only: 

367 return 

368 

369 # Deepcopy default references 

370 for fname in Packet.class_default_fields_ref[cls_name]: 

371 value = self.default_fields[fname] 

372 try: 

373 self.fields[fname] = value.copy() 

374 except AttributeError: 

375 # Python 2.7 - list only 

376 self.fields[fname] = value[:] 

377 

378 def prepare_cached_fields(self, flist): 

379 # type: (Sequence[AnyField]) -> None 

380 """ 

381 Prepare the cached fields of the fields_desc dict 

382 """ 

383 

384 cls_name = self.__class__ 

385 

386 # Fields cache initialization 

387 if not flist: 

388 return 

389 

390 class_default_fields = dict() 

391 class_default_fields_ref = list() 

392 class_fieldtype = dict() 

393 class_packetfields = list() 

394 

395 # Fields initialization 

396 for f in flist: 

397 if isinstance(f, MultipleTypeField): 

398 # Abort 

399 self.class_dont_cache[cls_name] = True 

400 self.do_init_fields(self.fields_desc) 

401 return 

402 

403 class_default_fields[f.name] = copy.deepcopy(f.default) 

404 class_fieldtype[f.name] = f 

405 if f.holds_packets: 

406 class_packetfields.append(f) 

407 

408 # Remember references 

409 if isinstance(f.default, (list, dict, set, RandField, Packet)): 

410 class_default_fields_ref.append(f.name) 

411 

412 # Apply 

413 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref 

414 Packet.class_fieldtype[cls_name] = class_fieldtype 

415 Packet.class_packetfields[cls_name] = class_packetfields 

416 # Last to avoid racing issues 

417 Packet.class_default_fields[cls_name] = class_default_fields 

418 

419 def dissection_done(self, pkt): 

420 # type: (Packet) -> None 

421 """DEV: will be called after a dissection is completed""" 

422 self.post_dissection(pkt) 

423 self.payload.dissection_done(pkt) 

424 

425 def post_dissection(self, pkt): 

426 # type: (Packet) -> None 

427 """DEV: is called after the dissection of the whole packet""" 

428 pass 

429 

430 def get_field(self, fld): 

431 # type: (str) -> AnyField 

432 """DEV: returns the field instance from the name of the field""" 

433 return self.fieldtype[fld] 

434 

435 def add_payload(self, payload): 

436 # type: (Union[Packet, bytes]) -> None 

437 if payload is None: 

438 return 

439 elif not isinstance(self.payload, NoPayload): 

440 self.payload.add_payload(payload) 

441 else: 

442 if isinstance(payload, Packet): 

443 self.payload = payload 

444 payload.add_underlayer(self) 

445 for t in self.aliastypes: 

446 if t in payload.overload_fields: 

447 self.overloaded_fields = payload.overload_fields[t] 

448 break 

449 elif isinstance(payload, (bytes, str, bytearray, memoryview)): 

450 self.payload = conf.raw_layer(load=bytes_encode(payload)) 

451 else: 

452 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501 

453 

454 def remove_payload(self): 

455 # type: () -> None 

456 self.payload.remove_underlayer(self) 

457 self.payload = NoPayload() 

458 self.overloaded_fields = {} 

459 

460 def add_underlayer(self, underlayer): 

461 # type: (Packet) -> None 

462 self.underlayer = underlayer 

463 

464 def remove_underlayer(self, other): 

465 # type: (Packet) -> None 

466 self.underlayer = None 

467 

468 def add_parent(self, parent): 

469 # type: (Packet) -> None 

470 """Set packet parent. 

471 When packet is an element in PacketListField, parent field would 

472 point to the list owner packet.""" 

473 self.parent = parent 

474 

475 def remove_parent(self, other): 

476 # type: (Packet) -> None 

477 """Remove packet parent. 

478 When packet is an element in PacketListField, parent field would 

479 point to the list owner packet.""" 

480 self.parent = None 

481 

482 def copy(self) -> Self: 

483 """Returns a deep copy of the instance.""" 

484 clone = self.__class__() 

485 clone.fields = self.copy_fields_dict(self.fields) 

486 clone.default_fields = self.copy_fields_dict(self.default_fields) 

487 clone.overloaded_fields = self.overloaded_fields.copy() 

488 clone.underlayer = self.underlayer 

489 clone.parent = self.parent 

490 clone.explicit = self.explicit 

491 clone.raw_packet_cache = self.raw_packet_cache 

492 clone.raw_packet_cache_fields = self.copy_fields_dict( 

493 self.raw_packet_cache_fields 

494 ) 

495 clone.wirelen = self.wirelen 

496 clone.post_transforms = self.post_transforms[:] 

497 clone.payload = self.payload.copy() 

498 clone.payload.add_underlayer(clone) 

499 clone.time = self.time 

500 clone.comments = self.comments 

501 clone.direction = self.direction 

502 clone.sniffed_on = self.sniffed_on 

503 return clone 

504 

505 def _resolve_alias(self, attr): 

506 # type: (str) -> str 

507 new_attr, version = self.deprecated_fields[attr] 

508 warnings.warn( 

509 "%s has been deprecated in favor of %s since %s !" % ( 

510 attr, new_attr, version 

511 ), DeprecationWarning 

512 ) 

513 return new_attr 

514 

515 def getfieldval(self, attr): 

516 # type: (str) -> Any 

517 if self.deprecated_fields and attr in self.deprecated_fields: 

518 attr = self._resolve_alias(attr) 

519 if attr in self.fields: 

520 return self.fields[attr] 

521 if attr in self.overloaded_fields: 

522 return self.overloaded_fields[attr] 

523 if attr in self.default_fields: 

524 return self.default_fields[attr] 

525 return self.payload.getfieldval(attr) 

526 

527 def getfield_and_val(self, attr): 

528 # type: (str) -> Tuple[AnyField, Any] 

529 if self.deprecated_fields and attr in self.deprecated_fields: 

530 attr = self._resolve_alias(attr) 

531 if attr in self.fields: 

532 return self.get_field(attr), self.fields[attr] 

533 if attr in self.overloaded_fields: 

534 return self.get_field(attr), self.overloaded_fields[attr] 

535 if attr in self.default_fields: 

536 return self.get_field(attr), self.default_fields[attr] 

537 raise ValueError 

538 

539 def __getattr__(self, attr): 

540 # type: (str) -> Any 

541 try: 

542 fld, v = self.getfield_and_val(attr) 

543 except ValueError: 

544 return self.payload.__getattr__(attr) 

545 if fld is not None: 

546 return v if isinstance(v, RawVal) else fld.i2h(self, v) 

547 return v 

548 

549 def setfieldval(self, attr, val): 

550 # type: (str, Any) -> None 

551 if self.deprecated_fields and attr in self.deprecated_fields: 

552 attr = self._resolve_alias(attr) 

553 if attr in self.default_fields: 

554 fld = self.get_field(attr) 

555 if fld is None: 

556 any2i = lambda x, y: y # type: Callable[..., Any] 

557 else: 

558 any2i = fld.any2i 

559 self.fields[attr] = val if isinstance(val, RawVal) else \ 

560 any2i(self, val) 

561 self.explicit = 0 

562 self.raw_packet_cache = None 

563 self.raw_packet_cache_fields = None 

564 self.wirelen = None 

565 elif attr == "payload": 

566 self.remove_payload() 

567 self.add_payload(val) 

568 else: 

569 self.payload.setfieldval(attr, val) 

570 

571 def __setattr__(self, attr, val): 

572 # type: (str, Any) -> None 

573 if attr in self.__all_slots__: 

574 return object.__setattr__(self, attr, val) 

575 try: 

576 return self.setfieldval(attr, val) 

577 except AttributeError: 

578 pass 

579 return object.__setattr__(self, attr, val) 

580 

581 def delfieldval(self, attr): 

582 # type: (str) -> None 

583 if attr in self.fields: 

584 del self.fields[attr] 

585 self.explicit = 0 # in case a default value must be explicit 

586 self.raw_packet_cache = None 

587 self.raw_packet_cache_fields = None 

588 self.wirelen = None 

589 elif attr in self.default_fields: 

590 pass 

591 elif attr == "payload": 

592 self.remove_payload() 

593 else: 

594 self.payload.delfieldval(attr) 

595 

596 def __delattr__(self, attr): 

597 # type: (str) -> None 

598 if attr == "payload": 

599 return self.remove_payload() 

600 if attr in self.__all_slots__: 

601 return object.__delattr__(self, attr) 

602 try: 

603 return self.delfieldval(attr) 

604 except AttributeError: 

605 pass 

606 return object.__delattr__(self, attr) 

607 

608 def _superdir(self): 

609 # type: () -> Set[str] 

610 """ 

611 Return a list of slots and methods, including those from subclasses. 

612 """ 

613 attrs = set() # type: Set[str] 

614 cls = self.__class__ 

615 if hasattr(cls, '__all_slots__'): 

616 attrs.update(cls.__all_slots__) 

617 for bcls in cls.__mro__: 

618 if hasattr(bcls, '__dict__'): 

619 attrs.update(bcls.__dict__) 

620 return attrs 

621 

622 def __dir__(self): 

623 # type: () -> List[str] 

624 """ 

625 Add fields to tab completion list. 

626 """ 

627 return sorted(itertools.chain(self._superdir(), self.default_fields)) 

628 

629 def __repr__(self): 

630 # type: () -> str 

631 s = "" 

632 ct = conf.color_theme 

633 for f in self.fields_desc: 

634 if isinstance(f, ConditionalField) and not f._evalcond(self): 

635 continue 

636 if f.name in self.fields: 

637 fval = self.fields[f.name] 

638 if isinstance(fval, (list, dict, set)) and len(fval) == 0: 

639 continue 

640 val = f.i2repr(self, fval) 

641 elif f.name in self.overloaded_fields: 

642 fover = self.overloaded_fields[f.name] 

643 if isinstance(fover, (list, dict, set)) and len(fover) == 0: 

644 continue 

645 val = f.i2repr(self, fover) 

646 else: 

647 continue 

648 if isinstance(f, Emph) or f in conf.emph: 

649 ncol = ct.emph_field_name 

650 vcol = ct.emph_field_value 

651 else: 

652 ncol = ct.field_name 

653 vcol = ct.field_value 

654 

655 s += " %s%s%s" % (ncol(f.name), 

656 ct.punct("="), 

657 vcol(val)) 

658 return "%s%s %s %s%s%s" % (ct.punct("<"), 

659 ct.layer_name(self.__class__.__name__), 

660 s, 

661 ct.punct("|"), 

662 repr(self.payload), 

663 ct.punct(">")) 

664 

665 def __str__(self): 

666 # type: () -> str 

667 return self.summary() 

668 

669 def __bytes__(self): 

670 # type: () -> bytes 

671 return self.build() 

672 

673 def __div__(self, other): 

674 # type: (Any) -> Self 

675 if isinstance(other, Packet): 

676 cloneA = self.copy() 

677 cloneB = other.copy() 

678 cloneA.add_payload(cloneB) 

679 return cloneA 

680 elif isinstance(other, (bytes, str, bytearray, memoryview)): 

681 return self / conf.raw_layer(load=bytes_encode(other)) 

682 else: 

683 return other.__rdiv__(self) # type: ignore 

684 __truediv__ = __div__ 

685 

686 def __rdiv__(self, other): 

687 # type: (Any) -> Packet 

688 if isinstance(other, (bytes, str, bytearray, memoryview)): 

689 return conf.raw_layer(load=bytes_encode(other)) / self 

690 else: 

691 raise TypeError 

692 __rtruediv__ = __rdiv__ 

693 

694 def __mul__(self, other): 

695 # type: (Any) -> List[Packet] 

696 if isinstance(other, int): 

697 return [self] * other 

698 else: 

699 raise TypeError 

700 

701 def __rmul__(self, other): 

702 # type: (Any) -> List[Packet] 

703 return self.__mul__(other) 

704 

705 def __nonzero__(self): 

706 # type: () -> bool 

707 return True 

708 __bool__ = __nonzero__ 

709 

710 def __len__(self): 

711 # type: () -> int 

712 return len(self.__bytes__()) 

713 

714 def copy_field_value(self, fieldname, value): 

715 # type: (str, Any) -> Any 

716 return self.get_field(fieldname).do_copy(value) 

717 

718 def copy_fields_dict(self, fields): 

719 # type: (_T) -> _T 

720 if fields is None: 

721 return None 

722 return {fname: self.copy_field_value(fname, fval) 

723 for fname, fval in fields.items()} 

724 

725 def _raw_packet_cache_field_value(self, fld, val, copy=False): 

726 # type: (AnyField, Any, bool) -> Optional[Any] 

727 """Get a value representative of a mutable field to detect changes""" 

728 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any] 

729 if fld.holds_packets: 

730 # avoid copying whole packets (perf: #GH3894) 

731 if fld.islist: 

732 return [ 

733 (_cpy(x.fields), x.payload.raw_packet_cache) for x in val 

734 ] 

735 else: 

736 return (_cpy(val.fields), val.payload.raw_packet_cache) 

737 elif fld.islist or fld.ismutable: 

738 return _cpy(val) 

739 return None 

740 

741 def clear_cache(self): 

742 # type: () -> None 

743 """Clear the raw packet cache for the field and all its subfields""" 

744 self.raw_packet_cache = None 

745 for fname, fval in self.fields.items(): 

746 fld = self.get_field(fname) 

747 if fld.holds_packets: 

748 if isinstance(fval, Packet): 

749 fval.clear_cache() 

750 elif isinstance(fval, list): 

751 for fsubval in fval: 

752 fsubval.clear_cache() 

753 self.payload.clear_cache() 

754 

755 def self_build(self): 

756 # type: () -> bytes 

757 """ 

758 Create the default layer regarding fields_desc dict 

759 """ 

760 if self.raw_packet_cache is not None and \ 

761 self.raw_packet_cache_fields is not None: 

762 for fname, fval in self.raw_packet_cache_fields.items(): 

763 fld, val = self.getfield_and_val(fname) 

764 if self._raw_packet_cache_field_value(fld, val) != fval: 

765 self.raw_packet_cache = None 

766 self.raw_packet_cache_fields = None 

767 self.wirelen = None 

768 break 

769 if self.raw_packet_cache is not None: 

770 return self.raw_packet_cache 

771 p = b"" 

772 for f in self.fields_desc: 

773 val = self.getfieldval(f.name) 

774 if isinstance(val, RawVal): 

775 p += bytes(val) 

776 else: 

777 try: 

778 p = f.addfield(self, p, val) 

779 except Exception as ex: 

780 try: 

781 ex.args = ( 

782 "While building field '%s': " % f.name + 

783 ex.args[0], 

784 ) + ex.args[1:] 

785 except (AttributeError, IndexError): 

786 pass 

787 raise ex 

788 return p 

789 

790 def do_build_payload(self): 

791 # type: () -> bytes 

792 """ 

793 Create the default version of the payload layer 

794 

795 :return: a string of payload layer 

796 """ 

797 return self.payload.do_build() 

798 

799 def do_build(self): 

800 # type: () -> bytes 

801 """ 

802 Create the default version of the layer 

803 

804 :return: a string of the packet with the payload 

805 """ 

806 if not self.explicit: 

807 self = next(iter(self)) 

808 pkt = self.self_build() 

809 for t in self.post_transforms: 

810 pkt = t(pkt) 

811 pay = self.do_build_payload() 

812 if self.raw_packet_cache is None: 

813 return self.post_build(pkt, pay) 

814 else: 

815 return pkt + pay 

816 

817 def build_padding(self): 

818 # type: () -> bytes 

819 return self.payload.build_padding() 

820 

821 def build(self): 

822 # type: () -> bytes 

823 """ 

824 Create the current layer 

825 

826 :return: string of the packet with the payload 

827 """ 

828 p = self.do_build() 

829 p += self.build_padding() 

830 p = self.build_done(p) 

831 return p 

832 

833 def post_build(self, pkt, pay): 

834 # type: (bytes, bytes) -> bytes 

835 """ 

836 DEV: called right after the current layer is build. 

837 

838 :param str pkt: the current packet (build by self_build function) 

839 :param str pay: the packet payload (build by do_build_payload function) 

840 :return: a string of the packet with the payload 

841 """ 

842 return pkt + pay 

843 

844 def build_done(self, p): 

845 # type: (bytes) -> bytes 

846 return self.payload.build_done(p) 

847 

848 def do_build_ps(self): 

849 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501 

850 p = b"" 

851 pl = [] 

852 q = b"" 

853 for f in self.fields_desc: 

854 if isinstance(f, ConditionalField) and not f._evalcond(self): 

855 continue 

856 p = f.addfield(self, p, self.getfieldval(f.name)) 

857 if isinstance(p, bytes): 

858 r = p[len(q):] 

859 q = p 

860 else: 

861 r = b"" 

862 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r)) 

863 

864 pkt, lst = self.payload.build_ps(internal=1) 

865 p += pkt 

866 lst.append((self, pl)) 

867 

868 return p, lst 

869 

870 def build_ps(self, internal=0): 

871 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501 

872 p, lst = self.do_build_ps() 

873# if not internal: 

874# pkt = self 

875# while pkt.haslayer(conf.padding_layer): 

876# pkt = pkt.getlayer(conf.padding_layer) 

877# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) ) 

878# p += pkt.load 

879# pkt = pkt.payload 

880 return p, lst 

881 

882 def canvas_dump(self, layer_shift=0, rebuild=1): 

883 # type: (int, int) -> pyx.canvas.canvas 

884 if PYX == 0: 

885 raise ImportError("PyX and its dependencies must be installed") 

886 canvas = pyx.canvas.canvas() 

887 if rebuild: 

888 _, t = self.__class__(raw(self)).build_ps() 

889 else: 

890 _, t = self.build_ps() 

891 YTXTI = len(t) 

892 for _, l in t: 

893 YTXTI += len(l) 

894 YTXT = float(YTXTI) 

895 YDUMP = YTXT 

896 

897 XSTART = 1 

898 XDSTART = 10 

899 y = 0.0 

900 yd = 0.0 

901 XMUL = 0.55 

902 YMUL = 0.4 

903 

904 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb) 

905 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb) 

906# backcolor=makecol(0.376, 0.729, 0.525, 1.0) 

907 

908 def hexstr(x): 

909 # type: (bytes) -> str 

910 return " ".join("%02x" % orb(c) for c in x) 

911 

912 def make_dump_txt(x, y, txt): 

913 # type: (int, float, bytes) -> pyx.text.text 

914 return pyx.text.text( 

915 XDSTART + x * XMUL, 

916 (YDUMP - y) * YMUL, 

917 r"\tt{%s}" % hexstr(txt), 

918 [pyx.text.size.Large] 

919 ) 

920 

921 def make_box(o): 

922 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox 

923 return pyx.box.rect( 

924 o.left(), o.bottom(), o.width(), o.height(), 

925 relcenter=(0.5, 0.5) 

926 ) 

927 

928 def make_frame(lst): 

929 # type: (List[Any]) -> pyx.path.path 

930 if len(lst) == 1: 

931 b = lst[0].bbox() 

932 b.enlarge(pyx.unit.u_pt) 

933 return b.path() 

934 else: 

935 fb = lst[0].bbox() 

936 fb.enlarge(pyx.unit.u_pt) 

937 lb = lst[-1].bbox() 

938 lb.enlarge(pyx.unit.u_pt) 

939 if len(lst) == 2 and fb.left() > lb.right(): 

940 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()), 

941 pyx.path.lineto(fb.left(), fb.top()), 

942 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501 

943 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501 

944 pyx.path.moveto(lb.left(), lb.top()), 

945 pyx.path.lineto(lb.right(), lb.top()), 

946 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 

947 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501 

948 else: 

949 # XXX 

950 gb = lst[1].bbox() 

951 if gb != lb: 

952 gb.enlarge(pyx.unit.u_pt) 

953 kb = lst[-2].bbox() 

954 if kb != gb and kb != lb: 

955 kb.enlarge(pyx.unit.u_pt) 

956 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()), 

957 pyx.path.lineto(fb.right(), fb.top()), 

958 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501 

959 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501 

960 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 

961 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501 

962 pyx.path.lineto(lb.left(), gb.top()), 

963 pyx.path.lineto(fb.left(), gb.top()), 

964 pyx.path.closepath(),) 

965 

966 def make_dump(s, # type: bytes 

967 shift=0, # type: int 

968 y=0., # type: float 

969 col=None, # type: pyx.color.color 

970 bkcol=None, # type: pyx.color.color 

971 large=16 # type: int 

972 ): 

973 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501 

974 c = pyx.canvas.canvas() 

975 tlist = [] 

976 while s: 

977 dmp, s = s[:large - shift], s[large - shift:] 

978 txt = make_dump_txt(shift, y, dmp) 

979 tlist.append(txt) 

980 shift += len(dmp) 

981 if shift >= 16: 

982 shift = 0 

983 y += 1 

984 if col is None: 

985 col = pyx.color.rgb.red 

986 if bkcol is None: 

987 bkcol = pyx.color.rgb.white 

988 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501 

989 for txt in tlist: 

990 c.insert(txt) 

991 return c, tlist[-1].bbox(), shift, y 

992 

993 last_shift, last_y = 0, 0.0 

994 while t: 

995 bkcol = next(backcolor) 

996 proto, fields = t.pop() 

997 y += 0.5 

998 pt = pyx.text.text( 

999 XSTART, 

1000 (YTXT - y) * YMUL, 

1001 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape( 

1002 str(proto.name) 

1003 ), 

1004 [pyx.text.size.Large] 

1005 ) 

1006 y += 1 

1007 ptbb = pt.bbox() 

1008 ptbb.enlarge(pyx.unit.u_pt * 2) 

1009 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501 

1010 canvas.insert(pt) 

1011 for field, fval, fdump in fields: 

1012 col = next(forecolor) 

1013 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501 

1014 if isinstance(field, BitField): 

1015 fsize = '%sb' % field.size 

1016 else: 

1017 fsize = '%sB' % len(fdump) 

1018 if (hasattr(field, 'field') and 

1019 'LE' in field.field.__class__.__name__[:3] or 

1020 'LE' in field.__class__.__name__[:3]): 

1021 fsize = r'$\scriptstyle\langle$' + fsize 

1022 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501 

1023 if isinstance(fval, str): 

1024 if len(fval) > 18: 

1025 fval = fval[:18] + "[...]" 

1026 else: 

1027 fval = "" 

1028 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501 

1029 y += 1.0 

1030 if fdump: 

1031 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501 

1032 

1033 dtb = target 

1034 vtb = vt.bbox() 

1035 bxvt = make_box(vtb) 

1036 bxdt = make_box(dtb) 

1037 dtb.enlarge(pyx.unit.u_pt) 

1038 try: 

1039 if yd < 0: 

1040 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501 

1041 else: 

1042 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501 

1043 except Exception: 

1044 pass 

1045 else: 

1046 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501 

1047 

1048 canvas.insert(dt) 

1049 

1050 canvas.insert(ft) 

1051 canvas.insert(st) 

1052 canvas.insert(vt) 

1053 last_y += layer_shift 

1054 

1055 return canvas 

1056 

1057 def extract_padding(self, s): 

1058 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 

1059 """ 

1060 DEV: to be overloaded to extract current layer's padding. 

1061 

1062 :param str s: the current layer 

1063 :return: a couple of strings (actual layer, padding) 

1064 """ 

1065 return s, None 

1066 

1067 def post_dissect(self, s): 

1068 # type: (bytes) -> bytes 

1069 """DEV: is called right after the current layer has been dissected""" 

1070 return s 

1071 

1072 def pre_dissect(self, s): 

1073 # type: (bytes) -> bytes 

1074 """DEV: is called right before the current layer is dissected""" 

1075 return s 

1076 

1077 def do_dissect(self, s): 

1078 # type: (bytes) -> bytes 

1079 _raw = s 

1080 self.raw_packet_cache_fields = {} 

1081 for f in self.fields_desc: 

1082 s, fval = f.getfield(self, s) 

1083 # Skip unused ConditionalField 

1084 if isinstance(f, ConditionalField) and fval is None: 

1085 continue 

1086 # We need to track fields with mutable values to discard 

1087 # .raw_packet_cache when needed. 

1088 if (f.islist or f.holds_packets or f.ismutable) and fval is not None: 

1089 self.raw_packet_cache_fields[f.name] = \ 

1090 self._raw_packet_cache_field_value(f, fval, copy=True) 

1091 self.fields[f.name] = fval 

1092 # Nothing left to dissect 

1093 if not s and (isinstance(f, MayEnd) or 

1094 (fval is not None and isinstance(f, ConditionalField) and 

1095 isinstance(f.fld, MayEnd))): 

1096 break 

1097 self.raw_packet_cache = _raw[:-len(s)] if s else _raw 

1098 self.explicit = 1 

1099 return s 

1100 

1101 def do_dissect_payload(self, s): 

1102 # type: (bytes) -> None 

1103 """ 

1104 Perform the dissection of the layer's payload 

1105 

1106 :param str s: the raw layer 

1107 """ 

1108 if s: 

1109 if ( 

1110 self.stop_dissection_after and 

1111 isinstance(self, self.stop_dissection_after) 

1112 ): 

1113 # stop dissection here 

1114 p = conf.raw_layer(s, _internal=1, _underlayer=self) 

1115 self.add_payload(p) 

1116 return 

1117 cls = self.guess_payload_class(s) 

1118 try: 

1119 p = cls( 

1120 s, 

1121 stop_dissection_after=self.stop_dissection_after, 

1122 _internal=1, 

1123 _underlayer=self, 

1124 ) 

1125 except KeyboardInterrupt: 

1126 raise 

1127 except Exception: 

1128 if conf.debug_dissector: 

1129 if issubtype(cls, Packet): 

1130 log_runtime.error("%s dissector failed", cls.__name__) 

1131 else: 

1132 log_runtime.error("%s.guess_payload_class() returned " 

1133 "[%s]", 

1134 self.__class__.__name__, repr(cls)) 

1135 if cls is not None: 

1136 raise 

1137 p = conf.raw_layer(s, _internal=1, _underlayer=self) 

1138 self.add_payload(p) 

1139 

1140 def dissect(self, s): 

1141 # type: (bytes) -> None 

1142 s = self.pre_dissect(s) 

1143 

1144 s = self.do_dissect(s) 

1145 

1146 s = self.post_dissect(s) 

1147 

1148 payl, pad = self.extract_padding(s) 

1149 self.do_dissect_payload(payl) 

1150 if pad and conf.padding: 

1151 self.add_payload(conf.padding_layer(pad)) 

1152 

1153 def guess_payload_class(self, payload): 

1154 # type: (bytes) -> Type[Packet] 

1155 """ 

1156 DEV: Guesses the next payload class from layer bonds. 

1157 Can be overloaded to use a different mechanism. 

1158 

1159 :param str payload: the layer's payload 

1160 :return: the payload class 

1161 """ 

1162 for t in self.aliastypes: 

1163 for fval, cls in t.payload_guess: 

1164 try: 

1165 if all(v == self.getfieldval(k) 

1166 for k, v in fval.items()): 

1167 return cls # type: ignore 

1168 except AttributeError: 

1169 pass 

1170 return self.default_payload_class(payload) 

1171 

1172 def default_payload_class(self, payload): 

1173 # type: (bytes) -> Type[Packet] 

1174 """ 

1175 DEV: Returns the default payload class if nothing has been found by the 

1176 guess_payload_class() method. 

1177 

1178 :param str payload: the layer's payload 

1179 :return: the default payload class define inside the configuration file 

1180 """ 

1181 return conf.raw_layer 

1182 

1183 def hide_defaults(self): 

1184 # type: () -> None 

1185 """Removes fields' values that are the same as default values.""" 

1186 # use list(): self.fields is modified in the loop 

1187 for k, v in list(self.fields.items()): 

1188 v = self.fields[k] 

1189 if k in self.default_fields: 

1190 if self.default_fields[k] == v: 

1191 del self.fields[k] 

1192 self.payload.hide_defaults() 

1193 

1194 def clone_with(self, payload=None, **kargs): 

1195 # type: (Optional[Any], **Any) -> Any 

1196 pkt = self.__class__() 

1197 pkt.explicit = 1 

1198 pkt.fields = kargs 

1199 pkt.default_fields = self.copy_fields_dict(self.default_fields) 

1200 pkt.overloaded_fields = self.overloaded_fields.copy() 

1201 pkt.time = self.time 

1202 pkt.underlayer = self.underlayer 

1203 pkt.parent = self.parent 

1204 pkt.post_transforms = self.post_transforms 

1205 pkt.raw_packet_cache = self.raw_packet_cache 

1206 pkt.raw_packet_cache_fields = self.copy_fields_dict( 

1207 self.raw_packet_cache_fields 

1208 ) 

1209 pkt.wirelen = self.wirelen 

1210 pkt.comments = self.comments 

1211 pkt.sniffed_on = self.sniffed_on 

1212 pkt.direction = self.direction 

1213 if payload is not None: 

1214 pkt.add_payload(payload) 

1215 return pkt 

1216 

1217 def __iter__(self): 

1218 # type: () -> Iterator[Packet] 

1219 """Iterates through all sub-packets generated by this Packet.""" 

1220 def loop(todo, done, self=self): 

1221 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet] 

1222 if todo: 

1223 eltname = todo.pop() 

1224 elt = self.getfieldval(eltname) 

1225 if not isinstance(elt, Gen): 

1226 if self.get_field(eltname).islist: 

1227 elt = SetGen([elt]) 

1228 else: 

1229 elt = SetGen(elt) 

1230 for e in elt: 

1231 done[eltname] = e 

1232 for x in loop(todo[:], done): 

1233 yield x 

1234 else: 

1235 if isinstance(self.payload, NoPayload): 

1236 payloads = SetGen([None]) # type: SetGen[Packet] 

1237 else: 

1238 payloads = self.payload 

1239 for payl in payloads: 

1240 # Let's make sure subpackets are consistent 

1241 done2 = done.copy() 

1242 for k in done2: 

1243 if isinstance(done2[k], VolatileValue): 

1244 done2[k] = done2[k]._fix() 

1245 pkt = self.clone_with(payload=payl, **done2) 

1246 yield pkt 

1247 

1248 if self.explicit or self.raw_packet_cache is not None: 

1249 todo = [] 

1250 done = self.fields 

1251 else: 

1252 todo = [k for (k, v) in itertools.chain(self.default_fields.items(), 

1253 self.overloaded_fields.items()) 

1254 if isinstance(v, VolatileValue)] + list(self.fields) 

1255 done = {} 

1256 return loop(todo, done) 

1257 

1258 def iterpayloads(self): 

1259 # type: () -> Iterator[Packet] 

1260 """Used to iter through the payloads of a Packet. 

1261 Useful for DNS or 802.11 for instance. 

1262 """ 

1263 yield self 

1264 current = self 

1265 while current.payload: 

1266 current = current.payload 

1267 yield current 

1268 

1269 def __gt__(self, other): 

1270 # type: (Packet) -> int 

1271 """True if other is an answer from self (self ==> other).""" 

1272 if isinstance(other, Packet): 

1273 return other < self 

1274 elif isinstance(other, bytes): 

1275 return 1 

1276 else: 

1277 raise TypeError((self, other)) 

1278 

1279 def __lt__(self, other): 

1280 # type: (Packet) -> int 

1281 """True if self is an answer from other (other ==> self).""" 

1282 if isinstance(other, Packet): 

1283 return self.answers(other) 

1284 elif isinstance(other, bytes): 

1285 return 1 

1286 else: 

1287 raise TypeError((self, other)) 

1288 

1289 def __eq__(self, other): 

1290 # type: (Any) -> bool 

1291 if not isinstance(other, self.__class__): 

1292 return False 

1293 for f in self.fields_desc: 

1294 if f not in other.fields_desc: 

1295 return False 

1296 if self.getfieldval(f.name) != other.getfieldval(f.name): 

1297 return False 

1298 return self.payload == other.payload 

1299 

1300 def __ne__(self, other): 

1301 # type: (Any) -> bool 

1302 return not self.__eq__(other) 

1303 

1304 # Note: setting __hash__ to None is the standard way 

1305 # of making an object un-hashable. mypy doesn't know that 

1306 __hash__ = None # type: ignore 

1307 

1308 def hashret(self): 

1309 # type: () -> bytes 

1310 """DEV: returns a string that has the same value for a request 

1311 and its answer.""" 

1312 return self.payload.hashret() 

1313 

1314 def answers(self, other): 

1315 # type: (Packet) -> int 

1316 """DEV: true if self is an answer from other""" 

1317 if other.__class__ == self.__class__: 

1318 return self.payload.answers(other.payload) 

1319 return 0 

1320 

1321 def layers(self): 

1322 # type: () -> List[Type[Packet]] 

1323 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501 

1324 layers = [] 

1325 lyr = self # type: Optional[Packet] 

1326 while lyr: 

1327 layers.append(lyr.__class__) 

1328 lyr = lyr.payload.getlayer(0, _subclass=True) 

1329 return layers 

1330 

1331 def haslayer(self, cls, _subclass=None): 

1332 # type: (Union[Type[Packet], str], Optional[bool]) -> int 

1333 """ 

1334 true if self has a layer that is an instance of cls. 

1335 Superseded by "cls in self" syntax. 

1336 """ 

1337 if _subclass is None: 

1338 _subclass = self.match_subclass or None 

1339 if _subclass: 

1340 match = issubtype 

1341 else: 

1342 match = lambda x, t: bool(x == t) 

1343 if cls is None or match(self.__class__, cls) \ 

1344 or cls in [self.__class__.__name__, self._name]: 

1345 return True 

1346 for f in self.packetfields: 

1347 fvalue_gen = self.getfieldval(f.name) 

1348 if fvalue_gen is None: 

1349 continue 

1350 if not f.islist: 

1351 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 

1352 for fvalue in fvalue_gen: 

1353 if isinstance(fvalue, Packet): 

1354 ret = fvalue.haslayer(cls, _subclass=_subclass) 

1355 if ret: 

1356 return ret 

1357 return self.payload.haslayer(cls, _subclass=_subclass) 

1358 

1359 def getlayer(self, 

1360 cls, # type: Union[int, Type[Packet], str] 

1361 nb=1, # type: int 

1362 _track=None, # type: Optional[List[int]] 

1363 _subclass=None, # type: Optional[bool] 

1364 **flt # type: Any 

1365 ): 

1366 # type: (...) -> Optional[Packet] 

1367 """Return the nb^th layer that is an instance of cls, matching flt 

1368values. 

1369 """ 

1370 if _subclass is None: 

1371 _subclass = self.match_subclass or None 

1372 if _subclass: 

1373 match = issubtype 

1374 else: 

1375 match = lambda x, t: bool(x == t) 

1376 # Note: 

1377 # cls can be int, packet, str 

1378 # string_class_name can be packet, str (packet or packet+field) 

1379 # class_name can be packet, str (packet only) 

1380 if isinstance(cls, int): 

1381 nb = cls + 1 

1382 string_class_name = "" # type: Union[Type[Packet], str] 

1383 else: 

1384 string_class_name = cls 

1385 class_name = "" # type: Union[Type[Packet], str] 

1386 fld = None # type: Optional[str] 

1387 if isinstance(string_class_name, str) and "." in string_class_name: 

1388 class_name, fld = string_class_name.split(".", 1) 

1389 else: 

1390 class_name, fld = string_class_name, None 

1391 if not class_name or match(self.__class__, class_name) \ 

1392 or class_name in [self.__class__.__name__, self._name]: 

1393 if all(self.getfieldval(fldname) == fldvalue 

1394 for fldname, fldvalue in flt.items()): 

1395 if nb == 1: 

1396 if fld is None: 

1397 return self 

1398 else: 

1399 return self.getfieldval(fld) # type: ignore 

1400 else: 

1401 nb -= 1 

1402 for f in self.packetfields: 

1403 fvalue_gen = self.getfieldval(f.name) 

1404 if fvalue_gen is None: 

1405 continue 

1406 if not f.islist: 

1407 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 

1408 for fvalue in fvalue_gen: 

1409 if isinstance(fvalue, Packet): 

1410 track = [] # type: List[int] 

1411 ret = fvalue.getlayer(class_name, nb=nb, _track=track, 

1412 _subclass=_subclass, **flt) 

1413 if ret is not None: 

1414 return ret 

1415 nb = track[0] 

1416 return self.payload.getlayer(class_name, nb=nb, _track=_track, 

1417 _subclass=_subclass, **flt) 

1418 

1419 def firstlayer(self): 

1420 # type: () -> Packet 

1421 q = self 

1422 while q.underlayer is not None: 

1423 q = q.underlayer 

1424 return q 

1425 

1426 def __getitem__(self, cls): 

1427 # type: (Union[Type[Packet], str]) -> Any 

1428 if isinstance(cls, slice): 

1429 lname = cls.start 

1430 if cls.stop: 

1431 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {})) 

1432 else: 

1433 ret = self.getlayer(cls.start, **(cls.step or {})) 

1434 else: 

1435 lname = cls 

1436 ret = self.getlayer(cls) 

1437 if ret is None: 

1438 if isinstance(lname, type): 

1439 name = lname.__name__ 

1440 elif not isinstance(lname, bytes): 

1441 name = repr(lname) 

1442 else: 

1443 name = cast(str, lname) 

1444 raise IndexError("Layer [%s] not found" % name) 

1445 return ret 

1446 

1447 def __delitem__(self, cls): 

1448 # type: (Type[Packet]) -> None 

1449 del self[cls].underlayer.payload 

1450 

1451 def __setitem__(self, cls, val): 

1452 # type: (Type[Packet], Packet) -> None 

1453 self[cls].underlayer.payload = val 

1454 

1455 def __contains__(self, cls): 

1456 # type: (Union[Type[Packet], str]) -> int 

1457 """ 

1458 "cls in self" returns true if self has a layer which is an 

1459 instance of cls. 

1460 """ 

1461 return self.haslayer(cls) 

1462 

1463 def route(self): 

1464 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]] 

1465 return self.payload.route() 

1466 

1467 def fragment(self, *args, **kargs): 

1468 # type: (*Any, **Any) -> List[Packet] 

1469 return self.payload.fragment(*args, **kargs) 

1470 

1471 def display(self, *args, **kargs): # Deprecated. Use show() 

1472 # type: (*Any, **Any) -> None 

1473 """Deprecated. Use show() method.""" 

1474 self.show(*args, **kargs) 

1475 

1476 def _show_or_dump(self, 

1477 dump=False, # type: bool 

1478 indent=3, # type: int 

1479 lvl="", # type: str 

1480 label_lvl="", # type: str 

1481 first_call=True # type: bool 

1482 ): 

1483 # type: (...) -> Optional[str] 

1484 """ 

1485 Internal method that shows or dumps a hierarchical view of a packet. 

1486 Called by show. 

1487 

1488 :param dump: determine if it prints or returns the string value 

1489 :param int indent: the size of indentation for each layer 

1490 :param str lvl: additional information about the layer lvl 

1491 :param str label_lvl: additional information about the layer fields 

1492 :param first_call: determine if the current function is the first 

1493 :return: return a hierarchical view if dump, else print it 

1494 """ 

1495 

1496 if dump: 

1497 from scapy.themes import ColorTheme, AnsiColorTheme 

1498 ct: ColorTheme = AnsiColorTheme() # No color for dump output 

1499 else: 

1500 ct = conf.color_theme 

1501 s = "%s%s %s %s\n" % (label_lvl, 

1502 ct.punct("###["), 

1503 ct.layer_name(self.name), 

1504 ct.punct("]###")) 

1505 fields = self.fields_desc.copy() 

1506 while fields: 

1507 f = fields.pop(0) 

1508 if isinstance(f, ConditionalField) and not f._evalcond(self): 

1509 continue 

1510 if hasattr(f, "fields"): # Field has subfields 

1511 s += "%s %s =\n" % ( 

1512 label_lvl + lvl, 

1513 ct.depreciate_field_name(f.name), 

1514 ) 

1515 lvl += " " * indent * self.show_indent 

1516 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)): 

1517 fields.insert(i, fld) 

1518 continue 

1519 if isinstance(f, Emph) or f in conf.emph: 

1520 ncol = ct.emph_field_name 

1521 vcol = ct.emph_field_value 

1522 else: 

1523 ncol = ct.field_name 

1524 vcol = ct.field_value 

1525 pad = max(0, 10 - len(f.name)) * " " 

1526 fvalue = self.getfieldval(f.name) 

1527 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501 

1528 s += "%s %s%s%s%s\n" % (label_lvl + lvl, 

1529 ct.punct("\\"), 

1530 ncol(f.name), 

1531 pad, 

1532 ct.punct("\\")) 

1533 fvalue_gen = SetGen( 

1534 fvalue, 

1535 _iterpacket=0 

1536 ) # type: SetGen[Packet] 

1537 for fvalue in fvalue_gen: 

1538 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501 

1539 else: 

1540 begn = "%s %s%s%s " % (label_lvl + lvl, 

1541 ncol(f.name), 

1542 pad, 

1543 ct.punct("="),) 

1544 reprval = f.i2repr(self, fvalue) 

1545 if isinstance(reprval, str): 

1546 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501 

1547 len(lvl) + 

1548 len(f.name) + 

1549 4)) 

1550 s += "%s%s\n" % (begn, vcol(reprval)) 

1551 if self.payload: 

1552 s += self.payload._show_or_dump( # type: ignore 

1553 dump=dump, 

1554 indent=indent, 

1555 lvl=lvl + (" " * indent * self.show_indent), 

1556 label_lvl=label_lvl, 

1557 first_call=False 

1558 ) 

1559 

1560 if first_call and not dump: 

1561 print(s) 

1562 return None 

1563 else: 

1564 return s 

1565 

1566 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 

1567 # type: (bool, int, str, str) -> Optional[Any] 

1568 """ 

1569 Prints or returns (when "dump" is true) a hierarchical view of the 

1570 packet. 

1571 

1572 :param dump: determine if it prints or returns the string value 

1573 :param int indent: the size of indentation for each layer 

1574 :param str lvl: additional information about the layer lvl 

1575 :param str label_lvl: additional information about the layer fields 

1576 :return: return a hierarchical view if dump, else print it 

1577 """ 

1578 return self._show_or_dump(dump, indent, lvl, label_lvl) 

1579 

1580 def show2(self, dump=False, indent=3, lvl="", label_lvl=""): 

1581 # type: (bool, int, str, str) -> Optional[Any] 

1582 """ 

1583 Prints or returns (when "dump" is true) a hierarchical view of an 

1584 assembled version of the packet, so that automatic fields are 

1585 calculated (checksums, etc.) 

1586 

1587 :param dump: determine if it prints or returns the string value 

1588 :param int indent: the size of indentation for each layer 

1589 :param str lvl: additional information about the layer lvl 

1590 :param str label_lvl: additional information about the layer fields 

1591 :return: return a hierarchical view if dump, else print it 

1592 """ 

1593 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl) 

1594 

1595 def sprintf(self, fmt, relax=1): 

1596 # type: (str, int) -> str 

1597 """ 

1598 sprintf(format, [relax=1]) -> str 

1599 

1600 Where format is a string that can include directives. A directive 

1601 begins and ends by % and has the following format: 

1602 ``%[fmt[r],][cls[:nb].]field%`` 

1603 

1604 :param fmt: is a classic printf directive, "r" can be appended for raw 

1605 substitution: 

1606 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer 

1607 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer). 

1608 Special case : "%.time%" is the creation time. 

1609 Ex:: 

1610 

1611 p.sprintf( 

1612 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% " 

1613 "%03xr,IP.proto% %r,TCP.flags%" 

1614 ) 

1615 

1616 Moreover, the format string can include conditional statements. A 

1617 conditional statement looks like : {layer:string} where layer is a 

1618 layer name, and string is the string to insert in place of the 

1619 condition if it is true, i.e. if layer is present. If layer is 

1620 preceded by a "!", the result is inverted. Conditions can be 

1621 imbricated. A valid statement can be:: 

1622 

1623 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet") 

1624 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}") 

1625 

1626 A side effect is that, to obtain "{" and "}" characters, you must use 

1627 "%(" and "%)". 

1628 """ 

1629 

1630 escape = {"%": "%", 

1631 "(": "{", 

1632 ")": "}"} 

1633 

1634 # Evaluate conditions 

1635 while "{" in fmt: 

1636 i = fmt.rindex("{") 

1637 j = fmt[i + 1:].index("}") 

1638 cond = fmt[i + 1:i + j + 1] 

1639 k = cond.find(":") 

1640 if k < 0: 

1641 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501 

1642 cond, format_ = cond[:k], cond[k + 1:] 

1643 res = False 

1644 if cond[0] == "!": 

1645 res = True 

1646 cond = cond[1:] 

1647 if self.haslayer(cond): 

1648 res = not res 

1649 if not res: 

1650 format_ = "" 

1651 fmt = fmt[:i] + format_ + fmt[i + j + 2:] 

1652 

1653 # Evaluate directives 

1654 s = "" 

1655 while "%" in fmt: 

1656 i = fmt.index("%") 

1657 s += fmt[:i] 

1658 fmt = fmt[i + 1:] 

1659 if fmt and fmt[0] in escape: 

1660 s += escape[fmt[0]] 

1661 fmt = fmt[1:] 

1662 continue 

1663 try: 

1664 i = fmt.index("%") 

1665 sfclsfld = fmt[:i] 

1666 fclsfld = sfclsfld.split(",") 

1667 if len(fclsfld) == 1: 

1668 f = "s" 

1669 clsfld = fclsfld[0] 

1670 elif len(fclsfld) == 2: 

1671 f, clsfld = fclsfld 

1672 else: 

1673 raise Scapy_Exception 

1674 if "." in clsfld: 

1675 cls, fld = clsfld.split(".") 

1676 else: 

1677 cls = self.__class__.__name__ 

1678 fld = clsfld 

1679 num = 1 

1680 if ":" in cls: 

1681 cls, snum = cls.split(":") 

1682 num = int(snum) 

1683 fmt = fmt[i + 1:] 

1684 except Exception: 

1685 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501 

1686 else: 

1687 if fld == "time": 

1688 val = time.strftime( 

1689 "%H:%M:%S.%%06i", 

1690 time.localtime(float(self.time)) 

1691 ) % int((self.time - int(self.time)) * 1000000) 

1692 elif cls == self.__class__.__name__ and hasattr(self, fld): 

1693 if num > 1: 

1694 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501 

1695 f = "s" 

1696 else: 

1697 try: 

1698 val = self.getfieldval(fld) 

1699 except AttributeError: 

1700 val = getattr(self, fld) 

1701 if f[-1] == "r": # Raw field value 

1702 f = f[:-1] 

1703 if not f: 

1704 f = "s" 

1705 else: 

1706 if fld in self.fieldtype: 

1707 val = self.fieldtype[fld].i2repr(self, val) 

1708 else: 

1709 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax) 

1710 f = "s" 

1711 s += ("%" + f) % val 

1712 

1713 s += fmt 

1714 return s 

1715 

1716 def mysummary(self): 

1717 # type: () -> str 

1718 """DEV: can be overloaded to return a string that summarizes the layer. 

1719 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501 

1720 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501 

1721 mysummary() must be called if they are present.""" 

1722 return "" 

1723 

1724 def _do_summary(self): 

1725 # type: () -> Tuple[int, str, List[Any]] 

1726 found, s, needed = self.payload._do_summary() 

1727 ret = "" 

1728 if not found or self.__class__ in needed: 

1729 ret = self.mysummary() 

1730 if isinstance(ret, tuple): 

1731 ret, n = ret 

1732 needed += n 

1733 if ret or needed: 

1734 found = 1 

1735 if not ret: 

1736 ret = self.__class__.__name__ if self.show_summary else "" 

1737 if self.__class__ in conf.emph: 

1738 impf = [] 

1739 for f in self.fields_desc: 

1740 if f in conf.emph: 

1741 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501 

1742 ret = "%s [%s]" % (ret, " ".join(impf)) 

1743 if ret and s: 

1744 ret = "%s / %s" % (ret, s) 

1745 else: 

1746 ret = "%s%s" % (ret, s) 

1747 return found, ret, needed 

1748 

1749 def summary(self, intern=0): 

1750 # type: (int) -> str 

1751 """Prints a one line summary of a packet.""" 

1752 return self._do_summary()[1] 

1753 

1754 def lastlayer(self, layer=None): 

1755 # type: (Optional[Packet]) -> Packet 

1756 """Returns the uppest layer of the packet""" 

1757 return self.payload.lastlayer(self) 

1758 

1759 def decode_payload_as(self, cls): 

1760 # type: (Type[Packet]) -> None 

1761 """Reassembles the payload and decode it using another packet class""" 

1762 s = raw(self.payload) 

1763 self.payload = cls(s, _internal=1, _underlayer=self) 

1764 pp = self 

1765 while pp.underlayer is not None: 

1766 pp = pp.underlayer 

1767 self.payload.dissection_done(pp) 

1768 

1769 def _command(self, json=False): 

1770 # type: (bool) -> List[Tuple[str, Any]] 

1771 """ 

1772 Internal method used to generate command() and json() 

1773 """ 

1774 f = [] 

1775 iterator: Iterator[Tuple[str, Any]] 

1776 if json: 

1777 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc) 

1778 else: 

1779 iterator = iter(self.fields.items()) 

1780 for fn, fv in iterator: 

1781 fld = self.get_field(fn) 

1782 if isinstance(fv, (list, dict, set)) and not fv and not fld.default: 

1783 continue 

1784 if isinstance(fv, Packet): 

1785 if json: 

1786 fv = {k: v for (k, v) in fv._command(json=True)} 

1787 else: 

1788 fv = fv.command() 

1789 elif fld.islist and fld.holds_packets and isinstance(fv, list): 

1790 if json: 

1791 fv = [ 

1792 {k: v for (k, v) in x} 

1793 for x in map(lambda y: Packet._command(y, json=True), fv) 

1794 ] 

1795 else: 

1796 fv = "[%s]" % ",".join(map(Packet.command, fv)) 

1797 elif fld.islist and isinstance(fv, list): 

1798 if json: 

1799 fv = [ 

1800 getattr(x, 'command', lambda: repr(x))() 

1801 for x in fv 

1802 ] 

1803 else: 

1804 fv = "[%s]" % ",".join( 

1805 getattr(x, 'command', lambda: repr(x))() 

1806 for x in fv 

1807 ) 

1808 elif isinstance(fv, FlagValue): 

1809 fv = int(fv) 

1810 elif callable(getattr(fv, 'command', None)): 

1811 fv = fv.command(json=json) 

1812 else: 

1813 if json: 

1814 if isinstance(fv, bytes): 

1815 fv = fv.decode("utf-8", errors="backslashreplace") 

1816 else: 

1817 fv = fld.i2h(self, fv) 

1818 else: 

1819 fv = repr(fld.i2h(self, fv)) 

1820 f.append((fn, fv)) 

1821 return f 

1822 

1823 def command(self): 

1824 # type: () -> str 

1825 """ 

1826 Returns a string representing the command you have to type to 

1827 obtain the same packet 

1828 """ 

1829 c = "%s(%s)" % ( 

1830 self.__class__.__name__, 

1831 ", ".join("%s=%s" % x for x in self._command()) 

1832 ) 

1833 pc = self.payload.command() 

1834 if pc: 

1835 c += "/" + pc 

1836 return c 

1837 

1838 def json(self): 

1839 # type: () -> str 

1840 """ 

1841 Returns a JSON representing the packet. 

1842 

1843 Please note that this cannot be used for bijective usage: data loss WILL occur, 

1844 so it will not make sense to try to rebuild the packet from the output. 

1845 This must only be used for a grepping/displaying purpose. 

1846 """ 

1847 dump = json.dumps({k: v for (k, v) in self._command(json=True)}) 

1848 pc = self.payload.json() 

1849 if pc: 

1850 dump = dump[:-1] + ", \"payload\": %s}" % pc 

1851 return dump 

1852 

1853 

1854class NoPayload(Packet): 

1855 def __new__(cls, *args, **kargs): 

1856 # type: (Type[Packet], *Any, **Any) -> NoPayload 

1857 singl = cls.__dict__.get("__singl__") 

1858 if singl is None: 

1859 cls.__singl__ = singl = Packet.__new__(cls) 

1860 Packet.__init__(singl) 

1861 return cast(NoPayload, singl) 

1862 

1863 def __init__(self, *args, **kargs): 

1864 # type: (*Any, **Any) -> None 

1865 pass 

1866 

1867 def dissection_done(self, pkt): 

1868 # type: (Packet) -> None 

1869 pass 

1870 

1871 def add_payload(self, payload): 

1872 # type: (Union[Packet, bytes]) -> NoReturn 

1873 raise Scapy_Exception("Can't add payload to NoPayload instance") 

1874 

1875 def remove_payload(self): 

1876 # type: () -> None 

1877 pass 

1878 

1879 def add_underlayer(self, underlayer): 

1880 # type: (Any) -> None 

1881 pass 

1882 

1883 def remove_underlayer(self, other): 

1884 # type: (Packet) -> None 

1885 pass 

1886 

1887 def add_parent(self, parent): 

1888 # type: (Any) -> None 

1889 pass 

1890 

1891 def remove_parent(self, other): 

1892 # type: (Packet) -> None 

1893 pass 

1894 

1895 def copy(self): 

1896 # type: () -> NoPayload 

1897 return self 

1898 

1899 def clear_cache(self): 

1900 # type: () -> None 

1901 pass 

1902 

1903 def __repr__(self): 

1904 # type: () -> str 

1905 return "" 

1906 

1907 def __str__(self): 

1908 # type: () -> str 

1909 return "" 

1910 

1911 def __bytes__(self): 

1912 # type: () -> bytes 

1913 return b"" 

1914 

1915 def __nonzero__(self): 

1916 # type: () -> bool 

1917 return False 

1918 __bool__ = __nonzero__ 

1919 

1920 def do_build(self): 

1921 # type: () -> bytes 

1922 return b"" 

1923 

1924 def build(self): 

1925 # type: () -> bytes 

1926 return b"" 

1927 

1928 def build_padding(self): 

1929 # type: () -> bytes 

1930 return b"" 

1931 

1932 def build_done(self, p): 

1933 # type: (bytes) -> bytes 

1934 return p 

1935 

1936 def build_ps(self, internal=0): 

1937 # type: (int) -> Tuple[bytes, List[Any]] 

1938 return b"", [] 

1939 

1940 def getfieldval(self, attr): 

1941 # type: (str) -> NoReturn 

1942 raise AttributeError(attr) 

1943 

1944 def getfield_and_val(self, attr): 

1945 # type: (str) -> NoReturn 

1946 raise AttributeError(attr) 

1947 

1948 def setfieldval(self, attr, val): 

1949 # type: (str, Any) -> NoReturn 

1950 raise AttributeError(attr) 

1951 

1952 def delfieldval(self, attr): 

1953 # type: (str) -> NoReturn 

1954 raise AttributeError(attr) 

1955 

1956 def hide_defaults(self): 

1957 # type: () -> None 

1958 pass 

1959 

1960 def __iter__(self): 

1961 # type: () -> Iterator[Packet] 

1962 return iter([]) 

1963 

1964 def __eq__(self, other): 

1965 # type: (Any) -> bool 

1966 if isinstance(other, NoPayload): 

1967 return True 

1968 return False 

1969 

1970 def hashret(self): 

1971 # type: () -> bytes 

1972 return b"" 

1973 

1974 def answers(self, other): 

1975 # type: (Packet) -> bool 

1976 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501 

1977 

1978 def haslayer(self, cls, _subclass=None): 

1979 # type: (Union[Type[Packet], str], Optional[bool]) -> int 

1980 return 0 

1981 

1982 def getlayer(self, 

1983 cls, # type: Union[int, Type[Packet], str] 

1984 nb=1, # type: int 

1985 _track=None, # type: Optional[List[int]] 

1986 _subclass=None, # type: Optional[bool] 

1987 **flt # type: Any 

1988 ): 

1989 # type: (...) -> Optional[Packet] 

1990 if _track is not None: 

1991 _track.append(nb) 

1992 return None 

1993 

1994 def fragment(self, *args, **kargs): 

1995 # type: (*Any, **Any) -> List[Packet] 

1996 raise Scapy_Exception("cannot fragment this packet") 

1997 

1998 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 

1999 # type: (bool, int, str, str) -> None 

2000 pass 

2001 

2002 def sprintf(self, fmt, relax=1): 

2003 # type: (str, int) -> str 

2004 if relax: 

2005 return "??" 

2006 else: 

2007 raise Scapy_Exception("Format not found [%s]" % fmt) 

2008 

2009 def _do_summary(self): 

2010 # type: () -> Tuple[int, str, List[Any]] 

2011 return 0, "", [] 

2012 

2013 def layers(self): 

2014 # type: () -> List[Type[Packet]] 

2015 return [] 

2016 

2017 def lastlayer(self, layer=None): 

2018 # type: (Optional[Packet]) -> Packet 

2019 return layer or self 

2020 

2021 def command(self): 

2022 # type: () -> str 

2023 return "" 

2024 

2025 def json(self): 

2026 # type: () -> str 

2027 return "" 

2028 

2029 def route(self): 

2030 # type: () -> Tuple[None, None, None] 

2031 return (None, None, None) 

2032 

2033 

2034#################### 

2035# packet classes # 

2036#################### 

2037 

2038 

2039class Raw(Packet): 

2040 name = "Raw" 

2041 fields_desc = [StrField("load", b"")] 

2042 

2043 def __init__(self, _pkt=b"", *args, **kwargs): 

2044 # type: (bytes, *Any, **Any) -> None 

2045 if _pkt and not isinstance(_pkt, bytes): 

2046 if isinstance(_pkt, tuple): 

2047 _pkt, bn = _pkt 

2048 _pkt = bytes_encode(_pkt), bn 

2049 else: 

2050 _pkt = bytes_encode(_pkt) 

2051 super(Raw, self).__init__(_pkt, *args, **kwargs) 

2052 

2053 def answers(self, other): 

2054 # type: (Packet) -> int 

2055 return 1 

2056 

2057 def mysummary(self): 

2058 # type: () -> str 

2059 cs = conf.raw_summary 

2060 if cs: 

2061 if callable(cs): 

2062 return "Raw %s" % cs(self.load) 

2063 else: 

2064 return "Raw %r" % self.load 

2065 return Packet.mysummary(self) 

2066 

2067 

2068class Padding(Raw): 

2069 name = "Padding" 

2070 

2071 def self_build(self): 

2072 # type: (Optional[Any]) -> bytes 

2073 return b"" 

2074 

2075 def build_padding(self): 

2076 # type: () -> bytes 

2077 return ( 

2078 bytes_encode(self.load) if self.raw_packet_cache is None 

2079 else self.raw_packet_cache 

2080 ) + self.payload.build_padding() 

2081 

2082 

2083conf.raw_layer = Raw 

2084conf.padding_layer = Padding 

2085if conf.default_l2 is None: 

2086 conf.default_l2 = Raw 

2087 

2088################# 

2089# Bind layers # 

2090################# 

2091 

2092 

2093def bind_bottom_up(lower, # type: Type[Packet] 

2094 upper, # type: Type[Packet] 

2095 __fval=None, # type: Optional[Any] 

2096 **fval # type: Any 

2097 ): 

2098 # type: (...) -> None 

2099 r"""Bind 2 layers for dissection. 

2100 The upper layer will be chosen for dissection on top of the lower layer, if 

2101 ALL the passed arguments are validated. If multiple calls are made with 

2102 the same layers, the last one will be used as default. 

2103 

2104 ex: 

2105 >>> bind_bottom_up(Ether, SNAP, type=0x1234) 

2106 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501 

2107 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501 

2108 """ 

2109 if __fval is not None: 

2110 fval.update(__fval) 

2111 lower.payload_guess = lower.payload_guess[:] 

2112 lower.payload_guess.append((fval, upper)) 

2113 

2114 

2115def bind_top_down(lower, # type: Type[Packet] 

2116 upper, # type: Type[Packet] 

2117 __fval=None, # type: Optional[Any] 

2118 **fval # type: Any 

2119 ): 

2120 # type: (...) -> None 

2121 """Bind 2 layers for building. 

2122 When the upper layer is added as a payload of the lower layer, all the 

2123 arguments will be applied to them. 

2124 

2125 ex: 

2126 >>> bind_top_down(Ether, SNAP, type=0x1234) 

2127 >>> Ether()/SNAP() 

2128 <Ether type=0x1234 |<SNAP |>> 

2129 """ 

2130 if __fval is not None: 

2131 fval.update(__fval) 

2132 upper._overload_fields = upper._overload_fields.copy() # type: ignore 

2133 upper._overload_fields[lower] = fval 

2134 

2135 

2136@conf.commands.register 

2137def bind_layers(lower, # type: Type[Packet] 

2138 upper, # type: Type[Packet] 

2139 __fval=None, # type: Optional[Dict[str, int]] 

2140 **fval # type: Any 

2141 ): 

2142 # type: (...) -> None 

2143 """Bind 2 layers on some specific fields' values. 

2144 

2145 It makes the packet being built and dissected when the arguments 

2146 are present. 

2147 

2148 This function calls both bind_bottom_up and bind_top_down, with 

2149 all passed arguments. 

2150 

2151 Please have a look at their docs: 

2152 - help(bind_bottom_up) 

2153 - help(bind_top_down) 

2154 """ 

2155 if __fval is not None: 

2156 fval.update(__fval) 

2157 bind_top_down(lower, upper, **fval) 

2158 bind_bottom_up(lower, upper, **fval) 

2159 

2160 

2161def split_bottom_up(lower, # type: Type[Packet] 

2162 upper, # type: Type[Packet] 

2163 __fval=None, # type: Optional[Any] 

2164 **fval # type: Any 

2165 ): 

2166 # type: (...) -> None 

2167 """This call un-links an association that was made using bind_bottom_up. 

2168 Have a look at help(bind_bottom_up) 

2169 """ 

2170 if __fval is not None: 

2171 fval.update(__fval) 

2172 

2173 def do_filter(params, cls): 

2174 # type: (Dict[str, int], Type[Packet]) -> bool 

2175 params_is_invalid = any( 

2176 k not in params or params[k] != v for k, v in fval.items() 

2177 ) 

2178 return cls != upper or params_is_invalid 

2179 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)] 

2180 

2181 

2182def split_top_down(lower, # type: Type[Packet] 

2183 upper, # type: Type[Packet] 

2184 __fval=None, # type: Optional[Any] 

2185 **fval # type: Any 

2186 ): 

2187 # type: (...) -> None 

2188 """This call un-links an association that was made using bind_top_down. 

2189 Have a look at help(bind_top_down) 

2190 """ 

2191 if __fval is not None: 

2192 fval.update(__fval) 

2193 if lower in upper._overload_fields: 

2194 ofval = upper._overload_fields[lower] 

2195 if any(k not in ofval or ofval[k] != v for k, v in fval.items()): 

2196 return 

2197 upper._overload_fields = upper._overload_fields.copy() # type: ignore 

2198 del upper._overload_fields[lower] 

2199 

2200 

2201@conf.commands.register 

2202def split_layers(lower, # type: Type[Packet] 

2203 upper, # type: Type[Packet] 

2204 __fval=None, # type: Optional[Any] 

2205 **fval # type: Any 

2206 ): 

2207 # type: (...) -> None 

2208 """Split 2 layers previously bound. 

2209 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501 

2210 bind_layers. 

2211 

2212 Please have a look at their docs: 

2213 - help(split_bottom_up) 

2214 - help(split_top_down) 

2215 """ 

2216 if __fval is not None: 

2217 fval.update(__fval) 

2218 split_bottom_up(lower, upper, **fval) 

2219 split_top_down(lower, upper, **fval) 

2220 

2221 

2222@conf.commands.register 

2223def explore(layer=None): 

2224 # type: (Optional[str]) -> None 

2225 """Function used to discover the Scapy layers and protocols. 

2226 It helps to see which packets exists in contrib or layer files. 

2227 

2228 params: 

2229 - layer: If specified, the function will explore the layer. If not, 

2230 the GUI mode will be activated, to browse the available layers 

2231 

2232 examples: 

2233 >>> explore() # Launches the GUI 

2234 >>> explore("dns") # Explore scapy.layers.dns 

2235 >>> explore("http2") # Explore scapy.contrib.http2 

2236 >>> explore(scapy.layers.bluetooth4LE) 

2237 

2238 Note: to search a packet by name, use ls("name") rather than explore. 

2239 """ 

2240 if layer is None: # GUI MODE 

2241 if not conf.interactive: 

2242 raise Scapy_Exception("explore() GUI-mode cannot be run in " 

2243 "interactive mode. Please provide a " 

2244 "'layer' parameter !") 

2245 # 0 - Imports 

2246 try: 

2247 import prompt_toolkit 

2248 except ImportError: 

2249 raise ImportError("prompt_toolkit is not installed ! " 

2250 "You may install IPython, which contains it, via" 

2251 " `pip install ipython`") 

2252 if not _version_checker(prompt_toolkit, (2, 0)): 

2253 raise ImportError("prompt_toolkit >= 2.0.0 is required !") 

2254 # Only available with prompt_toolkit > 2.0, not released on PyPi yet 

2255 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \ 

2256 button_dialog 

2257 from prompt_toolkit.formatted_text import HTML 

2258 # Check for prompt_toolkit >= 3.0.0 

2259 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str] 

2260 if _version_checker(prompt_toolkit, (3, 0)): 

2261 call_ptk = lambda x: x.run() 

2262 # 1 - Ask for layer or contrib 

2263 btn_diag = button_dialog( 

2264 title="Scapy v%s" % conf.version, 

2265 text=HTML( 

2266 '<style bg="white" fg="red">Chose the type of packets' 

2267 ' you want to explore:</style>' 

2268 ), 

2269 buttons=[ 

2270 ("Layers", "layers"), 

2271 ("Contribs", "contribs"), 

2272 ("Cancel", "cancel") 

2273 ]) 

2274 action = call_ptk(btn_diag) 

2275 # 2 - Retrieve list of Packets 

2276 if action == "layers": 

2277 # Get all loaded layers 

2278 lvalues = conf.layers.layers() 

2279 # Restrict to layers-only (not contribs) + packet.py and asn1*.py 

2280 values = [x for x in lvalues if ("layers" in x[0] or 

2281 "packet" in x[0] or 

2282 "asn1" in x[0])] 

2283 elif action == "contribs": 

2284 # Get all existing contribs 

2285 from scapy.main import list_contrib 

2286 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True)) 

2287 values = [(x['name'], x['description']) 

2288 for x in cvalues] 

2289 # Remove very specific modules 

2290 values = [x for x in values if "can" not in x[0]] 

2291 else: 

2292 # Escape/Cancel was pressed 

2293 return 

2294 # Build tree 

2295 if action == "contribs": 

2296 # A tree is a dictionary. Each layer contains a keyword 

2297 # _l which contains the files in the layer, and a _name 

2298 # argument which is its name. The other keys are the subfolders, 

2299 # which are similar dictionaries 

2300 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501 

2301 for name, desc in values: 

2302 if "." in name: # Folder detected 

2303 parts = name.split(".") 

2304 subtree = tree 

2305 for pa in parts[:-1]: 

2306 if pa not in subtree: 

2307 subtree[pa] = {} 

2308 # one layer deeper 

2309 subtree = subtree[pa] # type: ignore 

2310 subtree["_name"] = pa # type: ignore 

2311 if "_l" not in subtree: 

2312 subtree["_l"] = [] 

2313 subtree["_l"].append((parts[-1], desc)) # type: ignore 

2314 else: 

2315 tree["_l"].append((name, desc)) # type: ignore 

2316 elif action == "layers": 

2317 tree = {"_l": values} 

2318 # 3 - Ask for the layer/contrib module to explore 

2319 current = tree # type: Any 

2320 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501 

2321 while True: 

2322 # Generate tests & form 

2323 folders = list(current.keys()) 

2324 _radio_values = [ 

2325 ("$" + name, str('[+] ' + name.capitalize())) 

2326 for name in folders if not name.startswith("_") 

2327 ] + current.get("_l", []) # type: List[str] 

2328 cur_path = "" 

2329 if previous: 

2330 cur_path = ".".join( 

2331 itertools.chain( 

2332 (x["_name"] for x in previous[1:]), # type: ignore 

2333 (current["_name"],) 

2334 ) 

2335 ) 

2336 extra_text = ( 

2337 '\n<style bg="white" fg="green">> scapy.%s</style>' 

2338 ) % (action + ("." + cur_path if cur_path else "")) 

2339 # Show popup 

2340 rd_diag = radiolist_dialog( 

2341 values=_radio_values, 

2342 title="Scapy v%s" % conf.version, 

2343 text=HTML( 

2344 ( 

2345 '<style bg="white" fg="red">Please select a file' 

2346 'among the following, to see all layers contained in' 

2347 ' it:</style>' 

2348 ) + extra_text 

2349 ), 

2350 cancel_text="Back" if previous else "Cancel" 

2351 ) 

2352 result = call_ptk(rd_diag) 

2353 if result is None: 

2354 # User pressed "Cancel/Back" 

2355 if previous: # Back 

2356 current = previous.pop() 

2357 continue 

2358 else: # Cancel 

2359 return 

2360 if result.startswith("$"): 

2361 previous.append(current) 

2362 current = current[result[1:]] 

2363 else: 

2364 # Enter on layer 

2365 if previous: # In subfolder 

2366 result = cur_path + "." + result 

2367 break 

2368 # 4 - (Contrib only): load contrib 

2369 if action == "contribs": 

2370 from scapy.main import load_contrib 

2371 load_contrib(result) 

2372 result = "scapy.contrib." + result 

2373 else: # NON-GUI MODE 

2374 # We handle layer as a short layer name, full layer name 

2375 # or the module itself 

2376 if isinstance(layer, types.ModuleType): 

2377 layer = layer.__name__ 

2378 if isinstance(layer, str): 

2379 if layer.startswith("scapy.layers."): 

2380 result = layer 

2381 else: 

2382 if layer.startswith("scapy.contrib."): 

2383 layer = layer.replace("scapy.contrib.", "") 

2384 from scapy.main import load_contrib 

2385 load_contrib(layer) 

2386 result_layer, result_contrib = (("scapy.layers.%s" % layer), 

2387 ("scapy.contrib.%s" % layer)) 

2388 if result_layer in conf.layers.ldict: 

2389 result = result_layer 

2390 elif result_contrib in conf.layers.ldict: 

2391 result = result_contrib 

2392 else: 

2393 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 

2394 else: 

2395 warning("Wrong usage ! Check out help(explore)") 

2396 return 

2397 

2398 # COMMON PART 

2399 # Get the list of all Packets contained in that module 

2400 try: 

2401 all_layers = conf.layers.ldict[result] 

2402 except KeyError: 

2403 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 

2404 # Print 

2405 print(conf.color_theme.layer_name("Packets contained in %s:" % result)) 

2406 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 

2407 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers] 

2408 print(pretty_list(rtlst, [("Class", "Name")], borders=True)) 

2409 

2410 

2411def _pkt_ls(obj, # type: Union[Packet, Type[Packet]] 

2412 verbose=False, # type: bool 

2413 ): 

2414 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501 

2415 """Internal function used to resolve `fields_desc` to display it. 

2416 

2417 :param obj: a packet object or class 

2418 :returns: a list containing tuples [(name, clsname, clsname_extras, 

2419 default, long_attrs)] 

2420 """ 

2421 is_pkt = isinstance(obj, Packet) 

2422 if not issubtype(obj, Packet) and not is_pkt: 

2423 raise ValueError 

2424 fields = [] 

2425 for f in obj.fields_desc: 

2426 cur_fld = f 

2427 attrs = [] # type: List[str] 

2428 long_attrs = [] # type: List[str] 

2429 while isinstance(cur_fld, (Emph, ConditionalField)): 

2430 if isinstance(cur_fld, ConditionalField): 

2431 attrs.append(cur_fld.__class__.__name__[:4]) 

2432 cur_fld = cur_fld.fld 

2433 name = cur_fld.name 

2434 default = cur_fld.default 

2435 if verbose and isinstance(cur_fld, EnumField) \ 

2436 and hasattr(cur_fld, "i2s") and cur_fld.i2s: 

2437 if len(cur_fld.i2s or []) < 50: 

2438 long_attrs.extend( 

2439 "%s: %d" % (strval, numval) 

2440 for numval, strval in 

2441 sorted(cur_fld.i2s.items()) 

2442 ) 

2443 elif isinstance(cur_fld, MultiEnumField): 

2444 if isinstance(obj, Packet): 

2445 obj_pkt = obj 

2446 else: 

2447 obj_pkt = obj() 

2448 fld_depend = cur_fld.depends_on(obj_pkt) 

2449 attrs.append("Depends on %s" % fld_depend) 

2450 if verbose: 

2451 cur_i2s = cur_fld.i2s_multi.get( 

2452 cur_fld.depends_on(obj_pkt), {} 

2453 ) 

2454 if len(cur_i2s) < 50: 

2455 long_attrs.extend( 

2456 "%s: %d" % (strval, numval) 

2457 for numval, strval in 

2458 sorted(cur_i2s.items()) 

2459 ) 

2460 elif verbose and isinstance(cur_fld, FlagsField): 

2461 names = cur_fld.names 

2462 long_attrs.append(", ".join(names)) 

2463 elif isinstance(cur_fld, MultipleTypeField): 

2464 default = cur_fld.dflt.default 

2465 attrs.append(", ".join( 

2466 x[0].__class__.__name__ for x in 

2467 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)]) 

2468 )) 

2469 

2470 cls = cur_fld.__class__ 

2471 class_name_extras = "(%s)" % ( 

2472 ", ".join(attrs) 

2473 ) if attrs else "" 

2474 if isinstance(cur_fld, BitField): 

2475 class_name_extras += " (%d bit%s)" % ( 

2476 cur_fld.size, 

2477 "s" if cur_fld.size > 1 else "" 

2478 ) 

2479 fields.append( 

2480 (name, 

2481 cls, 

2482 class_name_extras, 

2483 repr(default), 

2484 long_attrs) 

2485 ) 

2486 return fields 

2487 

2488 

2489@conf.commands.register 

2490def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]] 

2491 case_sensitive=False, # type: bool 

2492 verbose=False # type: bool 

2493 ): 

2494 # type: (...) -> None 

2495 """List available layers, or infos on a given layer class or name. 

2496 

2497 :param obj: Packet / packet name to use 

2498 :param case_sensitive: if obj is a string, is it case sensitive? 

2499 :param verbose: 

2500 """ 

2501 if obj is None or isinstance(obj, str): 

2502 tip = False 

2503 if obj is None: 

2504 tip = True 

2505 all_layers = sorted(conf.layers, key=lambda x: x.__name__) 

2506 else: 

2507 pattern = re.compile( 

2508 obj, 

2509 0 if case_sensitive else re.I 

2510 ) 

2511 # We first order by accuracy, then length 

2512 if case_sensitive: 

2513 sorter = lambda x: (x.__name__.index(obj), len(x.__name__)) 

2514 else: 

2515 obj = obj.lower() 

2516 sorter = lambda x: (x.__name__.lower().index(obj), 

2517 len(x.__name__)) 

2518 all_layers = sorted((layer for layer in conf.layers 

2519 if (isinstance(layer.__name__, str) and 

2520 pattern.search(layer.__name__)) or 

2521 (isinstance(layer.name, str) and 

2522 pattern.search(layer.name))), 

2523 key=sorter) 

2524 for layer in all_layers: 

2525 print("%-10s : %s" % (layer.__name__, layer._name)) 

2526 if tip and conf.interactive: 

2527 print("\nTIP: You may use explore() to navigate through all " 

2528 "layers using a clear GUI") 

2529 else: 

2530 try: 

2531 fields = _pkt_ls( 

2532 obj, 

2533 verbose=verbose 

2534 ) 

2535 is_pkt = isinstance(obj, Packet) 

2536 # Print 

2537 for fname, cls, clsne, dflt, long_attrs in fields: 

2538 clsinfo = cls.__name__ + " " + clsne 

2539 print("%-10s : %-35s =" % (fname, clsinfo), end=' ') 

2540 if is_pkt: 

2541 print("%-15r" % (getattr(obj, fname),), end=' ') 

2542 print("(%r)" % (dflt,)) 

2543 for attr in long_attrs: 

2544 print("%-15s%s" % ("", attr)) 

2545 # Restart for payload if any 

2546 if is_pkt: 

2547 obj = cast(Packet, obj) 

2548 if isinstance(obj.payload, NoPayload): 

2549 return 

2550 print("--") 

2551 ls(obj.payload) 

2552 except ValueError: 

2553 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501 

2554 

2555 

2556@conf.commands.register 

2557def rfc(cls, ret=False, legend=True): 

2558 # type: (Type[Packet], bool, bool) -> Optional[str] 

2559 """ 

2560 Generate an RFC-like representation of a packet def. 

2561 

2562 :param cls: the Packet class 

2563 :param ret: return the result instead of printing (def. False) 

2564 :param legend: show text under the diagram (default True) 

2565 

2566 Ex:: 

2567 

2568 >>> rfc(Ether) 

2569 

2570 """ 

2571 if not issubclass(cls, Packet): 

2572 raise TypeError("Packet class expected") 

2573 cur_len = 0 

2574 cur_line = [] 

2575 lines = [] 

2576 # Get the size (width) that a field will take 

2577 # when formatted, from its length in bits 

2578 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int] 

2579 ident = 0 # Fields UUID 

2580 

2581 # Generate packet groups 

2582 def _iterfields() -> Iterator[Tuple[str, int]]: 

2583 for f in cls.fields_desc: 

2584 # Fancy field name 

2585 fname = f.name.upper().replace("_", " ") 

2586 fsize = int(f.sz * 8) 

2587 yield fname, fsize 

2588 # Add padding optionally 

2589 if isinstance(f, PadField): 

2590 if isinstance(f._align, tuple): 

2591 pad = - cur_len % (f._align[0] * 8) 

2592 else: 

2593 pad = - cur_len % (f._align * 8) 

2594 if pad: 

2595 yield "padding", pad 

2596 for fname, flen in _iterfields(): 

2597 cur_len += flen 

2598 ident += 1 

2599 # The field might exceed the current line or 

2600 # take more than one line. Copy it as required 

2601 while True: 

2602 over = max(0, cur_len - 32) # Exceed 

2603 len1 = clsize(flen - over) # What fits 

2604 cur_line.append((fname[:len1], len1, ident)) 

2605 if cur_len >= 32: 

2606 # Current line is full. start a new line 

2607 lines.append(cur_line) 

2608 cur_len = flen = over 

2609 fname = "" # do not repeat the field 

2610 cur_line = [] 

2611 if not over: 

2612 # there is no data left 

2613 break 

2614 else: 

2615 # End of the field 

2616 break 

2617 # Add the last line if un-finished 

2618 if cur_line: 

2619 lines.append(cur_line) 

2620 # Calculate separations between lines 

2621 seps = [] 

2622 seps.append("+-" * 32 + "+\n") 

2623 for i in range(len(lines) - 1): 

2624 # Start with a full line 

2625 sep = "+-" * 32 + "+\n" 

2626 # Get the line above and below the current 

2627 # separation 

2628 above, below = lines[i], lines[i + 1] 

2629 # The last field of above is shared with below 

2630 if above[-1][2] == below[0][2]: 

2631 # where the field in "above" starts 

2632 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1 

2633 # where the field in "below" ends 

2634 pos_below = below[0][1] 

2635 if pos_above < pos_below: 

2636 # they are overlapping. 

2637 # Now crop the space between those pos 

2638 # and fill it with " " 

2639 pos_above = pos_above + pos_above % 2 

2640 sep = ( 

2641 sep[:1 + pos_above] + 

2642 " " * (pos_below - pos_above) + 

2643 sep[1 + pos_below:] 

2644 ) 

2645 # line is complete 

2646 seps.append(sep) 

2647 # Graph 

2648 result = "" 

2649 # Bytes markers 

2650 result += " " + (" " * 19).join( 

2651 str(x) for x in range(4) 

2652 ) + "\n" 

2653 # Bits markers 

2654 result += " " + " ".join( 

2655 str(x % 10) for x in range(32) 

2656 ) + "\n" 

2657 # Add fields and their separations 

2658 for line, sep in zip(lines, seps): 

2659 result += sep 

2660 for elt, flen, _ in line: 

2661 result += "|" + elt.center(flen, " ") 

2662 result += "|\n" 

2663 result += "+-" * (cur_len or 32) + "+\n" 

2664 # Annotate with the figure name 

2665 if legend: 

2666 result += "\n" + ("Fig. " + cls.__name__).center(66, " ") 

2667 # return if asked for, else print 

2668 if ret: 

2669 return result 

2670 print(result) 

2671 return None 

2672 

2673 

2674############# 

2675# Fuzzing # 

2676############# 

2677 

2678_P = TypeVar('_P', bound=Packet) 

2679 

2680 

2681@conf.commands.register 

2682def fuzz(p, # type: _P 

2683 _inplace=0, # type: int 

2684 ): 

2685 # type: (...) -> _P 

2686 """ 

2687 Transform a layer into a fuzzy layer by replacing some default values 

2688 by random objects. 

2689 

2690 :param p: the Packet instance to fuzz 

2691 :return: the fuzzed packet. 

2692 """ 

2693 if not _inplace: 

2694 p = p.copy() 

2695 q = cast(Packet, p) 

2696 while not isinstance(q, NoPayload): 

2697 new_default_fields = {} 

2698 multiple_type_fields = [] # type: List[str] 

2699 for f in q.fields_desc: 

2700 if isinstance(f, PacketListField): 

2701 for r in getattr(q, f.name): 

2702 fuzz(r, _inplace=1) 

2703 elif isinstance(f, MultipleTypeField): 

2704 # the type of the field will depend on others 

2705 multiple_type_fields.append(f.name) 

2706 elif f.default is not None: 

2707 if not isinstance(f, ConditionalField) or f._evalcond(q): 

2708 rnd = f.randval() 

2709 if rnd is not None: 

2710 new_default_fields[f.name] = rnd 

2711 # Process packets with MultipleTypeFields 

2712 if multiple_type_fields: 

2713 # freeze the other random values 

2714 new_default_fields = { 

2715 key: (val._fix() if isinstance(val, VolatileValue) else val) 

2716 for key, val in new_default_fields.items() 

2717 } 

2718 q.default_fields.update(new_default_fields) 

2719 new_default_fields.clear() 

2720 # add the random values of the MultipleTypeFields 

2721 for name in multiple_type_fields: 

2722 fld = cast(MultipleTypeField, q.get_field(name)) 

2723 rnd = fld._find_fld_pkt(q).randval() 

2724 if rnd is not None: 

2725 new_default_fields[name] = rnd 

2726 q.default_fields.update(new_default_fields) 

2727 q = q.payload 

2728 return p