Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/as_resolvers.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Resolve Autonomous Systems (AS). 

8""" 

9 

10 

11import socket 

12from scapy.config import conf 

13from scapy.compat import plain_str 

14 

15from typing import ( 

16 Any, 

17 Optional, 

18 Tuple, 

19 List, 

20) 

21 

22 

23class AS_resolver: 

24 server = None 

25 options = "-k" # type: Optional[str] 

26 

27 def __init__(self, server=None, port=43, options=None): 

28 # type: (Optional[str], int, Optional[str]) -> None 

29 if server is not None: 

30 self.server = server 

31 self.port = port 

32 if options is not None: 

33 self.options = options 

34 

35 def _start(self): 

36 # type: () -> None 

37 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

38 self.s.connect((self.server, self.port)) 

39 if self.options: 

40 self.s.send(self.options.encode("utf8") + b"\n") 

41 self.s.recv(8192) 

42 

43 def _stop(self): 

44 # type: () -> None 

45 self.s.close() 

46 

47 def _parse_whois(self, txt): 

48 # type: (bytes) -> Tuple[Optional[str], str] 

49 asn, desc = None, b"" 

50 for line in txt.splitlines(): 

51 if not asn and line.startswith(b"origin:"): 

52 asn = plain_str(line[7:].strip()) 

53 if line.startswith(b"descr:"): 

54 if desc: 

55 desc += b"\n" 

56 desc += line[6:].strip() 

57 if asn is not None and desc: 

58 break 

59 return asn, plain_str(desc.strip()) 

60 

61 def _resolve_one(self, ip): 

62 # type: (str) -> Tuple[str, Optional[str], str] 

63 self.s.send(("%s\n" % ip).encode("utf8")) 

64 x = b"" 

65 while not (b"%" in x or b"source" in x): 

66 x += self.s.recv(8192) 

67 asn, desc = self._parse_whois(x) 

68 return ip, asn, desc 

69 

70 def resolve(self, 

71 *ips # type: str 

72 ): 

73 # type: (...) -> List[Tuple[str, Optional[str], str]] 

74 self._start() 

75 ret = [] # type: List[Tuple[str, Optional[str], str]] 

76 for ip in ips: 

77 ip, asn, desc = self._resolve_one(ip) 

78 if asn is not None: 

79 ret.append((ip, asn, desc)) 

80 self._stop() 

81 return ret 

82 

83 

84class AS_resolver_riswhois(AS_resolver): 

85 server = "riswhois.ripe.net" 

86 options = "-k -M -1" 

87 

88 

89class AS_resolver_radb(AS_resolver): 

90 server = "whois.ra.net" 

91 options = "-k -M" 

92 

93 

94class AS_resolver_cymru(AS_resolver): 

95 server = "whois.cymru.com" 

96 options = None 

97 

98 def resolve(self, 

99 *ips # type: str 

100 ): 

101 # type: (...) -> List[Tuple[str, Optional[str], str]] 

102 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

103 s.connect((self.server, self.port)) 

104 s.send( 

105 b"begin\r\n" + 

106 b"\r\n".join(ip.encode() for ip in ips) + 

107 b"\r\nend\r\n" 

108 ) 

109 r = b"" 

110 while True: 

111 line = s.recv(8192) 

112 if line == b"": 

113 break 

114 r += line 

115 s.close() 

116 

117 return self.parse(r) 

118 

119 def parse(self, data): 

120 # type: (bytes) -> List[Tuple[str, Optional[str], str]] 

121 """Parse bulk cymru data""" 

122 

123 ASNlist = [] # type: List[Tuple[str, Optional[str], str]] 

124 for line in plain_str(data).splitlines()[1:]: 

125 if "|" not in line: 

126 continue 

127 asn, ip, desc = [elt.strip() for elt in line.split('|')] 

128 if asn == "NA": 

129 continue 

130 asn = "AS%s" % asn 

131 ASNlist.append((ip, asn, desc)) 

132 return ASNlist 

133 

134 

135class AS_resolver_multi(AS_resolver): 

136 def __init__(self, *reslist): 

137 # type: (*AS_resolver) -> None 

138 AS_resolver.__init__(self) 

139 if reslist: 

140 self.resolvers_list = reslist 

141 else: 

142 self.resolvers_list = (AS_resolver_radb(), 

143 AS_resolver_cymru()) 

144 

145 def resolve(self, *ips): 

146 # type: (*Any) -> List[Tuple[str, Optional[str], str]] 

147 todo = ips 

148 ret = [] 

149 for ASres in self.resolvers_list: 

150 try: 

151 res = ASres.resolve(*todo) 

152 except socket.error: 

153 continue 

154 todo = tuple(ip for ip in todo if ip not in [r[0] for r in res]) 

155 ret += res 

156 if not todo: 

157 break 

158 return ret 

159 

160 

161conf.AS_resolver = AS_resolver_multi()