1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Resolve Autonomous Systems (AS).
8"""
9
10
11import socket
12from scapy.config import conf
13from scapy.compat import plain_str
14
15from typing import (
16 Any,
17 Optional,
18 Tuple,
19 List,
20)
21
22
23class AS_resolver:
24 server = None
25 options = "-k" # type: Optional[str]
26
27 def __init__(self, server=None, port=43, options=None):
28 # type: (Optional[str], int, Optional[str]) -> None
29 if server is not None:
30 self.server = server
31 self.port = port
32 if options is not None:
33 self.options = options
34
35 def _start(self):
36 # type: () -> None
37 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
38 self.s.connect((self.server, self.port))
39 if self.options:
40 self.s.send(self.options.encode("utf8") + b"\n")
41 self.s.recv(8192)
42
43 def _stop(self):
44 # type: () -> None
45 self.s.close()
46
47 def _parse_whois(self, txt):
48 # type: (bytes) -> Tuple[Optional[str], str]
49 asn, desc = None, b""
50 for line in txt.splitlines():
51 if not asn and line.startswith(b"origin:"):
52 asn = plain_str(line[7:].strip())
53 if line.startswith(b"descr:"):
54 if desc:
55 desc += b"\n"
56 desc += line[6:].strip()
57 if asn is not None and desc:
58 break
59 return asn, plain_str(desc.strip())
60
61 def _resolve_one(self, ip):
62 # type: (str) -> Tuple[str, Optional[str], str]
63 self.s.send(("%s\n" % ip).encode("utf8"))
64 x = b""
65 while not (b"%" in x or b"source" in x):
66 x += self.s.recv(8192)
67 asn, desc = self._parse_whois(x)
68 return ip, asn, desc
69
70 def resolve(self,
71 *ips # type: str
72 ):
73 # type: (...) -> List[Tuple[str, Optional[str], str]]
74 self._start()
75 ret = [] # type: List[Tuple[str, Optional[str], str]]
76 for ip in ips:
77 ip, asn, desc = self._resolve_one(ip)
78 if asn is not None:
79 ret.append((ip, asn, desc))
80 self._stop()
81 return ret
82
83
84class AS_resolver_riswhois(AS_resolver):
85 server = "riswhois.ripe.net"
86 options = "-k -M -1"
87
88
89class AS_resolver_radb(AS_resolver):
90 server = "whois.ra.net"
91 options = "-k -M"
92
93
94class AS_resolver_cymru(AS_resolver):
95 server = "whois.cymru.com"
96 options = None
97
98 def resolve(self,
99 *ips # type: str
100 ):
101 # type: (...) -> List[Tuple[str, Optional[str], str]]
102 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103 s.connect((self.server, self.port))
104 s.send(
105 b"begin\r\n" +
106 b"\r\n".join(ip.encode() for ip in ips) +
107 b"\r\nend\r\n"
108 )
109 r = b""
110 while True:
111 line = s.recv(8192)
112 if line == b"":
113 break
114 r += line
115 s.close()
116
117 return self.parse(r)
118
119 def parse(self, data):
120 # type: (bytes) -> List[Tuple[str, Optional[str], str]]
121 """Parse bulk cymru data"""
122
123 ASNlist = [] # type: List[Tuple[str, Optional[str], str]]
124 for line in plain_str(data).splitlines()[1:]:
125 if "|" not in line:
126 continue
127 asn, ip, desc = [elt.strip() for elt in line.split('|')]
128 if asn == "NA":
129 continue
130 asn = "AS%s" % asn
131 ASNlist.append((ip, asn, desc))
132 return ASNlist
133
134
135class AS_resolver_multi(AS_resolver):
136 def __init__(self, *reslist):
137 # type: (*AS_resolver) -> None
138 AS_resolver.__init__(self)
139 if reslist:
140 self.resolvers_list = reslist
141 else:
142 self.resolvers_list = (AS_resolver_radb(),
143 AS_resolver_cymru())
144
145 def resolve(self, *ips):
146 # type: (*Any) -> List[Tuple[str, Optional[str], str]]
147 todo = ips
148 ret = []
149 for ASres in self.resolvers_list:
150 try:
151 res = ASres.resolve(*todo)
152 except socket.error:
153 continue
154 todo = tuple(ip for ip in todo if ip not in [r[0] for r in res])
155 ret += res
156 if not todo:
157 break
158 return ret
159
160
161conf.AS_resolver = AS_resolver_multi()