Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/fields.py: 63%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1920 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5# Copyright (C) Michael Farrell <micolous+git@gmail.com> 

6 

7 

8""" 

9Fields: basic data structures that make up parts of packets. 

10""" 

11 

12import calendar 

13import collections 

14import copy 

15import datetime 

16import inspect 

17import math 

18import socket 

19import struct 

20import time 

21import warnings 

22 

23from types import MethodType 

24from uuid import UUID 

25from enum import Enum 

26 

27from scapy.config import conf 

28from scapy.dadict import DADict 

29from scapy.volatile import RandBin, RandByte, RandEnumKeys, RandInt, \ 

30 RandIP, RandIP6, RandLong, RandMAC, RandNum, RandShort, RandSInt, \ 

31 RandSByte, RandTermString, RandUUID, VolatileValue, RandSShort, \ 

32 RandSLong, RandFloat 

33from scapy.data import EPOCH 

34from scapy.error import log_runtime, Scapy_Exception 

35from scapy.compat import bytes_hex, chb, orb, plain_str, raw, bytes_encode 

36from scapy.pton_ntop import inet_ntop, inet_pton 

37from scapy.utils import inet_aton, inet_ntoa, lhex, mac2str, str2mac, EDecimal 

38from scapy.utils6 import in6_6to4ExtractAddr, in6_isaddr6to4, \ 

39 in6_isaddrTeredo, in6_ptop, Net6, teredoAddrExtractInfo 

40from scapy.base_classes import Gen, Net, BasePacket, Field_metaclass 

41from scapy.error import warning 

42 

43# Typing imports 

44from typing import ( 

45 Any, 

46 AnyStr, 

47 Callable, 

48 Dict, 

49 List, 

50 Generic, 

51 Optional, 

52 Set, 

53 Tuple, 

54 Type, 

55 TypeVar, 

56 Union, 

57 # func 

58 cast, 

59 TYPE_CHECKING, 

60) 

61 

62if TYPE_CHECKING: 

63 # Do not import on runtime ! (import loop) 

64 from scapy.packet import Packet 

65 

66 

67class RawVal: 

68 r""" 

69 A raw value that will not be processed by the field and inserted 

70 as-is in the packet string. 

71 

72 Example:: 

73 

74 >>> a = IP(len=RawVal("####")) 

75 >>> bytes(a) 

76 b'F\x00####\x00\x01\x00\x005\xb5\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00' 

77 

78 """ 

79 

80 def __init__(self, val=b""): 

81 # type: (bytes) -> None 

82 self.val = bytes_encode(val) 

83 

84 def __str__(self): 

85 # type: () -> str 

86 return str(self.val) 

87 

88 def __bytes__(self): 

89 # type: () -> bytes 

90 return self.val 

91 

92 def __len__(self): 

93 # type: () -> int 

94 return len(self.val) 

95 

96 def __repr__(self): 

97 # type: () -> str 

98 return "<RawVal [%r]>" % self.val 

99 

100 

101class ObservableDict(Dict[int, str]): 

102 """ 

103 Helper class to specify a protocol extendable for runtime modifications 

104 """ 

105 

106 def __init__(self, *args, **kw): 

107 # type: (*Dict[int, str], **Any) -> None 

108 self.observers = [] # type: List[_EnumField[Any]] 

109 super(ObservableDict, self).__init__(*args, **kw) 

110 

111 def observe(self, observer): 

112 # type: (_EnumField[Any]) -> None 

113 self.observers.append(observer) 

114 

115 def __setitem__(self, key, value): 

116 # type: (int, str) -> None 

117 for o in self.observers: 

118 o.notify_set(self, key, value) 

119 super(ObservableDict, self).__setitem__(key, value) 

120 

121 def __delitem__(self, key): 

122 # type: (int) -> None 

123 for o in self.observers: 

124 o.notify_del(self, key) 

125 super(ObservableDict, self).__delitem__(key) 

126 

127 def update(self, anotherDict): # type: ignore 

128 for k in anotherDict: 

129 self[k] = anotherDict[k] 

130 

131 

132############ 

133# Fields # 

134############ 

135 

136I = TypeVar('I') # Internal storage # noqa: E741 

137M = TypeVar('M') # Machine storage 

138 

139 

140class Field(Generic[I, M], metaclass=Field_metaclass): 

141 """ 

142 For more information on how this works, please refer to the 

143 'Adding new protocols' chapter in the online documentation: 

144 

145 https://scapy.readthedocs.io/en/stable/build_dissect.html 

146 """ 

147 __slots__ = [ 

148 "name", 

149 "fmt", 

150 "default", 

151 "sz", 

152 "owners", 

153 "struct" 

154 ] 

155 islist = 0 

156 ismutable = False 

157 holds_packets = 0 

158 

159 def __init__(self, name, default, fmt="H"): 

160 # type: (str, Any, str) -> None 

161 if not isinstance(name, str): 

162 raise ValueError("name should be a string") 

163 self.name = name 

164 if fmt[0] in "@=<>!": 

165 self.fmt = fmt 

166 else: 

167 self.fmt = "!" + fmt 

168 self.struct = struct.Struct(self.fmt) 

169 self.default = self.any2i(None, default) 

170 self.sz = struct.calcsize(self.fmt) # type: int 

171 self.owners = [] # type: List[Type[Packet]] 

172 

173 def register_owner(self, cls): 

174 # type: (Type[Packet]) -> None 

175 self.owners.append(cls) 

176 

177 def i2len(self, 

178 pkt, # type: Packet 

179 x, # type: Any 

180 ): 

181 # type: (...) -> int 

182 """Convert internal value to a length usable by a FieldLenField""" 

183 if isinstance(x, RawVal): 

184 return len(x) 

185 return self.sz 

186 

187 def i2count(self, pkt, x): 

188 # type: (Optional[Packet], I) -> int 

189 """Convert internal value to a number of elements usable by a FieldLenField. 

190 Always 1 except for list fields""" 

191 return 1 

192 

193 def h2i(self, pkt, x): 

194 # type: (Optional[Packet], Any) -> I 

195 """Convert human value to internal value""" 

196 return cast(I, x) 

197 

198 def i2h(self, pkt, x): 

199 # type: (Optional[Packet], I) -> Any 

200 """Convert internal value to human value""" 

201 return x 

202 

203 def m2i(self, pkt, x): 

204 # type: (Optional[Packet], M) -> I 

205 """Convert machine value to internal value""" 

206 return cast(I, x) 

207 

208 def i2m(self, pkt, x): 

209 # type: (Optional[Packet], Optional[I]) -> M 

210 """Convert internal value to machine value""" 

211 if x is None: 

212 return cast(M, 0) 

213 elif isinstance(x, str): 

214 return cast(M, bytes_encode(x)) 

215 return cast(M, x) 

216 

217 def any2i(self, pkt, x): 

218 # type: (Optional[Packet], Any) -> Optional[I] 

219 """Try to understand the most input values possible and make an internal value from them""" # noqa: E501 

220 return self.h2i(pkt, x) 

221 

222 def i2repr(self, pkt, x): 

223 # type: (Optional[Packet], I) -> str 

224 """Convert internal value to a nice representation""" 

225 return repr(self.i2h(pkt, x)) 

226 

227 def addfield(self, pkt, s, val): 

228 # type: (Packet, bytes, Optional[I]) -> bytes 

229 """Add an internal value to a string 

230 

231 Copy the network representation of field `val` (belonging to layer 

232 `pkt`) to the raw string packet `s`, and return the new string packet. 

233 """ 

234 try: 

235 return s + self.struct.pack(self.i2m(pkt, val)) 

236 except struct.error as ex: 

237 raise ValueError( 

238 "Incorrect type of value for field %s:\n" % self.name + 

239 "struct.error('%s')\n" % ex + 

240 "To inject bytes into the field regardless of the type, " + 

241 "use RawVal. See help(RawVal)" 

242 ) 

243 

244 def getfield(self, pkt, s): 

245 # type: (Packet, bytes) -> Tuple[bytes, I] 

246 """Extract an internal value from a string 

247 

248 Extract from the raw packet `s` the field value belonging to layer 

249 `pkt`. 

250 

251 Returns a two-element list, 

252 first the raw packet string after having removed the extracted field, 

253 second the extracted field itself in internal representation. 

254 """ 

255 return s[self.sz:], self.m2i(pkt, self.struct.unpack(s[:self.sz])[0]) 

256 

257 def do_copy(self, x): 

258 # type: (I) -> I 

259 if isinstance(x, list): 

260 x = x[:] # type: ignore 

261 for i in range(len(x)): 

262 if isinstance(x[i], BasePacket): 

263 x[i] = x[i].copy() 

264 return x # type: ignore 

265 if hasattr(x, "copy"): 

266 return x.copy() # type: ignore 

267 return x 

268 

269 def __repr__(self): 

270 # type: () -> str 

271 return "<%s (%s).%s>" % ( 

272 self.__class__.__name__, 

273 ",".join(x.__name__ for x in self.owners), 

274 self.name 

275 ) 

276 

277 def copy(self): 

278 # type: () -> Field[I, M] 

279 return copy.copy(self) 

280 

281 def randval(self): 

282 # type: () -> VolatileValue[Any] 

283 """Return a volatile object whose value is both random and suitable for this field""" # noqa: E501 

284 fmtt = self.fmt[-1] 

285 if fmtt in "BbHhIiQq": 

286 return {"B": RandByte, "b": RandSByte, 

287 "H": RandShort, "h": RandSShort, 

288 "I": RandInt, "i": RandSInt, 

289 "Q": RandLong, "q": RandSLong}[fmtt]() 

290 elif fmtt == "s": 

291 if self.fmt[0] in "0123456789": 

292 value = int(self.fmt[:-1]) 

293 else: 

294 value = int(self.fmt[1:-1]) 

295 return RandBin(value) 

296 else: 

297 raise ValueError( 

298 "no random class for [%s] (fmt=%s)." % ( 

299 self.name, self.fmt 

300 ) 

301 ) 

302 

303 

304class _FieldContainer(object): 

305 """ 

306 A field that acts as a container for another field 

307 """ 

308 __slots__ = ["fld"] 

309 

310 def __getattr__(self, attr): 

311 # type: (str) -> Any 

312 return getattr(self.fld, attr) 

313 

314 

315AnyField = Union[Field[Any, Any], _FieldContainer] 

316 

317 

318class Emph(_FieldContainer): 

319 """Empathize sub-layer for display""" 

320 __slots__ = ["fld"] 

321 

322 def __init__(self, fld): 

323 # type: (Any) -> None 

324 self.fld = fld 

325 

326 def __eq__(self, other): 

327 # type: (Any) -> bool 

328 return bool(self.fld == other) 

329 

330 def __hash__(self): 

331 # type: () -> int 

332 return hash(self.fld) 

333 

334 

335class MayEnd(_FieldContainer): 

336 """ 

337 Allow packet dissection to end after the dissection of this field 

338 if no bytes are left. 

339 

340 A good example would be a length field that can be 0 or a set value, 

341 and where it would be too annoying to use multiple ConditionalFields 

342 

343 Important note: any field below this one MUST default 

344 to an empty value, else the behavior will be unexpected. 

345 """ 

346 __slots__ = ["fld"] 

347 

348 def __init__(self, fld): 

349 # type: (Any) -> None 

350 self.fld = fld 

351 

352 def __eq__(self, other): 

353 # type: (Any) -> bool 

354 return bool(self.fld == other) 

355 

356 def __hash__(self): 

357 # type: () -> int 

358 return hash(self.fld) 

359 

360 

361class ActionField(_FieldContainer): 

362 __slots__ = ["fld", "_action_method", "_privdata"] 

363 

364 def __init__(self, fld, action_method, **kargs): 

365 # type: (Field[Any, Any], str, **Any) -> None 

366 self.fld = fld 

367 self._action_method = action_method 

368 self._privdata = kargs 

369 

370 def any2i(self, pkt, val): 

371 # type: (Optional[Packet], int) -> Any 

372 getattr(pkt, self._action_method)(val, self.fld, **self._privdata) 

373 return getattr(self.fld, "any2i")(pkt, val) 

374 

375 

376class ConditionalField(_FieldContainer): 

377 __slots__ = ["fld", "cond"] 

378 

379 def __init__(self, 

380 fld, # type: AnyField 

381 cond # type: Callable[[Packet], bool] 

382 ): 

383 # type: (...) -> None 

384 self.fld = fld 

385 self.cond = cond 

386 

387 def _evalcond(self, pkt): 

388 # type: (Packet) -> bool 

389 return bool(self.cond(pkt)) 

390 

391 def any2i(self, pkt, x): 

392 # type: (Optional[Packet], Any) -> Any 

393 # BACKWARD COMPATIBILITY 

394 # Note: we shouldn't need this function. (it's not correct) 

395 # However, having i2h implemented (#2364), it changes the default 

396 # behavior and broke all packets that wrongly use two ConditionalField 

397 # with the same name. Those packets are the problem: they are wrongly 

398 # built (they should either be re-using the same conditional field, or 

399 # using a MultipleTypeField). 

400 # But I don't want to dive into fixing all of them just yet, 

401 # so for now, let's keep this this way, even though it's not correct. 

402 if type(self.fld) is Field: 

403 return x 

404 return self.fld.any2i(pkt, x) 

405 

406 def i2h(self, pkt, val): 

407 # type: (Optional[Packet], Any) -> Any 

408 if pkt and not self._evalcond(pkt): 

409 return None 

410 return self.fld.i2h(pkt, val) 

411 

412 def getfield(self, pkt, s): 

413 # type: (Packet, bytes) -> Tuple[bytes, Any] 

414 if self._evalcond(pkt): 

415 return self.fld.getfield(pkt, s) 

416 else: 

417 return s, None 

418 

419 def addfield(self, pkt, s, val): 

420 # type: (Packet, bytes, Any) -> bytes 

421 if self._evalcond(pkt): 

422 return self.fld.addfield(pkt, s, val) 

423 else: 

424 return s 

425 

426 def __getattr__(self, attr): 

427 # type: (str) -> Any 

428 return getattr(self.fld, attr) 

429 

430 

431class MultipleTypeField(_FieldContainer): 

432 """ 

433 MultipleTypeField are used for fields that can be implemented by 

434 various Field subclasses, depending on conditions on the packet. 

435 

436 It is initialized with `flds` and `dflt`. 

437 

438 :param dflt: is the default field type, to be used when none of the 

439 conditions matched the current packet. 

440 :param flds: is a list of tuples (`fld`, `cond`) or (`fld`, `cond`, `hint`) 

441 where `fld` if a field type, and `cond` a "condition" to 

442 determine if `fld` is the field type that should be used. 

443 

444 ``cond`` is either: 

445 

446 - a callable `cond_pkt` that accepts one argument (the packet) and 

447 returns True if `fld` should be used, False otherwise. 

448 - a tuple (`cond_pkt`, `cond_pkt_val`), where `cond_pkt` is the same 

449 as in the previous case and `cond_pkt_val` is a callable that 

450 accepts two arguments (the packet, and the value to be set) and 

451 returns True if `fld` should be used, False otherwise. 

452 

453 See scapy.layers.l2.ARP (type "help(ARP)" in Scapy) for an example of 

454 use. 

455 """ 

456 

457 __slots__ = ["flds", "dflt", "hints", "name", "default"] 

458 

459 def __init__( 

460 self, 

461 flds: List[Union[ 

462 Tuple[Field[Any, Any], Any, str], 

463 Tuple[Field[Any, Any], Any] 

464 ]], 

465 dflt: Field[Any, Any] 

466 ) -> None: 

467 self.hints = { 

468 x[0]: x[2] 

469 for x in flds 

470 if len(x) == 3 

471 } 

472 self.flds = [ 

473 (x[0], x[1]) for x in flds 

474 ] 

475 self.dflt = dflt 

476 self.default = None # So that we can detect changes in defaults 

477 self.name = self.dflt.name 

478 if any(x[0].name != self.name for x in self.flds): 

479 warnings.warn( 

480 ("All fields should have the same name in a " 

481 "MultipleTypeField (%s). Use hints.") % self.name, 

482 SyntaxWarning 

483 ) 

484 

485 def _iterate_fields_cond(self, pkt, val, use_val): 

486 # type: (Optional[Packet], Any, bool) -> Field[Any, Any] 

487 """Internal function used by _find_fld_pkt & _find_fld_pkt_val""" 

488 # Iterate through the fields 

489 for fld, cond in self.flds: 

490 if isinstance(cond, tuple): 

491 if use_val: 

492 if val is None: 

493 val = self.dflt.default 

494 if cond[1](pkt, val): 

495 return fld 

496 continue 

497 else: 

498 cond = cond[0] 

499 if cond(pkt): 

500 return fld 

501 return self.dflt 

502 

503 def _find_fld_pkt(self, pkt): 

504 # type: (Optional[Packet]) -> Field[Any, Any] 

505 """Given a Packet instance `pkt`, returns the Field subclass to be 

506used. If you know the value to be set (e.g., in .addfield()), use 

507._find_fld_pkt_val() instead. 

508 

509 """ 

510 return self._iterate_fields_cond(pkt, None, False) 

511 

512 def _find_fld_pkt_val(self, 

513 pkt, # type: Optional[Packet] 

514 val, # type: Any 

515 ): 

516 # type: (...) -> Tuple[Field[Any, Any], Any] 

517 """Given a Packet instance `pkt` and the value `val` to be set, 

518returns the Field subclass to be used, and the updated `val` if necessary. 

519 

520 """ 

521 fld = self._iterate_fields_cond(pkt, val, True) 

522 if val is None: 

523 val = fld.default 

524 return fld, val 

525 

526 def _find_fld(self): 

527 # type: () -> Field[Any, Any] 

528 """Returns the Field subclass to be used, depending on the Packet 

529instance, or the default subclass. 

530 

531DEV: since the Packet instance is not provided, we have to use a hack 

532to guess it. It should only be used if you cannot provide the current 

533Packet instance (for example, because of the current Scapy API). 

534 

535If you have the current Packet instance, use ._find_fld_pkt_val() (if 

536the value to set is also known) of ._find_fld_pkt() instead. 

537 

538 """ 

539 # Hack to preserve current Scapy API 

540 # See https://stackoverflow.com/a/7272464/3223422 

541 frame = inspect.currentframe().f_back.f_back # type: ignore 

542 while frame is not None: 

543 try: 

544 pkt = frame.f_locals['self'] 

545 except KeyError: 

546 pass 

547 else: 

548 if isinstance(pkt, tuple(self.dflt.owners)): 

549 if not pkt.default_fields: 

550 # Packet not initialized 

551 return self.dflt 

552 return self._find_fld_pkt(pkt) 

553 frame = frame.f_back 

554 return self.dflt 

555 

556 def getfield(self, 

557 pkt, # type: Packet 

558 s, # type: bytes 

559 ): 

560 # type: (...) -> Tuple[bytes, Any] 

561 return self._find_fld_pkt(pkt).getfield(pkt, s) 

562 

563 def addfield(self, pkt, s, val): 

564 # type: (Packet, bytes, Any) -> bytes 

565 fld, val = self._find_fld_pkt_val(pkt, val) 

566 return fld.addfield(pkt, s, val) 

567 

568 def any2i(self, pkt, val): 

569 # type: (Optional[Packet], Any) -> Any 

570 fld, val = self._find_fld_pkt_val(pkt, val) 

571 return fld.any2i(pkt, val) 

572 

573 def h2i(self, pkt, val): 

574 # type: (Optional[Packet], Any) -> Any 

575 fld, val = self._find_fld_pkt_val(pkt, val) 

576 return fld.h2i(pkt, val) 

577 

578 def i2h(self, 

579 pkt, # type: Packet 

580 val, # type: Any 

581 ): 

582 # type: (...) -> Any 

583 fld, val = self._find_fld_pkt_val(pkt, val) 

584 return fld.i2h(pkt, val) 

585 

586 def i2m(self, pkt, val): 

587 # type: (Optional[Packet], Optional[Any]) -> Any 

588 fld, val = self._find_fld_pkt_val(pkt, val) 

589 return fld.i2m(pkt, val) 

590 

591 def i2len(self, pkt, val): 

592 # type: (Packet, Any) -> int 

593 fld, val = self._find_fld_pkt_val(pkt, val) 

594 return fld.i2len(pkt, val) 

595 

596 def i2repr(self, pkt, val): 

597 # type: (Optional[Packet], Any) -> str 

598 fld, val = self._find_fld_pkt_val(pkt, val) 

599 hint = "" 

600 if fld in self.hints: 

601 hint = " (%s)" % self.hints[fld] 

602 return fld.i2repr(pkt, val) + hint 

603 

604 def register_owner(self, cls): 

605 # type: (Type[Packet]) -> None 

606 for fld, _ in self.flds: 

607 fld.owners.append(cls) 

608 self.dflt.owners.append(cls) 

609 

610 def get_fields_list(self): 

611 # type: () -> List[Any] 

612 return [self] 

613 

614 @property 

615 def fld(self): 

616 # type: () -> Field[Any, Any] 

617 return self._find_fld() 

618 

619 

620class PadField(_FieldContainer): 

621 """Add bytes after the proxified field so that it ends at the specified 

622 alignment from its beginning""" 

623 __slots__ = ["fld", "_align", "_padwith"] 

624 

625 def __init__(self, fld, align, padwith=None): 

626 # type: (AnyField, int, Optional[bytes]) -> None 

627 self.fld = fld 

628 self._align = align 

629 self._padwith = padwith or b"\x00" 

630 

631 def padlen(self, flen, pkt): 

632 # type: (int, Packet) -> int 

633 return -flen % self._align 

634 

635 def getfield(self, 

636 pkt, # type: Packet 

637 s, # type: bytes 

638 ): 

639 # type: (...) -> Tuple[bytes, Any] 

640 remain, val = self.fld.getfield(pkt, s) 

641 padlen = self.padlen(len(s) - len(remain), pkt) 

642 return remain[padlen:], val 

643 

644 def addfield(self, 

645 pkt, # type: Packet 

646 s, # type: bytes 

647 val, # type: Any 

648 ): 

649 # type: (...) -> bytes 

650 sval = self.fld.addfield(pkt, b"", val) 

651 return s + sval + ( 

652 self.padlen(len(sval), pkt) * self._padwith 

653 ) 

654 

655 

656class ReversePadField(PadField): 

657 """Add bytes BEFORE the proxified field so that it starts at the specified 

658 alignment from its beginning""" 

659 

660 def original_length(self, pkt): 

661 # type: (Packet) -> int 

662 return len(pkt.original) 

663 

664 def getfield(self, 

665 pkt, # type: Packet 

666 s, # type: bytes 

667 ): 

668 # type: (...) -> Tuple[bytes, Any] 

669 # We need to get the length that has already been dissected 

670 padlen = self.padlen(self.original_length(pkt) - len(s), pkt) 

671 return self.fld.getfield(pkt, s[padlen:]) 

672 

673 def addfield(self, 

674 pkt, # type: Packet 

675 s, # type: bytes 

676 val, # type: Any 

677 ): 

678 # type: (...) -> bytes 

679 sval = self.fld.addfield(pkt, b"", val) 

680 return s + struct.pack("%is" % ( 

681 self.padlen(len(s), pkt) 

682 ), self._padwith) + sval 

683 

684 

685class TrailerBytes(bytes): 

686 """ 

687 Reverses slice operations to take from the back of the packet, 

688 not the front 

689 """ 

690 

691 def __getitem__(self, item): # type: ignore 

692 # type: (Union[int, slice]) -> Union[int, bytes] 

693 if isinstance(item, int): 

694 if item < 0: 

695 item = 1 + item 

696 else: 

697 item = len(self) - 1 - item 

698 elif isinstance(item, slice): 

699 start, stop, step = item.start, item.stop, item.step 

700 new_start = -stop if stop else None 

701 new_stop = -start if start else None 

702 item = slice(new_start, new_stop, step) 

703 return super(self.__class__, self).__getitem__(item) 

704 

705 

706class TrailerField(_FieldContainer): 

707 """Special Field that gets its value from the end of the *packet* 

708 (Note: not layer, but packet). 

709 

710 Mostly used for FCS 

711 """ 

712 __slots__ = ["fld"] 

713 

714 def __init__(self, fld): 

715 # type: (Field[Any, Any]) -> None 

716 self.fld = fld 

717 

718 # Note: this is ugly. Very ugly. 

719 # Do not copy this crap elsewhere, so that if one day we get 

720 # brave enough to refactor it, it'll be easier. 

721 

722 def getfield(self, pkt, s): 

723 # type: (Packet, bytes) -> Tuple[bytes, int] 

724 previous_post_dissect = pkt.post_dissect 

725 

726 def _post_dissect(self, s): 

727 # type: (Packet, bytes) -> bytes 

728 # Reset packet to allow post_build 

729 self.raw_packet_cache = None 

730 self.post_dissect = previous_post_dissect # type: ignore 

731 return previous_post_dissect(s) 

732 pkt.post_dissect = MethodType(_post_dissect, pkt) # type: ignore 

733 s = TrailerBytes(s) 

734 s, val = self.fld.getfield(pkt, s) 

735 return bytes(s), val 

736 

737 def addfield(self, pkt, s, val): 

738 # type: (Packet, bytes, Optional[int]) -> bytes 

739 previous_post_build = pkt.post_build 

740 value = self.fld.addfield(pkt, b"", val) 

741 

742 def _post_build(self, p, pay): 

743 # type: (Packet, bytes, bytes) -> bytes 

744 pay += value 

745 self.post_build = previous_post_build # type: ignore 

746 return previous_post_build(p, pay) 

747 pkt.post_build = MethodType(_post_build, pkt) # type: ignore 

748 return s 

749 

750 

751class FCSField(TrailerField): 

752 """ 

753 A FCS field that gets appended at the end of the *packet* (not layer). 

754 """ 

755 

756 def __init__(self, *args, **kwargs): 

757 # type: (*Any, **Any) -> None 

758 super(FCSField, self).__init__(Field(*args, **kwargs)) 

759 

760 def i2repr(self, pkt, x): 

761 # type: (Optional[Packet], int) -> str 

762 return lhex(self.i2h(pkt, x)) 

763 

764 

765class DestField(Field[str, bytes]): 

766 __slots__ = ["defaultdst"] 

767 # Each subclass must have its own bindings attribute 

768 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]] 

769 

770 def __init__(self, name, default): 

771 # type: (str, str) -> None 

772 self.defaultdst = default 

773 

774 def dst_from_pkt(self, pkt): 

775 # type: (Packet) -> str 

776 for addr, condition in self.bindings.get(pkt.payload.__class__, []): 

777 try: 

778 if all(pkt.payload.getfieldval(field) == value 

779 for field, value in condition.items()): 

780 return addr # type: ignore 

781 except AttributeError: 

782 pass 

783 return self.defaultdst 

784 

785 @classmethod 

786 def bind_addr(cls, layer, addr, **condition): 

787 # type: (Type[Packet], str, **Any) -> None 

788 cls.bindings.setdefault(layer, []).append( # type: ignore 

789 (addr, condition) 

790 ) 

791 

792 

793class MACField(Field[Optional[str], bytes]): 

794 def __init__(self, name, default): 

795 # type: (str, Optional[Any]) -> None 

796 Field.__init__(self, name, default, "6s") 

797 

798 def i2m(self, pkt, x): 

799 # type: (Optional[Packet], Optional[str]) -> bytes 

800 if not x: 

801 return b"\0\0\0\0\0\0" 

802 try: 

803 y = mac2str(x) 

804 except (struct.error, OverflowError): 

805 y = bytes_encode(x) 

806 return y 

807 

808 def m2i(self, pkt, x): 

809 # type: (Optional[Packet], bytes) -> str 

810 return str2mac(x) 

811 

812 def any2i(self, pkt, x): 

813 # type: (Optional[Packet], Any) -> str 

814 if isinstance(x, bytes) and len(x) == 6: 

815 return self.m2i(pkt, x) 

816 return cast(str, x) 

817 

818 def i2repr(self, pkt, x): 

819 # type: (Optional[Packet], Optional[str]) -> str 

820 x = self.i2h(pkt, x) 

821 if x is None: 

822 return repr(x) 

823 if self in conf.resolve: 

824 x = conf.manufdb._resolve_MAC(x) 

825 return x 

826 

827 def randval(self): 

828 # type: () -> RandMAC 

829 return RandMAC() 

830 

831 

832class LEMACField(MACField): 

833 def i2m(self, pkt, x): 

834 # type: (Optional[Packet], Optional[str]) -> bytes 

835 return MACField.i2m(self, pkt, x)[::-1] 

836 

837 def m2i(self, pkt, x): 

838 # type: (Optional[Packet], bytes) -> str 

839 return MACField.m2i(self, pkt, x[::-1]) 

840 

841 

842class IPField(Field[Union[str, Net], bytes]): 

843 def __init__(self, name, default): 

844 # type: (str, Optional[str]) -> None 

845 Field.__init__(self, name, default, "4s") 

846 

847 def h2i(self, pkt, x): 

848 # type: (Optional[Packet], Union[AnyStr, List[AnyStr]]) -> Any 

849 if isinstance(x, bytes): 

850 x = plain_str(x) # type: ignore 

851 if isinstance(x, str): 

852 try: 

853 inet_aton(x) 

854 except socket.error: 

855 return Net(x) 

856 elif isinstance(x, tuple): 

857 if len(x) != 2: 

858 raise ValueError("Invalid IP format") 

859 return Net(*x) 

860 elif isinstance(x, list): 

861 return [self.h2i(pkt, n) for n in x] 

862 return x 

863 

864 def i2h(self, pkt, x): 

865 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str 

866 return cast(str, x) 

867 

868 def resolve(self, x): 

869 # type: (str) -> str 

870 if self in conf.resolve: 

871 try: 

872 ret = socket.gethostbyaddr(x)[0] 

873 except Exception: 

874 pass 

875 else: 

876 if ret: 

877 return ret 

878 return x 

879 

880 def i2m(self, pkt, x): 

881 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 

882 if x is None: 

883 return b'\x00\x00\x00\x00' 

884 return inet_aton(plain_str(x)) 

885 

886 def m2i(self, pkt, x): 

887 # type: (Optional[Packet], bytes) -> str 

888 return inet_ntoa(x) 

889 

890 def any2i(self, pkt, x): 

891 # type: (Optional[Packet], Any) -> Any 

892 return self.h2i(pkt, x) 

893 

894 def i2repr(self, pkt, x): 

895 # type: (Optional[Packet], Union[str, Net]) -> str 

896 r = self.resolve(self.i2h(pkt, x)) 

897 return r if isinstance(r, str) else repr(r) 

898 

899 def randval(self): 

900 # type: () -> RandIP 

901 return RandIP() 

902 

903 

904class SourceIPField(IPField): 

905 __slots__ = ["dstname"] 

906 

907 def __init__(self, name, dstname): 

908 # type: (str, Optional[str]) -> None 

909 IPField.__init__(self, name, None) 

910 self.dstname = dstname 

911 

912 def __findaddr(self, pkt): 

913 # type: (Packet) -> str 

914 if conf.route is None: 

915 # unused import, only to initialize conf.route 

916 import scapy.route # noqa: F401 

917 dst = ("0.0.0.0" if self.dstname is None 

918 else getattr(pkt, self.dstname) or "0.0.0.0") 

919 if isinstance(dst, (Gen, list)): 

920 r = { 

921 conf.route.route(str(daddr)) 

922 for daddr in dst 

923 } # type: Set[Tuple[str, str, str]] 

924 if len(r) > 1: 

925 warning("More than one possible route for %r" % (dst,)) 

926 return min(r)[1] 

927 return conf.route.route(dst)[1] 

928 

929 def i2m(self, pkt, x): 

930 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 

931 if x is None and pkt is not None: 

932 x = self.__findaddr(pkt) 

933 return super(SourceIPField, self).i2m(pkt, x) 

934 

935 def i2h(self, pkt, x): 

936 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str 

937 if x is None and pkt is not None: 

938 x = self.__findaddr(pkt) 

939 return super(SourceIPField, self).i2h(pkt, x) 

940 

941 

942class IP6Field(Field[Optional[Union[str, Net6]], bytes]): 

943 def __init__(self, name, default): 

944 # type: (str, Optional[str]) -> None 

945 Field.__init__(self, name, default, "16s") 

946 

947 def h2i(self, pkt, x): 

948 # type: (Optional[Packet], Optional[str]) -> str 

949 if isinstance(x, bytes): 

950 x = plain_str(x) 

951 if isinstance(x, str): 

952 try: 

953 x = in6_ptop(x) 

954 except socket.error: 

955 return Net6(x) # type: ignore 

956 elif isinstance(x, tuple): 

957 if len(x) != 2: 

958 raise ValueError("Invalid IPv6 format") 

959 return Net6(*x) 

960 elif isinstance(x, list): 

961 x = [self.h2i(pkt, n) for n in x] 

962 return x # type: ignore 

963 

964 def i2h(self, pkt, x): 

965 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 

966 return cast(str, x) 

967 

968 def i2m(self, pkt, x): 

969 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 

970 if x is None: 

971 x = "::" 

972 return inet_pton(socket.AF_INET6, plain_str(x)) 

973 

974 def m2i(self, pkt, x): 

975 # type: (Optional[Packet], bytes) -> str 

976 return inet_ntop(socket.AF_INET6, x) 

977 

978 def any2i(self, pkt, x): 

979 # type: (Optional[Packet], Optional[str]) -> str 

980 return self.h2i(pkt, x) 

981 

982 def i2repr(self, pkt, x): 

983 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 

984 if x is None: 

985 return self.i2h(pkt, x) 

986 elif not isinstance(x, Net6) and not isinstance(x, list): 

987 if in6_isaddrTeredo(x): # print Teredo info 

988 server, _, maddr, mport = teredoAddrExtractInfo(x) 

989 return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr, mport) # noqa: E501 

990 elif in6_isaddr6to4(x): # print encapsulated address 

991 vaddr = in6_6to4ExtractAddr(x) 

992 return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr) 

993 r = self.i2h(pkt, x) # No specific information to return 

994 return r if isinstance(r, str) else repr(r) 

995 

996 def randval(self): 

997 # type: () -> RandIP6 

998 return RandIP6() 

999 

1000 

1001class SourceIP6Field(IP6Field): 

1002 __slots__ = ["dstname"] 

1003 

1004 def __init__(self, name, dstname): 

1005 # type: (str, str) -> None 

1006 IP6Field.__init__(self, name, None) 

1007 self.dstname = dstname 

1008 

1009 def i2m(self, pkt, x): 

1010 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 

1011 if x is None: 

1012 dst = ("::" if self.dstname is None else 

1013 getattr(pkt, self.dstname) or "::") 

1014 iff, x, nh = conf.route6.route(dst) 

1015 return super(SourceIP6Field, self).i2m(pkt, x) 

1016 

1017 def i2h(self, pkt, x): 

1018 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 

1019 if x is None: 

1020 if conf.route6 is None: 

1021 # unused import, only to initialize conf.route6 

1022 import scapy.route6 # noqa: F401 

1023 dst = ("::" if self.dstname is None else getattr(pkt, self.dstname)) # noqa: E501 

1024 if isinstance(dst, (Gen, list)): 

1025 r = {conf.route6.route(str(daddr)) 

1026 for daddr in dst} 

1027 if len(r) > 1: 

1028 warning("More than one possible route for %r" % (dst,)) 

1029 x = min(r)[1] 

1030 else: 

1031 x = conf.route6.route(dst)[1] 

1032 return super(SourceIP6Field, self).i2h(pkt, x) 

1033 

1034 

1035class DestIP6Field(IP6Field, DestField): 

1036 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]] 

1037 

1038 def __init__(self, name, default): 

1039 # type: (str, str) -> None 

1040 IP6Field.__init__(self, name, None) 

1041 DestField.__init__(self, name, default) 

1042 

1043 def i2m(self, pkt, x): 

1044 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 

1045 if x is None and pkt is not None: 

1046 x = self.dst_from_pkt(pkt) 

1047 return IP6Field.i2m(self, pkt, x) 

1048 

1049 def i2h(self, pkt, x): 

1050 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 

1051 if x is None and pkt is not None: 

1052 x = self.dst_from_pkt(pkt) 

1053 return super(DestIP6Field, self).i2h(pkt, x) 

1054 

1055 

1056class ByteField(Field[int, int]): 

1057 def __init__(self, name, default): 

1058 # type: (str, Optional[int]) -> None 

1059 Field.__init__(self, name, default, "B") 

1060 

1061 

1062class XByteField(ByteField): 

1063 def i2repr(self, pkt, x): 

1064 # type: (Optional[Packet], int) -> str 

1065 return lhex(self.i2h(pkt, x)) 

1066 

1067 

1068# XXX Unused field: at least add some tests 

1069class OByteField(ByteField): 

1070 def i2repr(self, pkt, x): 

1071 # type: (Optional[Packet], int) -> str 

1072 return "%03o" % self.i2h(pkt, x) 

1073 

1074 

1075class ThreeBytesField(Field[int, int]): 

1076 def __init__(self, name, default): 

1077 # type: (str, int) -> None 

1078 Field.__init__(self, name, default, "!I") 

1079 

1080 def addfield(self, pkt, s, val): 

1081 # type: (Packet, bytes, Optional[int]) -> bytes 

1082 return s + struct.pack(self.fmt, self.i2m(pkt, val))[1:4] 

1083 

1084 def getfield(self, pkt, s): 

1085 # type: (Packet, bytes) -> Tuple[bytes, int] 

1086 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0]) # noqa: E501 

1087 

1088 

1089class X3BytesField(ThreeBytesField, XByteField): 

1090 def i2repr(self, pkt, x): 

1091 # type: (Optional[Packet], int) -> str 

1092 return XByteField.i2repr(self, pkt, x) 

1093 

1094 

1095class LEThreeBytesField(ByteField): 

1096 def __init__(self, name, default): 

1097 # type: (str, Optional[int]) -> None 

1098 Field.__init__(self, name, default, "<I") 

1099 

1100 def addfield(self, pkt, s, val): 

1101 # type: (Packet, bytes, Optional[int]) -> bytes 

1102 return s + struct.pack(self.fmt, self.i2m(pkt, val))[:3] 

1103 

1104 def getfield(self, pkt, s): 

1105 # type: (Optional[Packet], bytes) -> Tuple[bytes, int] 

1106 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, s[:3] + b"\x00")[0]) # noqa: E501 

1107 

1108 

1109class XLE3BytesField(LEThreeBytesField, XByteField): 

1110 def i2repr(self, pkt, x): 

1111 # type: (Optional[Packet], int) -> str 

1112 return XByteField.i2repr(self, pkt, x) 

1113 

1114 

1115def LEX3BytesField(*args, **kwargs): 

1116 # type: (*Any, **Any) -> Any 

1117 warnings.warn( 

1118 "LEX3BytesField is deprecated. Use XLE3BytesField", 

1119 DeprecationWarning 

1120 ) 

1121 return XLE3BytesField(*args, **kwargs) 

1122 

1123 

1124class NBytesField(Field[int, List[int]]): 

1125 def __init__(self, name, default, sz): 

1126 # type: (str, Optional[int], int) -> None 

1127 Field.__init__(self, name, default, "<" + "B" * sz) 

1128 

1129 def i2m(self, pkt, x): 

1130 # type: (Optional[Packet], Optional[int]) -> List[int] 

1131 if x is None: 

1132 return [0] * self.sz 

1133 x2m = list() 

1134 for _ in range(self.sz): 

1135 x2m.append(x % 256) 

1136 x //= 256 

1137 return x2m[::-1] 

1138 

1139 def m2i(self, pkt, x): 

1140 # type: (Optional[Packet], Union[List[int], int]) -> int 

1141 if isinstance(x, int): 

1142 return x 

1143 # x can be a tuple when coming from struct.unpack (from getfield) 

1144 if isinstance(x, (list, tuple)): 

1145 return sum(d * (256 ** i) for i, d in enumerate(reversed(x))) 

1146 return 0 

1147 

1148 def i2repr(self, pkt, x): 

1149 # type: (Optional[Packet], int) -> str 

1150 if isinstance(x, int): 

1151 return '%i' % x 

1152 return super(NBytesField, self).i2repr(pkt, x) 

1153 

1154 def addfield(self, pkt, s, val): 

1155 # type: (Optional[Packet], bytes, Optional[int]) -> bytes 

1156 return s + self.struct.pack(*self.i2m(pkt, val)) 

1157 

1158 def getfield(self, pkt, s): 

1159 # type: (Optional[Packet], bytes) -> Tuple[bytes, int] 

1160 return (s[self.sz:], 

1161 self.m2i(pkt, self.struct.unpack(s[:self.sz]))) # type: ignore 

1162 

1163 def randval(self): 

1164 # type: () -> RandNum 

1165 return RandNum(0, 2 ** (self.sz * 8) - 1) 

1166 

1167 

1168class XNBytesField(NBytesField): 

1169 def i2repr(self, pkt, x): 

1170 # type: (Optional[Packet], int) -> str 

1171 if isinstance(x, int): 

1172 return '0x%x' % x 

1173 # x can be a tuple when coming from struct.unpack (from getfield) 

1174 if isinstance(x, (list, tuple)): 

1175 return "0x" + "".join("%02x" % b for b in x) 

1176 return super(XNBytesField, self).i2repr(pkt, x) 

1177 

1178 

1179class SignedByteField(Field[int, int]): 

1180 def __init__(self, name, default): 

1181 # type: (str, Optional[int]) -> None 

1182 Field.__init__(self, name, default, "b") 

1183 

1184 

1185class FieldValueRangeException(Scapy_Exception): 

1186 pass 

1187 

1188 

1189class MaximumItemsCount(Scapy_Exception): 

1190 pass 

1191 

1192 

1193class FieldAttributeException(Scapy_Exception): 

1194 pass 

1195 

1196 

1197class YesNoByteField(ByteField): 

1198 """ 

1199 A byte based flag field that shows representation of its number 

1200 based on a given association 

1201 

1202 In its default configuration the following representation is generated: 

1203 x == 0 : 'no' 

1204 x != 0 : 'yes' 

1205 

1206 In more sophisticated use-cases (e.g. yes/no/invalid) one can use the 

1207 config attribute to configure. 

1208 Key-value, key-range and key-value-set associations that will be used to 

1209 generate the values representation. 

1210 

1211 - A range is given by a tuple (<first-val>, <last-value>) including the 

1212 last value. 

1213 - A single-value tuple is treated as scalar. 

1214 - A list defines a set of (probably non consecutive) values that should be 

1215 associated to a given key. 

1216 

1217 All values not associated with a key will be shown as number of type 

1218 unsigned byte. 

1219 

1220 **For instance**:: 

1221 

1222 config = { 

1223 'no' : 0, 

1224 'foo' : (1,22), 

1225 'yes' : 23, 

1226 'bar' : [24,25, 42, 48, 87, 253] 

1227 } 

1228 

1229 Generates the following representations:: 

1230 

1231 x == 0 : 'no' 

1232 x == 15: 'foo' 

1233 x == 23: 'yes' 

1234 x == 42: 'bar' 

1235 x == 43: 43 

1236 

1237 Another example, using the config attribute one could also revert 

1238 the stock-yes-no-behavior:: 

1239 

1240 config = { 

1241 'yes' : 0, 

1242 'no' : (1,255) 

1243 } 

1244 

1245 Will generate the following value representation:: 

1246 

1247 x == 0 : 'yes' 

1248 x != 0 : 'no' 

1249 

1250 """ 

1251 __slots__ = ['eval_fn'] 

1252 

1253 def _build_config_representation(self, config): 

1254 # type: (Dict[str, Any]) -> None 

1255 assoc_table = dict() 

1256 for key in config: 

1257 value_spec = config[key] 

1258 

1259 value_spec_type = type(value_spec) 

1260 

1261 if value_spec_type is int: 

1262 if value_spec < 0 or value_spec > 255: 

1263 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 

1264 'must be in range [0..255]'.format(value_spec)) # noqa: E501 

1265 assoc_table[value_spec] = key 

1266 

1267 elif value_spec_type is list: 

1268 for value in value_spec: 

1269 if value < 0 or value > 255: 

1270 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 

1271 'must be in range [0..255]'.format(value)) # noqa: E501 

1272 assoc_table[value] = key 

1273 

1274 elif value_spec_type is tuple: 

1275 value_spec_len = len(value_spec) 

1276 if value_spec_len != 2: 

1277 raise FieldAttributeException('invalid length {} of given config item tuple {} - must be ' # noqa: E501 

1278 '(<start-range>, <end-range>).'.format(value_spec_len, value_spec)) # noqa: E501 

1279 

1280 value_range_start = value_spec[0] 

1281 if value_range_start < 0 or value_range_start > 255: 

1282 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 

1283 'must be in range [0..255]'.format(value_range_start)) # noqa: E501 

1284 

1285 value_range_end = value_spec[1] 

1286 if value_range_end < 0 or value_range_end > 255: 

1287 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 

1288 'must be in range [0..255]'.format(value_range_end)) # noqa: E501 

1289 

1290 for value in range(value_range_start, value_range_end + 1): 

1291 

1292 assoc_table[value] = key 

1293 

1294 self.eval_fn = lambda x: assoc_table[x] if x in assoc_table else x 

1295 

1296 def __init__(self, name, default, config=None): 

1297 # type: (str, int, Optional[Dict[str, Any]]) -> None 

1298 

1299 if not config: 

1300 # this represents the common use case and therefore it is kept small # noqa: E501 

1301 self.eval_fn = lambda x: 'no' if x == 0 else 'yes' 

1302 else: 

1303 self._build_config_representation(config) 

1304 ByteField.__init__(self, name, default) 

1305 

1306 def i2repr(self, pkt, x): 

1307 # type: (Optional[Packet], int) -> str 

1308 return self.eval_fn(x) # type: ignore 

1309 

1310 

1311class ShortField(Field[int, int]): 

1312 def __init__(self, name, default): 

1313 # type: (str, Optional[int]) -> None 

1314 Field.__init__(self, name, default, "H") 

1315 

1316 

1317class SignedShortField(Field[int, int]): 

1318 def __init__(self, name, default): 

1319 # type: (str, Optional[int]) -> None 

1320 Field.__init__(self, name, default, "h") 

1321 

1322 

1323class LEShortField(Field[int, int]): 

1324 def __init__(self, name, default): 

1325 # type: (str, Optional[int]) -> None 

1326 Field.__init__(self, name, default, "<H") 

1327 

1328 

1329class LESignedShortField(Field[int, int]): 

1330 def __init__(self, name, default): 

1331 # type: (str, Optional[int]) -> None 

1332 Field.__init__(self, name, default, "<h") 

1333 

1334 

1335class XShortField(ShortField): 

1336 def i2repr(self, pkt, x): 

1337 # type: (Optional[Packet], int) -> str 

1338 return lhex(self.i2h(pkt, x)) 

1339 

1340 

1341class IntField(Field[int, int]): 

1342 def __init__(self, name, default): 

1343 # type: (str, Optional[int]) -> None 

1344 Field.__init__(self, name, default, "I") 

1345 

1346 

1347class SignedIntField(Field[int, int]): 

1348 def __init__(self, name, default): 

1349 # type: (str, int) -> None 

1350 Field.__init__(self, name, default, "i") 

1351 

1352 

1353class LEIntField(Field[int, int]): 

1354 def __init__(self, name, default): 

1355 # type: (str, Optional[int]) -> None 

1356 Field.__init__(self, name, default, "<I") 

1357 

1358 

1359class LESignedIntField(Field[int, int]): 

1360 def __init__(self, name, default): 

1361 # type: (str, int) -> None 

1362 Field.__init__(self, name, default, "<i") 

1363 

1364 

1365class XIntField(IntField): 

1366 def i2repr(self, pkt, x): 

1367 # type: (Optional[Packet], int) -> str 

1368 return lhex(self.i2h(pkt, x)) 

1369 

1370 

1371class XLEIntField(LEIntField, XIntField): 

1372 def i2repr(self, pkt, x): 

1373 # type: (Optional[Packet], int) -> str 

1374 return XIntField.i2repr(self, pkt, x) 

1375 

1376 

1377class XLEShortField(LEShortField, XShortField): 

1378 def i2repr(self, pkt, x): 

1379 # type: (Optional[Packet], int) -> str 

1380 return XShortField.i2repr(self, pkt, x) 

1381 

1382 

1383class LongField(Field[int, int]): 

1384 def __init__(self, name, default): 

1385 # type: (str, int) -> None 

1386 Field.__init__(self, name, default, "Q") 

1387 

1388 

1389class SignedLongField(Field[int, int]): 

1390 def __init__(self, name, default): 

1391 # type: (str, Optional[int]) -> None 

1392 Field.__init__(self, name, default, "q") 

1393 

1394 

1395class LELongField(LongField): 

1396 def __init__(self, name, default): 

1397 # type: (str, Optional[int]) -> None 

1398 Field.__init__(self, name, default, "<Q") 

1399 

1400 

1401class LESignedLongField(Field[int, int]): 

1402 def __init__(self, name, default): 

1403 # type: (str, Optional[Any]) -> None 

1404 Field.__init__(self, name, default, "<q") 

1405 

1406 

1407class XLongField(LongField): 

1408 def i2repr(self, pkt, x): 

1409 # type: (Optional[Packet], int) -> str 

1410 return lhex(self.i2h(pkt, x)) 

1411 

1412 

1413class XLELongField(LELongField, XLongField): 

1414 def i2repr(self, pkt, x): 

1415 # type: (Optional[Packet], int) -> str 

1416 return XLongField.i2repr(self, pkt, x) 

1417 

1418 

1419class IEEEFloatField(Field[int, int]): 

1420 def __init__(self, name, default): 

1421 # type: (str, Optional[int]) -> None 

1422 Field.__init__(self, name, default, "f") 

1423 

1424 

1425class IEEEDoubleField(Field[int, int]): 

1426 def __init__(self, name, default): 

1427 # type: (str, Optional[int]) -> None 

1428 Field.__init__(self, name, default, "d") 

1429 

1430 

1431class _StrField(Field[I, bytes]): 

1432 __slots__ = ["remain"] 

1433 

1434 def __init__(self, name, default, fmt="H", remain=0): 

1435 # type: (str, Optional[I], str, int) -> None 

1436 Field.__init__(self, name, default, fmt) 

1437 self.remain = remain 

1438 

1439 def i2len(self, pkt, x): 

1440 # type: (Optional[Packet], Any) -> int 

1441 if x is None: 

1442 return 0 

1443 return len(x) 

1444 

1445 def any2i(self, pkt, x): 

1446 # type: (Optional[Packet], Any) -> I 

1447 if isinstance(x, str): 

1448 x = bytes_encode(x) 

1449 return super(_StrField, self).any2i(pkt, x) # type: ignore 

1450 

1451 def i2repr(self, pkt, x): 

1452 # type: (Optional[Packet], I) -> str 

1453 if x and isinstance(x, bytes): 

1454 return repr(x) 

1455 return super(_StrField, self).i2repr(pkt, x) 

1456 

1457 def i2m(self, pkt, x): 

1458 # type: (Optional[Packet], Optional[I]) -> bytes 

1459 if x is None: 

1460 return b"" 

1461 if not isinstance(x, bytes): 

1462 return bytes_encode(x) 

1463 return x 

1464 

1465 def addfield(self, pkt, s, val): 

1466 # type: (Packet, bytes, Optional[I]) -> bytes 

1467 return s + self.i2m(pkt, val) 

1468 

1469 def getfield(self, pkt, s): 

1470 # type: (Packet, bytes) -> Tuple[bytes, I] 

1471 if self.remain == 0: 

1472 return b"", self.m2i(pkt, s) 

1473 else: 

1474 return s[-self.remain:], self.m2i(pkt, s[:-self.remain]) 

1475 

1476 def randval(self): 

1477 # type: () -> RandBin 

1478 return RandBin(RandNum(0, 1200)) 

1479 

1480 

1481class StrField(_StrField[bytes]): 

1482 pass 

1483 

1484 

1485class StrFieldUtf16(StrField): 

1486 def any2i(self, pkt, x): 

1487 # type: (Optional[Packet], Optional[str]) -> bytes 

1488 if isinstance(x, str): 

1489 return self.h2i(pkt, x) 

1490 return super(StrFieldUtf16, self).any2i(pkt, x) 

1491 

1492 def i2repr(self, pkt, x): 

1493 # type: (Optional[Packet], bytes) -> str 

1494 return plain_str(self.i2h(pkt, x)) 

1495 

1496 def h2i(self, pkt, x): 

1497 # type: (Optional[Packet], Optional[str]) -> bytes 

1498 return plain_str(x).encode('utf-16-le', errors="replace") 

1499 

1500 def i2h(self, pkt, x): 

1501 # type: (Optional[Packet], bytes) -> str 

1502 return bytes_encode(x).decode('utf-16-le', errors="replace") 

1503 

1504 

1505class _StrEnumField: 

1506 def __init__(self, **kwargs): 

1507 # type: (**Any) -> None 

1508 self.enum = kwargs.pop("enum", {}) 

1509 

1510 def i2repr(self, pkt, v): 

1511 # type: (Optional[Packet], bytes) -> str 

1512 r = v.rstrip(b"\0") 

1513 rr = repr(r) 

1514 if self.enum: 

1515 if v in self.enum: 

1516 rr = "%s (%s)" % (rr, self.enum[v]) 

1517 elif r in self.enum: 

1518 rr = "%s (%s)" % (rr, self.enum[r]) 

1519 return rr 

1520 

1521 

1522class StrEnumField(_StrEnumField, StrField): 

1523 __slots__ = ["enum"] 

1524 

1525 def __init__( 

1526 self, 

1527 name, # type: str 

1528 default, # type: bytes 

1529 enum=None, # type: Optional[Dict[str, str]] 

1530 **kwargs # type: Any 

1531 ): 

1532 # type: (...) -> None 

1533 StrField.__init__(self, name, default, **kwargs) # type: ignore 

1534 self.enum = enum 

1535 

1536 

1537K = TypeVar('K', List[BasePacket], BasePacket, Optional[BasePacket]) 

1538 

1539 

1540class _PacketField(_StrField[K]): 

1541 __slots__ = ["cls"] 

1542 holds_packets = 1 

1543 

1544 def __init__(self, 

1545 name, # type: str 

1546 default, # type: Optional[K] 

1547 pkt_cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501 

1548 ): 

1549 # type: (...) -> None 

1550 super(_PacketField, self).__init__(name, default) 

1551 self.cls = pkt_cls 

1552 

1553 def i2m(self, 

1554 pkt, # type: Optional[Packet] 

1555 i, # type: Any 

1556 ): 

1557 # type: (...) -> bytes 

1558 if i is None: 

1559 return b"" 

1560 return raw(i) 

1561 

1562 def m2i(self, pkt, m): # type: ignore 

1563 # type: (Optional[Packet], bytes) -> Packet 

1564 try: 

1565 # we want to set parent wherever possible 

1566 return self.cls(m, _parent=pkt) # type: ignore 

1567 except TypeError: 

1568 return self.cls(m) 

1569 

1570 

1571class _PacketFieldSingle(_PacketField[K]): 

1572 def any2i(self, pkt, x): 

1573 # type: (Optional[Packet], Any) -> K 

1574 if x and pkt and hasattr(x, "add_parent"): 

1575 cast("Packet", x).add_parent(pkt) 

1576 return super(_PacketFieldSingle, self).any2i(pkt, x) 

1577 

1578 def getfield(self, 

1579 pkt, # type: Packet 

1580 s, # type: bytes 

1581 ): 

1582 # type: (...) -> Tuple[bytes, K] 

1583 i = self.m2i(pkt, s) 

1584 remain = b"" 

1585 if conf.padding_layer in i: 

1586 r = i[conf.padding_layer] 

1587 del r.underlayer.payload 

1588 remain = r.load 

1589 return remain, i # type: ignore 

1590 

1591 

1592class PacketField(_PacketFieldSingle[BasePacket]): 

1593 def randval(self): # type: ignore 

1594 # type: () -> Packet 

1595 from scapy.packet import fuzz 

1596 return fuzz(self.cls()) # type: ignore 

1597 

1598 

1599class PacketLenField(_PacketFieldSingle[Optional[BasePacket]]): 

1600 __slots__ = ["length_from"] 

1601 

1602 def __init__(self, 

1603 name, # type: str 

1604 default, # type: Packet 

1605 cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501 

1606 length_from=None # type: Optional[Callable[[Packet], int]] # noqa: E501 

1607 ): 

1608 # type: (...) -> None 

1609 super(PacketLenField, self).__init__(name, default, cls) 

1610 self.length_from = length_from or (lambda x: 0) 

1611 

1612 def getfield(self, 

1613 pkt, # type: Packet 

1614 s, # type: bytes 

1615 ): 

1616 # type: (...) -> Tuple[bytes, Optional[BasePacket]] 

1617 len_pkt = self.length_from(pkt) 

1618 i = None 

1619 if len_pkt: 

1620 try: 

1621 i = self.m2i(pkt, s[:len_pkt]) 

1622 except Exception: 

1623 if conf.debug_dissector: 

1624 raise 

1625 i = conf.raw_layer(load=s[:len_pkt]) 

1626 return s[len_pkt:], i 

1627 

1628 

1629class PacketListField(_PacketField[List[BasePacket]]): 

1630 """PacketListField represents a list containing a series of Packet instances 

1631 that might occur right in the middle of another Packet field. 

1632 This field type may also be used to indicate that a series of Packet 

1633 instances have a sibling semantic instead of a parent/child relationship 

1634 (i.e. a stack of layers). All elements in PacketListField have current 

1635 packet referenced in parent field. 

1636 """ 

1637 __slots__ = ["count_from", "length_from", "next_cls_cb", "max_count"] 

1638 islist = 1 

1639 

1640 def __init__( 

1641 self, 

1642 name, # type: str 

1643 default, # type: Optional[List[BasePacket]] 

1644 pkt_cls=None, # type: Optional[Union[Callable[[bytes], Packet], Type[Packet]]] # noqa: E501 

1645 count_from=None, # type: Optional[Callable[[Packet], int]] 

1646 length_from=None, # type: Optional[Callable[[Packet], int]] 

1647 next_cls_cb=None, # type: Optional[Callable[[Packet, List[BasePacket], Optional[Packet], bytes], Type[Packet]]] # noqa: E501 

1648 max_count=None, # type: Optional[int] 

1649 ): 

1650 # type: (...) -> None 

1651 """ 

1652 The number of Packet instances that are dissected by this field can 

1653 be parametrized using one of three different mechanisms/parameters: 

1654 

1655 * count_from: a callback that returns the number of Packet 

1656 instances to dissect. The callback prototype is:: 

1657 

1658 count_from(pkt:Packet) -> int 

1659 

1660 * length_from: a callback that returns the number of bytes that 

1661 must be dissected by this field. The callback prototype is:: 

1662 

1663 length_from(pkt:Packet) -> int 

1664 

1665 * next_cls_cb: a callback that enables a Scapy developer to 

1666 dynamically discover if another Packet instance should be 

1667 dissected or not. See below for this callback prototype. 

1668 

1669 The bytes that are not consumed during the dissection of this field 

1670 are passed to the next field of the current packet. 

1671 

1672 For the serialization of such a field, the list of Packets that are 

1673 contained in a PacketListField can be heterogeneous and is 

1674 unrestricted. 

1675 

1676 The type of the Packet instances that are dissected with this field is 

1677 specified or discovered using one of the following mechanism: 

1678 

1679 * the pkt_cls parameter may contain a callable that returns an 

1680 instance of the dissected Packet. This may either be a 

1681 reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py) 

1682 to generate an homogeneous PacketListField or a function 

1683 deciding the type of the Packet instance 

1684 (e.g. _CDPGuessAddrRecord in contrib/cdp.py) 

1685 

1686 * the pkt_cls parameter may contain a class object with a defined 

1687 ``dispatch_hook`` classmethod. That method must return a Packet 

1688 instance. The ``dispatch_hook`` callmethod must implement the 

1689 following prototype:: 

1690 

1691 dispatch_hook(cls, 

1692 _pkt:Optional[Packet], 

1693 *args, **kargs 

1694 ) -> Type[Packet] 

1695 

1696 The _pkt parameter may contain a reference to the packet 

1697 instance containing the PacketListField that is being 

1698 dissected. 

1699 

1700 * the ``next_cls_cb`` parameter may contain a callable whose 

1701 prototype is:: 

1702 

1703 cbk(pkt:Packet, 

1704 lst:List[Packet], 

1705 cur:Optional[Packet], 

1706 remain:str 

1707 ) -> Optional[Type[Packet]] 

1708 

1709 The pkt argument contains a reference to the Packet instance 

1710 containing the PacketListField that is being dissected. 

1711 The lst argument is the list of all Packet instances that were 

1712 previously parsed during the current ``PacketListField`` 

1713 dissection, saved for the very last Packet instance. 

1714 The cur argument contains a reference to that very last parsed 

1715 ``Packet`` instance. The remain argument contains the bytes 

1716 that may still be consumed by the current PacketListField 

1717 dissection operation. 

1718 

1719 This callback returns either the type of the next Packet to 

1720 dissect or None to indicate that no more Packet are to be 

1721 dissected. 

1722 

1723 These four arguments allows a variety of dynamic discovery of 

1724 the number of Packet to dissect and of the type of each one of 

1725 these Packets, including: type determination based on current 

1726 Packet instances or its underlayers, continuation based on the 

1727 previously parsed Packet instances within that PacketListField, 

1728 continuation based on a look-ahead on the bytes to be 

1729 dissected... 

1730 

1731 The pkt_cls and next_cls_cb parameters are semantically exclusive, 

1732 although one could specify both. If both are specified, pkt_cls is 

1733 silently ignored. The same is true for count_from and next_cls_cb. 

1734 

1735 length_from and next_cls_cb are compatible and the dissection will 

1736 end, whichever of the two stop conditions comes first. 

1737 

1738 :param name: the name of the field 

1739 :param default: the default value of this field; generally an empty 

1740 Python list 

1741 :param pkt_cls: either a callable returning a Packet instance or a 

1742 class object defining a ``dispatch_hook`` class method 

1743 :param count_from: a callback returning the number of Packet 

1744 instances to dissect. 

1745 :param length_from: a callback returning the number of bytes to dissect 

1746 :param next_cls_cb: a callback returning either None or the type of 

1747 the next Packet to dissect. 

1748 :param max_count: an int containing the max amount of results. This is 

1749 a safety mechanism, exceeding this value will raise a Scapy_Exception. 

1750 """ 

1751 if default is None: 

1752 default = [] # Create a new list for each instance 

1753 super(PacketListField, self).__init__( 

1754 name, 

1755 default, 

1756 pkt_cls # type: ignore 

1757 ) 

1758 self.count_from = count_from 

1759 self.length_from = length_from 

1760 self.next_cls_cb = next_cls_cb 

1761 self.max_count = max_count 

1762 

1763 def any2i(self, pkt, x): 

1764 # type: (Optional[Packet], Any) -> List[BasePacket] 

1765 if not isinstance(x, list): 

1766 if x and pkt and hasattr(x, "add_parent"): 

1767 x.add_parent(pkt) 

1768 return [x] 

1769 elif pkt: 

1770 for i in x: 

1771 if not i or not hasattr(i, "add_parent"): 

1772 continue 

1773 i.add_parent(pkt) 

1774 return x 

1775 

1776 def i2count(self, 

1777 pkt, # type: Optional[Packet] 

1778 val, # type: List[BasePacket] 

1779 ): 

1780 # type: (...) -> int 

1781 if isinstance(val, list): 

1782 return len(val) 

1783 return 1 

1784 

1785 def i2len(self, 

1786 pkt, # type: Optional[Packet] 

1787 val, # type: List[Packet] 

1788 ): 

1789 # type: (...) -> int 

1790 return sum(len(self.i2m(pkt, p)) for p in val) 

1791 

1792 def getfield(self, pkt, s): 

1793 # type: (Packet, bytes) -> Tuple[bytes, List[BasePacket]] 

1794 c = len_pkt = cls = None 

1795 if self.length_from is not None: 

1796 len_pkt = self.length_from(pkt) 

1797 elif self.count_from is not None: 

1798 c = self.count_from(pkt) 

1799 if self.next_cls_cb is not None: 

1800 cls = self.next_cls_cb(pkt, [], None, s) 

1801 c = 1 

1802 if cls is None: 

1803 c = 0 

1804 

1805 lst = [] # type: List[BasePacket] 

1806 ret = b"" 

1807 remain = s 

1808 if len_pkt is not None: 

1809 remain, ret = s[:len_pkt], s[len_pkt:] 

1810 while remain: 

1811 if c is not None: 

1812 if c <= 0: 

1813 break 

1814 c -= 1 

1815 try: 

1816 if cls is not None: 

1817 try: 

1818 # we want to set parent wherever possible 

1819 p = cls(remain, _parent=pkt) 

1820 except TypeError: 

1821 p = cls(remain) 

1822 else: 

1823 p = self.m2i(pkt, remain) 

1824 except Exception: 

1825 if conf.debug_dissector: 

1826 raise 

1827 p = conf.raw_layer(load=remain) 

1828 remain = b"" 

1829 else: 

1830 if conf.padding_layer in p: 

1831 pad = p[conf.padding_layer] 

1832 remain = pad.load 

1833 del pad.underlayer.payload 

1834 if self.next_cls_cb is not None: 

1835 cls = self.next_cls_cb(pkt, lst, p, remain) 

1836 if cls is not None: 

1837 c = 0 if c is None else c 

1838 c += 1 

1839 else: 

1840 remain = b"" 

1841 lst.append(p) 

1842 if len(lst) > (self.max_count or conf.max_list_count): 

1843 raise MaximumItemsCount( 

1844 "Maximum amount of items reached in PacketListField: %s " 

1845 "(defaults to conf.max_list_count)" 

1846 % (self.max_count or conf.max_list_count) 

1847 ) 

1848 

1849 if isinstance(remain, tuple): 

1850 remain, nb = remain 

1851 return (remain + ret, nb), lst 

1852 else: 

1853 return remain + ret, lst 

1854 

1855 def i2m(self, 

1856 pkt, # type: Optional[Packet] 

1857 i, # type: Any 

1858 ): 

1859 # type: (...) -> bytes 

1860 return bytes_encode(i) 

1861 

1862 def addfield(self, pkt, s, val): 

1863 # type: (Packet, bytes, Any) -> bytes 

1864 return s + b"".join(self.i2m(pkt, v) for v in val) 

1865 

1866 

1867class StrFixedLenField(StrField): 

1868 __slots__ = ["length_from"] 

1869 

1870 def __init__( 

1871 self, 

1872 name, # type: str 

1873 default, # type: Optional[bytes] 

1874 length=None, # type: Optional[int] 

1875 length_from=None, # type: Optional[Callable[[Packet], int]] # noqa: E501 

1876 ): 

1877 # type: (...) -> None 

1878 super(StrFixedLenField, self).__init__(name, default) 

1879 self.length_from = length_from or (lambda x: 0) 

1880 if length is not None: 

1881 self.sz = length 

1882 self.length_from = lambda x, length=length: length # type: ignore 

1883 

1884 def i2repr(self, 

1885 pkt, # type: Optional[Packet] 

1886 v, # type: bytes 

1887 ): 

1888 # type: (...) -> str 

1889 if isinstance(v, bytes): 

1890 v = v.rstrip(b"\0") 

1891 return super(StrFixedLenField, self).i2repr(pkt, v) 

1892 

1893 def getfield(self, pkt, s): 

1894 # type: (Packet, bytes) -> Tuple[bytes, bytes] 

1895 len_pkt = self.length_from(pkt) 

1896 if len_pkt == 0: 

1897 return s, b"" 

1898 return s[len_pkt:], self.m2i(pkt, s[:len_pkt]) 

1899 

1900 def addfield(self, pkt, s, val): 

1901 # type: (Packet, bytes, Optional[bytes]) -> bytes 

1902 len_pkt = self.length_from(pkt) 

1903 if len_pkt is None: 

1904 return s + self.i2m(pkt, val) 

1905 return s + struct.pack("%is" % len_pkt, self.i2m(pkt, val)) 

1906 

1907 def randval(self): 

1908 # type: () -> RandBin 

1909 try: 

1910 return RandBin(self.length_from(None)) # type: ignore 

1911 except Exception: 

1912 return RandBin(RandNum(0, 200)) 

1913 

1914 

1915class StrFixedLenFieldUtf16(StrFixedLenField, StrFieldUtf16): 

1916 pass 

1917 

1918 

1919class StrFixedLenEnumField(_StrEnumField, StrFixedLenField): 

1920 __slots__ = ["enum"] 

1921 

1922 def __init__( 

1923 self, 

1924 name, # type: str 

1925 default, # type: bytes 

1926 enum=None, # type: Optional[Dict[str, str]] 

1927 length=None, # type: Optional[int] 

1928 length_from=None # type: Optional[Callable[[Optional[Packet]], int]] # noqa: E501 

1929 ): 

1930 # type: (...) -> None 

1931 StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from) # noqa: E501 

1932 self.enum = enum 

1933 

1934 

1935class NetBIOSNameField(StrFixedLenField): 

1936 def __init__(self, name, default, length=31): 

1937 # type: (str, bytes, int) -> None 

1938 StrFixedLenField.__init__(self, name, default, length) 

1939 

1940 def i2m(self, pkt, y): 

1941 # type: (Optional[Packet], Optional[bytes]) -> bytes 

1942 if pkt: 

1943 len_pkt = self.length_from(pkt) // 2 

1944 else: 

1945 len_pkt = 0 

1946 x = bytes_encode(y or b"") # type: bytes 

1947 x += b" " * len_pkt 

1948 x = x[:len_pkt] 

1949 x = b"".join( 

1950 chb(0x41 + (orb(b) >> 4)) + 

1951 chb(0x41 + (orb(b) & 0xf)) 

1952 for b in x 

1953 ) # noqa: E501 

1954 return b" " + x 

1955 

1956 def m2i(self, pkt, x): 

1957 # type: (Optional[Packet], bytes) -> bytes 

1958 x = x[1:].strip(b"\x00").strip(b" ") 

1959 return b"".join(map( 

1960 lambda x, y: chb( 

1961 (((orb(x) - 1) & 0xf) << 4) + ((orb(y) - 1) & 0xf) 

1962 ), 

1963 x[::2], x[1::2] 

1964 )) 

1965 

1966 

1967class StrLenField(StrField): 

1968 """ 

1969 StrField with a length 

1970 

1971 :param length_from: a function that returns the size of the string 

1972 :param max_length: max size to use as randval 

1973 """ 

1974 __slots__ = ["length_from", "max_length"] 

1975 ON_WIRE_SIZE_UTF16 = True 

1976 

1977 def __init__( 

1978 self, 

1979 name, # type: str 

1980 default, # type: bytes 

1981 length_from=None, # type: Optional[Callable[[Packet], int]] 

1982 max_length=None, # type: Optional[Any] 

1983 ): 

1984 # type: (...) -> None 

1985 super(StrLenField, self).__init__(name, default) 

1986 self.length_from = length_from 

1987 self.max_length = max_length 

1988 

1989 def getfield(self, pkt, s): 

1990 # type: (Any, bytes) -> Tuple[bytes, bytes] 

1991 len_pkt = (self.length_from or (lambda x: 0))(pkt) 

1992 if not self.ON_WIRE_SIZE_UTF16: 

1993 len_pkt *= 2 

1994 if len_pkt == 0: 

1995 return s, b"" 

1996 return s[len_pkt:], self.m2i(pkt, s[:len_pkt]) 

1997 

1998 def randval(self): 

1999 # type: () -> RandBin 

2000 return RandBin(RandNum(0, self.max_length or 1200)) 

2001 

2002 

2003class _XStrField(Field[bytes, bytes]): 

2004 def i2repr(self, pkt, x): 

2005 # type: (Optional[Packet], bytes) -> str 

2006 if isinstance(x, bytes): 

2007 return bytes_hex(x).decode() 

2008 return super(_XStrField, self).i2repr(pkt, x) 

2009 

2010 

2011class XStrField(_XStrField, StrField): 

2012 """ 

2013 StrField which value is printed as hexadecimal. 

2014 """ 

2015 

2016 

2017class XStrLenField(_XStrField, StrLenField): 

2018 """ 

2019 StrLenField which value is printed as hexadecimal. 

2020 """ 

2021 

2022 

2023class XStrFixedLenField(_XStrField, StrFixedLenField): 

2024 """ 

2025 StrFixedLenField which value is printed as hexadecimal. 

2026 """ 

2027 

2028 

2029class XLEStrLenField(XStrLenField): 

2030 def i2m(self, pkt, x): 

2031 # type: (Optional[Packet], Optional[bytes]) -> bytes 

2032 if not x: 

2033 return b"" 

2034 return x[:: -1] 

2035 

2036 def m2i(self, pkt, x): 

2037 # type: (Optional[Packet], bytes) -> bytes 

2038 return x[:: -1] 

2039 

2040 

2041class StrLenFieldUtf16(StrLenField, StrFieldUtf16): 

2042 pass 

2043 

2044 

2045class StrLenEnumField(_StrEnumField, StrLenField): 

2046 __slots__ = ["enum"] 

2047 

2048 def __init__( 

2049 self, 

2050 name, # type: str 

2051 default, # type: bytes 

2052 enum=None, # type: Optional[Dict[str, str]] 

2053 **kwargs # type: Any 

2054 ): 

2055 # type: (...) -> None 

2056 StrLenField.__init__(self, name, default, **kwargs) 

2057 self.enum = enum 

2058 

2059 

2060class BoundStrLenField(StrLenField): 

2061 __slots__ = ["minlen", "maxlen"] 

2062 

2063 def __init__( 

2064 self, 

2065 name, # type: str 

2066 default, # type: bytes 

2067 minlen=0, # type: int 

2068 maxlen=255, # type: int 

2069 length_from=None # type: Optional[Callable[[Packet], int]] 

2070 ): 

2071 # type: (...) -> None 

2072 StrLenField.__init__(self, name, default, length_from=length_from) 

2073 self.minlen = minlen 

2074 self.maxlen = maxlen 

2075 

2076 def randval(self): 

2077 # type: () -> RandBin 

2078 return RandBin(RandNum(self.minlen, self.maxlen)) 

2079 

2080 

2081class FieldListField(Field[List[Any], List[Any]]): 

2082 __slots__ = ["field", "count_from", "length_from", "max_count"] 

2083 islist = 1 

2084 

2085 def __init__( 

2086 self, 

2087 name, # type: str 

2088 default, # type: Optional[List[AnyField]] 

2089 field, # type: AnyField 

2090 length_from=None, # type: Optional[Callable[[Packet], int]] 

2091 count_from=None, # type: Optional[Callable[[Packet], int]] 

2092 max_count=None, # type: Optional[int] 

2093 ): 

2094 # type: (...) -> None 

2095 if default is None: 

2096 default = [] # Create a new list for each instance 

2097 self.field = field 

2098 Field.__init__(self, name, default) 

2099 self.count_from = count_from 

2100 self.length_from = length_from 

2101 self.max_count = max_count 

2102 

2103 def i2count(self, pkt, val): 

2104 # type: (Optional[Packet], List[Any]) -> int 

2105 if isinstance(val, list): 

2106 return len(val) 

2107 return 1 

2108 

2109 def i2len(self, pkt, val): 

2110 # type: (Packet, List[Any]) -> int 

2111 return int(sum(self.field.i2len(pkt, v) for v in val)) 

2112 

2113 def any2i(self, pkt, x): 

2114 # type: (Optional[Packet], List[Any]) -> List[Any] 

2115 if not isinstance(x, list): 

2116 return [self.field.any2i(pkt, x)] 

2117 else: 

2118 return [self.field.any2i(pkt, e) for e in x] 

2119 

2120 def i2repr(self, 

2121 pkt, # type: Optional[Packet] 

2122 x, # type: List[Any] 

2123 ): 

2124 # type: (...) -> str 

2125 return "[%s]" % ", ".join(self.field.i2repr(pkt, v) for v in x) 

2126 

2127 def addfield(self, 

2128 pkt, # type: Packet 

2129 s, # type: bytes 

2130 val, # type: Optional[List[Any]] 

2131 ): 

2132 # type: (...) -> bytes 

2133 val = self.i2m(pkt, val) 

2134 for v in val: 

2135 s = self.field.addfield(pkt, s, v) 

2136 return s 

2137 

2138 def getfield(self, 

2139 pkt, # type: Packet 

2140 s, # type: bytes 

2141 ): 

2142 # type: (...) -> Any 

2143 c = len_pkt = None 

2144 if self.length_from is not None: 

2145 len_pkt = self.length_from(pkt) 

2146 elif self.count_from is not None: 

2147 c = self.count_from(pkt) 

2148 

2149 val = [] 

2150 ret = b"" 

2151 if len_pkt is not None: 

2152 s, ret = s[:len_pkt], s[len_pkt:] 

2153 

2154 while s: 

2155 if c is not None: 

2156 if c <= 0: 

2157 break 

2158 c -= 1 

2159 s, v = self.field.getfield(pkt, s) 

2160 val.append(v) 

2161 if len(val) > (self.max_count or conf.max_list_count): 

2162 raise MaximumItemsCount( 

2163 "Maximum amount of items reached in FieldListField: %s " 

2164 "(defaults to conf.max_list_count)" 

2165 % (self.max_count or conf.max_list_count) 

2166 ) 

2167 

2168 if isinstance(s, tuple): 

2169 s, bn = s 

2170 return (s + ret, bn), val 

2171 else: 

2172 return s + ret, val 

2173 

2174 

2175class FieldLenField(Field[int, int]): 

2176 __slots__ = ["length_of", "count_of", "adjust"] 

2177 

2178 def __init__( 

2179 self, 

2180 name, # type: str 

2181 default, # type: Optional[Any] 

2182 length_of=None, # type: Optional[str] 

2183 fmt="H", # type: str 

2184 count_of=None, # type: Optional[str] 

2185 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int] 

2186 ): 

2187 # type: (...) -> None 

2188 Field.__init__(self, name, default, fmt) 

2189 self.length_of = length_of 

2190 self.count_of = count_of 

2191 self.adjust = adjust 

2192 

2193 def i2m(self, pkt, x): 

2194 # type: (Optional[Packet], Optional[int]) -> int 

2195 if x is None and pkt is not None: 

2196 if self.length_of is not None: 

2197 fld, fval = pkt.getfield_and_val(self.length_of) 

2198 f = fld.i2len(pkt, fval) 

2199 elif self.count_of is not None: 

2200 fld, fval = pkt.getfield_and_val(self.count_of) 

2201 f = fld.i2count(pkt, fval) 

2202 else: 

2203 raise ValueError( 

2204 "Field should have either length_of or count_of" 

2205 ) 

2206 x = self.adjust(pkt, f) 

2207 elif x is None: 

2208 x = 0 

2209 return x 

2210 

2211 

2212class StrNullField(StrField): 

2213 DELIMITER = b"\x00" 

2214 

2215 def addfield(self, pkt, s, val): 

2216 # type: (Packet, bytes, Optional[bytes]) -> bytes 

2217 return s + self.i2m(pkt, val) + self.DELIMITER 

2218 

2219 def getfield(self, 

2220 pkt, # type: Packet 

2221 s, # type: bytes 

2222 ): 

2223 # type: (...) -> Tuple[bytes, bytes] 

2224 len_str = 0 

2225 while True: 

2226 len_str = s.find(self.DELIMITER, len_str) 

2227 if len_str < 0: 

2228 # DELIMITER not found: return empty 

2229 return b"", s 

2230 if len_str % len(self.DELIMITER): 

2231 len_str += 1 

2232 else: 

2233 break 

2234 return s[len_str + len(self.DELIMITER):], self.m2i(pkt, s[:len_str]) 

2235 

2236 def randval(self): 

2237 # type: () -> RandTermString 

2238 return RandTermString(RandNum(0, 1200), self.DELIMITER) 

2239 

2240 def i2len(self, pkt, x): 

2241 # type: (Optional[Packet], Any) -> int 

2242 return super(StrNullField, self).i2len(pkt, x) + 1 

2243 

2244 

2245class StrNullFieldUtf16(StrNullField, StrFieldUtf16): 

2246 DELIMITER = b"\x00\x00" 

2247 

2248 

2249class StrStopField(StrField): 

2250 __slots__ = ["stop", "additional"] 

2251 

2252 def __init__(self, name, default, stop, additional=0): 

2253 # type: (str, str, bytes, int) -> None 

2254 Field.__init__(self, name, default) 

2255 self.stop = stop 

2256 self.additional = additional 

2257 

2258 def getfield(self, pkt, s): 

2259 # type: (Optional[Packet], bytes) -> Tuple[bytes, bytes] 

2260 len_str = s.find(self.stop) 

2261 if len_str < 0: 

2262 return b"", s 

2263 len_str += len(self.stop) + self.additional 

2264 return s[len_str:], s[:len_str] 

2265 

2266 def randval(self): 

2267 # type: () -> RandTermString 

2268 return RandTermString(RandNum(0, 1200), self.stop) 

2269 

2270 

2271class LenField(Field[int, int]): 

2272 """ 

2273 If None, will be filled with the size of the payload 

2274 """ 

2275 __slots__ = ["adjust"] 

2276 

2277 def __init__(self, name, default, fmt="H", adjust=lambda x: x): 

2278 # type: (str, Optional[Any], str, Callable[[int], int]) -> None 

2279 Field.__init__(self, name, default, fmt) 

2280 self.adjust = adjust 

2281 

2282 def i2m(self, 

2283 pkt, # type: Optional[Packet] 

2284 x, # type: Optional[int] 

2285 ): 

2286 # type: (...) -> int 

2287 if x is None: 

2288 x = 0 

2289 if pkt is not None: 

2290 x = self.adjust(len(pkt.payload)) 

2291 return x 

2292 

2293 

2294class BCDFloatField(Field[float, int]): 

2295 def i2m(self, pkt, x): 

2296 # type: (Optional[Packet], Optional[float]) -> int 

2297 if x is None: 

2298 return 0 

2299 return int(256 * x) 

2300 

2301 def m2i(self, pkt, x): 

2302 # type: (Optional[Packet], int) -> float 

2303 return x / 256.0 

2304 

2305 

2306class _BitField(Field[I, int]): 

2307 """ 

2308 Field to handle bits. 

2309 

2310 :param name: name of the field 

2311 :param default: default value 

2312 :param size: size (in bits). If negative, Low endian 

2313 :param tot_size: size of the total group of bits (in bytes) the bitfield 

2314 is in. If negative, Low endian. 

2315 :param end_tot_size: same but for the BitField ending a group. 

2316 

2317 Example - normal usage:: 

2318 

2319 0 1 2 3 

2320 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 

2321 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 

2322 | A | B | C | 

2323 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 

2324 

2325 Fig. TestPacket 

2326 

2327 class TestPacket(Packet): 

2328 fields_desc = [ 

2329 BitField("a", 0, 14), 

2330 BitField("b", 0, 16), 

2331 BitField("c", 0, 2), 

2332 ] 

2333 

2334 Example - Low endian stored as 16 bits on the network:: 

2335 

2336 x x x x x x x x x x x x x x x x 

2337 a [b] [ c ] [ a ] 

2338 

2339 Will first get reversed during dissecion: 

2340 

2341 x x x x x x x x x x x x x x x x 

2342 [ a ] [b] [ c ] 

2343 

2344 class TestPacket(Packet): 

2345 fields_desc = [ 

2346 BitField("a", 0, 9, tot_size=-2), 

2347 BitField("b", 0, 2), 

2348 BitField("c", 0, 5, end_tot_size=-2) 

2349 ] 

2350 

2351 """ 

2352 __slots__ = ["rev", "size", "tot_size", "end_tot_size"] 

2353 

2354 def __init__(self, name, default, size, 

2355 tot_size=0, end_tot_size=0): 

2356 # type: (str, Optional[I], int, int, int) -> None 

2357 Field.__init__(self, name, default) 

2358 if callable(size): 

2359 size = size(self) 

2360 self.rev = size < 0 or tot_size < 0 or end_tot_size < 0 

2361 self.size = abs(size) 

2362 if not tot_size: 

2363 tot_size = self.size // 8 

2364 self.tot_size = abs(tot_size) 

2365 if not end_tot_size: 

2366 end_tot_size = self.size // 8 

2367 self.end_tot_size = abs(end_tot_size) 

2368 # Fields always have a round sz except BitField 

2369 # so to keep it simple, we'll ignore it here. 

2370 self.sz = self.size / 8. # type: ignore 

2371 

2372 # We need to # type: ignore a few things because of how special 

2373 # BitField is 

2374 def addfield(self, # type: ignore 

2375 pkt, # type: Packet 

2376 s, # type: Union[Tuple[bytes, int, int], bytes] 

2377 ival, # type: I 

2378 ): 

2379 # type: (...) -> Union[Tuple[bytes, int, int], bytes] 

2380 val = self.i2m(pkt, ival) 

2381 if isinstance(s, tuple): 

2382 s, bitsdone, v = s 

2383 else: 

2384 bitsdone = 0 

2385 v = 0 

2386 v <<= self.size 

2387 v |= val & ((1 << self.size) - 1) 

2388 bitsdone += self.size 

2389 while bitsdone >= 8: 

2390 bitsdone -= 8 

2391 s = s + struct.pack("!B", v >> bitsdone) 

2392 v &= (1 << bitsdone) - 1 

2393 if bitsdone: 

2394 return s, bitsdone, v 

2395 else: 

2396 # Apply LE if necessary 

2397 if self.rev and self.end_tot_size > 1: 

2398 s = s[:-self.end_tot_size] + s[-self.end_tot_size:][::-1] 

2399 return s 

2400 

2401 def getfield(self, # type: ignore 

2402 pkt, # type: Packet 

2403 s, # type: Union[Tuple[bytes, int], bytes] 

2404 ): 

2405 # type: (...) -> Union[Tuple[Tuple[bytes, int], I], Tuple[bytes, I]] # noqa: E501 

2406 if isinstance(s, tuple): 

2407 s, bn = s 

2408 else: 

2409 bn = 0 

2410 # Apply LE if necessary 

2411 if self.rev and self.tot_size > 1: 

2412 s = s[:self.tot_size][::-1] + s[self.tot_size:] 

2413 

2414 # we don't want to process all the string 

2415 nb_bytes = (self.size + bn - 1) // 8 + 1 

2416 w = s[:nb_bytes] 

2417 

2418 # split the substring byte by byte 

2419 _bytes = struct.unpack('!%dB' % nb_bytes, w) 

2420 

2421 b = 0 

2422 for c in range(nb_bytes): 

2423 b |= int(_bytes[c]) << (nb_bytes - c - 1) * 8 

2424 

2425 # get rid of high order bits 

2426 b &= (1 << (nb_bytes * 8 - bn)) - 1 

2427 

2428 # remove low order bits 

2429 b = b >> (nb_bytes * 8 - self.size - bn) 

2430 

2431 bn += self.size 

2432 s = s[bn // 8:] 

2433 bn = bn % 8 

2434 b2 = self.m2i(pkt, b) 

2435 if bn: 

2436 return (s, bn), b2 

2437 else: 

2438 return s, b2 

2439 

2440 def randval(self): 

2441 # type: () -> RandNum 

2442 return RandNum(0, 2**self.size - 1) 

2443 

2444 def i2len(self, pkt, x): # type: ignore 

2445 # type: (Optional[Packet], Optional[float]) -> float 

2446 return float(self.size) / 8 

2447 

2448 

2449class BitField(_BitField[int]): 

2450 __doc__ = _BitField.__doc__ 

2451 

2452 

2453class BitLenField(BitField): 

2454 __slots__ = ["length_from"] 

2455 

2456 def __init__(self, 

2457 name, # type: str 

2458 default, # type: Optional[int] 

2459 length_from # type: Callable[[Packet], int] 

2460 ): 

2461 # type: (...) -> None 

2462 self.length_from = length_from 

2463 super(BitLenField, self).__init__(name, default, 0) 

2464 

2465 def getfield(self, # type: ignore 

2466 pkt, # type: Packet 

2467 s, # type: Union[Tuple[bytes, int], bytes] 

2468 ): 

2469 # type: (...) -> Union[Tuple[Tuple[bytes, int], int], Tuple[bytes, int]] # noqa: E501 

2470 self.size = self.length_from(pkt) 

2471 return super(BitLenField, self).getfield(pkt, s) 

2472 

2473 def addfield(self, # type: ignore 

2474 pkt, # type: Packet 

2475 s, # type: Union[Tuple[bytes, int, int], bytes] 

2476 val # type: int 

2477 ): 

2478 # type: (...) -> Union[Tuple[bytes, int, int], bytes] 

2479 self.size = self.length_from(pkt) 

2480 return super(BitLenField, self).addfield(pkt, s, val) 

2481 

2482 

2483class BitFieldLenField(BitField): 

2484 __slots__ = ["length_of", "count_of", "adjust", "tot_size", "end_tot_size"] 

2485 

2486 def __init__(self, 

2487 name, # type: str 

2488 default, # type: Optional[int] 

2489 size, # type: int 

2490 length_of=None, # type: Optional[Union[Callable[[Optional[Packet]], int], str]] # noqa: E501 

2491 count_of=None, # type: Optional[str] 

2492 adjust=lambda pkt, x: x, # type: Callable[[Optional[Packet], int], int] # noqa: E501 

2493 tot_size=0, # type: int 

2494 end_tot_size=0, # type: int 

2495 ): 

2496 # type: (...) -> None 

2497 super(BitFieldLenField, self).__init__(name, default, size, 

2498 tot_size, end_tot_size) 

2499 self.length_of = length_of 

2500 self.count_of = count_of 

2501 self.adjust = adjust 

2502 

2503 def i2m(self, pkt, x): 

2504 # type: (Optional[Packet], Optional[Any]) -> int 

2505 return FieldLenField.i2m(self, pkt, x) # type: ignore 

2506 

2507 

2508class XBitField(BitField): 

2509 def i2repr(self, pkt, x): 

2510 # type: (Optional[Packet], int) -> str 

2511 return lhex(self.i2h(pkt, x)) 

2512 

2513 

2514class _EnumField(Field[Union[List[I], I], I]): 

2515 def __init__(self, 

2516 name, # type: str 

2517 default, # type: Optional[I] 

2518 enum, # type: Union[Dict[I, str], Dict[str, I], List[str], DADict[I, str], Type[Enum], Tuple[Callable[[I], str], Callable[[str], I]]] # noqa: E501 

2519 fmt="H", # type: str 

2520 ): 

2521 # type: (...) -> None 

2522 """ Initializes enum fields. 

2523 

2524 @param name: name of this field 

2525 @param default: default value of this field 

2526 @param enum: either an enum, a dict or a tuple of two callables. 

2527 Dict keys are the internal values, while the dict 

2528 values are the user-friendly representations. If the 

2529 tuple is provided, the first callable receives the 

2530 internal value as parameter and returns the 

2531 user-friendly representation and the second callable 

2532 does the converse. The first callable may return None 

2533 to default to a literal string (repr()) representation. 

2534 @param fmt: struct.pack format used to parse and serialize the 

2535 internal value from and to machine representation. 

2536 """ 

2537 if isinstance(enum, ObservableDict): 

2538 cast(ObservableDict, enum).observe(self) 

2539 

2540 if isinstance(enum, tuple): 

2541 self.i2s_cb = enum[0] # type: Optional[Callable[[I], str]] 

2542 self.s2i_cb = enum[1] # type: Optional[Callable[[str], I]] 

2543 self.i2s = None # type: Optional[Dict[I, str]] 

2544 self.s2i = None # type: Optional[Dict[str, I]] 

2545 elif isinstance(enum, type) and issubclass(enum, Enum): 

2546 # Python's Enum 

2547 i2s = self.i2s = {} 

2548 s2i = self.s2i = {} 

2549 self.i2s_cb = None 

2550 self.s2i_cb = None 

2551 names = [x.name for x in enum] 

2552 for n in names: 

2553 value = enum[n].value 

2554 i2s[value] = n 

2555 s2i[n] = value 

2556 else: 

2557 i2s = self.i2s = {} 

2558 s2i = self.s2i = {} 

2559 self.i2s_cb = None 

2560 self.s2i_cb = None 

2561 keys = [] # type: List[I] 

2562 if isinstance(enum, list): 

2563 keys = list(range(len(enum))) # type: ignore 

2564 elif isinstance(enum, DADict): 

2565 keys = enum.keys() 

2566 else: 

2567 keys = list(enum) # type: ignore 

2568 if any(isinstance(x, str) for x in keys): 

2569 i2s, s2i = s2i, i2s # type: ignore 

2570 for k in keys: 

2571 value = cast(str, enum[k]) # type: ignore 

2572 i2s[k] = value 

2573 s2i[value] = k 

2574 Field.__init__(self, name, default, fmt) 

2575 

2576 def any2i_one(self, pkt, x): 

2577 # type: (Optional[Packet], Any) -> I 

2578 if isinstance(x, Enum): 

2579 return cast(I, x.value) 

2580 elif isinstance(x, str): 

2581 if self.s2i: 

2582 x = self.s2i[x] 

2583 elif self.s2i_cb: 

2584 x = self.s2i_cb(x) 

2585 return cast(I, x) 

2586 

2587 def _i2repr(self, pkt, x): 

2588 # type: (Optional[Packet], I) -> str 

2589 return repr(x) 

2590 

2591 def i2repr_one(self, pkt, x): 

2592 # type: (Optional[Packet], I) -> str 

2593 if self not in conf.noenum and not isinstance(x, VolatileValue): 

2594 if self.i2s: 

2595 try: 

2596 return self.i2s[x] 

2597 except KeyError: 

2598 pass 

2599 elif self.i2s_cb: 

2600 ret = self.i2s_cb(x) 

2601 if ret is not None: 

2602 return ret 

2603 return self._i2repr(pkt, x) 

2604 

2605 def any2i(self, pkt, x): 

2606 # type: (Optional[Packet], Any) -> Union[I, List[I]] 

2607 if isinstance(x, list): 

2608 return [self.any2i_one(pkt, z) for z in x] 

2609 else: 

2610 return self.any2i_one(pkt, x) 

2611 

2612 def i2repr(self, pkt, x): # type: ignore 

2613 # type: (Optional[Packet], Any) -> Union[List[str], str] 

2614 if isinstance(x, list): 

2615 return [self.i2repr_one(pkt, z) for z in x] 

2616 else: 

2617 return self.i2repr_one(pkt, x) 

2618 

2619 def notify_set(self, enum, key, value): 

2620 # type: (ObservableDict, I, str) -> None 

2621 ks = "0x%x" if isinstance(key, int) else "%s" 

2622 log_runtime.debug( 

2623 "At %s: Change to %s at " + ks, self, value, key 

2624 ) 

2625 if self.i2s is not None and self.s2i is not None: 

2626 self.i2s[key] = value 

2627 self.s2i[value] = key 

2628 

2629 def notify_del(self, enum, key): 

2630 # type: (ObservableDict, I) -> None 

2631 ks = "0x%x" if isinstance(key, int) else "%s" 

2632 log_runtime.debug("At %s: Delete value at " + ks, self, key) 

2633 if self.i2s is not None and self.s2i is not None: 

2634 value = self.i2s[key] 

2635 del self.i2s[key] 

2636 del self.s2i[value] 

2637 

2638 

2639class EnumField(_EnumField[I]): 

2640 __slots__ = ["i2s", "s2i", "s2i_cb", "i2s_cb"] 

2641 

2642 

2643class CharEnumField(EnumField[str]): 

2644 def __init__(self, 

2645 name, # type: str 

2646 default, # type: str 

2647 enum, # type: Union[Dict[str, str], Tuple[Callable[[str], str], Callable[[str], str]]] # noqa: E501 

2648 fmt="1s", # type: str 

2649 ): 

2650 # type: (...) -> None 

2651 super(CharEnumField, self).__init__(name, default, enum, fmt) 

2652 if self.i2s is not None: 

2653 k = list(self.i2s) 

2654 if k and len(k[0]) != 1: 

2655 self.i2s, self.s2i = self.s2i, self.i2s 

2656 

2657 def any2i_one(self, pkt, x): 

2658 # type: (Optional[Packet], str) -> str 

2659 if len(x) != 1: 

2660 if self.s2i: 

2661 x = self.s2i[x] 

2662 elif self.s2i_cb: 

2663 x = self.s2i_cb(x) 

2664 return x 

2665 

2666 

2667class BitEnumField(_BitField[Union[List[int], int]], _EnumField[int]): 

2668 __slots__ = EnumField.__slots__ 

2669 

2670 def __init__(self, name, default, size, enum, **kwargs): 

2671 # type: (str, Optional[int], int, Dict[int, str], **Any) -> None 

2672 _EnumField.__init__(self, name, default, enum) 

2673 _BitField.__init__(self, name, default, size, **kwargs) 

2674 

2675 def any2i(self, pkt, x): 

2676 # type: (Optional[Packet], Any) -> Union[List[int], int] 

2677 return _EnumField.any2i(self, pkt, x) 

2678 

2679 def i2repr(self, 

2680 pkt, # type: Optional[Packet] 

2681 x, # type: Union[List[int], int] 

2682 ): 

2683 # type: (...) -> Any 

2684 return _EnumField.i2repr(self, pkt, x) 

2685 

2686 

2687class BitLenEnumField(BitLenField, _EnumField[int]): 

2688 __slots__ = EnumField.__slots__ 

2689 

2690 def __init__(self, 

2691 name, # type: str 

2692 default, # type: Optional[int] 

2693 length_from, # type: Callable[[Packet], int] 

2694 enum, # type: Dict[int, str] 

2695 **kwargs, # type: Any 

2696 ): 

2697 # type: (...) -> None 

2698 _EnumField.__init__(self, name, default, enum) 

2699 BitLenField.__init__(self, name, default, length_from, **kwargs) 

2700 

2701 def any2i(self, pkt, x): 

2702 # type: (Optional[Packet], Any) -> int 

2703 return _EnumField.any2i(self, pkt, x) # type: ignore 

2704 

2705 def i2repr(self, 

2706 pkt, # type: Optional[Packet] 

2707 x, # type: Union[List[int], int] 

2708 ): 

2709 # type: (...) -> Any 

2710 return _EnumField.i2repr(self, pkt, x) 

2711 

2712 

2713class ShortEnumField(EnumField[int]): 

2714 __slots__ = EnumField.__slots__ 

2715 

2716 def __init__(self, 

2717 name, # type: str 

2718 default, # type: int 

2719 enum, # type: Union[Dict[int, str], Dict[str, int], Tuple[Callable[[int], str], Callable[[str], int]], DADict[int, str]] # noqa: E501 

2720 ): 

2721 # type: (...) -> None 

2722 super(ShortEnumField, self).__init__(name, default, enum, "H") 

2723 

2724 

2725class LEShortEnumField(EnumField[int]): 

2726 def __init__(self, name, default, enum): 

2727 # type: (str, int, Union[Dict[int, str], List[str]]) -> None 

2728 super(LEShortEnumField, self).__init__(name, default, enum, "<H") 

2729 

2730 

2731class LELongEnumField(EnumField[int]): 

2732 def __init__(self, name, default, enum): 

2733 # type: (str, int, Union[Dict[int, str], List[str]]) -> None 

2734 super(LELongEnumField, self).__init__(name, default, enum, "<Q") 

2735 

2736 

2737class ByteEnumField(EnumField[int]): 

2738 def __init__(self, name, default, enum): 

2739 # type: (str, Optional[int], Dict[int, str]) -> None 

2740 super(ByteEnumField, self).__init__(name, default, enum, "B") 

2741 

2742 

2743class XByteEnumField(ByteEnumField): 

2744 def i2repr_one(self, pkt, x): 

2745 # type: (Optional[Packet], int) -> str 

2746 if self not in conf.noenum and not isinstance(x, VolatileValue): 

2747 if self.i2s: 

2748 try: 

2749 return self.i2s[x] 

2750 except KeyError: 

2751 pass 

2752 elif self.i2s_cb: 

2753 ret = self.i2s_cb(x) 

2754 if ret is not None: 

2755 return ret 

2756 return lhex(x) 

2757 

2758 

2759class IntEnumField(EnumField[int]): 

2760 def __init__(self, name, default, enum): 

2761 # type: (str, Optional[int], Dict[int, str]) -> None 

2762 super(IntEnumField, self).__init__(name, default, enum, "I") 

2763 

2764 

2765class SignedIntEnumField(EnumField[int]): 

2766 def __init__(self, name, default, enum): 

2767 # type: (str, Optional[int], Dict[int, str]) -> None 

2768 super(SignedIntEnumField, self).__init__(name, default, enum, "i") 

2769 

2770 

2771class LEIntEnumField(EnumField[int]): 

2772 def __init__(self, name, default, enum): 

2773 # type: (str, int, Dict[int, str]) -> None 

2774 super(LEIntEnumField, self).__init__(name, default, enum, "<I") 

2775 

2776 

2777class XShortEnumField(ShortEnumField): 

2778 def _i2repr(self, pkt, x): 

2779 # type: (Optional[Packet], Any) -> str 

2780 return lhex(x) 

2781 

2782 

2783class LE3BytesEnumField(LEThreeBytesField, _EnumField[int]): 

2784 __slots__ = EnumField.__slots__ 

2785 

2786 def __init__(self, name, default, enum): 

2787 # type: (str, Optional[int], Dict[int, str]) -> None 

2788 _EnumField.__init__(self, name, default, enum) 

2789 LEThreeBytesField.__init__(self, name, default) 

2790 

2791 def any2i(self, pkt, x): 

2792 # type: (Optional[Packet], Any) -> int 

2793 return _EnumField.any2i(self, pkt, x) # type: ignore 

2794 

2795 def i2repr(self, pkt, x): # type: ignore 

2796 # type: (Optional[Packet], Any) -> Union[List[str], str] 

2797 return _EnumField.i2repr(self, pkt, x) 

2798 

2799 

2800class XLE3BytesEnumField(LE3BytesEnumField): 

2801 def _i2repr(self, pkt, x): 

2802 # type: (Optional[Packet], Any) -> str 

2803 return lhex(x) 

2804 

2805 

2806class _MultiEnumField(_EnumField[I]): 

2807 def __init__(self, 

2808 name, # type: str 

2809 default, # type: int 

2810 enum, # type: Dict[I, Dict[I, str]] 

2811 depends_on, # type: Callable[[Optional[Packet]], I] 

2812 fmt="H" # type: str 

2813 ): 

2814 # type: (...) -> None 

2815 

2816 self.depends_on = depends_on 

2817 self.i2s_multi = enum 

2818 self.s2i_multi = {} # type: Dict[I, Dict[str, I]] 

2819 self.s2i_all = {} # type: Dict[str, I] 

2820 for m in enum: 

2821 s2i = {} # type: Dict[str, I] 

2822 self.s2i_multi[m] = s2i 

2823 for k, v in enum[m].items(): 

2824 s2i[v] = k 

2825 self.s2i_all[v] = k 

2826 Field.__init__(self, name, default, fmt) 

2827 

2828 def any2i_one(self, pkt, x): 

2829 # type: (Optional[Packet], Any) -> I 

2830 if isinstance(x, str): 

2831 v = self.depends_on(pkt) 

2832 if v in self.s2i_multi: 

2833 s2i = self.s2i_multi[v] 

2834 if x in s2i: 

2835 return s2i[x] 

2836 return self.s2i_all[x] 

2837 return cast(I, x) 

2838 

2839 def i2repr_one(self, pkt, x): 

2840 # type: (Optional[Packet], I) -> str 

2841 v = self.depends_on(pkt) 

2842 if isinstance(v, VolatileValue): 

2843 return repr(v) 

2844 if v in self.i2s_multi: 

2845 return str(self.i2s_multi[v].get(x, x)) 

2846 return str(x) 

2847 

2848 

2849class MultiEnumField(_MultiEnumField[int], EnumField[int]): 

2850 __slots__ = ["depends_on", "i2s_multi", "s2i_multi", "s2i_all"] 

2851 

2852 

2853class BitMultiEnumField(_BitField[Union[List[int], int]], 

2854 _MultiEnumField[int]): 

2855 __slots__ = EnumField.__slots__ + MultiEnumField.__slots__ 

2856 

2857 def __init__( 

2858 self, 

2859 name, # type: str 

2860 default, # type: int 

2861 size, # type: int 

2862 enum, # type: Dict[int, Dict[int, str]] 

2863 depends_on # type: Callable[[Optional[Packet]], int] 

2864 ): 

2865 # type: (...) -> None 

2866 _MultiEnumField.__init__(self, name, default, enum, depends_on) 

2867 self.rev = size < 0 

2868 self.size = abs(size) 

2869 self.sz = self.size / 8. # type: ignore 

2870 

2871 def any2i(self, pkt, x): 

2872 # type: (Optional[Packet], Any) -> Union[List[int], int] 

2873 return _MultiEnumField[int].any2i( 

2874 self, # type: ignore 

2875 pkt, 

2876 x 

2877 ) 

2878 

2879 def i2repr( # type: ignore 

2880 self, 

2881 pkt, # type: Optional[Packet] 

2882 x # type: Union[List[int], int] 

2883 ): 

2884 # type: (...) -> Union[str, List[str]] 

2885 return _MultiEnumField[int].i2repr( 

2886 self, # type: ignore 

2887 pkt, 

2888 x 

2889 ) 

2890 

2891 

2892class ByteEnumKeysField(ByteEnumField): 

2893 """ByteEnumField that picks valid values when fuzzed. """ 

2894 

2895 def randval(self): 

2896 # type: () -> RandEnumKeys 

2897 return RandEnumKeys(self.i2s or {}) 

2898 

2899 

2900class ShortEnumKeysField(ShortEnumField): 

2901 """ShortEnumField that picks valid values when fuzzed. """ 

2902 

2903 def randval(self): 

2904 # type: () -> RandEnumKeys 

2905 return RandEnumKeys(self.i2s or {}) 

2906 

2907 

2908class IntEnumKeysField(IntEnumField): 

2909 """IntEnumField that picks valid values when fuzzed. """ 

2910 

2911 def randval(self): 

2912 # type: () -> RandEnumKeys 

2913 return RandEnumKeys(self.i2s or {}) 

2914 

2915 

2916# Little endian fixed length field 

2917 

2918 

2919class LEFieldLenField(FieldLenField): 

2920 def __init__( 

2921 self, 

2922 name, # type: str 

2923 default, # type: Optional[Any] 

2924 length_of=None, # type: Optional[str] 

2925 fmt="<H", # type: str 

2926 count_of=None, # type: Optional[str] 

2927 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int] 

2928 ): 

2929 # type: (...) -> None 

2930 FieldLenField.__init__( 

2931 self, name, default, 

2932 length_of=length_of, 

2933 fmt=fmt, 

2934 count_of=count_of, 

2935 adjust=adjust 

2936 ) 

2937 

2938 

2939class FlagValueIter(object): 

2940 

2941 __slots__ = ["flagvalue", "cursor"] 

2942 

2943 def __init__(self, flagvalue): 

2944 # type: (FlagValue) -> None 

2945 self.flagvalue = flagvalue 

2946 self.cursor = 0 

2947 

2948 def __iter__(self): 

2949 # type: () -> FlagValueIter 

2950 return self 

2951 

2952 def __next__(self): 

2953 # type: () -> str 

2954 x = int(self.flagvalue) 

2955 x >>= self.cursor 

2956 while x: 

2957 self.cursor += 1 

2958 if x & 1: 

2959 return self.flagvalue.names[self.cursor - 1] 

2960 x >>= 1 

2961 raise StopIteration 

2962 

2963 next = __next__ 

2964 

2965 

2966class FlagValue(object): 

2967 __slots__ = ["value", "names", "multi"] 

2968 

2969 def _fixvalue(self, value): 

2970 # type: (Any) -> int 

2971 if not value: 

2972 return 0 

2973 if isinstance(value, str): 

2974 value = value.split('+') if self.multi else list(value) 

2975 if isinstance(value, list): 

2976 y = 0 

2977 for i in value: 

2978 y |= 1 << self.names.index(i) 

2979 value = y 

2980 return int(value) 

2981 

2982 def __init__(self, value, names): 

2983 # type: (Union[List[str], int, str], Union[List[str], str]) -> None 

2984 self.multi = isinstance(names, list) 

2985 self.names = names 

2986 self.value = self._fixvalue(value) 

2987 

2988 def __hash__(self): 

2989 # type: () -> int 

2990 return hash(self.value) 

2991 

2992 def __int__(self): 

2993 # type: () -> int 

2994 return self.value 

2995 

2996 def __eq__(self, other): 

2997 # type: (Any) -> bool 

2998 return self.value == self._fixvalue(other) 

2999 

3000 def __lt__(self, other): 

3001 # type: (Any) -> bool 

3002 return self.value < self._fixvalue(other) 

3003 

3004 def __le__(self, other): 

3005 # type: (Any) -> bool 

3006 return self.value <= self._fixvalue(other) 

3007 

3008 def __gt__(self, other): 

3009 # type: (Any) -> bool 

3010 return self.value > self._fixvalue(other) 

3011 

3012 def __ge__(self, other): 

3013 # type: (Any) -> bool 

3014 return self.value >= self._fixvalue(other) 

3015 

3016 def __ne__(self, other): 

3017 # type: (Any) -> bool 

3018 return self.value != self._fixvalue(other) 

3019 

3020 def __and__(self, other): 

3021 # type: (int) -> FlagValue 

3022 return self.__class__(self.value & self._fixvalue(other), self.names) 

3023 __rand__ = __and__ 

3024 

3025 def __or__(self, other): 

3026 # type: (int) -> FlagValue 

3027 return self.__class__(self.value | self._fixvalue(other), self.names) 

3028 __ror__ = __or__ 

3029 __add__ = __or__ # + is an alias for | 

3030 

3031 def __sub__(self, other): 

3032 # type: (int) -> FlagValue 

3033 return self.__class__( 

3034 self.value & (2 ** len(self.names) - 1 - self._fixvalue(other)), 

3035 self.names 

3036 ) 

3037 

3038 def __xor__(self, other): 

3039 # type: (int) -> FlagValue 

3040 return self.__class__(self.value ^ self._fixvalue(other), self.names) 

3041 

3042 def __lshift__(self, other): 

3043 # type: (int) -> int 

3044 return self.value << self._fixvalue(other) 

3045 

3046 def __rshift__(self, other): 

3047 # type: (int) -> int 

3048 return self.value >> self._fixvalue(other) 

3049 

3050 def __nonzero__(self): 

3051 # type: () -> bool 

3052 return bool(self.value) 

3053 __bool__ = __nonzero__ 

3054 

3055 def flagrepr(self): 

3056 # type: () -> str 

3057 warnings.warn( 

3058 "obj.flagrepr() is obsolete. Use str(obj) instead.", 

3059 DeprecationWarning 

3060 ) 

3061 return str(self) 

3062 

3063 def __str__(self): 

3064 # type: () -> str 

3065 i = 0 

3066 r = [] 

3067 x = int(self) 

3068 while x: 

3069 if x & 1: 

3070 try: 

3071 name = self.names[i] 

3072 except IndexError: 

3073 name = "?" 

3074 r.append(name) 

3075 i += 1 

3076 x >>= 1 

3077 return ("+" if self.multi else "").join(r) 

3078 

3079 def __iter__(self): 

3080 # type: () -> FlagValueIter 

3081 return FlagValueIter(self) 

3082 

3083 def __repr__(self): 

3084 # type: () -> str 

3085 return "<Flag %d (%s)>" % (self, self) 

3086 

3087 def __deepcopy__(self, memo): 

3088 # type: (Dict[Any, Any]) -> FlagValue 

3089 return self.__class__(int(self), self.names) 

3090 

3091 def __getattr__(self, attr): 

3092 # type: (str) -> Any 

3093 if attr in self.__slots__: 

3094 return super(FlagValue, self).__getattribute__(attr) 

3095 try: 

3096 if self.multi: 

3097 return bool((2 ** self.names.index(attr)) & int(self)) 

3098 return all(bool((2 ** self.names.index(flag)) & int(self)) 

3099 for flag in attr) 

3100 except ValueError: 

3101 if '_' in attr: 

3102 try: 

3103 return self.__getattr__(attr.replace('_', '-')) 

3104 except AttributeError: 

3105 pass 

3106 return super(FlagValue, self).__getattribute__(attr) 

3107 

3108 def __setattr__(self, attr, value): 

3109 # type: (str, Union[List[str], int, str]) -> None 

3110 if attr == "value" and not isinstance(value, int): 

3111 raise ValueError(value) 

3112 if attr in self.__slots__: 

3113 return super(FlagValue, self).__setattr__(attr, value) 

3114 if attr in self.names: 

3115 if value: 

3116 self.value |= (2 ** self.names.index(attr)) 

3117 else: 

3118 self.value &= ~(2 ** self.names.index(attr)) 

3119 else: 

3120 return super(FlagValue, self).__setattr__(attr, value) 

3121 

3122 def copy(self): 

3123 # type: () -> FlagValue 

3124 return self.__class__(self.value, self.names) 

3125 

3126 

3127class FlagsField(_BitField[Optional[Union[int, FlagValue]]]): 

3128 """ Handle Flag type field 

3129 

3130 Make sure all your flags have a label 

3131 

3132 Example (list): 

3133 >>> from scapy.packet import Packet 

3134 >>> class FlagsTest(Packet): 

3135 fields_desc = [FlagsField("flags", 0, 8, ["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7"])] # noqa: E501 

3136 >>> FlagsTest(flags=9).show2() 

3137 ###[ FlagsTest ]### 

3138 flags = f0+f3 

3139 

3140 Example (str): 

3141 >>> from scapy.packet import Packet 

3142 >>> class TCPTest(Packet): 

3143 fields_desc = [ 

3144 BitField("reserved", 0, 7), 

3145 FlagsField("flags", 0x2, 9, "FSRPAUECN") 

3146 ] 

3147 >>> TCPTest(flags=3).show2() 

3148 ###[ FlagsTest ]### 

3149 reserved = 0 

3150 flags = FS 

3151 

3152 Example (dict): 

3153 >>> from scapy.packet import Packet 

3154 >>> class FlagsTest2(Packet): 

3155 fields_desc = [ 

3156 FlagsField("flags", 0x2, 16, { 

3157 0x0001: "A", 

3158 0x0008: "B", 

3159 }) 

3160 ] 

3161 

3162 :param name: field's name 

3163 :param default: default value for the field 

3164 :param size: number of bits in the field (in bits). if negative, LE 

3165 :param names: (list or str or dict) label for each flag 

3166 If it's a str or a list, the least Significant Bit tag's name 

3167 is written first. 

3168 """ 

3169 ismutable = True 

3170 __slots__ = ["names"] 

3171 

3172 def __init__(self, 

3173 name, # type: str 

3174 default, # type: Optional[Union[int, FlagValue]] 

3175 size, # type: int 

3176 names # type: Union[List[str], str, Dict[int, str]] 

3177 ): 

3178 # type: (...) -> None 

3179 # Convert the dict to a list 

3180 if isinstance(names, dict): 

3181 tmp = ["bit_%d" % i for i in range(abs(size))] 

3182 for i, v in names.items(): 

3183 tmp[int(math.floor(math.log(i, 2)))] = v 

3184 names = tmp 

3185 # Store the names as str or list 

3186 self.names = names 

3187 super(FlagsField, self).__init__(name, default, size) 

3188 

3189 def _fixup_val(self, x): 

3190 # type: (Any) -> Optional[FlagValue] 

3191 """Returns a FlagValue instance when needed. Internal method, to be 

3192used in *2i() and i2*() methods. 

3193 

3194 """ 

3195 if isinstance(x, (FlagValue, VolatileValue)): 

3196 return x # type: ignore 

3197 if x is None: 

3198 return None 

3199 return FlagValue(x, self.names) 

3200 

3201 def any2i(self, pkt, x): 

3202 # type: (Optional[Packet], Any) -> Optional[FlagValue] 

3203 return self._fixup_val(super(FlagsField, self).any2i(pkt, x)) 

3204 

3205 def m2i(self, pkt, x): 

3206 # type: (Optional[Packet], int) -> Optional[FlagValue] 

3207 return self._fixup_val(super(FlagsField, self).m2i(pkt, x)) 

3208 

3209 def i2h(self, pkt, x): 

3210 # type: (Optional[Packet], Any) -> Optional[FlagValue] 

3211 return self._fixup_val(super(FlagsField, self).i2h(pkt, x)) 

3212 

3213 def i2repr(self, 

3214 pkt, # type: Optional[Packet] 

3215 x, # type: Any 

3216 ): 

3217 # type: (...) -> str 

3218 if isinstance(x, (list, tuple)): 

3219 return repr(type(x)( 

3220 "None" if v is None else str(self._fixup_val(v)) for v in x 

3221 )) 

3222 return "None" if x is None else str(self._fixup_val(x)) 

3223 

3224 

3225MultiFlagsEntry = collections.namedtuple('MultiFlagsEntry', ['short', 'long']) 

3226 

3227 

3228class MultiFlagsField(_BitField[Set[str]]): 

3229 __slots__ = FlagsField.__slots__ + ["depends_on"] 

3230 

3231 def __init__(self, 

3232 name, # type: str 

3233 default, # type: Set[str] 

3234 size, # type: int 

3235 names, # type: Dict[int, Dict[int, MultiFlagsEntry]] 

3236 depends_on, # type: Callable[[Optional[Packet]], int] 

3237 ): 

3238 # type: (...) -> None 

3239 self.names = names 

3240 self.depends_on = depends_on 

3241 super(MultiFlagsField, self).__init__(name, default, size) 

3242 

3243 def any2i(self, pkt, x): 

3244 # type: (Optional[Packet], Any) -> Set[str] 

3245 if not isinstance(x, (set, int)): 

3246 raise ValueError('set expected') 

3247 

3248 if pkt is not None: 

3249 if isinstance(x, int): 

3250 return self.m2i(pkt, x) 

3251 else: 

3252 v = self.depends_on(pkt) 

3253 if v is not None: 

3254 assert v in self.names, 'invalid dependency' 

3255 these_names = self.names[v] 

3256 s = set() 

3257 for i in x: 

3258 for val in these_names.values(): 

3259 if val.short == i: 

3260 s.add(i) 

3261 break 

3262 else: 

3263 assert False, 'Unknown flag "{}" with this dependency'.format(i) # noqa: E501 

3264 continue 

3265 return s 

3266 if isinstance(x, int): 

3267 return set() 

3268 return x 

3269 

3270 def i2m(self, pkt, x): 

3271 # type: (Optional[Packet], Optional[Set[str]]) -> int 

3272 v = self.depends_on(pkt) 

3273 these_names = self.names.get(v, {}) 

3274 

3275 r = 0 

3276 if x is None: 

3277 return r 

3278 for flag_set in x: 

3279 for i, val in these_names.items(): 

3280 if val.short == flag_set: 

3281 r |= 1 << i 

3282 break 

3283 else: 

3284 r |= 1 << int(flag_set[len('bit '):]) 

3285 return r 

3286 

3287 def m2i(self, pkt, x): 

3288 # type: (Optional[Packet], int) -> Set[str] 

3289 v = self.depends_on(pkt) 

3290 these_names = self.names.get(v, {}) 

3291 

3292 r = set() 

3293 i = 0 

3294 while x: 

3295 if x & 1: 

3296 if i in these_names: 

3297 r.add(these_names[i].short) 

3298 else: 

3299 r.add('bit {}'.format(i)) 

3300 x >>= 1 

3301 i += 1 

3302 return r 

3303 

3304 def i2repr(self, pkt, x): 

3305 # type: (Optional[Packet], Set[str]) -> str 

3306 v = self.depends_on(pkt) 

3307 these_names = self.names.get(v, {}) 

3308 

3309 r = set() 

3310 for flag_set in x: 

3311 for i in these_names.values(): 

3312 if i.short == flag_set: 

3313 r.add("{} ({})".format(i.long, i.short)) 

3314 break 

3315 else: 

3316 r.add(flag_set) 

3317 return repr(r) 

3318 

3319 

3320class FixedPointField(BitField): 

3321 __slots__ = ['frac_bits'] 

3322 

3323 def __init__(self, name, default, size, frac_bits=16): 

3324 # type: (str, int, int, int) -> None 

3325 self.frac_bits = frac_bits 

3326 super(FixedPointField, self).__init__(name, default, size) 

3327 

3328 def any2i(self, pkt, val): 

3329 # type: (Optional[Packet], Optional[float]) -> Optional[int] 

3330 if val is None: 

3331 return val 

3332 ival = int(val) 

3333 fract = int((val - ival) * 2**self.frac_bits) 

3334 return (ival << self.frac_bits) | fract 

3335 

3336 def i2h(self, pkt, val): 

3337 # type: (Optional[Packet], Optional[int]) -> Optional[EDecimal] 

3338 # A bit of trickery to get precise floats 

3339 if val is None: 

3340 return val 

3341 int_part = val >> self.frac_bits 

3342 pw = 2.0**self.frac_bits 

3343 frac_part = EDecimal(val & (1 << self.frac_bits) - 1) 

3344 frac_part /= pw # type: ignore 

3345 return int_part + frac_part.normalize(int(math.log10(pw))) 

3346 

3347 def i2repr(self, pkt, val): 

3348 # type: (Optional[Packet], int) -> str 

3349 return str(self.i2h(pkt, val)) 

3350 

3351 

3352# Base class for IPv4 and IPv6 Prefixes inspired by IPField and IP6Field. 

3353# Machine values are encoded in a multiple of wordbytes bytes. 

3354class _IPPrefixFieldBase(Field[Tuple[str, int], Tuple[bytes, int]]): 

3355 __slots__ = ["wordbytes", "maxbytes", "aton", "ntoa", "length_from"] 

3356 

3357 def __init__( 

3358 self, 

3359 name, # type: str 

3360 default, # type: Tuple[str, int] 

3361 wordbytes, # type: int 

3362 maxbytes, # type: int 

3363 aton, # type: Callable[..., Any] 

3364 ntoa, # type: Callable[..., Any] 

3365 length_from=None # type: Optional[Callable[[Packet], int]] 

3366 ): 

3367 # type: (...) -> None 

3368 self.wordbytes = wordbytes 

3369 self.maxbytes = maxbytes 

3370 self.aton = aton 

3371 self.ntoa = ntoa 

3372 Field.__init__(self, name, default, "%is" % self.maxbytes) 

3373 if length_from is None: 

3374 length_from = lambda x: 0 

3375 self.length_from = length_from 

3376 

3377 def _numbytes(self, pfxlen): 

3378 # type: (int) -> int 

3379 wbits = self.wordbytes * 8 

3380 return ((pfxlen + (wbits - 1)) // wbits) * self.wordbytes 

3381 

3382 def h2i(self, pkt, x): 

3383 # type: (Optional[Packet], str) -> Tuple[str, int] 

3384 # "fc00:1::1/64" -> ("fc00:1::1", 64) 

3385 [pfx, pfxlen] = x.split('/') 

3386 self.aton(pfx) # check for validity 

3387 return (pfx, int(pfxlen)) 

3388 

3389 def i2h(self, pkt, x): 

3390 # type: (Optional[Packet], Tuple[str, int]) -> str 

3391 # ("fc00:1::1", 64) -> "fc00:1::1/64" 

3392 (pfx, pfxlen) = x 

3393 return "%s/%i" % (pfx, pfxlen) 

3394 

3395 def i2m(self, 

3396 pkt, # type: Optional[Packet] 

3397 x # type: Optional[Tuple[str, int]] 

3398 ): 

3399 # type: (...) -> Tuple[bytes, int] 

3400 # ("fc00:1::1", 64) -> (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) # noqa: E501 

3401 if x is None: 

3402 pfx, pfxlen = "", 0 

3403 else: 

3404 (pfx, pfxlen) = x 

3405 s = self.aton(pfx) 

3406 return (s[:self._numbytes(pfxlen)], pfxlen) 

3407 

3408 def m2i(self, pkt, x): 

3409 # type: (Optional[Packet], Tuple[bytes, int]) -> Tuple[str, int] 

3410 # (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) -> ("fc00:1::1", 64) # noqa: E501 

3411 (s, pfxlen) = x 

3412 

3413 if len(s) < self.maxbytes: 

3414 s = s + (b"\0" * (self.maxbytes - len(s))) 

3415 return (self.ntoa(s), pfxlen) 

3416 

3417 def any2i(self, pkt, x): 

3418 # type: (Optional[Packet], Optional[Any]) -> Tuple[str, int] 

3419 if x is None: 

3420 return (self.ntoa(b"\0" * self.maxbytes), 1) 

3421 

3422 return self.h2i(pkt, x) 

3423 

3424 def i2len(self, pkt, x): 

3425 # type: (Packet, Tuple[str, int]) -> int 

3426 (_, pfxlen) = x 

3427 return pfxlen 

3428 

3429 def addfield(self, pkt, s, val): 

3430 # type: (Packet, bytes, Optional[Tuple[str, int]]) -> bytes 

3431 (rawpfx, pfxlen) = self.i2m(pkt, val) 

3432 fmt = "!%is" % self._numbytes(pfxlen) 

3433 return s + struct.pack(fmt, rawpfx) 

3434 

3435 def getfield(self, pkt, s): 

3436 # type: (Packet, bytes) -> Tuple[bytes, Tuple[str, int]] 

3437 pfxlen = self.length_from(pkt) 

3438 numbytes = self._numbytes(pfxlen) 

3439 fmt = "!%is" % numbytes 

3440 return s[numbytes:], self.m2i(pkt, (struct.unpack(fmt, s[:numbytes])[0], pfxlen)) # noqa: E501 

3441 

3442 

3443class IPPrefixField(_IPPrefixFieldBase): 

3444 def __init__( 

3445 self, 

3446 name, # type: str 

3447 default, # type: Tuple[str, int] 

3448 wordbytes=1, # type: int 

3449 length_from=None # type: Optional[Callable[[Packet], int]] 

3450 ): 

3451 _IPPrefixFieldBase.__init__( 

3452 self, 

3453 name, 

3454 default, 

3455 wordbytes, 

3456 4, 

3457 inet_aton, 

3458 inet_ntoa, 

3459 length_from 

3460 ) 

3461 

3462 

3463class IP6PrefixField(_IPPrefixFieldBase): 

3464 def __init__( 

3465 self, 

3466 name, # type: str 

3467 default, # type: Tuple[str, int] 

3468 wordbytes=1, # type: int 

3469 length_from=None # type: Optional[Callable[[Packet], int]] 

3470 ): 

3471 # type: (...) -> None 

3472 _IPPrefixFieldBase.__init__( 

3473 self, 

3474 name, 

3475 default, 

3476 wordbytes, 

3477 16, 

3478 lambda a: inet_pton(socket.AF_INET6, a), 

3479 lambda n: inet_ntop(socket.AF_INET6, n), 

3480 length_from 

3481 ) 

3482 

3483 

3484class UTCTimeField(Field[float, int]): 

3485 __slots__ = ["epoch", "delta", "strf", 

3486 "use_msec", "use_micro", "use_nano", "custom_scaling"] 

3487 

3488 def __init__(self, 

3489 name, # type: str 

3490 default, # type: int 

3491 use_msec=False, # type: bool 

3492 use_micro=False, # type: bool 

3493 use_nano=False, # type: bool 

3494 epoch=None, # type: Optional[Tuple[int, int, int, int, int, int, int, int, int]] # noqa: E501 

3495 strf="%a, %d %b %Y %H:%M:%S %z", # type: str 

3496 custom_scaling=None, # type: Optional[int] 

3497 fmt="I" # type: str 

3498 ): 

3499 # type: (...) -> None 

3500 Field.__init__(self, name, default, fmt=fmt) 

3501 mk_epoch = EPOCH if epoch is None else calendar.timegm(epoch) 

3502 self.epoch = mk_epoch 

3503 self.delta = mk_epoch - EPOCH 

3504 self.strf = strf 

3505 self.use_msec = use_msec 

3506 self.use_micro = use_micro 

3507 self.use_nano = use_nano 

3508 self.custom_scaling = custom_scaling 

3509 

3510 def i2repr(self, pkt, x): 

3511 # type: (Optional[Packet], float) -> str 

3512 if x is None: 

3513 x = time.time() - self.delta 

3514 elif self.use_msec: 

3515 x = x / 1e3 

3516 elif self.use_micro: 

3517 x = x / 1e6 

3518 elif self.use_nano: 

3519 x = x / 1e9 

3520 elif self.custom_scaling: 

3521 x = x / self.custom_scaling 

3522 x += self.delta 

3523 # To make negative timestamps work on all plateforms (e.g. Windows), 

3524 # we need a trick. 

3525 t = ( 

3526 datetime.datetime(1970, 1, 1) + 

3527 datetime.timedelta(seconds=x) 

3528 ).strftime(self.strf) 

3529 return "%s (%d)" % (t, int(x)) 

3530 

3531 def i2m(self, pkt, x): 

3532 # type: (Optional[Packet], Optional[float]) -> int 

3533 if x is None: 

3534 x = time.time() - self.delta 

3535 if self.use_msec: 

3536 x = x * 1e3 

3537 elif self.use_micro: 

3538 x = x * 1e6 

3539 elif self.use_nano: 

3540 x = x * 1e9 

3541 elif self.custom_scaling: 

3542 x = x * self.custom_scaling 

3543 return int(x) 

3544 return int(x) 

3545 

3546 

3547class SecondsIntField(Field[float, int]): 

3548 __slots__ = ["use_msec", "use_micro", "use_nano"] 

3549 

3550 def __init__(self, name, default, 

3551 use_msec=False, 

3552 use_micro=False, 

3553 use_nano=False): 

3554 # type: (str, int, bool, bool, bool) -> None 

3555 Field.__init__(self, name, default, "I") 

3556 self.use_msec = use_msec 

3557 self.use_micro = use_micro 

3558 self.use_nano = use_nano 

3559 

3560 def i2repr(self, pkt, x): 

3561 # type: (Optional[Packet], Optional[float]) -> str 

3562 if x is None: 

3563 y = 0 # type: Union[int, float] 

3564 elif self.use_msec: 

3565 y = x / 1e3 

3566 elif self.use_micro: 

3567 y = x / 1e6 

3568 elif self.use_nano: 

3569 y = x / 1e9 

3570 else: 

3571 y = x 

3572 return "%s sec" % y 

3573 

3574 

3575class _ScalingField(object): 

3576 def __init__(self, 

3577 name, # type: str 

3578 default, # type: float 

3579 scaling=1, # type: Union[int, float] 

3580 unit="", # type: str 

3581 offset=0, # type: Union[int, float] 

3582 ndigits=3, # type: int 

3583 fmt="B", # type: str 

3584 ): 

3585 # type: (...) -> None 

3586 self.scaling = scaling 

3587 self.unit = unit 

3588 self.offset = offset 

3589 self.ndigits = ndigits 

3590 Field.__init__(self, name, default, fmt) # type: ignore 

3591 

3592 def i2m(self, 

3593 pkt, # type: Optional[Packet] 

3594 x # type: Optional[Union[int, float]] 

3595 ): 

3596 # type: (...) -> Union[int, float] 

3597 if x is None: 

3598 x = 0 

3599 x = (x - self.offset) / self.scaling 

3600 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore 

3601 x = int(round(x)) 

3602 return x 

3603 

3604 def m2i(self, pkt, x): 

3605 # type: (Optional[Packet], Union[int, float]) -> Union[int, float] 

3606 x = x * self.scaling + self.offset 

3607 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore 

3608 x = round(x, self.ndigits) 

3609 return x 

3610 

3611 def any2i(self, pkt, x): 

3612 # type: (Optional[Packet], Any) -> Union[int, float] 

3613 if isinstance(x, (str, bytes)): 

3614 x = struct.unpack(self.fmt, bytes_encode(x))[0] # type: ignore 

3615 x = self.m2i(pkt, x) 

3616 if not isinstance(x, (int, float)): 

3617 raise ValueError("Unknown type") 

3618 return x 

3619 

3620 def i2repr(self, pkt, x): 

3621 # type: (Optional[Packet], Union[int, float]) -> str 

3622 return "%s %s" % ( 

3623 self.i2h(pkt, x), # type: ignore 

3624 self.unit 

3625 ) 

3626 

3627 def randval(self): 

3628 # type: () -> RandFloat 

3629 value = Field.randval(self) # type: ignore 

3630 if value is not None: 

3631 min_val = round(value.min * self.scaling + self.offset, 

3632 self.ndigits) 

3633 max_val = round(value.max * self.scaling + self.offset, 

3634 self.ndigits) 

3635 

3636 return RandFloat(min(min_val, max_val), max(min_val, max_val)) 

3637 

3638 

3639class ScalingField(_ScalingField, 

3640 Field[Union[int, float], Union[int, float]]): 

3641 """ Handle physical values which are scaled and/or offset for communication 

3642 

3643 Example: 

3644 >>> from scapy.packet import Packet 

3645 >>> class ScalingFieldTest(Packet): 

3646 fields_desc = [ScalingField('data', 0, scaling=0.1, offset=-1, unit='mV')] # noqa: E501 

3647 >>> ScalingFieldTest(data=10).show2() 

3648 ###[ ScalingFieldTest ]### 

3649 data= 10.0 mV 

3650 >>> hexdump(ScalingFieldTest(data=10)) 

3651 0000 6E n 

3652 >>> hexdump(ScalingFieldTest(data=b"\x6D")) 

3653 0000 6D m 

3654 >>> ScalingFieldTest(data=b"\x6D").show2() 

3655 ###[ ScalingFieldTest ]### 

3656 data= 9.9 mV 

3657 

3658 bytes(ScalingFieldTest(...)) will produce 0x6E in this example. 

3659 0x6E is 110 (decimal). This is calculated through the scaling factor 

3660 and the offset. "data" was set to 10, which means, we want to transfer 

3661 the physical value 10 mV. To calculate the value, which has to be 

3662 sent on the bus, the offset has to subtracted and the scaling has to be 

3663 applied by division through the scaling factor. 

3664 bytes = (data - offset) / scaling 

3665 bytes = ( 10 - (-1) ) / 0.1 

3666 bytes = 110 = 0x6E 

3667 

3668 If you want to force a certain internal value, you can assign a byte- 

3669 string to the field (data=b"\x6D"). If a string of a bytes object is 

3670 given to the field, no internal value conversion will be applied 

3671 

3672 :param name: field's name 

3673 :param default: default value for the field 

3674 :param scaling: scaling factor for the internal value conversion 

3675 :param unit: string for the unit representation of the internal value 

3676 :param offset: value to offset the internal value during conversion 

3677 :param ndigits: number of fractional digits for the internal conversion 

3678 :param fmt: struct.pack format used to parse and serialize the internal value from and to machine representation # noqa: E501 

3679 """ 

3680 

3681 

3682class BitScalingField(_ScalingField, BitField): # type: ignore 

3683 """ 

3684 A ScalingField that is a BitField 

3685 """ 

3686 

3687 def __init__(self, name, default, size, *args, **kwargs): 

3688 # type: (str, int, int, *Any, **Any) -> None 

3689 _ScalingField.__init__(self, name, default, *args, **kwargs) 

3690 BitField.__init__(self, name, default, size) # type: ignore 

3691 

3692 

3693class OUIField(X3BytesField): 

3694 """ 

3695 A field designed to carry a OUI (3 bytes) 

3696 """ 

3697 

3698 def i2repr(self, pkt, val): 

3699 # type: (Optional[Packet], int) -> str 

3700 by_val = struct.pack("!I", val or 0)[1:] 

3701 oui = str2mac(by_val + b"\0" * 3)[:8] 

3702 if conf.manufdb: 

3703 fancy = conf.manufdb._get_manuf(oui) 

3704 if fancy != oui: 

3705 return "%s (%s)" % (fancy, oui) 

3706 return oui 

3707 

3708 

3709class UUIDField(Field[UUID, bytes]): 

3710 """Field for UUID storage, wrapping Python's uuid.UUID type. 

3711 

3712 The internal storage format of this field is ``uuid.UUID`` from the Python 

3713 standard library. 

3714 

3715 There are three formats (``uuid_fmt``) for this field type: 

3716 

3717 * ``FORMAT_BE`` (default): the UUID is six fields in big-endian byte order, 

3718 per RFC 4122. 

3719 

3720 This format is used by DHCPv6 (RFC 6355) and most network protocols. 

3721 

3722 * ``FORMAT_LE``: the UUID is six fields, with ``time_low``, ``time_mid`` 

3723 and ``time_high_version`` in little-endian byte order. This *doesn't* 

3724 change the arrangement of the fields from RFC 4122. 

3725 

3726 This format is used by Microsoft's COM/OLE libraries. 

3727 

3728 * ``FORMAT_REV``: the UUID is a single 128-bit integer in little-endian 

3729 byte order. This *changes the arrangement* of the fields. 

3730 

3731 This format is used by Bluetooth Low Energy. 

3732 

3733 Note: You should use the constants here. 

3734 

3735 The "human encoding" of this field supports a number of different input 

3736 formats, and wraps Python's ``uuid.UUID`` library appropriately: 

3737 

3738 * Given a bytearray, bytes or str of 16 bytes, this class decodes UUIDs in 

3739 wire format. 

3740 

3741 * Given a bytearray, bytes or str of other lengths, this delegates to 

3742 ``uuid.UUID`` the Python standard library. This supports a number of 

3743 different encoding options -- see the Python standard library 

3744 documentation for more details. 

3745 

3746 * Given an int or long, presumed to be a 128-bit integer to pass to 

3747 ``uuid.UUID``. 

3748 

3749 * Given a tuple: 

3750 

3751 * Tuples of 11 integers are treated as having the last 6 integers forming 

3752 the ``node`` field, and are merged before being passed as a tuple of 6 

3753 integers to ``uuid.UUID``. 

3754 

3755 * Otherwise, the tuple is passed as the ``fields`` parameter to 

3756 ``uuid.UUID`` directly without modification. 

3757 

3758 ``uuid.UUID`` expects a tuple of 6 integers. 

3759 

3760 Other types (such as ``uuid.UUID``) are passed through. 

3761 """ 

3762 

3763 __slots__ = ["uuid_fmt"] 

3764 

3765 FORMAT_BE = 0 

3766 FORMAT_LE = 1 

3767 FORMAT_REV = 2 

3768 

3769 # Change this when we get new formats 

3770 FORMATS = (FORMAT_BE, FORMAT_LE, FORMAT_REV) 

3771 

3772 def __init__(self, name, default, uuid_fmt=FORMAT_BE): 

3773 # type: (str, Optional[int], int) -> None 

3774 self.uuid_fmt = uuid_fmt 

3775 self._check_uuid_fmt() 

3776 Field.__init__(self, name, default, "16s") 

3777 

3778 def _check_uuid_fmt(self): 

3779 # type: () -> None 

3780 """Checks .uuid_fmt, and raises an exception if it is not valid.""" 

3781 if self.uuid_fmt not in UUIDField.FORMATS: 

3782 raise FieldValueRangeException( 

3783 "Unsupported uuid_fmt ({})".format(self.uuid_fmt)) 

3784 

3785 def i2m(self, pkt, x): 

3786 # type: (Optional[Packet], Optional[UUID]) -> bytes 

3787 self._check_uuid_fmt() 

3788 if x is None: 

3789 return b'\0' * 16 

3790 if self.uuid_fmt == UUIDField.FORMAT_BE: 

3791 return x.bytes 

3792 elif self.uuid_fmt == UUIDField.FORMAT_LE: 

3793 return x.bytes_le 

3794 elif self.uuid_fmt == UUIDField.FORMAT_REV: 

3795 return x.bytes[::-1] 

3796 else: 

3797 raise FieldAttributeException("Unknown fmt") 

3798 

3799 def m2i(self, 

3800 pkt, # type: Optional[Packet] 

3801 x, # type: bytes 

3802 ): 

3803 # type: (...) -> UUID 

3804 self._check_uuid_fmt() 

3805 if self.uuid_fmt == UUIDField.FORMAT_BE: 

3806 return UUID(bytes=x) 

3807 elif self.uuid_fmt == UUIDField.FORMAT_LE: 

3808 return UUID(bytes_le=x) 

3809 elif self.uuid_fmt == UUIDField.FORMAT_REV: 

3810 return UUID(bytes=x[::-1]) 

3811 else: 

3812 raise FieldAttributeException("Unknown fmt") 

3813 

3814 def any2i(self, 

3815 pkt, # type: Optional[Packet] 

3816 x # type: Any # noqa: E501 

3817 ): 

3818 # type: (...) -> Optional[UUID] 

3819 # Python's uuid doesn't handle bytearray, so convert to an immutable 

3820 # type first. 

3821 if isinstance(x, bytearray): 

3822 x = bytes_encode(x) 

3823 

3824 if isinstance(x, int): 

3825 u = UUID(int=x) 

3826 elif isinstance(x, tuple): 

3827 if len(x) == 11: 

3828 # For compatibility with dce_rpc: this packs into a tuple where 

3829 # elements 7..10 are the 48-bit node ID. 

3830 node = 0 

3831 for i in x[5:]: 

3832 node = (node << 8) | i 

3833 

3834 x = (x[0], x[1], x[2], x[3], x[4], node) 

3835 

3836 u = UUID(fields=x) 

3837 elif isinstance(x, (str, bytes)): 

3838 if len(x) == 16: 

3839 # Raw bytes 

3840 u = self.m2i(pkt, bytes_encode(x)) 

3841 else: 

3842 u = UUID(plain_str(x)) 

3843 elif isinstance(x, (UUID, RandUUID)): 

3844 u = cast(UUID, x) 

3845 else: 

3846 return None 

3847 return u 

3848 

3849 @staticmethod 

3850 def randval(): 

3851 # type: () -> RandUUID 

3852 return RandUUID() 

3853 

3854 

3855class UUIDEnumField(UUIDField, _EnumField[UUID]): 

3856 __slots__ = EnumField.__slots__ 

3857 

3858 def __init__(self, name, default, enum, uuid_fmt=0): 

3859 # type: (str, Optional[int], Any, int) -> None 

3860 _EnumField.__init__(self, name, default, enum, "16s") # type: ignore 

3861 UUIDField.__init__(self, name, default, uuid_fmt=uuid_fmt) 

3862 

3863 def any2i(self, pkt, x): 

3864 # type: (Optional[Packet], Any) -> UUID 

3865 return _EnumField.any2i(self, pkt, x) # type: ignore 

3866 

3867 def i2repr(self, 

3868 pkt, # type: Optional[Packet] 

3869 x, # type: UUID 

3870 ): 

3871 # type: (...) -> Any 

3872 return _EnumField.i2repr(self, pkt, x) 

3873 

3874 

3875class BitExtendedField(Field[Optional[int], bytes]): 

3876 """ 

3877 Bit Extended Field 

3878 

3879 This type of field has a variable number of bytes. Each byte is defined 

3880 as follows: 

3881 - 7 bits of data 

3882 - 1 bit an an extension bit: 

3883 

3884 + 0 means it is last byte of the field ("stopping bit") 

3885 + 1 means there is another byte after this one ("forwarding bit") 

3886 

3887 To get the actual data, it is necessary to hop the binary data byte per 

3888 byte and to check the extension bit until 0 

3889 """ 

3890 

3891 __slots__ = ["extension_bit"] 

3892 

3893 def prepare_byte(self, x): 

3894 # type: (int) -> int 

3895 # Moves the forwarding bit to the LSB 

3896 x = int(x) 

3897 fx_bit = (x & 2**self.extension_bit) >> self.extension_bit 

3898 lsb_bits = x & 2**self.extension_bit - 1 

3899 msb_bits = x >> (self.extension_bit + 1) 

3900 x = (msb_bits << (self.extension_bit + 1)) + (lsb_bits << 1) + fx_bit 

3901 return x 

3902 

3903 def str2extended(self, x=b""): 

3904 # type: (bytes) -> Tuple[bytes, Optional[int]] 

3905 # For convenience, we reorder the byte so that the forwarding 

3906 # bit is always the LSB. We then apply the same algorithm 

3907 # whatever the real forwarding bit position 

3908 

3909 # First bit is the stopping bit at zero 

3910 bits = 0b0 

3911 end = None 

3912 

3913 # We retrieve 7 bits. 

3914 # If "forwarding bit" is 1 then we continue on another byte 

3915 i = 0 

3916 for c in bytearray(x): 

3917 c = self.prepare_byte(c) 

3918 bits = bits << 7 | (int(c) >> 1) 

3919 if not int(c) & 0b1: 

3920 end = x[i + 1:] 

3921 break 

3922 i = i + 1 

3923 if end is None: 

3924 # We reached the end of the data but there was no 

3925 # "ending bit". This is not normal. 

3926 return b"", None 

3927 else: 

3928 return end, bits 

3929 

3930 def extended2str(self, x): 

3931 # type: (Optional[int]) -> bytes 

3932 if x is None: 

3933 return b"" 

3934 x = int(x) 

3935 s = [] 

3936 LSByte = True 

3937 FX_Missing = True 

3938 bits = 0b0 

3939 i = 0 

3940 while (x > 0 or FX_Missing): 

3941 if i == 8: 

3942 # End of byte 

3943 i = 0 

3944 s.append(bits) 

3945 bits = 0b0 

3946 FX_Missing = True 

3947 else: 

3948 if i % 8 == self.extension_bit: 

3949 # This is extension bit 

3950 if LSByte: 

3951 bits = bits | 0b0 << i 

3952 LSByte = False 

3953 else: 

3954 bits = bits | 0b1 << i 

3955 FX_Missing = False 

3956 else: 

3957 bits = bits | (x & 0b1) << i 

3958 x = x >> 1 

3959 # Still some bits 

3960 i = i + 1 

3961 s.append(bits) 

3962 

3963 result = "".encode() 

3964 for x in s[:: -1]: 

3965 result = result + struct.pack(">B", x) 

3966 return result 

3967 

3968 def __init__(self, name, default, extension_bit): 

3969 # type: (str, Optional[Any], int) -> None 

3970 Field.__init__(self, name, default, "B") 

3971 self.extension_bit = extension_bit 

3972 

3973 def i2m(self, pkt, x): 

3974 # type: (Optional[Any], Optional[int]) -> bytes 

3975 return self.extended2str(x) 

3976 

3977 def m2i(self, pkt, x): 

3978 # type: (Optional[Any], bytes) -> Optional[int] 

3979 return self.str2extended(x)[1] 

3980 

3981 def addfield(self, pkt, s, val): 

3982 # type: (Optional[Packet], bytes, Optional[int]) -> bytes 

3983 return s + self.i2m(pkt, val) 

3984 

3985 def getfield(self, pkt, s): 

3986 # type: (Optional[Any], bytes) -> Tuple[bytes, Optional[int]] 

3987 return self.str2extended(s) 

3988 

3989 

3990class LSBExtendedField(BitExtendedField): 

3991 # This is a BitExtendedField with the extension bit on LSB 

3992 def __init__(self, name, default): 

3993 # type: (str, Optional[Any]) -> None 

3994 BitExtendedField.__init__(self, name, default, extension_bit=0) 

3995 

3996 

3997class MSBExtendedField(BitExtendedField): 

3998 # This is a BitExtendedField with the extension bit on MSB 

3999 def __init__(self, name, default): 

4000 # type: (str, Optional[Any]) -> None 

4001 BitExtendedField.__init__(self, name, default, extension_bit=7)