Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/localinterfaces.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1"""Utilities for identifying local IP addresses.""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import os 

8import re 

9import socket 

10import subprocess 

11from collections.abc import Callable, Iterable, Mapping, Sequence 

12from subprocess import PIPE, Popen 

13from typing import Any 

14from warnings import warn 

15 

16LOCAL_IPS: list[str] = [] 

17PUBLIC_IPS: list[str] = [] 

18 

19LOCALHOST: str = "" 

20 

21 

22def _uniq_stable(elems: Iterable) -> list: 

23 """uniq_stable(elems) -> list 

24 

25 Return from an iterable, a list of all the unique elements in the input, 

26 maintaining the order in which they first appear. 

27 """ 

28 seen = set() 

29 value = [] 

30 for x in elems: 

31 if x not in seen: 

32 value.append(x) 

33 seen.add(x) 

34 return value 

35 

36 

37def _get_output(cmd: str | Sequence[str]) -> str: 

38 """Get output of a command, raising IOError if it fails""" 

39 startupinfo = None 

40 if os.name == "nt": 

41 startupinfo = subprocess.STARTUPINFO() # type:ignore[attr-defined] 

42 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type:ignore[attr-defined] 

43 p = Popen(cmd, stdout=PIPE, stderr=PIPE, startupinfo=startupinfo) # noqa 

44 stdout, stderr = p.communicate() 

45 if p.returncode: 

46 msg = "Failed to run {}: {}".format(cmd, stderr.decode("utf8", "replace")) 

47 raise OSError(msg) 

48 return stdout.decode("utf8", "replace") 

49 

50 

51def _only_once(f: Callable) -> Callable: 

52 """decorator to only run a function once""" 

53 f.called = False # type:ignore[attr-defined] 

54 

55 def wrapped(**kwargs: Any) -> Any: 

56 if f.called: # type:ignore[attr-defined] 

57 return 

58 ret = f(**kwargs) 

59 f.called = True # type:ignore[attr-defined] 

60 return ret 

61 

62 return wrapped 

63 

64 

65def _requires_ips(f: Callable) -> Callable: 

66 """decorator to ensure load_ips has been run before f""" 

67 

68 def ips_loaded(*args: Any, **kwargs: Any) -> Any: 

69 _load_ips() 

70 return f(*args, **kwargs) 

71 

72 return ips_loaded 

73 

74 

75# subprocess-parsing ip finders 

76class NoIPAddresses(Exception): # noqa 

77 pass 

78 

79 

80def _populate_from_list(addrs: Sequence[str]) -> None: 

81 """populate local and public IPs from flat list of all IPs""" 

82 _populate_from_dict({"all": addrs}) 

83 

84 

85def _populate_from_dict(addrs: Mapping[str, Sequence[str]]) -> None: 

86 """populate local and public IPs from dict of {'en0': 'ip'}""" 

87 if not addrs: 

88 raise NoIPAddresses() 

89 

90 global LOCALHOST 

91 public_ips = [] 

92 local_ips = [] 

93 

94 for iface, ip_list in addrs.items(): 

95 for ip in ip_list: 

96 local_ips.append(ip) 

97 if not LOCALHOST and (iface.startswith("lo") or ip.startswith("127.")): 

98 LOCALHOST = ip 

99 if not iface.startswith("lo") and not ip.startswith(("127.", "169.254.")): 

100 # don't include link-local address in public_ips 

101 public_ips.append(ip) 

102 

103 if not LOCALHOST or LOCALHOST == "127.0.0.1": 

104 LOCALHOST = "127.0.0.1" 

105 local_ips.insert(0, LOCALHOST) 

106 

107 local_ips.extend(["0.0.0.0", ""]) # noqa: S104 

108 

109 LOCAL_IPS[:] = _uniq_stable(local_ips) 

110 PUBLIC_IPS[:] = _uniq_stable(public_ips) 

111 

112 

113_ifconfig_ipv4_pat = re.compile(r"inet\b.*?(\d+\.\d+\.\d+\.\d+)", re.IGNORECASE) 

114 

115 

116def _load_ips_ifconfig() -> None: 

117 """load ip addresses from `ifconfig` output (posix)""" 

118 

119 try: 

120 out = _get_output("ifconfig") 

121 except OSError: 

122 # no ifconfig, it's usually in /sbin and /sbin is not on everyone's PATH 

123 out = _get_output("/sbin/ifconfig") 

124 

125 lines = out.splitlines() 

126 addrs = [] 

127 for line in lines: 

128 m = _ifconfig_ipv4_pat.match(line.strip()) 

129 if m: 

130 addrs.append(m.group(1)) 

131 _populate_from_list(addrs) 

132 

133 

134def _load_ips_ip() -> None: 

135 """load ip addresses from `ip addr` output (Linux)""" 

136 out = _get_output(["ip", "-f", "inet", "addr"]) 

137 

138 lines = out.splitlines() 

139 addrs = [] 

140 for line in lines: 

141 blocks = line.lower().split() 

142 if (len(blocks) >= 2) and (blocks[0] == "inet"): 

143 addrs.append(blocks[1].split("/")[0]) 

144 _populate_from_list(addrs) 

145 

146 

147_ipconfig_ipv4_pat = re.compile(r"ipv4.*?(\d+\.\d+\.\d+\.\d+)$", re.IGNORECASE) 

148 

149 

150def _load_ips_ipconfig() -> None: 

151 """load ip addresses from `ipconfig` output (Windows)""" 

152 out = _get_output("ipconfig") 

153 

154 lines = out.splitlines() 

155 addrs = [] 

156 for line in lines: 

157 m = _ipconfig_ipv4_pat.match(line.strip()) 

158 if m: 

159 addrs.append(m.group(1)) 

160 _populate_from_list(addrs) 

161 

162 

163def _load_ips_psutil() -> None: 

164 """load ip addresses with psutil""" 

165 import psutil 

166 

167 addr_dict: dict[str, list[str]] = {} 

168 

169 # dict of iface_name: address_list, eg 

170 # {"lo": [snicaddr(family=<AddressFamily.AF_INET>, address="127.0.0.1", 

171 # ...), snicaddr(family=<AddressFamily.AF_INET6>, ...)]} 

172 for iface, ifaddresses in psutil.net_if_addrs().items(): 

173 addr_dict[iface] = [ 

174 address_data.address 

175 for address_data in ifaddresses 

176 if address_data.family == socket.AF_INET 

177 ] 

178 

179 _populate_from_dict(addr_dict) 

180 

181 

182def _load_ips_netifaces() -> None: 

183 """load ip addresses with netifaces""" 

184 import netifaces 

185 

186 addr_dict: dict[str, list[str]] = {} 

187 

188 # list of iface names, 'lo0', 'eth0', etc. 

189 for iface in netifaces.interfaces(): 

190 # list of ipv4 addrinfo dicts 

191 addr_dict[iface] = [] 

192 

193 ipv4s = netifaces.ifaddresses(iface).get(netifaces.AF_INET, []) 

194 for entry in ipv4s: 

195 addr = entry.get("addr") 

196 if addr: 

197 addr_dict[iface].append(addr) 

198 _populate_from_dict(addr_dict) 

199 

200 

201def _load_ips_gethostbyname() -> None: 

202 """load ip addresses with socket.gethostbyname_ex 

203 

204 This can be slow. 

205 """ 

206 global LOCALHOST 

207 try: 

208 LOCAL_IPS[:] = socket.gethostbyname_ex("localhost")[2] 

209 except OSError: 

210 # assume common default 

211 LOCAL_IPS[:] = ["127.0.0.1"] 

212 

213 try: 

214 hostname = socket.gethostname() 

215 PUBLIC_IPS[:] = socket.gethostbyname_ex(hostname)[2] 

216 # try hostname.local, in case hostname has been short-circuited to loopback 

217 if not hostname.endswith(".local") and all(ip.startswith("127") for ip in PUBLIC_IPS): 

218 PUBLIC_IPS[:] = socket.gethostbyname_ex(socket.gethostname() + ".local")[2] 

219 except OSError: 

220 pass 

221 finally: 

222 PUBLIC_IPS[:] = _uniq_stable(PUBLIC_IPS) 

223 LOCAL_IPS.extend(PUBLIC_IPS) 

224 

225 # include all-interface aliases: 0.0.0.0 and '' 

226 LOCAL_IPS.extend(["0.0.0.0", ""]) # noqa 

227 

228 LOCAL_IPS[:] = _uniq_stable(LOCAL_IPS) 

229 

230 LOCALHOST = LOCAL_IPS[0] 

231 

232 

233def _load_ips_dumb() -> None: 

234 """Fallback in case of unexpected failure""" 

235 global LOCALHOST 

236 LOCALHOST = "127.0.0.1" 

237 LOCAL_IPS[:] = [LOCALHOST, "0.0.0.0", ""] # noqa 

238 PUBLIC_IPS[:] = [] 

239 

240 

241@_only_once 

242def _load_ips(suppress_exceptions: bool = True) -> None: 

243 """load the IPs that point to this machine 

244 

245 This function will only ever be called once. 

246 

247 If will use psutil to do it quickly if available. 

248 If not, it will use netifaces to do it quickly if available. 

249 Then it will fallback on parsing the output of ifconfig / ip addr / ipconfig, as appropriate. 

250 Finally, it will fallback on socket.gethostbyname_ex, which can be slow. 

251 """ 

252 

253 try: 

254 # first priority, use psutil 

255 try: 

256 return _load_ips_psutil() 

257 except ImportError: 

258 pass 

259 

260 # second priority, use netifaces 

261 try: 

262 return _load_ips_netifaces() 

263 except ImportError: 

264 pass 

265 

266 # second priority, parse subprocess output (how reliable is this?) 

267 

268 if os.name == "nt": 

269 try: 

270 return _load_ips_ipconfig() 

271 except (OSError, NoIPAddresses): 

272 pass 

273 else: 

274 try: 

275 return _load_ips_ip() 

276 except (OSError, NoIPAddresses): 

277 pass 

278 try: 

279 return _load_ips_ifconfig() 

280 except (OSError, NoIPAddresses): 

281 pass 

282 

283 # lowest priority, use gethostbyname 

284 

285 return _load_ips_gethostbyname() 

286 except Exception as e: 

287 if not suppress_exceptions: 

288 raise 

289 # unexpected error shouldn't crash, load dumb default values instead. 

290 warn("Unexpected error discovering local network interfaces: %s" % e, stacklevel=2) 

291 _load_ips_dumb() 

292 

293 

294@_requires_ips 

295def local_ips() -> list[str]: 

296 """return the IP addresses that point to this machine""" 

297 return LOCAL_IPS 

298 

299 

300@_requires_ips 

301def public_ips() -> list[str]: 

302 """return the IP addresses for this machine that are visible to other machines""" 

303 return PUBLIC_IPS 

304 

305 

306@_requires_ips 

307def localhost() -> str: 

308 """return ip for localhost (almost always 127.0.0.1)""" 

309 return LOCALHOST 

310 

311 

312@_requires_ips 

313def is_local_ip(ip: str) -> bool: 

314 """does `ip` point to this machine?""" 

315 return ip in LOCAL_IPS 

316 

317 

318@_requires_ips 

319def is_public_ip(ip: str) -> bool: 

320 """is `ip` a publicly visible address?""" 

321 return ip in PUBLIC_IPS