Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/provisioning/local_provisioner.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

145 statements  

1"""Kernel Provisioner Classes""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import asyncio 

6import os 

7import signal 

8import sys 

9from typing import TYPE_CHECKING, Any 

10 

11from ..connect import KernelConnectionInfo, LocalPortCache 

12from ..launcher import launch_kernel 

13from ..localinterfaces import is_local_ip, local_ips 

14from .provisioner_base import KernelProvisionerBase 

15 

16 

17class LocalProvisioner(KernelProvisionerBase): 

18 """ 

19 :class:`LocalProvisioner` is a concrete class of ABC :py:class:`KernelProvisionerBase` 

20 and is the out-of-box default implementation used when no kernel provisioner is 

21 specified in the kernel specification (``kernel.json``). It provides functional 

22 parity to existing applications by launching the kernel locally and using 

23 :class:`subprocess.Popen` to manage its lifecycle. 

24 

25 This class is intended to be subclassed for customizing local kernel environments 

26 and serve as a reference implementation for other custom provisioners. 

27 """ 

28 

29 process = None 

30 _exit_future = None 

31 pid = None 

32 pgid = None 

33 ip = None 

34 ports_cached = False 

35 

36 @property 

37 def has_process(self) -> bool: 

38 return self.process is not None 

39 

40 async def poll(self) -> int | None: 

41 """Poll the provisioner.""" 

42 ret = 0 

43 if self.process: 

44 ret = self.process.poll() # type:ignore[unreachable] 

45 return ret 

46 

47 async def wait(self) -> int | None: 

48 """Wait for the provisioner process.""" 

49 ret = 0 

50 if self.process: 

51 # Use busy loop at 100ms intervals, polling until the process is 

52 # not alive. If we find the process is no longer alive, complete 

53 # its cleanup via the blocking wait(). Callers are responsible for 

54 # issuing calls to wait() using a timeout (see kill()). 

55 while await self.poll() is None: # type:ignore[unreachable] 

56 await asyncio.sleep(0.1) 

57 

58 # Process is no longer alive, wait and clear 

59 ret = self.process.wait() 

60 # Make sure all the fds get closed. 

61 for attr in ["stdout", "stderr", "stdin"]: 

62 fid = getattr(self.process, attr) 

63 if fid: 

64 fid.close() 

65 self.process = None # allow has_process to now return False 

66 return ret 

67 

68 async def send_signal(self, signum: int) -> None: 

69 """Sends a signal to the process group of the kernel (this 

70 usually includes the kernel and any subprocesses spawned by 

71 the kernel). 

72 

73 Note that since only SIGTERM is supported on Windows, we will 

74 check if the desired signal is for interrupt and apply the 

75 applicable code on Windows in that case. 

76 """ 

77 if self.process: 

78 if signum == signal.SIGINT and sys.platform == "win32": # type:ignore[unreachable] 

79 from ..win_interrupt import send_interrupt 

80 

81 send_interrupt(self.process.win32_interrupt_event) 

82 return 

83 

84 # Prefer process-group over process 

85 if self.pgid and hasattr(os, "killpg"): 

86 try: 

87 os.killpg(self.pgid, signum) 

88 return 

89 except OSError: 

90 pass # We'll retry sending the signal to only the process below 

91 

92 # If we're here, send the signal to the process and let caller handle exceptions 

93 self.process.send_signal(signum) 

94 return 

95 

96 async def kill(self, restart: bool = False) -> None: 

97 """Kill the provisioner and optionally restart.""" 

98 if self.process: 

99 if hasattr(signal, "SIGKILL"): # type:ignore[unreachable] 

100 # If available, give preference to signalling the process-group over `kill()`. 

101 try: 

102 await self.send_signal(signal.SIGKILL) 

103 return 

104 except OSError: 

105 pass 

106 try: 

107 self.process.kill() 

108 except OSError as e: 

109 LocalProvisioner._tolerate_no_process(e) 

110 

111 async def terminate(self, restart: bool = False) -> None: 

112 """Terminate the provisioner and optionally restart.""" 

113 if self.process: 

114 if hasattr(signal, "SIGTERM"): # type:ignore[unreachable] 

115 # If available, give preference to signalling the process group over `terminate()`. 

116 try: 

117 await self.send_signal(signal.SIGTERM) 

118 return 

119 except OSError: 

120 pass 

121 try: 

122 self.process.terminate() 

123 except OSError as e: 

124 LocalProvisioner._tolerate_no_process(e) 

125 

126 @staticmethod 

127 def _tolerate_no_process(os_error: OSError) -> None: 

128 # In Windows, we will get an Access Denied error if the process 

129 # has already terminated. Ignore it. 

130 if sys.platform == "win32": 

131 if os_error.winerror != 5: 

132 err_message = f"Invalid Error, expecting error number to be 5, got {os_error}" 

133 raise ValueError(err_message) 

134 

135 # On Unix, we may get an ESRCH error (or ProcessLookupError instance) if 

136 # the process has already terminated. Ignore it. 

137 else: 

138 from errno import ESRCH 

139 

140 if not isinstance(os_error, ProcessLookupError) or os_error.errno != ESRCH: 

141 err_message = ( 

142 f"Invalid Error, expecting ProcessLookupError or ESRCH, got {os_error}" 

143 ) 

144 raise ValueError(err_message) 

145 

146 async def cleanup(self, restart: bool = False) -> None: 

147 """Clean up the resources used by the provisioner and optionally restart.""" 

148 if self.ports_cached and not restart: 

149 # provisioner is about to be destroyed, return cached ports 

150 lpc = LocalPortCache.instance() 

151 ports = ( 

152 self.connection_info["shell_port"], 

153 self.connection_info["iopub_port"], 

154 self.connection_info["stdin_port"], 

155 self.connection_info["hb_port"], 

156 self.connection_info["control_port"], 

157 ) 

158 for port in ports: 

159 if TYPE_CHECKING: 

160 assert isinstance(port, int) 

161 lpc.return_port(port) 

162 

163 async def pre_launch(self, **kwargs: Any) -> dict[str, Any]: 

164 """Perform any steps in preparation for kernel process launch. 

165 

166 This includes applying additional substitutions to the kernel launch command and env. 

167 It also includes preparation of launch parameters. 

168 

169 Returns the updated kwargs. 

170 """ 

171 

172 # This should be considered temporary until a better division of labor can be defined. 

173 km = self.parent 

174 if km: 

175 if km.transport == "tcp" and not is_local_ip(km.ip): 

176 msg = ( 

177 "Can only launch a kernel on a local interface. " 

178 f"This one is not: {km.ip}." 

179 "Make sure that the '*_address' attributes are " 

180 "configured properly. " 

181 f"Currently valid addresses are: {local_ips()}" 

182 ) 

183 raise RuntimeError(msg) 

184 # build the Popen cmd 

185 extra_arguments = kwargs.pop("extra_arguments", []) 

186 

187 # write connection file / get default ports 

188 # TODO - change when handshake pattern is adopted 

189 if km.cache_ports and not self.ports_cached: 

190 lpc = LocalPortCache.instance() 

191 km.shell_port = lpc.find_available_port(km.ip) 

192 km.iopub_port = lpc.find_available_port(km.ip) 

193 km.stdin_port = lpc.find_available_port(km.ip) 

194 km.hb_port = lpc.find_available_port(km.ip) 

195 km.control_port = lpc.find_available_port(km.ip) 

196 self.ports_cached = True 

197 if "env" in kwargs: 

198 jupyter_session = kwargs["env"].get("JPY_SESSION_NAME", "") 

199 km.write_connection_file(jupyter_session=jupyter_session) 

200 else: 

201 km.write_connection_file() 

202 self.connection_info = km.get_connection_info() 

203 

204 kernel_cmd = km.format_kernel_cmd( 

205 extra_arguments=extra_arguments 

206 ) # This needs to remain here for b/c 

207 else: 

208 extra_arguments = kwargs.pop("extra_arguments", []) 

209 kernel_cmd = self.kernel_spec.argv + extra_arguments 

210 

211 return await super().pre_launch(cmd=kernel_cmd, **kwargs) 

212 

213 async def launch_kernel(self, cmd: list[str], **kwargs: Any) -> KernelConnectionInfo: 

214 """Launch a kernel with a command.""" 

215 scrubbed_kwargs = LocalProvisioner._scrub_kwargs(kwargs) 

216 self.process = launch_kernel(cmd, **scrubbed_kwargs) 

217 pgid = None 

218 if hasattr(os, "getpgid"): 

219 try: 

220 pgid = os.getpgid(self.process.pid) 

221 except OSError: 

222 pass 

223 

224 self.pid = self.process.pid 

225 self.pgid = pgid 

226 return self.connection_info 

227 

228 @staticmethod 

229 def _scrub_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]: 

230 """Remove any keyword arguments that Popen does not tolerate.""" 

231 keywords_to_scrub: list[str] = ["extra_arguments", "kernel_id"] 

232 scrubbed_kwargs = kwargs.copy() 

233 for kw in keywords_to_scrub: 

234 scrubbed_kwargs.pop(kw, None) 

235 return scrubbed_kwargs 

236 

237 async def get_provisioner_info(self) -> dict: 

238 """Captures the base information necessary for persistence relative to this instance.""" 

239 provisioner_info = await super().get_provisioner_info() 

240 provisioner_info.update({"pid": self.pid, "pgid": self.pgid, "ip": self.ip}) 

241 return provisioner_info 

242 

243 async def load_provisioner_info(self, provisioner_info: dict) -> None: 

244 """Loads the base information necessary for persistence relative to this instance.""" 

245 await super().load_provisioner_info(provisioner_info) 

246 self.pid = provisioner_info["pid"] 

247 self.pgid = provisioner_info["pgid"] 

248 self.ip = provisioner_info["ip"]