Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/provisioning/local_provisioner.py: 23%
140 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""Kernel Provisioner Classes"""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4import asyncio
5import os
6import signal
7import sys
8from typing import Any, Dict, List, Optional
10from ..connect import KernelConnectionInfo, LocalPortCache
11from ..launcher import launch_kernel
12from ..localinterfaces import is_local_ip, local_ips
13from .provisioner_base import KernelProvisionerBase
16class LocalProvisioner(KernelProvisionerBase): # type:ignore[misc]
17 """
18 :class:`LocalProvisioner` is a concrete class of ABC :py:class:`KernelProvisionerBase`
19 and is the out-of-box default implementation used when no kernel provisioner is
20 specified in the kernel specification (``kernel.json``). It provides functional
21 parity to existing applications by launching the kernel locally and using
22 :class:`subprocess.Popen` to manage its lifecycle.
24 This class is intended to be subclassed for customizing local kernel environments
25 and serve as a reference implementation for other custom provisioners.
26 """
28 process = None
29 _exit_future = None
30 pid = None
31 pgid = None
32 ip = None
33 ports_cached = False
35 @property
36 def has_process(self) -> bool:
37 return self.process is not None
39 async def poll(self) -> Optional[int]:
40 """Poll the provisioner."""
41 ret = 0
42 if self.process:
43 ret = self.process.poll()
44 return ret
46 async def wait(self) -> Optional[int]:
47 """Wait for the provisioner process."""
48 ret = 0
49 if self.process:
50 # Use busy loop at 100ms intervals, polling until the process is
51 # not alive. If we find the process is no longer alive, complete
52 # its cleanup via the blocking wait(). Callers are responsible for
53 # issuing calls to wait() using a timeout (see kill()).
54 while await self.poll() is None:
55 await asyncio.sleep(0.1)
57 # Process is no longer alive, wait and clear
58 ret = self.process.wait()
59 # Make sure all the fds get closed.
60 for attr in ['stdout', 'stderr', 'stdin']:
61 fid = getattr(self.process, attr)
62 if fid:
63 fid.close()
64 self.process = None # allow has_process to now return False
65 return ret
67 async def send_signal(self, signum: int) -> None:
68 """Sends a signal to the process group of the kernel (this
69 usually includes the kernel and any subprocesses spawned by
70 the kernel).
72 Note that since only SIGTERM is supported on Windows, we will
73 check if the desired signal is for interrupt and apply the
74 applicable code on Windows in that case.
75 """
76 if self.process:
77 if signum == signal.SIGINT and sys.platform == 'win32':
78 from ..win_interrupt import send_interrupt
80 send_interrupt(self.process.win32_interrupt_event)
81 return
83 # Prefer process-group over process
84 if self.pgid and hasattr(os, "killpg"):
85 try:
86 os.killpg(self.pgid, signum)
87 return
88 except OSError:
89 pass # We'll retry sending the signal to only the process below
91 # If we're here, send the signal to the process and let caller handle exceptions
92 self.process.send_signal(signum)
93 return
95 async def kill(self, restart: bool = False) -> None:
96 """Kill the provisioner and optionally restart."""
97 if self.process:
98 if hasattr(signal, "SIGKILL"):
99 # If available, give preference to signalling the process-group over `kill()`.
100 try:
101 await self.send_signal(signal.SIGKILL)
102 return
103 except OSError:
104 pass
105 try:
106 self.process.kill()
107 except OSError as e:
108 LocalProvisioner._tolerate_no_process(e)
110 async def terminate(self, restart: bool = False) -> None:
111 """Terminate the provisioner and optionally restart."""
112 if self.process:
113 if hasattr(signal, "SIGTERM"):
114 # If available, give preference to signalling the process group over `terminate()`.
115 try:
116 await self.send_signal(signal.SIGTERM)
117 return
118 except OSError:
119 pass
120 try:
121 self.process.terminate()
122 except OSError as e:
123 LocalProvisioner._tolerate_no_process(e)
125 @staticmethod
126 def _tolerate_no_process(os_error: OSError) -> None:
127 # In Windows, we will get an Access Denied error if the process
128 # has already terminated. Ignore it.
129 if sys.platform == 'win32':
130 if os_error.winerror != 5:
131 raise
132 # On Unix, we may get an ESRCH error (or ProcessLookupError instance) if
133 # the process has already terminated. Ignore it.
134 else:
135 from errno import ESRCH
137 if not isinstance(os_error, ProcessLookupError) or os_error.errno != ESRCH:
138 raise
140 async def cleanup(self, restart: bool = False) -> None:
141 """Clean up the resources used by the provisioner and optionally restart."""
142 if self.ports_cached and not restart:
143 # provisioner is about to be destroyed, return cached ports
144 lpc = LocalPortCache.instance()
145 ports = (
146 self.connection_info['shell_port'],
147 self.connection_info['iopub_port'],
148 self.connection_info['stdin_port'],
149 self.connection_info['hb_port'],
150 self.connection_info['control_port'],
151 )
152 for port in ports:
153 lpc.return_port(port)
155 async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
156 """Perform any steps in preparation for kernel process launch.
158 This includes applying additional substitutions to the kernel launch command and env.
159 It also includes preparation of launch parameters.
161 Returns the updated kwargs.
162 """
164 # This should be considered temporary until a better division of labor can be defined.
165 km = self.parent
166 if km:
167 if km.transport == 'tcp' and not is_local_ip(km.ip):
168 msg = (
169 "Can only launch a kernel on a local interface. "
170 "This one is not: {}."
171 "Make sure that the '*_address' attributes are "
172 "configured properly. "
173 "Currently valid addresses are: {}".format(km.ip, local_ips())
174 )
175 raise RuntimeError(msg)
176 # build the Popen cmd
177 extra_arguments = kwargs.pop('extra_arguments', [])
179 # write connection file / get default ports
180 # TODO - change when handshake pattern is adopted
181 if km.cache_ports and not self.ports_cached:
182 lpc = LocalPortCache.instance()
183 km.shell_port = lpc.find_available_port(km.ip)
184 km.iopub_port = lpc.find_available_port(km.ip)
185 km.stdin_port = lpc.find_available_port(km.ip)
186 km.hb_port = lpc.find_available_port(km.ip)
187 km.control_port = lpc.find_available_port(km.ip)
188 self.ports_cached = True
189 if 'env' in kwargs:
190 jupyter_session = kwargs['env'].get("JPY_SESSION_NAME", "")
191 km.write_connection_file(jupyter_session=jupyter_session)
192 else:
193 km.write_connection_file()
194 self.connection_info = km.get_connection_info()
196 kernel_cmd = km.format_kernel_cmd(
197 extra_arguments=extra_arguments
198 ) # This needs to remain here for b/c
199 else:
200 extra_arguments = kwargs.pop('extra_arguments', [])
201 kernel_cmd = self.kernel_spec.argv + extra_arguments
203 return await super().pre_launch(cmd=kernel_cmd, **kwargs)
205 async def launch_kernel(self, cmd: List[str], **kwargs: Any) -> KernelConnectionInfo:
206 """Launch a kernel with a command."""
207 scrubbed_kwargs = LocalProvisioner._scrub_kwargs(kwargs)
208 self.process = launch_kernel(cmd, **scrubbed_kwargs)
209 pgid = None
210 if hasattr(os, "getpgid"):
211 try:
212 pgid = os.getpgid(self.process.pid)
213 except OSError:
214 pass
216 self.pid = self.process.pid
217 self.pgid = pgid
218 return self.connection_info
220 @staticmethod
221 def _scrub_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]:
222 """Remove any keyword arguments that Popen does not tolerate."""
223 keywords_to_scrub: List[str] = ['extra_arguments', 'kernel_id']
224 scrubbed_kwargs = kwargs.copy()
225 for kw in keywords_to_scrub:
226 scrubbed_kwargs.pop(kw, None)
227 return scrubbed_kwargs
229 async def get_provisioner_info(self) -> Dict:
230 """Captures the base information necessary for persistence relative to this instance."""
231 provisioner_info = await super().get_provisioner_info()
232 provisioner_info.update({'pid': self.pid, 'pgid': self.pgid, 'ip': self.ip})
233 return provisioner_info
235 async def load_provisioner_info(self, provisioner_info: Dict) -> None:
236 """Loads the base information necessary for persistence relative to this instance."""
237 await super().load_provisioner_info(provisioner_info)
238 self.pid = provisioner_info['pid']
239 self.pgid = provisioner_info['pgid']
240 self.ip = provisioner_info['ip']