Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/as_resolvers.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

106 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Resolve Autonomous Systems (AS). 

8""" 

9 

10 

11import socket 

12from scapy.config import conf 

13from scapy.compat import plain_str 

14 

15from typing import ( 

16 Any, 

17 Optional, 

18 Tuple, 

19 List, 

20) 

21 

22 

23class AS_resolver: 

24 server = None 

25 options = "-k" # type: Optional[str] 

26 

27 def __init__(self, server=None, port=43, options=None): 

28 # type: (Optional[str], int, Optional[str]) -> None 

29 if server is not None: 

30 self.server = server 

31 self.port = port 

32 if options is not None: 

33 self.options = options 

34 

35 def _start(self): 

36 # type: () -> None 

37 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

38 self.s.connect((self.server, self.port)) 

39 if self.options: 

40 self.s.send(self.options.encode("utf8") + b"\n") 

41 self.s.recv(8192) 

42 

43 def _stop(self): 

44 # type: () -> None 

45 self.s.close() 

46 

47 def _parse_whois(self, txt): 

48 # type: (bytes) -> Tuple[Optional[str], str] 

49 asn, desc = None, b"" 

50 for line in txt.splitlines(): 

51 if not asn and line.startswith(b"origin:"): 

52 asn = plain_str(line[7:].strip()) 

53 if line.startswith(b"descr:"): 

54 if desc: 

55 desc += b"\n" 

56 desc += line[6:].strip() 

57 if asn is not None and desc: 

58 break 

59 return asn, plain_str(desc.strip()) 

60 

61 def _resolve_one(self, ip): 

62 # type: (str) -> Tuple[str, Optional[str], str] 

63 self.s.send(("%s\n" % ip).encode("utf8")) 

64 x = b"" 

65 while not (b"%" in x or b"source" in x): 

66 d = self.s.recv(8192) 

67 if not d: 

68 break 

69 x += d 

70 asn, desc = self._parse_whois(x) 

71 return ip, asn, desc 

72 

73 def resolve(self, 

74 *ips # type: str 

75 ): 

76 # type: (...) -> List[Tuple[str, Optional[str], str]] 

77 self._start() 

78 ret = [] # type: List[Tuple[str, Optional[str], str]] 

79 for ip in ips: 

80 ip, asn, desc = self._resolve_one(ip) 

81 if asn is not None: 

82 ret.append((ip, asn, desc)) 

83 self._stop() 

84 return ret 

85 

86 

87class AS_resolver_riswhois(AS_resolver): 

88 server = "riswhois.ripe.net" 

89 options = "-k -M -1" 

90 

91 

92class AS_resolver_radb(AS_resolver): 

93 server = "whois.ra.net" 

94 options = "-k -M" 

95 

96 

97class AS_resolver_cymru(AS_resolver): 

98 server = "whois.cymru.com" 

99 options = None 

100 

101 def resolve(self, 

102 *ips # type: str 

103 ): 

104 # type: (...) -> List[Tuple[str, Optional[str], str]] 

105 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

106 s.connect((self.server, self.port)) 

107 s.send( 

108 b"begin\r\n" + 

109 b"\r\n".join(ip.encode() for ip in ips) + 

110 b"\r\nend\r\n" 

111 ) 

112 r = b"" 

113 while True: 

114 line = s.recv(8192) 

115 if line == b"": 

116 break 

117 r += line 

118 s.close() 

119 

120 return self.parse(r) 

121 

122 def parse(self, data): 

123 # type: (bytes) -> List[Tuple[str, Optional[str], str]] 

124 """Parse bulk cymru data""" 

125 

126 ASNlist = [] # type: List[Tuple[str, Optional[str], str]] 

127 for line in plain_str(data).splitlines()[1:]: 

128 if "|" not in line: 

129 continue 

130 asn, ip, desc = [elt.strip() for elt in line.split('|')] 

131 if asn == "NA": 

132 continue 

133 asn = "AS%s" % asn 

134 ASNlist.append((ip, asn, desc)) 

135 return ASNlist 

136 

137 

138class AS_resolver_multi(AS_resolver): 

139 def __init__(self, *reslist): 

140 # type: (*AS_resolver) -> None 

141 AS_resolver.__init__(self) 

142 if reslist: 

143 self.resolvers_list = reslist 

144 else: 

145 self.resolvers_list = (AS_resolver_radb(), 

146 AS_resolver_cymru()) 

147 

148 def resolve(self, *ips): 

149 # type: (*Any) -> List[Tuple[str, Optional[str], str]] 

150 todo = ips 

151 ret = [] 

152 for ASres in self.resolvers_list: 

153 try: 

154 res = ASres.resolve(*todo) 

155 except socket.error: 

156 continue 

157 todo = tuple(ip for ip in todo if ip not in [r[0] for r in res]) 

158 ret += res 

159 if not todo: 

160 break 

161 return ret 

162 

163 

164conf.AS_resolver = AS_resolver_multi()