Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/provisioning/local_provisioner.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

143 statements  

1"""Kernel Provisioner Classes""" 

2# Copyright (c) Jupyter Development Team. 

3# Distributed under the terms of the Modified BSD License. 

4import asyncio 

5import os 

6import signal 

7import sys 

8from typing import TYPE_CHECKING, Any, Dict, List, Optional 

9 

10from ..connect import KernelConnectionInfo, LocalPortCache 

11from ..launcher import launch_kernel 

12from ..localinterfaces import is_local_ip, local_ips 

13from .provisioner_base import KernelProvisionerBase 

14 

15 

16class LocalProvisioner(KernelProvisionerBase): # type:ignore[misc] 

17 """ 

18 :class:`LocalProvisioner` is a concrete class of ABC :py:class:`KernelProvisionerBase` 

19 and is the out-of-box default implementation used when no kernel provisioner is 

20 specified in the kernel specification (``kernel.json``). It provides functional 

21 parity to existing applications by launching the kernel locally and using 

22 :class:`subprocess.Popen` to manage its lifecycle. 

23 

24 This class is intended to be subclassed for customizing local kernel environments 

25 and serve as a reference implementation for other custom provisioners. 

26 """ 

27 

28 process = None 

29 _exit_future = None 

30 pid = None 

31 pgid = None 

32 ip = None 

33 ports_cached = False 

34 

35 @property 

36 def has_process(self) -> bool: 

37 return self.process is not None 

38 

39 async def poll(self) -> Optional[int]: 

40 """Poll the provisioner.""" 

41 ret = 0 

42 if self.process: 

43 ret = self.process.poll() # type:ignore[unreachable] 

44 return ret 

45 

46 async def wait(self) -> Optional[int]: 

47 """Wait for the provisioner process.""" 

48 ret = 0 

49 if self.process: 

50 # Use busy loop at 100ms intervals, polling until the process is 

51 # not alive. If we find the process is no longer alive, complete 

52 # its cleanup via the blocking wait(). Callers are responsible for 

53 # issuing calls to wait() using a timeout (see kill()). 

54 while await self.poll() is None: # type:ignore[unreachable] 

55 await asyncio.sleep(0.1) 

56 

57 # Process is no longer alive, wait and clear 

58 ret = self.process.wait() 

59 # Make sure all the fds get closed. 

60 for attr in ["stdout", "stderr", "stdin"]: 

61 fid = getattr(self.process, attr) 

62 if fid: 

63 fid.close() 

64 self.process = None # allow has_process to now return False 

65 return ret 

66 

67 async def send_signal(self, signum: int) -> None: 

68 """Sends a signal to the process group of the kernel (this 

69 usually includes the kernel and any subprocesses spawned by 

70 the kernel). 

71 

72 Note that since only SIGTERM is supported on Windows, we will 

73 check if the desired signal is for interrupt and apply the 

74 applicable code on Windows in that case. 

75 """ 

76 if self.process: 

77 if signum == signal.SIGINT and sys.platform == "win32": # type:ignore[unreachable] 

78 from ..win_interrupt import send_interrupt 

79 

80 send_interrupt(self.process.win32_interrupt_event) 

81 return 

82 

83 # Prefer process-group over process 

84 if self.pgid and hasattr(os, "killpg"): 

85 try: 

86 os.killpg(self.pgid, signum) 

87 return 

88 except OSError: 

89 pass # We'll retry sending the signal to only the process below 

90 

91 # If we're here, send the signal to the process and let caller handle exceptions 

92 self.process.send_signal(signum) 

93 return 

94 

95 async def kill(self, restart: bool = False) -> None: 

96 """Kill the provisioner and optionally restart.""" 

97 if self.process: 

98 if hasattr(signal, "SIGKILL"): # type:ignore[unreachable] 

99 # If available, give preference to signalling the process-group over `kill()`. 

100 try: 

101 await self.send_signal(signal.SIGKILL) 

102 return 

103 except OSError: 

104 pass 

105 try: 

106 self.process.kill() 

107 except OSError as e: 

108 LocalProvisioner._tolerate_no_process(e) 

109 

110 async def terminate(self, restart: bool = False) -> None: 

111 """Terminate the provisioner and optionally restart.""" 

112 if self.process: 

113 if hasattr(signal, "SIGTERM"): # type:ignore[unreachable] 

114 # If available, give preference to signalling the process group over `terminate()`. 

115 try: 

116 await self.send_signal(signal.SIGTERM) 

117 return 

118 except OSError: 

119 pass 

120 try: 

121 self.process.terminate() 

122 except OSError as e: 

123 LocalProvisioner._tolerate_no_process(e) 

124 

125 @staticmethod 

126 def _tolerate_no_process(os_error: OSError) -> None: 

127 # In Windows, we will get an Access Denied error if the process 

128 # has already terminated. Ignore it. 

129 if sys.platform == "win32": 

130 if os_error.winerror != 5: 

131 raise 

132 # On Unix, we may get an ESRCH error (or ProcessLookupError instance) if 

133 # the process has already terminated. Ignore it. 

134 else: 

135 from errno import ESRCH 

136 

137 if not isinstance(os_error, ProcessLookupError) or os_error.errno != ESRCH: 

138 raise 

139 

140 async def cleanup(self, restart: bool = False) -> None: 

141 """Clean up the resources used by the provisioner and optionally restart.""" 

142 if self.ports_cached and not restart: 

143 # provisioner is about to be destroyed, return cached ports 

144 lpc = LocalPortCache.instance() 

145 ports = ( 

146 self.connection_info["shell_port"], 

147 self.connection_info["iopub_port"], 

148 self.connection_info["stdin_port"], 

149 self.connection_info["hb_port"], 

150 self.connection_info["control_port"], 

151 ) 

152 for port in ports: 

153 if TYPE_CHECKING: 

154 assert isinstance(port, int) 

155 lpc.return_port(port) 

156 

157 async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]: 

158 """Perform any steps in preparation for kernel process launch. 

159 

160 This includes applying additional substitutions to the kernel launch command and env. 

161 It also includes preparation of launch parameters. 

162 

163 Returns the updated kwargs. 

164 """ 

165 

166 # This should be considered temporary until a better division of labor can be defined. 

167 km = self.parent 

168 if km: 

169 if km.transport == "tcp" and not is_local_ip(km.ip): 

170 msg = ( 

171 "Can only launch a kernel on a local interface. " 

172 f"This one is not: {km.ip}." 

173 "Make sure that the '*_address' attributes are " 

174 "configured properly. " 

175 f"Currently valid addresses are: {local_ips()}" 

176 ) 

177 raise RuntimeError(msg) 

178 # build the Popen cmd 

179 extra_arguments = kwargs.pop("extra_arguments", []) 

180 

181 # write connection file / get default ports 

182 # TODO - change when handshake pattern is adopted 

183 if km.cache_ports and not self.ports_cached: 

184 lpc = LocalPortCache.instance() 

185 km.shell_port = lpc.find_available_port(km.ip) 

186 km.iopub_port = lpc.find_available_port(km.ip) 

187 km.stdin_port = lpc.find_available_port(km.ip) 

188 km.hb_port = lpc.find_available_port(km.ip) 

189 km.control_port = lpc.find_available_port(km.ip) 

190 self.ports_cached = True 

191 if "env" in kwargs: 

192 jupyter_session = kwargs["env"].get("JPY_SESSION_NAME", "") 

193 km.write_connection_file(jupyter_session=jupyter_session) 

194 else: 

195 km.write_connection_file() 

196 self.connection_info = km.get_connection_info() 

197 

198 kernel_cmd = km.format_kernel_cmd( 

199 extra_arguments=extra_arguments 

200 ) # This needs to remain here for b/c 

201 else: 

202 extra_arguments = kwargs.pop("extra_arguments", []) 

203 kernel_cmd = self.kernel_spec.argv + extra_arguments 

204 

205 return await super().pre_launch(cmd=kernel_cmd, **kwargs) 

206 

207 async def launch_kernel(self, cmd: List[str], **kwargs: Any) -> KernelConnectionInfo: 

208 """Launch a kernel with a command.""" 

209 scrubbed_kwargs = LocalProvisioner._scrub_kwargs(kwargs) 

210 self.process = launch_kernel(cmd, **scrubbed_kwargs) 

211 pgid = None 

212 if hasattr(os, "getpgid"): 

213 try: 

214 pgid = os.getpgid(self.process.pid) 

215 except OSError: 

216 pass 

217 

218 self.pid = self.process.pid 

219 self.pgid = pgid 

220 return self.connection_info 

221 

222 @staticmethod 

223 def _scrub_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]: 

224 """Remove any keyword arguments that Popen does not tolerate.""" 

225 keywords_to_scrub: List[str] = ["extra_arguments", "kernel_id"] 

226 scrubbed_kwargs = kwargs.copy() 

227 for kw in keywords_to_scrub: 

228 scrubbed_kwargs.pop(kw, None) 

229 return scrubbed_kwargs 

230 

231 async def get_provisioner_info(self) -> Dict: 

232 """Captures the base information necessary for persistence relative to this instance.""" 

233 provisioner_info = await super().get_provisioner_info() 

234 provisioner_info.update({"pid": self.pid, "pgid": self.pgid, "ip": self.ip}) 

235 return provisioner_info 

236 

237 async def load_provisioner_info(self, provisioner_info: Dict) -> None: 

238 """Loads the base information necessary for persistence relative to this instance.""" 

239 await super().load_provisioner_info(provisioner_info) 

240 self.pid = provisioner_info["pid"] 

241 self.pgid = provisioner_info["pgid"] 

242 self.ip = provisioner_info["ip"]