Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/provisioning/provisioner_base.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

70 statements  

1"""Kernel Provisioner Classes""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import os 

6from abc import ABC, ABCMeta, abstractmethod 

7from typing import Any, Union 

8 

9from traitlets.config import Instance, LoggingConfigurable, Unicode 

10 

11from ..connect import KernelConnectionInfo 

12 

13 

14class KernelProvisionerMeta(ABCMeta, type(LoggingConfigurable)): # type: ignore[misc] 

15 pass 

16 

17 

18class KernelProvisionerBase(ABC, LoggingConfigurable, metaclass=KernelProvisionerMeta): # type: ignore[metaclass] 

19 """ 

20 Abstract base class defining methods for KernelProvisioner classes. 

21 

22 A majority of methods are abstract (requiring implementations via a subclass) while 

23 some are optional and others provide implementations common to all instances. 

24 Subclasses should be aware of which methods require a call to the superclass. 

25 

26 Many of these methods model those of :class:`subprocess.Popen` for parity with 

27 previous versions where the kernel process was managed directly. 

28 """ 

29 

30 # The kernel specification associated with this provisioner 

31 kernel_spec: Any = Instance("jupyter_client.kernelspec.KernelSpec", allow_none=True) 

32 kernel_id: Union[str, Unicode] = Unicode(None, allow_none=True) 

33 connection_info: KernelConnectionInfo = {} 

34 

35 @property 

36 @abstractmethod 

37 def has_process(self) -> bool: 

38 """ 

39 Returns true if this provisioner is currently managing a process. 

40 

41 This property is asserted to be True immediately following a call to 

42 the provisioner's :meth:`launch_kernel` method. 

43 """ 

44 pass 

45 

46 @abstractmethod 

47 async def poll(self) -> int | None: 

48 """ 

49 Checks if kernel process is still running. 

50 

51 If running, None is returned, otherwise the process's integer-valued exit code is returned. 

52 This method is called from :meth:`KernelManager.is_alive`. 

53 """ 

54 pass 

55 

56 @abstractmethod 

57 async def wait(self) -> int | None: 

58 """ 

59 Waits for kernel process to terminate. 

60 

61 This method is called from `KernelManager.finish_shutdown()` and 

62 `KernelManager.kill_kernel()` when terminating a kernel gracefully or 

63 immediately, respectively. 

64 """ 

65 pass 

66 

67 @abstractmethod 

68 async def send_signal(self, signum: int) -> None: 

69 """ 

70 Sends signal identified by signum to the kernel process. 

71 

72 This method is called from `KernelManager.signal_kernel()` to send the 

73 kernel process a signal. 

74 """ 

75 pass 

76 

77 @abstractmethod 

78 async def kill(self, restart: bool = False) -> None: 

79 """ 

80 Kill the kernel process. 

81 

82 This is typically accomplished via a SIGKILL signal, which cannot be caught. 

83 This method is called from `KernelManager.kill_kernel()` when terminating 

84 a kernel immediately. 

85 

86 restart is True if this operation will precede a subsequent launch_kernel request. 

87 """ 

88 pass 

89 

90 @abstractmethod 

91 async def terminate(self, restart: bool = False) -> None: 

92 """ 

93 Terminates the kernel process. 

94 

95 This is typically accomplished via a SIGTERM signal, which can be caught, allowing 

96 the kernel provisioner to perform possible cleanup of resources. This method is 

97 called indirectly from `KernelManager.finish_shutdown()` during a kernel's 

98 graceful termination. 

99 

100 restart is True if this operation precedes a start launch_kernel request. 

101 """ 

102 pass 

103 

104 @abstractmethod 

105 async def launch_kernel(self, cmd: list[str], **kwargs: Any) -> KernelConnectionInfo: 

106 """ 

107 Launch the kernel process and return its connection information. 

108 

109 This method is called from `KernelManager.launch_kernel()` during the 

110 kernel manager's start kernel sequence. 

111 """ 

112 pass 

113 

114 @abstractmethod 

115 async def cleanup(self, restart: bool = False) -> None: 

116 """ 

117 Cleanup any resources allocated on behalf of the kernel provisioner. 

118 

119 This method is called from `KernelManager.cleanup_resources()` as part of 

120 its shutdown kernel sequence. 

121 

122 restart is True if this operation precedes a start launch_kernel request. 

123 """ 

124 pass 

125 

126 async def shutdown_requested(self, restart: bool = False) -> None: 

127 """ 

128 Allows the provisioner to determine if the kernel's shutdown has been requested. 

129 

130 This method is called from `KernelManager.request_shutdown()` as part of 

131 its shutdown sequence. 

132 

133 This method is optional and is primarily used in scenarios where the provisioner 

134 may need to perform other operations in preparation for a kernel's shutdown. 

135 """ 

136 pass 

137 

138 async def pre_launch(self, **kwargs: Any) -> dict[str, Any]: 

139 """ 

140 Perform any steps in preparation for kernel process launch. 

141 

142 This includes applying additional substitutions to the kernel launch command 

143 and environment. It also includes preparation of launch parameters. 

144 

145 NOTE: Subclass implementations are advised to call this method as it applies 

146 environment variable substitutions from the local environment and calls the 

147 provisioner's :meth:`_finalize_env()` method to allow each provisioner the 

148 ability to cleanup the environment variables that will be used by the kernel. 

149 

150 This method is called from `KernelManager.pre_start_kernel()` as part of its 

151 start kernel sequence. 

152 

153 Returns the (potentially updated) keyword arguments that are passed to 

154 :meth:`launch_kernel()`. 

155 """ 

156 env = kwargs.pop("env", os.environ).copy() 

157 env.update(self.__apply_env_substitutions(env)) 

158 self._finalize_env(env) 

159 kwargs["env"] = env 

160 

161 return kwargs 

162 

163 async def post_launch(self, **kwargs: Any) -> None: 

164 """ 

165 Perform any steps following the kernel process launch. 

166 

167 This method is called from `KernelManager.post_start_kernel()` as part of its 

168 start kernel sequence. 

169 """ 

170 pass 

171 

172 async def get_provisioner_info(self) -> dict[str, Any]: 

173 """ 

174 Captures the base information necessary for persistence relative to this instance. 

175 

176 This enables applications that subclass `KernelManager` to persist a kernel provisioner's 

177 relevant information to accomplish functionality like disaster recovery or high availability 

178 by calling this method via the kernel manager's `provisioner` attribute. 

179 

180 NOTE: The superclass method must always be called first to ensure proper serialization. 

181 """ 

182 provisioner_info: dict[str, Any] = {} 

183 provisioner_info["kernel_id"] = self.kernel_id 

184 provisioner_info["connection_info"] = self.connection_info 

185 return provisioner_info 

186 

187 async def load_provisioner_info(self, provisioner_info: dict) -> None: 

188 """ 

189 Loads the base information necessary for persistence relative to this instance. 

190 

191 The inverse of `get_provisioner_info()`, this enables applications that subclass 

192 `KernelManager` to re-establish communication with a provisioner that is managing 

193 a (presumably) remote kernel from an entirely different process that the original 

194 provisioner. 

195 

196 NOTE: The superclass method must always be called first to ensure proper deserialization. 

197 """ 

198 self.kernel_id = provisioner_info["kernel_id"] 

199 self.connection_info = provisioner_info["connection_info"] 

200 

201 def get_shutdown_wait_time(self, recommended: float = 5.0) -> float: 

202 """ 

203 Returns the time allowed for a complete shutdown. This may vary by provisioner. 

204 

205 This method is called from `KernelManager.finish_shutdown()` during the graceful 

206 phase of its kernel shutdown sequence. 

207 

208 The recommended value will typically be what is configured in the kernel manager. 

209 """ 

210 return recommended 

211 

212 def get_stable_start_time(self, recommended: float = 10.0) -> float: 

213 """ 

214 Returns the expected upper bound for a kernel (re-)start to complete. 

215 This may vary by provisioner. 

216 

217 The recommended value will typically be what is configured in the kernel restarter. 

218 """ 

219 return recommended 

220 

221 def _finalize_env(self, env: dict[str, str]) -> None: 

222 """ 

223 Ensures env is appropriate prior to launch. 

224 

225 This method is called from `KernelProvisionerBase.pre_launch()` during the kernel's 

226 start sequence. 

227 

228 NOTE: Subclasses should be sure to call super()._finalize_env(env) 

229 """ 

230 if self.kernel_spec.language and self.kernel_spec.language.lower().startswith("python"): 

231 # Don't allow PYTHONEXECUTABLE to be passed to kernel process. 

232 # If set, it can bork all the things. 

233 env.pop("PYTHONEXECUTABLE", None) 

234 

235 def __apply_env_substitutions(self, substitution_values: dict[str, str]) -> dict[str, str]: 

236 """ 

237 Walks entries in the kernelspec's env stanza and applies substitutions from current env. 

238 

239 This method is called from `KernelProvisionerBase.pre_launch()` during the kernel's 

240 start sequence. 

241 

242 Returns the substituted list of env entries. 

243 

244 NOTE: This method is private and is not intended to be overridden by provisioners. 

245 """ 

246 substituted_env = {} 

247 if self.kernel_spec: 

248 from string import Template 

249 

250 # For each templated env entry, fill any templated references 

251 # matching names of env variables with those values and build 

252 # new dict with substitutions. 

253 templated_env = self.kernel_spec.env 

254 for k, v in templated_env.items(): 

255 substituted_env.update({k: Template(v).safe_substitute(substitution_values)}) 

256 return substituted_env