Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/plist.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7PacketList: holds several packets and allows to do operations on them.
8"""
11import os
12from collections import defaultdict
13from typing import Sequence, NamedTuple
15from scapy.config import conf
16from scapy.base_classes import (
17 BasePacket,
18 BasePacketList,
19 PacketList_metaclass,
20 SetGen,
21 _CanvasDumpExtended,
22)
23from scapy.utils import do_graph, hexdump, make_table, make_lined_table, \
24 make_tex_table, issubtype
25from functools import reduce
27# typings
28from typing import (
29 Any,
30 Callable,
31 DefaultDict,
32 Dict,
33 Generic,
34 Iterator,
35 List,
36 Optional,
37 Tuple,
38 Type,
39 TypeVar,
40 Union,
41 TYPE_CHECKING,
42)
43from scapy.packet import Packet
45try:
46 import pyx
47except ImportError:
48 pass
50if TYPE_CHECKING:
51 from scapy.libs.matplot import Line2D
53#############
54# Results #
55#############
58QueryAnswer = NamedTuple(
59 "QueryAnswer",
60 [("query", Packet), ("answer", Packet)]
61)
63_Inner = TypeVar("_Inner", Packet, QueryAnswer)
66class _PacketList(Generic[_Inner], metaclass=PacketList_metaclass):
67 __slots__ = ["stats", "res", "listname"]
69 def __init__(self,
70 res=None, # type: Optional[Union[_PacketList[_Inner], List[_Inner]]] # noqa: E501
71 name="PacketList", # type: str
72 stats=None # type: Optional[List[Type[Packet]]]
73 ):
74 # type: (...) -> None
75 """create a packet list from a list of packets
76 res: the list of packets
77 stats: a list of classes that will appear in the stats (defaults to [TCP,UDP,ICMP])""" # noqa: E501
78 if stats is None:
79 stats = conf.stats_classic_protocols
80 self.stats = stats
81 if res is None:
82 self.res = [] # type: List[_Inner]
83 elif isinstance(res, _PacketList):
84 self.res = res.res
85 else:
86 self.res = res
87 self.listname = name
89 def __len__(self):
90 # type: () -> int
91 return len(self.res)
93 def _elt2pkt(self, elt):
94 # type: (_Inner) -> Packet
95 return elt # type: ignore
97 def _elt2sum(self, elt):
98 # type: (_Inner) -> str
99 return elt.summary() # type: ignore
101 def _elt2show(self, elt):
102 # type: (_Inner) -> str
103 return self._elt2sum(elt)
105 def __repr__(self):
106 # type: () -> str
107 stats = {x: 0 for x in self.stats}
108 other = 0
109 for r in self.res:
110 f = 0
111 for p in stats:
112 if self._elt2pkt(r).haslayer(p):
113 stats[p] += 1
114 f = 1
115 break
116 if not f:
117 other += 1
118 s = ""
119 ct = conf.color_theme
120 for p in self.stats:
121 s += " %s%s%s" % (ct.packetlist_proto(p._name),
122 ct.punct(":"),
123 ct.packetlist_value(stats[p]))
124 s += " %s%s%s" % (ct.packetlist_proto("Other"),
125 ct.punct(":"),
126 ct.packetlist_value(other))
127 return "%s%s%s%s%s" % (ct.punct("<"),
128 ct.packetlist_name(self.listname),
129 ct.punct(":"),
130 s,
131 ct.punct(">"))
133 def __getstate__(self):
134 # type: () -> Dict[str, Any]
135 """
136 Creates a basic representation of the instance, used in
137 conjunction with __setstate__() e.g. by pickle
139 :returns: dict representing this instance
140 """
141 state = {
142 'res': self.res,
143 'stats': self.stats,
144 'listname': self.listname
145 }
146 return state
148 def __setstate__(self, state):
149 # type: (Dict[str, Any]) -> None
150 """
151 Sets instance attributes to values given by state, used in
152 conjunction with __getstate__() e.g. by pickle
154 :param state: dict representing this instance
155 """
156 self.res = state['res']
157 self.stats = state['stats']
158 self.listname = state['listname']
160 def __iter__(self):
161 # type: () -> Iterator[_Inner]
162 return self.res.__iter__()
164 def __getattr__(self, attr):
165 # type: (str) -> Any
166 return getattr(self.res, attr)
168 def __getitem__(self, item):
169 # type: (Any) -> Any
170 if issubtype(item, BasePacket):
171 return self.__class__([x for x in self.res if item in self._elt2pkt(x)], # noqa: E501
172 name="%s from %s" % (item.__name__, self.listname)) # noqa: E501
173 if isinstance(item, slice):
174 return self.__class__(self.res.__getitem__(item),
175 name="mod %s" % self.listname)
176 return self.res.__getitem__(item)
178 _T = TypeVar('_T', 'SndRcvList', 'PacketList')
180 # Hinting hack: type self
181 def __add__(self, # type: _PacketList._T # type: ignore
182 other # type: _PacketList._T
183 ):
184 # type: (...) -> _PacketList._T
185 return self.__class__(
186 self.res + other.res,
187 name="%s+%s" % (
188 self.listname,
189 other.listname
190 )
191 )
193 def summary(self,
194 prn=None, # type: Optional[Callable[..., Any]]
195 lfilter=None # type: Optional[Callable[..., bool]]
196 ):
197 # type: (...) -> None
198 """prints a summary of each packet
200 :param prn: function to apply to each packet instead of
201 lambda x:x.summary()
202 :param lfilter: truth function to apply to each packet to decide
203 whether it will be displayed
204 """
205 for r in self.res:
206 if lfilter is not None:
207 if not lfilter(*r):
208 continue
209 if prn is None:
210 print(self._elt2sum(r))
211 else:
212 print(prn(*r))
214 def nsummary(self,
215 prn=None, # type: Optional[Callable[..., Any]]
216 lfilter=None # type: Optional[Callable[..., bool]]
217 ):
218 # type: (...) -> None
219 """prints a summary of each packet with the packet's number
221 :param prn: function to apply to each packet instead of
222 lambda x:x.summary()
223 :param lfilter: truth function to apply to each packet to decide
224 whether it will be displayed
225 """
226 for i, res in enumerate(self.res):
227 if lfilter is not None:
228 if not lfilter(*res):
229 continue
230 print(conf.color_theme.id(i, fmt="%04i"), end=' ')
231 if prn is None:
232 print(self._elt2sum(res))
233 else:
234 print(prn(*res))
236 def show(self, *args, **kargs):
237 # type: (*Any, **Any) -> None
238 """Best way to display the packet list. Defaults to nsummary() method""" # noqa: E501
239 return self.nsummary(*args, **kargs)
241 def filter(self, func):
242 # type: (Callable[..., bool]) -> _PacketList[_Inner]
243 """Returns a packet list filtered by a truth function. This truth
244 function has to take a packet as the only argument and return
245 a boolean value.
246 """
247 return self.__class__([x for x in self.res if func(*x)],
248 name="filtered %s" % self.listname)
250 def make_table(self, *args, **kargs):
251 # type: (Any, Any) -> Optional[str]
252 """Prints a table using a function that returns for each packet its head column value, head row value and displayed value # noqa: E501
253 ex: p.make_table(lambda x:(x[IP].dst, x[TCP].dport, x[TCP].sprintf("%flags%")) """ # noqa: E501
254 return make_table(self.res, *args, **kargs)
256 def make_lined_table(self, *args, **kargs):
257 # type: (Any, Any) -> Optional[str]
258 """Same as make_table, but print a table with lines"""
259 return make_lined_table(self.res, *args, **kargs)
261 def make_tex_table(self, *args, **kargs):
262 # type: (Any, Any) -> Optional[str]
263 """Same as make_table, but print a table with LaTeX syntax"""
264 return make_tex_table(self.res, *args, **kargs)
266 def plot(self,
267 f, # type: Callable[..., Any]
268 lfilter=None, # type: Optional[Callable[..., bool]]
269 plot_xy=False, # type: bool
270 **kargs # type: Any
271 ):
272 # type: (...) -> Line2D
273 """Applies a function to each packet to get a value that will be plotted
274 with matplotlib. A list of matplotlib.lines.Line2D is returned.
276 lfilter: a truth function that decides whether a packet must be plotted
277 """
278 # Defer imports of matplotlib until its needed
279 # because it has a heavy dep chain
280 from scapy.libs.matplot import (
281 plt,
282 MATPLOTLIB_INLINED,
283 MATPLOTLIB_DEFAULT_PLOT_KARGS
284 )
286 # Get the list of packets
287 if lfilter is None:
288 lst_pkts = [f(*e) for e in self.res]
289 else:
290 lst_pkts = [f(*e) for e in self.res if lfilter(*e)]
292 # Mimic the default gnuplot output
293 if kargs == {}:
294 kargs = MATPLOTLIB_DEFAULT_PLOT_KARGS
295 if plot_xy:
296 lines = plt.plot(*zip(*lst_pkts), **kargs)
297 else:
298 lines = plt.plot(lst_pkts, **kargs)
300 # Call show() if matplotlib is not inlined
301 if not MATPLOTLIB_INLINED:
302 plt.show()
304 return lines
306 def diffplot(self,
307 f, # type: Callable[..., Any]
308 delay=1, # type: int
309 lfilter=None, # type: Optional[Callable[..., bool]]
310 **kargs # type: Any
311 ):
312 # type: (...) -> Line2D
313 """diffplot(f, delay=1, lfilter=None)
314 Applies a function to couples (l[i],l[i+delay])
316 A list of matplotlib.lines.Line2D is returned.
317 """
318 # Defer imports of matplotlib until its needed
319 # because it has a heavy dep chain
320 from scapy.libs.matplot import (
321 plt,
322 MATPLOTLIB_INLINED,
323 MATPLOTLIB_DEFAULT_PLOT_KARGS
324 )
326 # Get the list of packets
327 if lfilter is None:
328 lst_pkts = [f(self.res[i], self.res[i + 1])
329 for i in range(len(self.res) - delay)]
330 else:
331 lst_pkts = [f(self.res[i], self.res[i + 1])
332 for i in range(len(self.res) - delay)
333 if lfilter(self.res[i])]
335 # Mimic the default gnuplot output
336 if kargs == {}:
337 kargs = MATPLOTLIB_DEFAULT_PLOT_KARGS
338 lines = plt.plot(lst_pkts, **kargs)
340 # Call show() if matplotlib is not inlined
341 if not MATPLOTLIB_INLINED:
342 plt.show()
344 return lines
346 def multiplot(self,
347 f, # type: Callable[..., Any]
348 lfilter=None, # type: Optional[Callable[..., Any]]
349 plot_xy=False, # type: bool
350 **kargs # type: Any
351 ):
352 # type: (...) -> Line2D
353 """Uses a function that returns a label and a value for this label, then
354 plots all the values label by label.
356 A list of matplotlib.lines.Line2D is returned.
357 """
358 # Defer imports of matplotlib until its needed
359 # because it has a heavy dep chain
360 from scapy.libs.matplot import (
361 plt,
362 MATPLOTLIB_INLINED,
363 MATPLOTLIB_DEFAULT_PLOT_KARGS
364 )
366 # Get the list of packets
367 if lfilter is None:
368 lst_pkts = (f(*e) for e in self.res)
369 else:
370 lst_pkts = (f(*e) for e in self.res if lfilter(*e))
372 # Apply the function f to the packets
373 d = {} # type: Dict[str, List[float]]
374 for k, v in lst_pkts:
375 d.setdefault(k, []).append(v)
377 # Mimic the default gnuplot output
378 if not kargs:
379 kargs = MATPLOTLIB_DEFAULT_PLOT_KARGS
381 if plot_xy:
382 lines = [plt.plot(*list(zip(*pl)), **dict(kargs, label=k))
383 for k, pl in d.items()]
384 else:
385 lines = [plt.plot(pl, **dict(kargs, label=k))
386 for k, pl in d.items()]
387 plt.legend(loc="center right", bbox_to_anchor=(1.5, 0.5))
389 # Call show() if matplotlib is not inlined
390 if not MATPLOTLIB_INLINED:
391 plt.show()
393 return lines
395 def rawhexdump(self):
396 # type: () -> None
397 """Prints an hexadecimal dump of each packet in the list"""
398 for p in self:
399 hexdump(self._elt2pkt(p))
401 def hexraw(self, lfilter=None):
402 # type: (Optional[Callable[..., bool]]) -> None
403 """Same as nsummary(), except that if a packet has a Raw layer, it will be hexdumped # noqa: E501
404 lfilter: a truth function that decides whether a packet must be displayed""" # noqa: E501
405 for i, res in enumerate(self.res):
406 p = self._elt2pkt(res)
407 if lfilter is not None and not lfilter(p):
408 continue
409 print("%s %s %s" % (conf.color_theme.id(i, fmt="%04i"),
410 p.sprintf("%.time%"),
411 self._elt2sum(res)))
412 if p.haslayer(conf.raw_layer):
413 hexdump(p.getlayer(conf.raw_layer).load) # type: ignore
415 def hexdump(self, lfilter=None):
416 # type: (Optional[Callable[..., bool]]) -> None
417 """Same as nsummary(), except that packets are also hexdumped
418 lfilter: a truth function that decides whether a packet must be displayed""" # noqa: E501
419 for i, res in enumerate(self.res):
420 p = self._elt2pkt(res)
421 if lfilter is not None and not lfilter(p):
422 continue
423 print("%s %s %s" % (conf.color_theme.id(i, fmt="%04i"),
424 p.sprintf("%.time%"),
425 self._elt2sum(res)))
426 hexdump(p)
428 def padding(self, lfilter=None):
429 # type: (Optional[Callable[..., bool]]) -> None
430 """Same as hexraw(), for Padding layer"""
431 for i, res in enumerate(self.res):
432 p = self._elt2pkt(res)
433 if p.haslayer(conf.padding_layer):
434 if lfilter is None or lfilter(p):
435 print("%s %s %s" % (conf.color_theme.id(i, fmt="%04i"),
436 p.sprintf("%.time%"),
437 self._elt2sum(res)))
438 hexdump(
439 p.getlayer(conf.padding_layer).load # type: ignore
440 )
442 def nzpadding(self, lfilter=None):
443 # type: (Optional[Callable[..., bool]]) -> None
444 """Same as padding() but only non null padding"""
445 for i, res in enumerate(self.res):
446 p = self._elt2pkt(res)
447 if p.haslayer(conf.padding_layer):
448 pad = p.getlayer(conf.padding_layer).load # type: ignore
449 if pad == pad[:1] * len(pad):
450 continue
451 if lfilter is None or lfilter(p):
452 print("%s %s %s" % (conf.color_theme.id(i, fmt="%04i"),
453 p.sprintf("%.time%"),
454 self._elt2sum(res)))
455 hexdump(
456 p.getlayer(conf.padding_layer).load # type: ignore
457 )
459 def conversations(self,
460 getsrcdst=None, # type: Optional[Callable[[Packet], Tuple[Any, ...]]] # noqa: E501
461 **kargs # type: Any
462 ):
463 # type: (...) -> Any
464 """Graphes a conversations between sources and destinations and display it
465 (using graphviz and imagemagick)
467 :param getsrcdst: a function that takes an element of the list and
468 returns the source, the destination and optionally
469 a label. By default, returns the IP source and
470 destination from IP and ARP layers
471 :param type: output type (svg, ps, gif, jpg, etc.), passed to dot's
472 "-T" option
473 :param target: filename or redirect. Defaults pipe to Imagemagick's
474 display program
475 :param prog: which graphviz program to use
476 """
477 if getsrcdst is None:
478 def _getsrcdst(pkt):
479 # type: (Packet) -> Tuple[str, str]
480 """Extract src and dst addresses"""
481 if 'IP' in pkt:
482 return (pkt['IP'].src, pkt['IP'].dst)
483 if 'IPv6' in pkt:
484 return (pkt['IPv6'].src, pkt['IPv6'].dst)
485 if 'ARP' in pkt:
486 return (pkt['ARP'].psrc, pkt['ARP'].pdst)
487 raise TypeError()
488 getsrcdst = _getsrcdst
489 conv = {} # type: Dict[Tuple[Any, ...], Any]
490 for elt in self.res:
491 p = self._elt2pkt(elt)
492 try:
493 c = getsrcdst(p)
494 except Exception:
495 # No warning here: it's OK that getsrcdst() raises an
496 # exception, since it might be, for example, a
497 # function that expects a specific layer in each
498 # packet. The try/except approach is faster and
499 # considered more Pythonic than adding tests.
500 continue
501 if len(c) == 3:
502 conv.setdefault(c[:2], set()).add(c[2])
503 else:
504 conv[c] = conv.get(c, 0) + 1
505 gr = 'digraph "conv" {\n'
506 for (s, d), l in conv.items():
507 gr += '\t "%s" -> "%s" [label="%s"]\n' % (
508 s, d, ', '.join(str(x) for x in l) if isinstance(l, set) else l
509 )
510 gr += "}\n"
511 return do_graph(gr, **kargs)
513 def afterglow(self,
514 src=None, # type: Optional[Callable[[_Inner], Any]]
515 event=None, # type: Optional[Callable[[_Inner], Any]]
516 dst=None, # type: Optional[Callable[[_Inner], Any]]
517 **kargs # type: Any
518 ):
519 # type: (...) -> Any
520 """Experimental clone attempt of http://sourceforge.net/projects/afterglow
521 each datum is reduced as src -> event -> dst and the data are graphed.
522 by default we have IP.src -> IP.dport -> IP.dst"""
523 if src is None:
524 src = lambda *x: x[0]['IP'].src
525 if event is None:
526 event = lambda *x: x[0]['IP'].dport
527 if dst is None:
528 dst = lambda *x: x[0]['IP'].dst
529 sl = {} # type: Dict[Any, Tuple[Union[float, int], List[Any]]]
530 el = {} # type: Dict[Any, Tuple[Union[float, int], List[Any]]]
531 dl = {} # type: Dict[Any, int]
532 for i in self.res:
533 try:
534 s, e, d = src(i), event(i), dst(i)
535 if s in sl:
536 n, lst = sl[s]
537 n += 1
538 if e not in lst:
539 lst.append(e)
540 sl[s] = (n, lst)
541 else:
542 sl[s] = (1, [e])
543 if e in el:
544 n, lst = el[e]
545 n += 1
546 if d not in lst:
547 lst.append(d)
548 el[e] = (n, lst)
549 else:
550 el[e] = (1, [d])
551 dl[d] = dl.get(d, 0) + 1
552 except Exception:
553 continue
555 def minmax(x):
556 # type: (Any) -> Tuple[int, int]
557 m, M = reduce(lambda a, b: (min(a[0], b[0]), max(a[1], b[1])),
558 ((a, a) for a in x))
559 if m == M:
560 m = 0
561 if M == 0:
562 M = 1
563 return m, M
565 mins, maxs = minmax(x for x, _ in sl.values())
566 mine, maxe = minmax(x for x, _ in el.values())
567 mind, maxd = minmax(dl.values())
569 gr = 'digraph "afterglow" {\n\tedge [len=2.5];\n'
571 gr += "# src nodes\n"
572 for s in sl:
573 n, _ = sl[s]
574 n = 1 + float(n - mins) / (maxs - mins)
575 gr += '"src.%s" [label = "%s", shape=box, fillcolor="#FF0000", style=filled, fixedsize=1, height=%.2f,width=%.2f];\n' % (repr(s), repr(s), n, n) # noqa: E501
576 gr += "# event nodes\n"
577 for e in el:
578 n, _ = el[e]
579 n = 1 + float(n - mine) / (maxe - mine)
580 gr += '"evt.%s" [label = "%s", shape=circle, fillcolor="#00FFFF", style=filled, fixedsize=1, height=%.2f, width=%.2f];\n' % (repr(e), repr(e), n, n) # noqa: E501
581 for d in dl:
582 n = dl[d]
583 n = 1 + float(n - mind) / (maxd - mind)
584 gr += '"dst.%s" [label = "%s", shape=triangle, fillcolor="#0000ff", style=filled, fixedsize=1, height=%.2f, width=%.2f];\n' % (repr(d), repr(d), n, n) # noqa: E501
586 gr += "###\n"
587 for s in sl:
588 n, lst1 = sl[s]
589 for e in lst1:
590 gr += ' "src.%s" -> "evt.%s";\n' % (repr(s), repr(e))
591 for e in el:
592 n, lst2 = el[e]
593 for d in lst2:
594 gr += ' "evt.%s" -> "dst.%s";\n' % (repr(e), repr(d))
596 gr += "}"
597 return do_graph(gr, **kargs)
599 def canvas_dump(self, layer_shift=0, rebuild=1):
600 # type: (int, int) -> 'pyx.canvas.canvas'
601 d = pyx.document.document()
602 len_res = len(self.res)
603 for i, res in enumerate(self.res):
604 c = self._elt2pkt(res).canvas_dump(layer_shift=layer_shift,
605 rebuild=rebuild)
606 cbb = c.bbox()
607 c.text(cbb.left(), cbb.top() + 1, r"\font\cmssfont=cmss12\cmssfont{Frame %i/%i}" % (i, len_res), [pyx.text.size.LARGE]) # noqa: E501
608 if conf.verb >= 2:
609 os.write(1, b".")
610 d.append(pyx.document.page(c, paperformat=pyx.document.paperformat.A4, # noqa: E501
611 margin=1 * pyx.unit.t_cm,
612 fittosize=1))
613 return d
615 def sessions(
616 self,
617 session_extractor=None # type: Optional[Callable[[Packet], str]]
618 ):
619 # type: (...) -> Dict[str, _PacketList[_Inner]]
620 if session_extractor is None:
621 def _session_extractor(p):
622 # type: (Packet) -> str
623 """Extract sessions from packets"""
624 if 'Ether' in p:
625 if 'IP' in p or 'IPv6' in p:
626 ip_src_fmt = "{IP:%IP.src%}{IPv6:%IPv6.src%}"
627 ip_dst_fmt = "{IP:%IP.dst%}{IPv6:%IPv6.dst%}"
628 addr_fmt = (ip_src_fmt, ip_dst_fmt)
629 if 'TCP' in p:
630 fmt = "TCP {}:%r,TCP.sport% > {}:%r,TCP.dport%"
631 elif 'UDP' in p:
632 fmt = "UDP {}:%r,UDP.sport% > {}:%r,UDP.dport%"
633 elif 'ICMP' in p:
634 fmt = "ICMP {} > {} type=%r,ICMP.type% code=%r," \
635 "ICMP.code% id=%ICMP.id%"
636 elif 'ICMPv6' in p:
637 fmt = "ICMPv6 {} > {} type=%r,ICMPv6.type% " \
638 "code=%r,ICMPv6.code%"
639 elif 'IPv6' in p:
640 fmt = "IPv6 {} > {} nh=%IPv6.nh%"
641 else:
642 fmt = "IP {} > {} proto=%IP.proto%"
643 return p.sprintf(fmt.format(*addr_fmt))
644 elif 'ARP' in p:
645 return p.sprintf("ARP %ARP.psrc% > %ARP.pdst%")
646 else:
647 return p.sprintf("Ethernet type=%04xr,Ether.type%")
648 return "Other"
649 session_extractor = _session_extractor
650 sessions = defaultdict(self.__class__) # type: DefaultDict[str, _PacketList[_Inner]] # noqa: E501
651 for p in self.res:
652 sess = session_extractor(
653 self._elt2pkt(p)
654 )
655 sessions[sess].append(p)
656 return dict(sessions)
658 def replace(self, *args, **kargs):
659 # type: (Any, Any) -> PacketList
660 """
661 lst.replace(<field>,[<oldvalue>,]<newvalue>)
662 lst.replace( (fld,[ov],nv),(fld,[ov,]nv),...)
663 if ov is None, all values are replaced
664 ex:
665 lst.replace( IP.src, "192.168.1.1", "10.0.0.1" )
666 lst.replace( IP.ttl, 64 )
667 lst.replace( (IP.ttl, 64), (TCP.sport, 666, 777), )
668 """
669 delete_checksums = kargs.get("delete_checksums", False)
670 x = PacketList(name="Replaced %s" % self.listname)
671 if not isinstance(args[0], tuple):
672 args = (args,)
673 for _p in self.res:
674 p = self._elt2pkt(_p)
675 copied = False
676 for scheme in args:
677 fld = scheme[0]
678 old = scheme[1] # not used if len(scheme) == 2
679 new = scheme[-1]
680 for o in fld.owners:
681 if o in p:
682 if len(scheme) == 2 or p[o].getfieldval(fld.name) == old: # noqa: E501
683 if not copied:
684 p = p.copy()
685 if delete_checksums:
686 p.delete_checksums()
687 copied = True
688 setattr(p[o], fld.name, new)
689 x.append(p)
690 return x
692 def getlayer(self, cls, # type: Packet
693 nb=None, # type: Optional[int]
694 flt=None, # type: Optional[Dict[str, Any]]
695 name=None, # type: Optional[str]
696 stats=None # type: Optional[List[Type[Packet]]]
697 ):
698 # type: (...) -> PacketList
699 """Returns the packet list from a given layer.
701 See ``Packet.getlayer`` for more info.
703 :param cls: search for a layer that is an instance of ``cls``
704 :type cls: Type[scapy.packet.Packet]
706 :param nb: return the nb^th layer that is an instance of ``cls``
707 :type nb: Optional[int]
709 :param flt: filter parameters for ``Packet.getlayer``
710 :type flt: Optional[Dict[str, Any]]
712 :param name: optional name for the new PacketList
713 :type name: Optional[str]
715 :param stats: optional list of protocols to give stats on; if not
716 specified, inherits from this PacketList.
717 :type stats: Optional[List[Type[scapy.packet.Packet]]]
718 :rtype: scapy.plist.PacketList
719 """
720 if name is None:
721 name = "{} layer {}".format(self.listname, cls.__name__)
722 if stats is None:
723 stats = self.stats
725 getlayer_arg = {} # type: Dict[str, Any]
726 if flt is not None:
727 getlayer_arg.update(flt)
728 getlayer_arg['cls'] = cls
729 if nb is not None:
730 getlayer_arg['nb'] = nb
732 # Only return non-None getlayer results
733 return PacketList([
734 pc for pc in (
735 self._elt2pkt(p).getlayer(**getlayer_arg) for p in self.res
736 ) if pc is not None],
737 name, stats
738 )
741class PacketList(_PacketList[Packet],
742 BasePacketList[Packet],
743 _CanvasDumpExtended):
744 def sr(self, multi=False, lookahead=None):
745 # type: (bool, Optional[int]) -> Tuple[SndRcvList, PacketList]
746 """
747 Matches packets in the list
749 :param multi: True if a packet can have multiple answers
750 :param lookahead: Maximum number of packets between packet and answer.
751 If 0 or None, full remaining list is
752 scanned for answers
753 :return: ( (matched couples), (unmatched packets) )
754 """
755 remain = self.res[:]
756 sr = [] # type: List[QueryAnswer]
757 i = 0
758 if lookahead is None or lookahead == 0:
759 lookahead = len(remain)
760 while i < len(remain):
761 s = remain[i]
762 j = i
763 while j < min(lookahead + i, len(remain) - 1):
764 j += 1
765 r = remain[j]
766 if r.answers(s):
767 sr.append(QueryAnswer(s, r))
768 if multi:
769 remain[i]._answered = 1
770 remain[j]._answered = 2
771 continue
772 del remain[j]
773 del remain[i]
774 i -= 1
775 break
776 i += 1
777 if multi:
778 remain = [x for x in remain if not hasattr(x, "_answered")]
779 return SndRcvList(sr), PacketList(remain)
782_PacketIterable = Union[
783 Sequence[Packet],
784 Packet,
785 SetGen[Packet],
786 _PacketList[Packet]
787]
790class SndRcvList(_PacketList[QueryAnswer],
791 BasePacketList[QueryAnswer],
792 _CanvasDumpExtended):
793 __slots__ = [] # type: List[str]
795 def __init__(self,
796 res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501
797 name="Results", # type: str
798 stats=None # type: Optional[List[Type[Packet]]]
799 ):
800 # type: (...) -> None
801 super(SndRcvList, self).__init__(res, name, stats)
803 def _elt2pkt(self, elt):
804 # type: (QueryAnswer) -> Packet
805 return elt[1]
807 def _elt2sum(self, elt):
808 # type: (QueryAnswer) -> str
809 return "%s ==> %s" % (elt[0].summary(), elt[1].summary())