Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/multikernelmanager.py: 43%
222 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-01 06:54 +0000
1"""A kernel manager for multiple kernels"""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4import asyncio
5import os
6import socket
7import typing as t
8import uuid
9from functools import wraps
11import zmq
12from traitlets import Any, Bool, Dict, DottedObjectName, Instance, Unicode, default, observe
13from traitlets.config.configurable import LoggingConfigurable
14from traitlets.utils.importstring import import_item
16from .kernelspec import NATIVE_KERNEL_NAME, KernelSpecManager
17from .manager import KernelManager
18from .utils import ensure_async, run_sync
21class DuplicateKernelError(Exception):
22 pass
25def kernel_method(f: t.Callable) -> t.Callable:
26 """decorator for proxying MKM.method(kernel_id) to individual KMs by ID"""
28 @wraps(f)
29 def wrapped(
30 self: t.Any, kernel_id: str, *args: t.Any, **kwargs: t.Any
31 ) -> t.Union[t.Callable, t.Awaitable]:
32 # get the kernel
33 km = self.get_kernel(kernel_id)
34 method = getattr(km, f.__name__)
35 # call the kernel's method
36 r = method(*args, **kwargs)
37 # last thing, call anything defined in the actual class method
38 # such as logging messages
39 f(self, kernel_id, *args, **kwargs)
40 # return the method result
41 return r
43 return wrapped
46class MultiKernelManager(LoggingConfigurable):
47 """A class for managing multiple kernels."""
49 default_kernel_name = Unicode(
50 NATIVE_KERNEL_NAME, help="The name of the default kernel to start"
51 ).tag(config=True)
53 kernel_spec_manager = Instance(KernelSpecManager, allow_none=True)
55 kernel_manager_class = DottedObjectName(
56 "jupyter_client.ioloop.IOLoopKernelManager",
57 help="""The kernel manager class. This is configurable to allow
58 subclassing of the KernelManager for customized behavior.
59 """,
60 ).tag(config=True)
62 @observe("kernel_manager_class")
63 def _kernel_manager_class_changed(self, change):
64 self.kernel_manager_factory = self._create_kernel_manager_factory()
66 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
68 @default("kernel_manager_factory")
69 def _kernel_manager_factory_default(self):
70 return self._create_kernel_manager_factory()
72 def _create_kernel_manager_factory(self) -> t.Callable:
73 kernel_manager_ctor = import_item(self.kernel_manager_class)
75 def create_kernel_manager(*args: t.Any, **kwargs: t.Any) -> KernelManager:
76 if self.shared_context:
77 if self.context.closed:
78 # recreate context if closed
79 self.context = self._context_default()
80 kwargs.setdefault("context", self.context)
81 km = kernel_manager_ctor(*args, **kwargs)
82 return km
84 return create_kernel_manager
86 shared_context = Bool(
87 True,
88 help="Share a single zmq.Context to talk to all my kernels",
89 ).tag(config=True)
91 context = Instance("zmq.Context")
93 _created_context = Bool(False)
95 _pending_kernels = Dict()
97 @property
98 def _starting_kernels(self):
99 """A shim for backwards compatibility."""
100 return self._pending_kernels
102 @default("context") # type:ignore[misc]
103 def _context_default(self) -> zmq.Context:
104 self._created_context = True
105 return zmq.Context()
107 connection_dir = Unicode("")
109 _kernels = Dict()
111 def __del__(self):
112 """Handle garbage collection. Destroy context if applicable."""
113 if self._created_context and self.context and not self.context.closed:
114 if self.log:
115 self.log.debug("Destroying zmq context for %s", self)
116 self.context.destroy()
117 try:
118 super_del = super().__del__ # type:ignore[misc]
119 except AttributeError:
120 pass
121 else:
122 super_del()
124 def list_kernel_ids(self) -> t.List[str]:
125 """Return a list of the kernel ids of the active kernels."""
126 # Create a copy so we can iterate over kernels in operations
127 # that delete keys.
128 return list(self._kernels.keys())
130 def __len__(self) -> int:
131 """Return the number of running kernels."""
132 return len(self.list_kernel_ids())
134 def __contains__(self, kernel_id: str) -> bool:
135 return kernel_id in self._kernels
137 def pre_start_kernel(
138 self, kernel_name: t.Optional[str], kwargs: t.Any
139 ) -> t.Tuple[KernelManager, str, str]:
140 # kwargs should be mutable, passing it as a dict argument.
141 kernel_id = kwargs.pop("kernel_id", self.new_kernel_id(**kwargs))
142 if kernel_id in self:
143 raise DuplicateKernelError("Kernel already exists: %s" % kernel_id)
145 if kernel_name is None:
146 kernel_name = self.default_kernel_name
147 # kernel_manager_factory is the constructor for the KernelManager
148 # subclass we are using. It can be configured as any Configurable,
149 # including things like its transport and ip.
150 constructor_kwargs = {}
151 if self.kernel_spec_manager:
152 constructor_kwargs["kernel_spec_manager"] = self.kernel_spec_manager
153 km = self.kernel_manager_factory(
154 connection_file=os.path.join(self.connection_dir, "kernel-%s.json" % kernel_id),
155 parent=self,
156 log=self.log,
157 kernel_name=kernel_name,
158 **constructor_kwargs,
159 )
160 return km, kernel_name, kernel_id
162 async def _add_kernel_when_ready(
163 self, kernel_id: str, km: KernelManager, kernel_awaitable: t.Awaitable
164 ) -> None:
165 try:
166 await kernel_awaitable
167 self._kernels[kernel_id] = km
168 self._pending_kernels.pop(kernel_id, None)
169 except Exception as e:
170 self.log.exception(e)
172 async def _remove_kernel_when_ready(
173 self, kernel_id: str, kernel_awaitable: t.Awaitable
174 ) -> None:
175 try:
176 await kernel_awaitable
177 self.remove_kernel(kernel_id)
178 self._pending_kernels.pop(kernel_id, None)
179 except Exception as e:
180 self.log.exception(e)
182 def _using_pending_kernels(self):
183 """Returns a boolean; a clearer method for determining if
184 this multikernelmanager is using pending kernels or not
185 """
186 return getattr(self, 'use_pending_kernels', False)
188 async def _async_start_kernel(
189 self, *, kernel_name: t.Optional[str] = None, **kwargs: t.Any
190 ) -> str:
191 """Start a new kernel.
193 The caller can pick a kernel_id by passing one in as a keyword arg,
194 otherwise one will be generated using new_kernel_id().
196 The kernel ID for the newly started kernel is returned.
197 """
198 km, kernel_name, kernel_id = self.pre_start_kernel(kernel_name, kwargs)
199 if not isinstance(km, KernelManager):
200 self.log.warning(
201 "Kernel manager class ({km_class}) is not an instance of 'KernelManager'!".format(
202 km_class=self.kernel_manager_class.__class__
203 )
204 )
205 kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner
207 starter = ensure_async(km.start_kernel(**kwargs))
208 task = asyncio.create_task(self._add_kernel_when_ready(kernel_id, km, starter))
209 self._pending_kernels[kernel_id] = task
210 # Handling a Pending Kernel
211 if self._using_pending_kernels():
212 # If using pending kernels, do not block
213 # on the kernel start.
214 self._kernels[kernel_id] = km
215 else:
216 await task
217 # raise an exception if one occurred during kernel startup.
218 if km.ready.exception():
219 raise km.ready.exception() # type: ignore
221 return kernel_id
223 start_kernel = run_sync(_async_start_kernel)
225 async def _async_shutdown_kernel(
226 self,
227 kernel_id: str,
228 now: t.Optional[bool] = False,
229 restart: t.Optional[bool] = False,
230 ) -> None:
231 """Shutdown a kernel by its kernel uuid.
233 Parameters
234 ==========
235 kernel_id : uuid
236 The id of the kernel to shutdown.
237 now : bool
238 Should the kernel be shutdown forcibly using a signal.
239 restart : bool
240 Will the kernel be restarted?
241 """
242 self.log.info("Kernel shutdown: %s", kernel_id)
243 # If the kernel is still starting, wait for it to be ready.
244 if kernel_id in self._pending_kernels:
245 task = self._pending_kernels[kernel_id]
246 try:
247 await task
248 km = self.get_kernel(kernel_id)
249 await t.cast(asyncio.Future, km.ready)
250 except asyncio.CancelledError:
251 pass
252 except Exception:
253 self.remove_kernel(kernel_id)
254 return
255 km = self.get_kernel(kernel_id)
256 # If a pending kernel raised an exception, remove it.
257 if not km.ready.cancelled() and km.ready.exception():
258 self.remove_kernel(kernel_id)
259 return
260 stopper = ensure_async(km.shutdown_kernel(now, restart))
261 fut = asyncio.ensure_future(self._remove_kernel_when_ready(kernel_id, stopper))
262 self._pending_kernels[kernel_id] = fut
263 # Await the kernel if not using pending kernels.
264 if not self._using_pending_kernels():
265 await fut
266 # raise an exception if one occurred during kernel shutdown.
267 if km.ready.exception():
268 raise km.ready.exception() # type: ignore
270 shutdown_kernel = run_sync(_async_shutdown_kernel)
272 @kernel_method
273 def request_shutdown(self, kernel_id: str, restart: t.Optional[bool] = False) -> None:
274 """Ask a kernel to shut down by its kernel uuid"""
276 @kernel_method
277 def finish_shutdown(
278 self,
279 kernel_id: str,
280 waittime: t.Optional[float] = None,
281 pollinterval: t.Optional[float] = 0.1,
282 ) -> None:
283 """Wait for a kernel to finish shutting down, and kill it if it doesn't"""
284 self.log.info("Kernel shutdown: %s", kernel_id)
286 @kernel_method
287 def cleanup_resources(self, kernel_id: str, restart: bool = False) -> None:
288 """Clean up a kernel's resources"""
290 def remove_kernel(self, kernel_id: str) -> KernelManager:
291 """remove a kernel from our mapping.
293 Mainly so that a kernel can be removed if it is already dead,
294 without having to call shutdown_kernel.
296 The kernel object is returned, or `None` if not found.
297 """
298 return self._kernels.pop(kernel_id, None)
300 async def _async_shutdown_all(self, now: bool = False) -> None:
301 """Shutdown all kernels."""
302 kids = self.list_kernel_ids()
303 kids += list(self._pending_kernels)
304 kms = list(self._kernels.values())
305 futs = [self._async_shutdown_kernel(kid, now=now) for kid in set(kids)]
306 await asyncio.gather(*futs)
307 # If using pending kernels, the kernels will not have been fully shut down.
308 if self._using_pending_kernels():
309 for km in kms:
310 try:
311 await km.ready
312 except asyncio.CancelledError:
313 self._pending_kernels[km.kernel_id].cancel()
314 except Exception:
315 # Will have been logged in _add_kernel_when_ready
316 pass
318 shutdown_all = run_sync(_async_shutdown_all)
320 def interrupt_kernel(self, kernel_id: str) -> None:
321 """Interrupt (SIGINT) the kernel by its uuid.
323 Parameters
324 ==========
325 kernel_id : uuid
326 The id of the kernel to interrupt.
327 """
328 kernel = self.get_kernel(kernel_id)
329 if not kernel.ready.done():
330 msg = "Kernel is in a pending state. Cannot interrupt."
331 raise RuntimeError(msg)
332 out = kernel.interrupt_kernel()
333 self.log.info("Kernel interrupted: %s", kernel_id)
334 return out
336 @kernel_method
337 def signal_kernel(self, kernel_id: str, signum: int) -> None:
338 """Sends a signal to the kernel by its uuid.
340 Note that since only SIGTERM is supported on Windows, this function
341 is only useful on Unix systems.
343 Parameters
344 ==========
345 kernel_id : uuid
346 The id of the kernel to signal.
347 signum : int
348 Signal number to send kernel.
349 """
350 self.log.info("Signaled Kernel %s with %s", kernel_id, signum)
352 async def _async_restart_kernel(self, kernel_id: str, now: bool = False) -> None:
353 """Restart a kernel by its uuid, keeping the same ports.
355 Parameters
356 ==========
357 kernel_id : uuid
358 The id of the kernel to interrupt.
359 now : bool, optional
360 If True, the kernel is forcefully restarted *immediately*, without
361 having a chance to do any cleanup action. Otherwise the kernel is
362 given 1s to clean up before a forceful restart is issued.
364 In all cases the kernel is restarted, the only difference is whether
365 it is given a chance to perform a clean shutdown or not.
366 """
367 kernel = self.get_kernel(kernel_id)
368 if self._using_pending_kernels() and not kernel.ready.done():
369 msg = "Kernel is in a pending state. Cannot restart."
370 raise RuntimeError(msg)
371 await ensure_async(kernel.restart_kernel(now=now))
372 self.log.info("Kernel restarted: %s", kernel_id)
374 restart_kernel = run_sync(_async_restart_kernel)
376 @kernel_method
377 def is_alive(self, kernel_id: str) -> bool: # type:ignore[empty-body]
378 """Is the kernel alive.
380 This calls KernelManager.is_alive() which calls Popen.poll on the
381 actual kernel subprocess.
383 Parameters
384 ==========
385 kernel_id : uuid
386 The id of the kernel.
387 """
389 def _check_kernel_id(self, kernel_id: str) -> None:
390 """check that a kernel id is valid"""
391 if kernel_id not in self:
392 raise KeyError("Kernel with id not found: %s" % kernel_id)
394 def get_kernel(self, kernel_id: str) -> KernelManager:
395 """Get the single KernelManager object for a kernel by its uuid.
397 Parameters
398 ==========
399 kernel_id : uuid
400 The id of the kernel.
401 """
402 self._check_kernel_id(kernel_id)
403 return self._kernels[kernel_id]
405 @kernel_method
406 def add_restart_callback(
407 self, kernel_id: str, callback: t.Callable, event: str = "restart"
408 ) -> None:
409 """add a callback for the KernelRestarter"""
411 @kernel_method
412 def remove_restart_callback(
413 self, kernel_id: str, callback: t.Callable, event: str = "restart"
414 ) -> None:
415 """remove a callback for the KernelRestarter"""
417 @kernel_method
418 def get_connection_info(self, kernel_id: str) -> t.Dict[str, t.Any]: # type:ignore[empty-body]
419 """Return a dictionary of connection data for a kernel.
421 Parameters
422 ==========
423 kernel_id : uuid
424 The id of the kernel.
426 Returns
427 =======
428 connection_dict : dict
429 A dict of the information needed to connect to a kernel.
430 This includes the ip address and the integer port
431 numbers of the different channels (stdin_port, iopub_port,
432 shell_port, hb_port).
433 """
435 @kernel_method
436 def connect_iopub( # type:ignore[empty-body]
437 self, kernel_id: str, identity: t.Optional[bytes] = None
438 ) -> socket.socket:
439 """Return a zmq Socket connected to the iopub channel.
441 Parameters
442 ==========
443 kernel_id : uuid
444 The id of the kernel
445 identity : bytes (optional)
446 The zmq identity of the socket
448 Returns
449 =======
450 stream : zmq Socket or ZMQStream
451 """
453 @kernel_method
454 def connect_shell( # type:ignore[empty-body]
455 self, kernel_id: str, identity: t.Optional[bytes] = None
456 ) -> socket.socket:
457 """Return a zmq Socket connected to the shell channel.
459 Parameters
460 ==========
461 kernel_id : uuid
462 The id of the kernel
463 identity : bytes (optional)
464 The zmq identity of the socket
466 Returns
467 =======
468 stream : zmq Socket or ZMQStream
469 """
471 @kernel_method
472 def connect_control( # type:ignore[empty-body]
473 self, kernel_id: str, identity: t.Optional[bytes] = None
474 ) -> socket.socket:
475 """Return a zmq Socket connected to the control channel.
477 Parameters
478 ==========
479 kernel_id : uuid
480 The id of the kernel
481 identity : bytes (optional)
482 The zmq identity of the socket
484 Returns
485 =======
486 stream : zmq Socket or ZMQStream
487 """
489 @kernel_method
490 def connect_stdin( # type:ignore[empty-body]
491 self, kernel_id: str, identity: t.Optional[bytes] = None
492 ) -> socket.socket:
493 """Return a zmq Socket connected to the stdin channel.
495 Parameters
496 ==========
497 kernel_id : uuid
498 The id of the kernel
499 identity : bytes (optional)
500 The zmq identity of the socket
502 Returns
503 =======
504 stream : zmq Socket or ZMQStream
505 """
507 @kernel_method
508 def connect_hb( # type:ignore[empty-body]
509 self, kernel_id: str, identity: t.Optional[bytes] = None
510 ) -> socket.socket:
511 """Return a zmq Socket connected to the hb channel.
513 Parameters
514 ==========
515 kernel_id : uuid
516 The id of the kernel
517 identity : bytes (optional)
518 The zmq identity of the socket
520 Returns
521 =======
522 stream : zmq Socket or ZMQStream
523 """
525 def new_kernel_id(self, **kwargs: t.Any) -> str:
526 """
527 Returns the id to associate with the kernel for this request. Subclasses may override
528 this method to substitute other sources of kernel ids.
529 :param kwargs:
530 :return: string-ized version 4 uuid
531 """
532 return str(uuid.uuid4())
535class AsyncMultiKernelManager(MultiKernelManager):
536 kernel_manager_class = DottedObjectName(
537 "jupyter_client.ioloop.AsyncIOLoopKernelManager",
538 config=True,
539 help="""The kernel manager class. This is configurable to allow
540 subclassing of the AsyncKernelManager for customized behavior.
541 """,
542 )
544 use_pending_kernels = Bool(
545 False,
546 help="""Whether to make kernels available before the process has started. The
547 kernel has a `.ready` future which can be awaited before connecting""",
548 ).tag(config=True)
550 context = Instance("zmq.asyncio.Context")
552 @default("context") # type:ignore[misc]
553 def _context_default(self) -> zmq.asyncio.Context:
554 self._created_context = True
555 return zmq.asyncio.Context()
557 start_kernel: t.Callable[
558 ..., t.Awaitable
559 ] = MultiKernelManager._async_start_kernel # type:ignore[assignment]
560 restart_kernel: t.Callable[
561 ..., t.Awaitable
562 ] = MultiKernelManager._async_restart_kernel # type:ignore[assignment]
563 shutdown_kernel: t.Callable[
564 ..., t.Awaitable
565 ] = MultiKernelManager._async_shutdown_kernel # type:ignore[assignment]
566 shutdown_all: t.Callable[
567 ..., t.Awaitable
568 ] = MultiKernelManager._async_shutdown_all # type:ignore[assignment]