Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/plist.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

384 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7PacketList: holds several packets and allows to do operations on them. 

8""" 

9 

10 

11import os 

12from collections import defaultdict 

13from typing import Sequence, NamedTuple 

14 

15from scapy.config import conf 

16from scapy.base_classes import ( 

17 BasePacket, 

18 BasePacketList, 

19 PacketList_metaclass, 

20 SetGen, 

21 _CanvasDumpExtended, 

22) 

23from scapy.utils import do_graph, hexdump, make_table, make_lined_table, \ 

24 make_tex_table, issubtype 

25from functools import reduce 

26 

27# typings 

28from typing import ( 

29 Any, 

30 Callable, 

31 DefaultDict, 

32 Dict, 

33 Generic, 

34 Iterator, 

35 List, 

36 Optional, 

37 Tuple, 

38 Type, 

39 TypeVar, 

40 Union, 

41 TYPE_CHECKING, 

42) 

43from scapy.packet import Packet 

44 

45try: 

46 import pyx 

47except ImportError: 

48 pass 

49 

50if TYPE_CHECKING: 

51 from scapy.libs.matplot import Line2D 

52 

53############# 

54# Results # 

55############# 

56 

57 

58QueryAnswer = NamedTuple( 

59 "QueryAnswer", 

60 [("query", Packet), ("answer", Packet)] 

61) 

62 

63_Inner = TypeVar("_Inner", Packet, QueryAnswer) 

64 

65 

66class _PacketList(Generic[_Inner], metaclass=PacketList_metaclass): 

67 __slots__ = ["stats", "res", "listname"] 

68 

69 def __init__(self, 

70 res=None, # type: Optional[Union[_PacketList[_Inner], List[_Inner]]] # noqa: E501 

71 name="PacketList", # type: str 

72 stats=None # type: Optional[List[Type[Packet]]] 

73 ): 

74 # type: (...) -> None 

75 """create a packet list from a list of packets 

76 res: the list of packets 

77 stats: a list of classes that will appear in the stats (defaults to [TCP,UDP,ICMP])""" # noqa: E501 

78 if stats is None: 

79 stats = conf.stats_classic_protocols 

80 self.stats = stats 

81 if res is None: 

82 self.res = [] # type: List[_Inner] 

83 elif isinstance(res, _PacketList): 

84 self.res = res.res 

85 else: 

86 self.res = res 

87 self.listname = name 

88 

89 def __len__(self): 

90 # type: () -> int 

91 return len(self.res) 

92 

93 def _elt2pkt(self, elt): 

94 # type: (_Inner) -> Packet 

95 return elt # type: ignore 

96 

97 def _elt2sum(self, elt): 

98 # type: (_Inner) -> str 

99 return elt.summary() # type: ignore 

100 

101 def _elt2show(self, elt): 

102 # type: (_Inner) -> str 

103 return self._elt2sum(elt) 

104 

105 def __repr__(self): 

106 # type: () -> str 

107 stats = {x: 0 for x in self.stats} 

108 other = 0 

109 for r in self.res: 

110 f = 0 

111 for p in stats: 

112 if self._elt2pkt(r).haslayer(p): 

113 stats[p] += 1 

114 f = 1 

115 break 

116 if not f: 

117 other += 1 

118 s = "" 

119 ct = conf.color_theme 

120 for p in self.stats: 

121 s += " %s%s%s" % (ct.packetlist_proto(p._name), 

122 ct.punct(":"), 

123 ct.packetlist_value(stats[p])) 

124 s += " %s%s%s" % (ct.packetlist_proto("Other"), 

125 ct.punct(":"), 

126 ct.packetlist_value(other)) 

127 return "%s%s%s%s%s" % (ct.punct("<"), 

128 ct.packetlist_name(self.listname), 

129 ct.punct(":"), 

130 s, 

131 ct.punct(">")) 

132 

133 def __getstate__(self): 

134 # type: () -> Dict[str, Any] 

135 """ 

136 Creates a basic representation of the instance, used in 

137 conjunction with __setstate__() e.g. by pickle 

138 

139 :returns: dict representing this instance 

140 """ 

141 state = { 

142 'res': self.res, 

143 'stats': self.stats, 

144 'listname': self.listname 

145 } 

146 return state 

147 

148 def __setstate__(self, state): 

149 # type: (Dict[str, Any]) -> None 

150 """ 

151 Sets instance attributes to values given by state, used in 

152 conjunction with __getstate__() e.g. by pickle 

153 

154 :param state: dict representing this instance 

155 """ 

156 self.res = state['res'] 

157 self.stats = state['stats'] 

158 self.listname = state['listname'] 

159 

160 def __iter__(self): 

161 # type: () -> Iterator[_Inner] 

162 return self.res.__iter__() 

163 

164 def __getattr__(self, attr): 

165 # type: (str) -> Any 

166 return getattr(self.res, attr) 

167 

168 def __getitem__(self, item): 

169 # type: (Any) -> Any 

170 if issubtype(item, BasePacket): 

171 return self.__class__([x for x in self.res if item in self._elt2pkt(x)], # noqa: E501 

172 name="%s from %s" % (item.__name__, self.listname)) # noqa: E501 

173 if isinstance(item, slice): 

174 return self.__class__(self.res.__getitem__(item), 

175 name="mod %s" % self.listname) 

176 return self.res.__getitem__(item) 

177 

178 _T = TypeVar('_T', 'SndRcvList', 'PacketList') 

179 

180 # Hinting hack: type self 

181 def __add__(self, # type: _PacketList._T # type: ignore 

182 other # type: _PacketList._T 

183 ): 

184 # type: (...) -> _PacketList._T 

185 return self.__class__( 

186 self.res + other.res, 

187 name="%s+%s" % ( 

188 self.listname, 

189 other.listname 

190 ) 

191 ) 

192 

193 def summary(self, 

194 prn=None, # type: Optional[Callable[..., Any]] 

195 lfilter=None # type: Optional[Callable[..., bool]] 

196 ): 

197 # type: (...) -> None 

198 """prints a summary of each packet 

199 

200 :param prn: function to apply to each packet instead of 

201 lambda x:x.summary() 

202 :param lfilter: truth function to apply to each packet to decide 

203 whether it will be displayed 

204 """ 

205 for r in self.res: 

206 if lfilter is not None: 

207 if not lfilter(*r): 

208 continue 

209 if prn is None: 

210 print(self._elt2sum(r)) 

211 else: 

212 print(prn(*r)) 

213 

214 def nsummary(self, 

215 prn=None, # type: Optional[Callable[..., Any]] 

216 lfilter=None # type: Optional[Callable[..., bool]] 

217 ): 

218 # type: (...) -> None 

219 """prints a summary of each packet with the packet's number 

220 

221 :param prn: function to apply to each packet instead of 

222 lambda x:x.summary() 

223 :param lfilter: truth function to apply to each packet to decide 

224 whether it will be displayed 

225 """ 

226 for i, res in enumerate(self.res): 

227 if lfilter is not None: 

228 if not lfilter(*res): 

229 continue 

230 print(conf.color_theme.id(i, fmt="%04i"), end=' ') 

231 if prn is None: 

232 print(self._elt2sum(res)) 

233 else: 

234 print(prn(*res)) 

235 

236 def show(self, *args, **kargs): 

237 # type: (*Any, **Any) -> None 

238 """Best way to display the packet list. Defaults to nsummary() method""" # noqa: E501 

239 return self.nsummary(*args, **kargs) 

240 

241 def filter(self, func): 

242 # type: (Callable[..., bool]) -> _PacketList[_Inner] 

243 """Returns a packet list filtered by a truth function. This truth 

244 function has to take a packet as the only argument and return 

245 a boolean value. 

246 """ 

247 return self.__class__([x for x in self.res if func(*x)], 

248 name="filtered %s" % self.listname) 

249 

250 def make_table(self, *args, **kargs): 

251 # type: (Any, Any) -> Optional[str] 

252 """Prints a table using a function that returns for each packet its head column value, head row value and displayed value # noqa: E501 

253 ex: p.make_table(lambda x:(x[IP].dst, x[TCP].dport, x[TCP].sprintf("%flags%")) """ # noqa: E501 

254 return make_table(self.res, *args, **kargs) 

255 

256 def make_lined_table(self, *args, **kargs): 

257 # type: (Any, Any) -> Optional[str] 

258 """Same as make_table, but print a table with lines""" 

259 return make_lined_table(self.res, *args, **kargs) 

260 

261 def make_tex_table(self, *args, **kargs): 

262 # type: (Any, Any) -> Optional[str] 

263 """Same as make_table, but print a table with LaTeX syntax""" 

264 return make_tex_table(self.res, *args, **kargs) 

265 

266 def plot(self, 

267 f, # type: Callable[..., Any] 

268 lfilter=None, # type: Optional[Callable[..., bool]] 

269 plot_xy=False, # type: bool 

270 **kargs # type: Any 

271 ): 

272 # type: (...) -> Line2D 

273 """Applies a function to each packet to get a value that will be plotted 

274 with matplotlib. A list of matplotlib.lines.Line2D is returned. 

275 

276 lfilter: a truth function that decides whether a packet must be plotted 

277 """ 

278 # Defer imports of matplotlib until its needed 

279 # because it has a heavy dep chain 

280 from scapy.libs.matplot import ( 

281 plt, 

282 MATPLOTLIB_INLINED, 

283 MATPLOTLIB_DEFAULT_PLOT_KARGS 

284 ) 

285 

286 # Get the list of packets 

287 if lfilter is None: 

288 lst_pkts = [f(*e) for e in self.res] 

289 else: 

290 lst_pkts = [f(*e) for e in self.res if lfilter(*e)] 

291 

292 # Mimic the default gnuplot output 

293 if kargs == {}: 

294 kargs = MATPLOTLIB_DEFAULT_PLOT_KARGS 

295 if plot_xy: 

296 lines = plt.plot(*zip(*lst_pkts), **kargs) 

297 else: 

298 lines = plt.plot(lst_pkts, **kargs) 

299 

300 # Call show() if matplotlib is not inlined 

301 if not MATPLOTLIB_INLINED: 

302 plt.show() 

303 

304 return lines 

305 

306 def diffplot(self, 

307 f, # type: Callable[..., Any] 

308 delay=1, # type: int 

309 lfilter=None, # type: Optional[Callable[..., bool]] 

310 **kargs # type: Any 

311 ): 

312 # type: (...) -> Line2D 

313 """diffplot(f, delay=1, lfilter=None) 

314 Applies a function to couples (l[i],l[i+delay]) 

315 

316 A list of matplotlib.lines.Line2D is returned. 

317 """ 

318 # Defer imports of matplotlib until its needed 

319 # because it has a heavy dep chain 

320 from scapy.libs.matplot import ( 

321 plt, 

322 MATPLOTLIB_INLINED, 

323 MATPLOTLIB_DEFAULT_PLOT_KARGS 

324 ) 

325 

326 # Get the list of packets 

327 if lfilter is None: 

328 lst_pkts = [f(self.res[i], self.res[i + 1]) 

329 for i in range(len(self.res) - delay)] 

330 else: 

331 lst_pkts = [f(self.res[i], self.res[i + 1]) 

332 for i in range(len(self.res) - delay) 

333 if lfilter(self.res[i])] 

334 

335 # Mimic the default gnuplot output 

336 if kargs == {}: 

337 kargs = MATPLOTLIB_DEFAULT_PLOT_KARGS 

338 lines = plt.plot(lst_pkts, **kargs) 

339 

340 # Call show() if matplotlib is not inlined 

341 if not MATPLOTLIB_INLINED: 

342 plt.show() 

343 

344 return lines 

345 

346 def multiplot(self, 

347 f, # type: Callable[..., Any] 

348 lfilter=None, # type: Optional[Callable[..., Any]] 

349 plot_xy=False, # type: bool 

350 **kargs # type: Any 

351 ): 

352 # type: (...) -> Line2D 

353 """Uses a function that returns a label and a value for this label, then 

354 plots all the values label by label. 

355 

356 A list of matplotlib.lines.Line2D is returned. 

357 """ 

358 # Defer imports of matplotlib until its needed 

359 # because it has a heavy dep chain 

360 from scapy.libs.matplot import ( 

361 plt, 

362 MATPLOTLIB_INLINED, 

363 MATPLOTLIB_DEFAULT_PLOT_KARGS 

364 ) 

365 

366 # Get the list of packets 

367 if lfilter is None: 

368 lst_pkts = (f(*e) for e in self.res) 

369 else: 

370 lst_pkts = (f(*e) for e in self.res if lfilter(*e)) 

371 

372 # Apply the function f to the packets 

373 d = {} # type: Dict[str, List[float]] 

374 for k, v in lst_pkts: 

375 d.setdefault(k, []).append(v) 

376 

377 # Mimic the default gnuplot output 

378 if not kargs: 

379 kargs = MATPLOTLIB_DEFAULT_PLOT_KARGS 

380 

381 if plot_xy: 

382 lines = [plt.plot(*list(zip(*pl)), **dict(kargs, label=k)) 

383 for k, pl in d.items()] 

384 else: 

385 lines = [plt.plot(pl, **dict(kargs, label=k)) 

386 for k, pl in d.items()] 

387 plt.legend(loc="center right", bbox_to_anchor=(1.5, 0.5)) 

388 

389 # Call show() if matplotlib is not inlined 

390 if not MATPLOTLIB_INLINED: 

391 plt.show() 

392 

393 return lines 

394 

395 def rawhexdump(self): 

396 # type: () -> None 

397 """Prints an hexadecimal dump of each packet in the list""" 

398 for p in self: 

399 hexdump(self._elt2pkt(p)) 

400 

401 def hexraw(self, lfilter=None): 

402 # type: (Optional[Callable[..., bool]]) -> None 

403 """Same as nsummary(), except that if a packet has a Raw layer, it will be hexdumped # noqa: E501 

404 lfilter: a truth function that decides whether a packet must be displayed""" # noqa: E501 

405 for i, res in enumerate(self.res): 

406 p = self._elt2pkt(res) 

407 if lfilter is not None and not lfilter(p): 

408 continue 

409 print("%s %s %s" % (conf.color_theme.id(i, fmt="%04i"), 

410 p.sprintf("%.time%"), 

411 self._elt2sum(res))) 

412 if p.haslayer(conf.raw_layer): 

413 hexdump(p.getlayer(conf.raw_layer).load) # type: ignore 

414 

415 def hexdump(self, lfilter=None): 

416 # type: (Optional[Callable[..., bool]]) -> None 

417 """Same as nsummary(), except that packets are also hexdumped 

418 lfilter: a truth function that decides whether a packet must be displayed""" # noqa: E501 

419 for i, res in enumerate(self.res): 

420 p = self._elt2pkt(res) 

421 if lfilter is not None and not lfilter(p): 

422 continue 

423 print("%s %s %s" % (conf.color_theme.id(i, fmt="%04i"), 

424 p.sprintf("%.time%"), 

425 self._elt2sum(res))) 

426 hexdump(p) 

427 

428 def padding(self, lfilter=None): 

429 # type: (Optional[Callable[..., bool]]) -> None 

430 """Same as hexraw(), for Padding layer""" 

431 for i, res in enumerate(self.res): 

432 p = self._elt2pkt(res) 

433 if p.haslayer(conf.padding_layer): 

434 if lfilter is None or lfilter(p): 

435 print("%s %s %s" % (conf.color_theme.id(i, fmt="%04i"), 

436 p.sprintf("%.time%"), 

437 self._elt2sum(res))) 

438 hexdump( 

439 p.getlayer(conf.padding_layer).load # type: ignore 

440 ) 

441 

442 def nzpadding(self, lfilter=None): 

443 # type: (Optional[Callable[..., bool]]) -> None 

444 """Same as padding() but only non null padding""" 

445 for i, res in enumerate(self.res): 

446 p = self._elt2pkt(res) 

447 if p.haslayer(conf.padding_layer): 

448 pad = p.getlayer(conf.padding_layer).load # type: ignore 

449 if pad == pad[:1] * len(pad): 

450 continue 

451 if lfilter is None or lfilter(p): 

452 print("%s %s %s" % (conf.color_theme.id(i, fmt="%04i"), 

453 p.sprintf("%.time%"), 

454 self._elt2sum(res))) 

455 hexdump( 

456 p.getlayer(conf.padding_layer).load # type: ignore 

457 ) 

458 

459 def conversations(self, 

460 getsrcdst=None, # type: Optional[Callable[[Packet], Tuple[Any, ...]]] # noqa: E501 

461 **kargs # type: Any 

462 ): 

463 # type: (...) -> Any 

464 """Graphes a conversations between sources and destinations and display it 

465 (using graphviz and imagemagick) 

466 

467 :param getsrcdst: a function that takes an element of the list and 

468 returns the source, the destination and optionally 

469 a label. By default, returns the IP source and 

470 destination from IP and ARP layers 

471 :param type: output type (svg, ps, gif, jpg, etc.), passed to dot's 

472 "-T" option 

473 :param target: filename or redirect. Defaults pipe to Imagemagick's 

474 display program 

475 :param prog: which graphviz program to use 

476 """ 

477 if getsrcdst is None: 

478 def _getsrcdst(pkt): 

479 # type: (Packet) -> Tuple[str, str] 

480 """Extract src and dst addresses""" 

481 if 'IP' in pkt: 

482 return (pkt['IP'].src, pkt['IP'].dst) 

483 if 'IPv6' in pkt: 

484 return (pkt['IPv6'].src, pkt['IPv6'].dst) 

485 if 'ARP' in pkt: 

486 return (pkt['ARP'].psrc, pkt['ARP'].pdst) 

487 raise TypeError() 

488 getsrcdst = _getsrcdst 

489 conv = {} # type: Dict[Tuple[Any, ...], Any] 

490 for elt in self.res: 

491 p = self._elt2pkt(elt) 

492 try: 

493 c = getsrcdst(p) 

494 except Exception: 

495 # No warning here: it's OK that getsrcdst() raises an 

496 # exception, since it might be, for example, a 

497 # function that expects a specific layer in each 

498 # packet. The try/except approach is faster and 

499 # considered more Pythonic than adding tests. 

500 continue 

501 if len(c) == 3: 

502 conv.setdefault(c[:2], set()).add(c[2]) 

503 else: 

504 conv[c] = conv.get(c, 0) + 1 

505 gr = 'digraph "conv" {\n' 

506 for (s, d), l in conv.items(): 

507 gr += '\t "%s" -> "%s" [label="%s"]\n' % ( 

508 s, d, ', '.join(str(x) for x in l) if isinstance(l, set) else l 

509 ) 

510 gr += "}\n" 

511 return do_graph(gr, **kargs) 

512 

513 def afterglow(self, 

514 src=None, # type: Optional[Callable[[_Inner], Any]] 

515 event=None, # type: Optional[Callable[[_Inner], Any]] 

516 dst=None, # type: Optional[Callable[[_Inner], Any]] 

517 **kargs # type: Any 

518 ): 

519 # type: (...) -> Any 

520 """Experimental clone attempt of http://sourceforge.net/projects/afterglow 

521 each datum is reduced as src -> event -> dst and the data are graphed. 

522 by default we have IP.src -> IP.dport -> IP.dst""" 

523 if src is None: 

524 src = lambda *x: x[0]['IP'].src 

525 if event is None: 

526 event = lambda *x: x[0]['IP'].dport 

527 if dst is None: 

528 dst = lambda *x: x[0]['IP'].dst 

529 sl = {} # type: Dict[Any, Tuple[Union[float, int], List[Any]]] 

530 el = {} # type: Dict[Any, Tuple[Union[float, int], List[Any]]] 

531 dl = {} # type: Dict[Any, int] 

532 for i in self.res: 

533 try: 

534 s, e, d = src(i), event(i), dst(i) 

535 if s in sl: 

536 n, lst = sl[s] 

537 n += 1 

538 if e not in lst: 

539 lst.append(e) 

540 sl[s] = (n, lst) 

541 else: 

542 sl[s] = (1, [e]) 

543 if e in el: 

544 n, lst = el[e] 

545 n += 1 

546 if d not in lst: 

547 lst.append(d) 

548 el[e] = (n, lst) 

549 else: 

550 el[e] = (1, [d]) 

551 dl[d] = dl.get(d, 0) + 1 

552 except Exception: 

553 continue 

554 

555 def minmax(x): 

556 # type: (Any) -> Tuple[int, int] 

557 m, M = reduce(lambda a, b: (min(a[0], b[0]), max(a[1], b[1])), 

558 ((a, a) for a in x)) 

559 if m == M: 

560 m = 0 

561 if M == 0: 

562 M = 1 

563 return m, M 

564 

565 mins, maxs = minmax(x for x, _ in sl.values()) 

566 mine, maxe = minmax(x for x, _ in el.values()) 

567 mind, maxd = minmax(dl.values()) 

568 

569 gr = 'digraph "afterglow" {\n\tedge [len=2.5];\n' 

570 

571 gr += "# src nodes\n" 

572 for s in sl: 

573 n, _ = sl[s] 

574 n = 1 + float(n - mins) / (maxs - mins) 

575 gr += '"src.%s" [label = "%s", shape=box, fillcolor="#FF0000", style=filled, fixedsize=1, height=%.2f,width=%.2f];\n' % (repr(s), repr(s), n, n) # noqa: E501 

576 gr += "# event nodes\n" 

577 for e in el: 

578 n, _ = el[e] 

579 n = 1 + float(n - mine) / (maxe - mine) 

580 gr += '"evt.%s" [label = "%s", shape=circle, fillcolor="#00FFFF", style=filled, fixedsize=1, height=%.2f, width=%.2f];\n' % (repr(e), repr(e), n, n) # noqa: E501 

581 for d in dl: 

582 n = dl[d] 

583 n = 1 + float(n - mind) / (maxd - mind) 

584 gr += '"dst.%s" [label = "%s", shape=triangle, fillcolor="#0000ff", style=filled, fixedsize=1, height=%.2f, width=%.2f];\n' % (repr(d), repr(d), n, n) # noqa: E501 

585 

586 gr += "###\n" 

587 for s in sl: 

588 n, lst1 = sl[s] 

589 for e in lst1: 

590 gr += ' "src.%s" -> "evt.%s";\n' % (repr(s), repr(e)) 

591 for e in el: 

592 n, lst2 = el[e] 

593 for d in lst2: 

594 gr += ' "evt.%s" -> "dst.%s";\n' % (repr(e), repr(d)) 

595 

596 gr += "}" 

597 return do_graph(gr, **kargs) 

598 

599 def canvas_dump(self, layer_shift=0, rebuild=1): 

600 # type: (int, int) -> 'pyx.canvas.canvas' 

601 d = pyx.document.document() 

602 len_res = len(self.res) 

603 for i, res in enumerate(self.res): 

604 c = self._elt2pkt(res).canvas_dump(layer_shift=layer_shift, 

605 rebuild=rebuild) 

606 cbb = c.bbox() 

607 c.text(cbb.left(), cbb.top() + 1, r"\font\cmssfont=cmss12\cmssfont{Frame %i/%i}" % (i, len_res), [pyx.text.size.LARGE]) # noqa: E501 

608 if conf.verb >= 2: 

609 os.write(1, b".") 

610 d.append(pyx.document.page(c, paperformat=pyx.document.paperformat.A4, # noqa: E501 

611 margin=1 * pyx.unit.t_cm, 

612 fittosize=1)) 

613 return d 

614 

615 def sessions( 

616 self, 

617 session_extractor=None # type: Optional[Callable[[Packet], str]] 

618 ): 

619 # type: (...) -> Dict[str, _PacketList[_Inner]] 

620 if session_extractor is None: 

621 def _session_extractor(p): 

622 # type: (Packet) -> str 

623 """Extract sessions from packets""" 

624 if 'Ether' in p: 

625 if 'IP' in p or 'IPv6' in p: 

626 ip_src_fmt = "{IP:%IP.src%}{IPv6:%IPv6.src%}" 

627 ip_dst_fmt = "{IP:%IP.dst%}{IPv6:%IPv6.dst%}" 

628 addr_fmt = (ip_src_fmt, ip_dst_fmt) 

629 if 'TCP' in p: 

630 fmt = "TCP {}:%r,TCP.sport% > {}:%r,TCP.dport%" 

631 elif 'UDP' in p: 

632 fmt = "UDP {}:%r,UDP.sport% > {}:%r,UDP.dport%" 

633 elif 'ICMP' in p: 

634 fmt = "ICMP {} > {} type=%r,ICMP.type% code=%r," \ 

635 "ICMP.code% id=%ICMP.id%" 

636 elif 'ICMPv6' in p: 

637 fmt = "ICMPv6 {} > {} type=%r,ICMPv6.type% " \ 

638 "code=%r,ICMPv6.code%" 

639 elif 'IPv6' in p: 

640 fmt = "IPv6 {} > {} nh=%IPv6.nh%" 

641 else: 

642 fmt = "IP {} > {} proto=%IP.proto%" 

643 return p.sprintf(fmt.format(*addr_fmt)) 

644 elif 'ARP' in p: 

645 return p.sprintf("ARP %ARP.psrc% > %ARP.pdst%") 

646 else: 

647 return p.sprintf("Ethernet type=%04xr,Ether.type%") 

648 return "Other" 

649 session_extractor = _session_extractor 

650 sessions = defaultdict(self.__class__) # type: DefaultDict[str, _PacketList[_Inner]] # noqa: E501 

651 for p in self.res: 

652 sess = session_extractor( 

653 self._elt2pkt(p) 

654 ) 

655 sessions[sess].append(p) 

656 return dict(sessions) 

657 

658 def replace(self, *args, **kargs): 

659 # type: (Any, Any) -> PacketList 

660 """ 

661 lst.replace(<field>,[<oldvalue>,]<newvalue>) 

662 lst.replace( (fld,[ov],nv),(fld,[ov,]nv),...) 

663 if ov is None, all values are replaced 

664 ex: 

665 lst.replace( IP.src, "192.168.1.1", "10.0.0.1" ) 

666 lst.replace( IP.ttl, 64 ) 

667 lst.replace( (IP.ttl, 64), (TCP.sport, 666, 777), ) 

668 """ 

669 delete_checksums = kargs.get("delete_checksums", False) 

670 x = PacketList(name="Replaced %s" % self.listname) 

671 if not isinstance(args[0], tuple): 

672 args = (args,) 

673 for _p in self.res: 

674 p = self._elt2pkt(_p) 

675 copied = False 

676 for scheme in args: 

677 fld = scheme[0] 

678 old = scheme[1] # not used if len(scheme) == 2 

679 new = scheme[-1] 

680 for o in fld.owners: 

681 if o in p: 

682 if len(scheme) == 2 or p[o].getfieldval(fld.name) == old: # noqa: E501 

683 if not copied: 

684 p = p.copy() 

685 if delete_checksums: 

686 p.delete_checksums() 

687 copied = True 

688 setattr(p[o], fld.name, new) 

689 x.append(p) 

690 return x 

691 

692 def getlayer(self, cls, # type: Packet 

693 nb=None, # type: Optional[int] 

694 flt=None, # type: Optional[Dict[str, Any]] 

695 name=None, # type: Optional[str] 

696 stats=None # type: Optional[List[Type[Packet]]] 

697 ): 

698 # type: (...) -> PacketList 

699 """Returns the packet list from a given layer. 

700 

701 See ``Packet.getlayer`` for more info. 

702 

703 :param cls: search for a layer that is an instance of ``cls`` 

704 :type cls: Type[scapy.packet.Packet] 

705 

706 :param nb: return the nb^th layer that is an instance of ``cls`` 

707 :type nb: Optional[int] 

708 

709 :param flt: filter parameters for ``Packet.getlayer`` 

710 :type flt: Optional[Dict[str, Any]] 

711 

712 :param name: optional name for the new PacketList 

713 :type name: Optional[str] 

714 

715 :param stats: optional list of protocols to give stats on; if not 

716 specified, inherits from this PacketList. 

717 :type stats: Optional[List[Type[scapy.packet.Packet]]] 

718 :rtype: scapy.plist.PacketList 

719 """ 

720 if name is None: 

721 name = "{} layer {}".format(self.listname, cls.__name__) 

722 if stats is None: 

723 stats = self.stats 

724 

725 getlayer_arg = {} # type: Dict[str, Any] 

726 if flt is not None: 

727 getlayer_arg.update(flt) 

728 getlayer_arg['cls'] = cls 

729 if nb is not None: 

730 getlayer_arg['nb'] = nb 

731 

732 # Only return non-None getlayer results 

733 return PacketList([ 

734 pc for pc in ( 

735 self._elt2pkt(p).getlayer(**getlayer_arg) for p in self.res 

736 ) if pc is not None], 

737 name, stats 

738 ) 

739 

740 

741class PacketList(_PacketList[Packet], 

742 BasePacketList[Packet], 

743 _CanvasDumpExtended): 

744 def sr(self, multi=False, lookahead=None): 

745 # type: (bool, Optional[int]) -> Tuple[SndRcvList, PacketList] 

746 """ 

747 Matches packets in the list 

748 

749 :param multi: True if a packet can have multiple answers 

750 :param lookahead: Maximum number of packets between packet and answer. 

751 If 0 or None, full remaining list is 

752 scanned for answers 

753 :return: ( (matched couples), (unmatched packets) ) 

754 """ 

755 remain = self.res[:] 

756 sr = [] # type: List[QueryAnswer] 

757 i = 0 

758 if lookahead is None or lookahead == 0: 

759 lookahead = len(remain) 

760 while i < len(remain): 

761 s = remain[i] 

762 j = i 

763 while j < min(lookahead + i, len(remain) - 1): 

764 j += 1 

765 r = remain[j] 

766 if r.answers(s): 

767 sr.append(QueryAnswer(s, r)) 

768 if multi: 

769 remain[i]._answered = 1 

770 remain[j]._answered = 2 

771 continue 

772 del remain[j] 

773 del remain[i] 

774 i -= 1 

775 break 

776 i += 1 

777 if multi: 

778 remain = [x for x in remain if not hasattr(x, "_answered")] 

779 return SndRcvList(sr), PacketList(remain) 

780 

781 

782_PacketIterable = Union[ 

783 Sequence[Packet], 

784 Packet, 

785 SetGen[Packet], 

786 _PacketList[Packet] 

787] 

788 

789 

790class SndRcvList(_PacketList[QueryAnswer], 

791 BasePacketList[QueryAnswer], 

792 _CanvasDumpExtended): 

793 __slots__ = [] # type: List[str] 

794 

795 def __init__(self, 

796 res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501 

797 name="Results", # type: str 

798 stats=None # type: Optional[List[Type[Packet]]] 

799 ): 

800 # type: (...) -> None 

801 super(SndRcvList, self).__init__(res, name, stats) 

802 

803 def _elt2pkt(self, elt): 

804 # type: (QueryAnswer) -> Packet 

805 return elt[1] 

806 

807 def _elt2sum(self, elt): 

808 # type: (QueryAnswer) -> str 

809 return "%s ==> %s" % (elt[0].summary(), elt[1].summary())