Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/jupyter_client/localinterfaces.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

201 statements  

1"""Utilities for identifying local IP addresses.""" 

2# Copyright (c) Jupyter Development Team. 

3# Distributed under the terms of the Modified BSD License. 

4from __future__ import annotations 

5 

6import os 

7import re 

8import socket 

9import subprocess 

10from subprocess import PIPE, Popen 

11from typing import Any, Callable, Iterable, Sequence 

12from warnings import warn 

13 

14LOCAL_IPS: list = [] 

15PUBLIC_IPS: list = [] 

16 

17LOCALHOST: str = "" 

18 

19 

20def _uniq_stable(elems: Iterable) -> list: 

21 """uniq_stable(elems) -> list 

22 

23 Return from an iterable, a list of all the unique elements in the input, 

24 maintaining the order in which they first appear. 

25 """ 

26 seen = set() 

27 value = [] 

28 for x in elems: 

29 if x not in seen: 

30 value.append(x) 

31 seen.add(x) 

32 return value 

33 

34 

35def _get_output(cmd: str | Sequence[str]) -> str: 

36 """Get output of a command, raising IOError if it fails""" 

37 startupinfo = None 

38 if os.name == "nt": 

39 startupinfo = subprocess.STARTUPINFO() # type:ignore[attr-defined] 

40 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type:ignore[attr-defined] 

41 p = Popen(cmd, stdout=PIPE, stderr=PIPE, startupinfo=startupinfo) # noqa 

42 stdout, stderr = p.communicate() 

43 if p.returncode: 

44 msg = "Failed to run {}: {}".format(cmd, stderr.decode("utf8", "replace")) 

45 raise OSError(msg) 

46 return stdout.decode("utf8", "replace") 

47 

48 

49def _only_once(f: Callable) -> Callable: 

50 """decorator to only run a function once""" 

51 f.called = False # type:ignore[attr-defined] 

52 

53 def wrapped(**kwargs: Any) -> Any: 

54 if f.called: # type:ignore[attr-defined] 

55 return 

56 ret = f(**kwargs) 

57 f.called = True # type:ignore[attr-defined] 

58 return ret 

59 

60 return wrapped 

61 

62 

63def _requires_ips(f: Callable) -> Callable: 

64 """decorator to ensure load_ips has been run before f""" 

65 

66 def ips_loaded(*args: Any, **kwargs: Any) -> Any: 

67 _load_ips() 

68 return f(*args, **kwargs) 

69 

70 return ips_loaded 

71 

72 

73# subprocess-parsing ip finders 

74class NoIPAddresses(Exception): # noqa 

75 pass 

76 

77 

78def _populate_from_list(addrs: Sequence[str] | None) -> None: 

79 """populate local and public IPs from flat list of all IPs""" 

80 if not addrs: 

81 raise NoIPAddresses 

82 

83 global LOCALHOST 

84 public_ips = [] 

85 local_ips = [] 

86 

87 for ip in addrs: 

88 local_ips.append(ip) 

89 if not ip.startswith("127."): 

90 public_ips.append(ip) 

91 elif not LOCALHOST: 

92 LOCALHOST = ip 

93 

94 if not LOCALHOST or LOCALHOST == "127.0.0.1": 

95 LOCALHOST = "127.0.0.1" 

96 local_ips.insert(0, LOCALHOST) 

97 

98 local_ips.extend(["0.0.0.0", ""]) # noqa 

99 

100 LOCAL_IPS[:] = _uniq_stable(local_ips) 

101 PUBLIC_IPS[:] = _uniq_stable(public_ips) 

102 

103 

104_ifconfig_ipv4_pat = re.compile(r"inet\b.*?(\d+\.\d+\.\d+\.\d+)", re.IGNORECASE) 

105 

106 

107def _load_ips_ifconfig() -> None: 

108 """load ip addresses from `ifconfig` output (posix)""" 

109 

110 try: 

111 out = _get_output("ifconfig") 

112 except OSError: 

113 # no ifconfig, it's usually in /sbin and /sbin is not on everyone's PATH 

114 out = _get_output("/sbin/ifconfig") 

115 

116 lines = out.splitlines() 

117 addrs = [] 

118 for line in lines: 

119 m = _ifconfig_ipv4_pat.match(line.strip()) 

120 if m: 

121 addrs.append(m.group(1)) 

122 _populate_from_list(addrs) 

123 

124 

125def _load_ips_ip() -> None: 

126 """load ip addresses from `ip addr` output (Linux)""" 

127 out = _get_output(["ip", "-f", "inet", "addr"]) 

128 

129 lines = out.splitlines() 

130 addrs = [] 

131 for line in lines: 

132 blocks = line.lower().split() 

133 if (len(blocks) >= 2) and (blocks[0] == "inet"): 

134 addrs.append(blocks[1].split("/")[0]) 

135 _populate_from_list(addrs) 

136 

137 

138_ipconfig_ipv4_pat = re.compile(r"ipv4.*?(\d+\.\d+\.\d+\.\d+)$", re.IGNORECASE) 

139 

140 

141def _load_ips_ipconfig() -> None: 

142 """load ip addresses from `ipconfig` output (Windows)""" 

143 out = _get_output("ipconfig") 

144 

145 lines = out.splitlines() 

146 addrs = [] 

147 for line in lines: 

148 m = _ipconfig_ipv4_pat.match(line.strip()) 

149 if m: 

150 addrs.append(m.group(1)) 

151 _populate_from_list(addrs) 

152 

153 

154def _load_ips_psutil() -> None: 

155 """load ip addresses with netifaces""" 

156 import psutil 

157 

158 global LOCALHOST 

159 local_ips = [] 

160 public_ips = [] 

161 

162 # dict of iface_name: address_list, eg 

163 # {"lo": [snicaddr(family=<AddressFamily.AF_INET>, address="127.0.0.1", 

164 # ...), snicaddr(family=<AddressFamily.AF_INET6>, ...)]} 

165 for iface, ifaddresses in psutil.net_if_addrs().items(): 

166 for address_data in ifaddresses: 

167 if address_data.family == socket.AF_INET: 

168 addr = address_data.address 

169 if not (iface.startswith("lo") or addr.startswith("127.")): 

170 public_ips.append(addr) 

171 elif not LOCALHOST: 

172 LOCALHOST = addr 

173 local_ips.append(addr) 

174 if not LOCALHOST: 

175 # we never found a loopback interface (can this ever happen?), assume common default 

176 LOCALHOST = "127.0.0.1" 

177 local_ips.insert(0, LOCALHOST) 

178 local_ips.extend(["0.0.0.0", ""]) # noqa 

179 LOCAL_IPS[:] = _uniq_stable(local_ips) 

180 PUBLIC_IPS[:] = _uniq_stable(public_ips) 

181 

182 

183def _load_ips_netifaces() -> None: 

184 """load ip addresses with netifaces""" 

185 import netifaces # type: ignore[import-not-found] 

186 

187 global LOCALHOST 

188 local_ips = [] 

189 public_ips = [] 

190 

191 # list of iface names, 'lo0', 'eth0', etc. 

192 for iface in netifaces.interfaces(): 

193 # list of ipv4 addrinfo dicts 

194 ipv4s = netifaces.ifaddresses(iface).get(netifaces.AF_INET, []) 

195 for entry in ipv4s: 

196 addr = entry.get("addr") 

197 if not addr: 

198 continue 

199 if not (iface.startswith("lo") or addr.startswith("127.")): 

200 public_ips.append(addr) 

201 elif not LOCALHOST: 

202 LOCALHOST = addr 

203 local_ips.append(addr) 

204 if not LOCALHOST: 

205 # we never found a loopback interface (can this ever happen?), assume common default 

206 LOCALHOST = "127.0.0.1" 

207 local_ips.insert(0, LOCALHOST) 

208 local_ips.extend(["0.0.0.0", ""]) # noqa 

209 LOCAL_IPS[:] = _uniq_stable(local_ips) 

210 PUBLIC_IPS[:] = _uniq_stable(public_ips) 

211 

212 

213def _load_ips_gethostbyname() -> None: 

214 """load ip addresses with socket.gethostbyname_ex 

215 

216 This can be slow. 

217 """ 

218 global LOCALHOST 

219 try: 

220 LOCAL_IPS[:] = socket.gethostbyname_ex("localhost")[2] 

221 except OSError: 

222 # assume common default 

223 LOCAL_IPS[:] = ["127.0.0.1"] 

224 

225 try: 

226 hostname = socket.gethostname() 

227 PUBLIC_IPS[:] = socket.gethostbyname_ex(hostname)[2] 

228 # try hostname.local, in case hostname has been short-circuited to loopback 

229 if not hostname.endswith(".local") and all(ip.startswith("127") for ip in PUBLIC_IPS): 

230 PUBLIC_IPS[:] = socket.gethostbyname_ex(socket.gethostname() + ".local")[2] 

231 except OSError: 

232 pass 

233 finally: 

234 PUBLIC_IPS[:] = _uniq_stable(PUBLIC_IPS) 

235 LOCAL_IPS.extend(PUBLIC_IPS) 

236 

237 # include all-interface aliases: 0.0.0.0 and '' 

238 LOCAL_IPS.extend(["0.0.0.0", ""]) # noqa 

239 

240 LOCAL_IPS[:] = _uniq_stable(LOCAL_IPS) 

241 

242 LOCALHOST = LOCAL_IPS[0] 

243 

244 

245def _load_ips_dumb() -> None: 

246 """Fallback in case of unexpected failure""" 

247 global LOCALHOST 

248 LOCALHOST = "127.0.0.1" 

249 LOCAL_IPS[:] = [LOCALHOST, "0.0.0.0", ""] # noqa 

250 PUBLIC_IPS[:] = [] 

251 

252 

253@_only_once 

254def _load_ips(suppress_exceptions: bool = True) -> None: 

255 """load the IPs that point to this machine 

256 

257 This function will only ever be called once. 

258 

259 If will use psutil to do it quickly if available. 

260 If not, it will use netifaces to do it quickly if available. 

261 Then it will fallback on parsing the output of ifconfig / ip addr / ipconfig, as appropriate. 

262 Finally, it will fallback on socket.gethostbyname_ex, which can be slow. 

263 """ 

264 

265 try: 

266 # first priority, use psutil 

267 try: 

268 return _load_ips_psutil() 

269 except ImportError: 

270 pass 

271 

272 # second priority, use netifaces 

273 try: 

274 return _load_ips_netifaces() 

275 except ImportError: 

276 pass 

277 

278 # second priority, parse subprocess output (how reliable is this?) 

279 

280 if os.name == "nt": 

281 try: 

282 return _load_ips_ipconfig() 

283 except (OSError, NoIPAddresses): 

284 pass 

285 else: 

286 try: 

287 return _load_ips_ip() 

288 except (OSError, NoIPAddresses): 

289 pass 

290 try: 

291 return _load_ips_ifconfig() 

292 except (OSError, NoIPAddresses): 

293 pass 

294 

295 # lowest priority, use gethostbyname 

296 

297 return _load_ips_gethostbyname() 

298 except Exception as e: 

299 if not suppress_exceptions: 

300 raise 

301 # unexpected error shouldn't crash, load dumb default values instead. 

302 warn("Unexpected error discovering local network interfaces: %s" % e, stacklevel=2) 

303 _load_ips_dumb() 

304 

305 

306@_requires_ips 

307def local_ips() -> list[str]: 

308 """return the IP addresses that point to this machine""" 

309 return LOCAL_IPS 

310 

311 

312@_requires_ips 

313def public_ips() -> list[str]: 

314 """return the IP addresses for this machine that are visible to other machines""" 

315 return PUBLIC_IPS 

316 

317 

318@_requires_ips 

319def localhost() -> str: 

320 """return ip for localhost (almost always 127.0.0.1)""" 

321 return LOCALHOST 

322 

323 

324@_requires_ips 

325def is_local_ip(ip: str) -> bool: 

326 """does `ip` point to this machine?""" 

327 return ip in LOCAL_IPS 

328 

329 

330@_requires_ips 

331def is_public_ip(ip: str) -> bool: 

332 """is `ip` a publicly visible address?""" 

333 return ip in PUBLIC_IPS