1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Resolve Autonomous Systems (AS).
8"""
9
10
11import socket
12from scapy.config import conf
13from scapy.compat import plain_str
14
15from typing import (
16 Any,
17 Optional,
18 Tuple,
19 List,
20)
21
22
23class AS_resolver:
24 server = None
25 options = "-k" # type: Optional[str]
26
27 def __init__(self, server=None, port=43, options=None):
28 # type: (Optional[str], int, Optional[str]) -> None
29 if server is not None:
30 self.server = server
31 self.port = port
32 if options is not None:
33 self.options = options
34
35 def _start(self):
36 # type: () -> None
37 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
38 self.s.connect((self.server, self.port))
39 if self.options:
40 self.s.send(self.options.encode("utf8") + b"\n")
41 self.s.recv(8192)
42
43 def _stop(self):
44 # type: () -> None
45 self.s.close()
46
47 def _parse_whois(self, txt):
48 # type: (bytes) -> Tuple[Optional[str], str]
49 asn, desc = None, b""
50 for line in txt.splitlines():
51 if not asn and line.startswith(b"origin:"):
52 asn = plain_str(line[7:].strip())
53 if line.startswith(b"descr:"):
54 if desc:
55 desc += b"\n"
56 desc += line[6:].strip()
57 if asn is not None and desc:
58 break
59 return asn, plain_str(desc.strip())
60
61 def _resolve_one(self, ip):
62 # type: (str) -> Tuple[str, Optional[str], str]
63 self.s.send(("%s\n" % ip).encode("utf8"))
64 x = b""
65 while not (b"%" in x or b"source" in x):
66 d = self.s.recv(8192)
67 if not d:
68 break
69 x += d
70 asn, desc = self._parse_whois(x)
71 return ip, asn, desc
72
73 def resolve(self,
74 *ips # type: str
75 ):
76 # type: (...) -> List[Tuple[str, Optional[str], str]]
77 self._start()
78 ret = [] # type: List[Tuple[str, Optional[str], str]]
79 for ip in ips:
80 ip, asn, desc = self._resolve_one(ip)
81 if asn is not None:
82 ret.append((ip, asn, desc))
83 self._stop()
84 return ret
85
86
87class AS_resolver_riswhois(AS_resolver):
88 server = "riswhois.ripe.net"
89 options = "-k -M -1"
90
91
92class AS_resolver_radb(AS_resolver):
93 server = "whois.ra.net"
94 options = "-k -M"
95
96
97class AS_resolver_cymru(AS_resolver):
98 server = "whois.cymru.com"
99 options = None
100
101 def resolve(self,
102 *ips # type: str
103 ):
104 # type: (...) -> List[Tuple[str, Optional[str], str]]
105 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
106 s.connect((self.server, self.port))
107 s.send(
108 b"begin\r\n" +
109 b"\r\n".join(ip.encode() for ip in ips) +
110 b"\r\nend\r\n"
111 )
112 r = b""
113 while True:
114 line = s.recv(8192)
115 if line == b"":
116 break
117 r += line
118 s.close()
119
120 return self.parse(r)
121
122 def parse(self, data):
123 # type: (bytes) -> List[Tuple[str, Optional[str], str]]
124 """Parse bulk cymru data"""
125
126 ASNlist = [] # type: List[Tuple[str, Optional[str], str]]
127 for line in plain_str(data).splitlines()[1:]:
128 if "|" not in line:
129 continue
130 asn, ip, desc = [elt.strip() for elt in line.split('|')]
131 if asn == "NA":
132 continue
133 asn = "AS%s" % asn
134 ASNlist.append((ip, asn, desc))
135 return ASNlist
136
137
138class AS_resolver_multi(AS_resolver):
139 def __init__(self, *reslist):
140 # type: (*AS_resolver) -> None
141 AS_resolver.__init__(self)
142 if reslist:
143 self.resolvers_list = reslist
144 else:
145 self.resolvers_list = (AS_resolver_radb(),
146 AS_resolver_cymru())
147
148 def resolve(self, *ips):
149 # type: (*Any) -> List[Tuple[str, Optional[str], str]]
150 todo = ips
151 ret = []
152 for ASres in self.resolvers_list:
153 try:
154 res = ASres.resolve(*todo)
155 except socket.error:
156 continue
157 todo = tuple(ip for ip in todo if ip not in [r[0] for r in res])
158 ret += res
159 if not todo:
160 break
161 return ret
162
163
164conf.AS_resolver = AS_resolver_multi()