Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/as_resolvers.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Resolve Autonomous Systems (AS).
8"""
11import socket
12from scapy.config import conf
13from scapy.compat import plain_str
15from typing import (
16 Any,
17 Optional,
18 Tuple,
19 List,
20)
23class AS_resolver:
24 server = None
25 options = "-k" # type: Optional[str]
27 def __init__(self, server=None, port=43, options=None):
28 # type: (Optional[str], int, Optional[str]) -> None
29 if server is not None:
30 self.server = server
31 self.port = port
32 if options is not None:
33 self.options = options
35 def _start(self):
36 # type: () -> None
37 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
38 self.s.connect((self.server, self.port))
39 if self.options:
40 self.s.send(self.options.encode("utf8") + b"\n")
41 self.s.recv(8192)
43 def _stop(self):
44 # type: () -> None
45 self.s.close()
47 def _parse_whois(self, txt):
48 # type: (bytes) -> Tuple[Optional[str], str]
49 asn, desc = None, b""
50 for line in txt.splitlines():
51 if not asn and line.startswith(b"origin:"):
52 asn = plain_str(line[7:].strip())
53 if line.startswith(b"descr:"):
54 if desc:
55 desc += b"\n"
56 desc += line[6:].strip()
57 if asn is not None and desc:
58 break
59 return asn, plain_str(desc.strip())
61 def _resolve_one(self, ip):
62 # type: (str) -> Tuple[str, Optional[str], str]
63 self.s.send(("%s\n" % ip).encode("utf8"))
64 x = b""
65 while not (b"%" in x or b"source" in x):
66 d = self.s.recv(8192)
67 if not d:
68 break
69 x += d
70 asn, desc = self._parse_whois(x)
71 return ip, asn, desc
73 def resolve(self,
74 *ips # type: str
75 ):
76 # type: (...) -> List[Tuple[str, Optional[str], str]]
77 self._start()
78 ret = [] # type: List[Tuple[str, Optional[str], str]]
79 for ip in ips:
80 ip, asn, desc = self._resolve_one(ip)
81 if asn is not None:
82 ret.append((ip, asn, desc))
83 self._stop()
84 return ret
87class AS_resolver_riswhois(AS_resolver):
88 server = "riswhois.ripe.net"
89 options = "-k -M -1"
92class AS_resolver_radb(AS_resolver):
93 server = "whois.ra.net"
94 options = "-k -M"
97class AS_resolver_cymru(AS_resolver):
98 server = "whois.cymru.com"
99 options = None
101 def resolve(self,
102 *ips # type: str
103 ):
104 # type: (...) -> List[Tuple[str, Optional[str], str]]
105 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
106 s.connect((self.server, self.port))
107 s.send(
108 b"begin\r\n" +
109 b"\r\n".join(ip.encode() for ip in ips) +
110 b"\r\nend\r\n"
111 )
112 r = b""
113 while True:
114 line = s.recv(8192)
115 if line == b"":
116 break
117 r += line
118 s.close()
120 return self.parse(r)
122 def parse(self, data):
123 # type: (bytes) -> List[Tuple[str, Optional[str], str]]
124 """Parse bulk cymru data"""
126 ASNlist = [] # type: List[Tuple[str, Optional[str], str]]
127 for line in plain_str(data).splitlines()[1:]:
128 if "|" not in line:
129 continue
130 asn, ip, desc = [elt.strip() for elt in line.split('|')]
131 if asn == "NA":
132 continue
133 asn = "AS%s" % asn
134 ASNlist.append((ip, asn, desc))
135 return ASNlist
138class AS_resolver_multi(AS_resolver):
139 def __init__(self, *reslist):
140 # type: (*AS_resolver) -> None
141 AS_resolver.__init__(self)
142 if reslist:
143 self.resolvers_list = reslist
144 else:
145 self.resolvers_list = (AS_resolver_radb(),
146 AS_resolver_cymru())
148 def resolve(self, *ips):
149 # type: (*Any) -> List[Tuple[str, Optional[str], str]]
150 todo = ips
151 ret = []
152 for ASres in self.resolvers_list:
153 try:
154 res = ASres.resolve(*todo)
155 except socket.error:
156 continue
157 todo = tuple(ip for ip in todo if ip not in [r[0] for r in res])
158 ret += res
159 if not todo:
160 break
161 return ret
164conf.AS_resolver = AS_resolver_multi()