1"""Base class to manage a running kernel"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5import asyncio
6import functools
7import os
8import re
9import signal
10import sys
11import typing as t
12import uuid
13import warnings
14from asyncio.futures import Future
15from concurrent.futures import Future as CFuture
16from contextlib import contextmanager
17from enum import Enum
18
19import zmq
20from jupyter_core.utils import run_sync
21from traitlets import (
22 Any,
23 Bool,
24 Dict,
25 DottedObjectName,
26 Float,
27 Instance,
28 Type,
29 Unicode,
30 default,
31 observe,
32 observe_compat,
33)
34from traitlets.utils.importstring import import_item
35
36from . import kernelspec
37from .asynchronous import AsyncKernelClient
38from .blocking import BlockingKernelClient
39from .client import KernelClient
40from .connect import ConnectionFileMixin
41from .managerabc import KernelManagerABC
42from .provisioning import KernelProvisionerBase
43from .provisioning import KernelProvisionerFactory as KPF # noqa
44
45
46class _ShutdownStatus(Enum):
47 """
48
49 This is so far used only for testing in order to track the internal state of
50 the shutdown logic, and verifying which path is taken for which
51 missbehavior.
52
53 """
54
55 Unset = None
56 ShutdownRequest = "ShutdownRequest"
57 SigtermRequest = "SigtermRequest"
58 SigkillRequest = "SigkillRequest"
59
60
61F = t.TypeVar("F", bound=t.Callable[..., t.Any])
62
63
64def _get_future() -> t.Union[Future, CFuture]:
65 """Get an appropriate Future object"""
66 try:
67 asyncio.get_running_loop()
68 return Future()
69 except RuntimeError:
70 # No event loop running, use concurrent future
71 return CFuture()
72
73
74def in_pending_state(method: F) -> F:
75 """Sets the kernel to a pending state by
76 creating a fresh Future for the KernelManager's `ready`
77 attribute. Once the method is finished, set the Future's results.
78 """
79
80 @t.no_type_check
81 @functools.wraps(method)
82 async def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
83 """Create a future for the decorated method."""
84 if self._attempted_start or not self._ready:
85 self._ready = _get_future()
86 try:
87 # call wrapped method, await, and set the result or exception.
88 out = await method(self, *args, **kwargs)
89 # Add a small sleep to ensure tests can capture the state before done
90 await asyncio.sleep(0.01)
91 if self.owns_kernel:
92 self._ready.set_result(None)
93 return out
94 except Exception as e:
95 self._ready.set_exception(e)
96 self.log.exception(self._ready.exception())
97 raise e
98
99 return t.cast(F, wrapper)
100
101
102class KernelManager(ConnectionFileMixin):
103 """Manages a single kernel in a subprocess on this host.
104
105 This version starts kernels with Popen.
106 """
107
108 _ready: t.Union[Future, CFuture] | None
109
110 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
111 """Initialize a kernel manager."""
112 if args:
113 warnings.warn(
114 "Passing positional only arguments to "
115 "`KernelManager.__init__` is deprecated since jupyter_client"
116 " 8.6, and will become an error on future versions. Positional "
117 " arguments have been ignored since jupyter_client 7.0",
118 DeprecationWarning,
119 stacklevel=2,
120 )
121 self._owns_kernel = kwargs.pop("owns_kernel", True)
122 super().__init__(**kwargs)
123 self._shutdown_status = _ShutdownStatus.Unset
124 self._attempted_start = False
125 self._ready = None
126
127 _created_context: Bool = Bool(False)
128
129 # The PyZMQ Context to use for communication with the kernel.
130 context: Instance = Instance(zmq.Context)
131
132 @default("context")
133 def _context_default(self) -> zmq.Context:
134 self._created_context = True
135 return zmq.Context()
136
137 # the class to create with our `client` method
138 client_class: DottedObjectName = DottedObjectName(
139 "jupyter_client.blocking.BlockingKernelClient", config=True
140 )
141 client_factory: Type = Type(klass=KernelClient, config=True)
142
143 @default("client_factory")
144 def _client_factory_default(self) -> Type:
145 return import_item(self.client_class)
146
147 @observe("client_class")
148 def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None:
149 self.client_factory = import_item(str(change["new"]))
150
151 kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True)
152
153 # The kernel provisioner with which this KernelManager is communicating.
154 # This will generally be a LocalProvisioner instance unless the kernelspec
155 # indicates otherwise.
156 provisioner: KernelProvisionerBase | None = None
157
158 kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager)
159
160 @default("kernel_spec_manager")
161 def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager:
162 return kernelspec.KernelSpecManager(data_dir=self.data_dir)
163
164 @observe("kernel_spec_manager")
165 @observe_compat
166 def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None:
167 self._kernel_spec = None
168
169 shutdown_wait_time: Float = Float(
170 5.0,
171 config=True,
172 help="Time to wait for a kernel to terminate before killing it, "
173 "in seconds. When a shutdown request is initiated, the kernel "
174 "will be immediately sent an interrupt (SIGINT), followed"
175 "by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
176 "it will be sent a terminate (SIGTERM) request, and finally at "
177 "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
178 "and kill may be equivalent on windows. Note that this value can be"
179 "overridden by the in-use kernel provisioner since shutdown times may"
180 "vary by provisioned environment.",
181 )
182
183 kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME)
184
185 @observe("kernel_name")
186 def _kernel_name_changed(self, change: t.Dict[str, str]) -> None:
187 self._kernel_spec = None
188 if change["new"] == "python":
189 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME
190
191 _kernel_spec: kernelspec.KernelSpec | None = None
192
193 @property
194 def kernel_spec(self) -> kernelspec.KernelSpec | None:
195 if self._kernel_spec is None and self.kernel_name != "":
196 self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name)
197 return self._kernel_spec
198
199 cache_ports: Bool = Bool(
200 False,
201 config=True,
202 help="True if the MultiKernelManager should cache ports for this KernelManager instance",
203 )
204
205 @default("cache_ports")
206 def _default_cache_ports(self) -> bool:
207 return self.transport == "tcp"
208
209 @property
210 def ready(self) -> t.Union[CFuture, Future]:
211 """A future that resolves when the kernel process has started for the first time"""
212 if not self._ready:
213 self._ready = _get_future()
214 return self._ready
215
216 @property
217 def ipykernel(self) -> bool:
218 return self.kernel_name in {"python", "python2", "python3"}
219
220 # Protected traits
221 _launch_args: t.Optional["Dict[str, Any]"] = Dict(allow_none=True)
222 _control_socket: Any = Any()
223
224 _restarter: Any = Any()
225
226 autorestart: Bool = Bool(
227 True, config=True, help="""Should we autorestart the kernel if it dies."""
228 )
229
230 shutting_down: bool = False
231
232 def __del__(self) -> None:
233 self._close_control_socket()
234 self.cleanup_connection_file()
235
236 # --------------------------------------------------------------------------
237 # Kernel restarter
238 # --------------------------------------------------------------------------
239
240 def start_restarter(self) -> None:
241 """Start the kernel restarter."""
242 pass
243
244 def stop_restarter(self) -> None:
245 """Stop the kernel restarter."""
246 pass
247
248 def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
249 """Register a callback to be called when a kernel is restarted"""
250 if self._restarter is None:
251 return
252 self._restarter.add_callback(callback, event)
253
254 def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
255 """Unregister a callback to be called when a kernel is restarted"""
256 if self._restarter is None:
257 return
258 self._restarter.remove_callback(callback, event)
259
260 # --------------------------------------------------------------------------
261 # create a Client connected to our Kernel
262 # --------------------------------------------------------------------------
263
264 def client(self, **kwargs: t.Any) -> BlockingKernelClient:
265 """Create a client configured to connect to our kernel"""
266 kw: dict = {}
267 kw.update(self.get_connection_info(session=True))
268 kw.update(
269 {
270 "connection_file": self.connection_file,
271 "parent": self,
272 }
273 )
274
275 # add kwargs last, for manual overrides
276 kw.update(kwargs)
277 return self.client_factory(**kw)
278
279 # --------------------------------------------------------------------------
280 # Kernel management
281 # --------------------------------------------------------------------------
282
283 def resolve_path(self, path: str) -> str | None:
284 """Resolve path to given file."""
285 assert self.provisioner is not None
286 return self.provisioner.resolve_path(path)
287
288 def update_env(self, *, env: t.Dict[str, str]) -> None:
289 """
290 Allow to update the environment of a kernel manager.
291
292 This will take effect only after kernel restart when the new env is
293 passed to the new kernel.
294
295 This is useful as some of the information of the current kernel reflect
296 the state of the session that started it, and those session information
297 (like the attach file path, or name), are mutable.
298
299 .. version-added: 8.5
300 """
301 # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict
302 if (
303 isinstance(self._launch_args, dict)
304 and "env" in self._launch_args
305 and isinstance(self._launch_args["env"], dict) # type: ignore [unreachable]
306 ):
307 self._launch_args["env"].update(env) # type: ignore [unreachable]
308
309 def format_kernel_cmd(self, extra_arguments: t.List[str] | None = None) -> t.List[str]:
310 """Replace templated args (e.g. {connection_file})"""
311 extra_arguments = extra_arguments or []
312 assert self.kernel_spec is not None
313 cmd = self.kernel_spec.argv + extra_arguments
314
315 if cmd and cmd[0] in {
316 "python",
317 "python%i" % sys.version_info[0],
318 "python%i.%i" % sys.version_info[:2],
319 }:
320 # executable is 'python' or 'python3', use sys.executable.
321 # These will typically be the same,
322 # but if the current process is in an env
323 # and has been launched by abspath without
324 # activating the env, python on PATH may not be sys.executable,
325 # but it should be.
326 cmd[0] = sys.executable
327
328 # Make sure to use the realpath for the connection_file
329 # On windows, when running with the store python, the connection_file path
330 # is not usable by non python kernels because the path is being rerouted when
331 # inside of a store app.
332 # See this bug here: https://bugs.python.org/issue41196
333 ns: t.Dict[str, t.Any] = {
334 "connection_file": os.path.realpath(self.connection_file),
335 "prefix": sys.prefix,
336 }
337
338 if self.kernel_spec: # type:ignore[truthy-bool]
339 ns["resource_dir"] = self.kernel_spec.resource_dir
340 assert isinstance(self._launch_args, dict)
341
342 ns.update(self._launch_args)
343
344 pat = re.compile(r"\{([A-Za-z0-9_]+)\}")
345
346 def from_ns(match: t.Any) -> t.Any:
347 """Get the key out of ns if it's there, otherwise no change."""
348 return ns.get(match.group(1), match.group())
349
350 return [pat.sub(from_ns, arg) for arg in cmd]
351
352 async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None:
353 """actually launch the kernel
354
355 override in a subclass to launch kernel subprocesses differently
356 Note that provisioners can now be used to customize kernel environments
357 and
358 """
359 assert self.provisioner is not None
360 connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw)
361 assert self.provisioner.has_process
362 # Provisioner provides the connection information. Load into kernel manager
363 # and write the connection file, if not already done.
364 self._reconcile_connection_info(connection_info)
365
366 _launch_kernel = run_sync(_async_launch_kernel)
367
368 # Control socket used for polite kernel shutdown
369
370 def _connect_control_socket(self) -> None:
371 if self._control_socket is None:
372 self._control_socket = self._create_connected_socket("control")
373 self._control_socket.linger = 100
374
375 def _close_control_socket(self) -> None:
376 if self._control_socket is None:
377 return
378 self._control_socket.close()
379 self._control_socket = None
380
381 async def _async_pre_start_kernel(
382 self, **kw: t.Any
383 ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]:
384 """Prepares a kernel for startup in a separate process.
385
386 If random ports (port=0) are being used, this method must be called
387 before the channels are created.
388
389 Parameters
390 ----------
391 `**kw` : optional
392 keyword arguments that are passed down to build the kernel_cmd
393 and launching the kernel (e.g. Popen kwargs).
394 """
395 self.shutting_down = False
396 self.kernel_id = self.kernel_id or kw.pop("kernel_id", str(uuid.uuid4()))
397 # save kwargs for use in restart
398 # assigning Traitlets Dicts to Dict make mypy unhappy but is ok
399 self._launch_args = kw.copy()
400 if self.provisioner is None: # will not be None on restarts
401 self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance(
402 self.kernel_id,
403 self.kernel_spec,
404 parent=self,
405 )
406 kw = await self.provisioner.pre_launch(**kw)
407 kernel_cmd = kw.pop("cmd")
408 return kernel_cmd, kw
409
410 pre_start_kernel = run_sync(_async_pre_start_kernel)
411
412 async def _async_post_start_kernel(self, **kw: t.Any) -> None:
413 """Performs any post startup tasks relative to the kernel.
414
415 Parameters
416 ----------
417 `**kw` : optional
418 keyword arguments that were used in the kernel process's launch.
419 """
420 self.start_restarter()
421 self._connect_control_socket()
422 assert self.provisioner is not None
423 await self.provisioner.post_launch(**kw)
424
425 post_start_kernel = run_sync(_async_post_start_kernel)
426
427 @in_pending_state
428 async def _async_start_kernel(self, **kw: t.Any) -> None:
429 """Starts a kernel on this host in a separate process.
430
431 If random ports (port=0) are being used, this method must be called
432 before the channels are created.
433
434 Parameters
435 ----------
436 `**kw` : optional
437 keyword arguments that are passed down to build the kernel_cmd
438 and launching the kernel (e.g. Popen kwargs).
439 """
440 self._attempted_start = True
441 kernel_cmd, kw = await self._async_pre_start_kernel(**kw)
442
443 # launch the kernel subprocess
444 self.log.debug("Starting kernel: %s", kernel_cmd)
445 await self._async_launch_kernel(kernel_cmd, **kw)
446 await self._async_post_start_kernel(**kw)
447
448 start_kernel = run_sync(_async_start_kernel)
449
450 async def _async_request_shutdown(self, restart: bool = False) -> None:
451 """Send a shutdown request via control channel"""
452 content = {"restart": restart}
453 msg = self.session.msg("shutdown_request", content=content)
454 # ensure control socket is connected
455 self._connect_control_socket()
456 self.session.send(self._control_socket, msg)
457 assert self.provisioner is not None
458 await self.provisioner.shutdown_requested(restart=restart)
459 self._shutdown_status = _ShutdownStatus.ShutdownRequest
460
461 request_shutdown = run_sync(_async_request_shutdown)
462
463 async def _async_finish_shutdown(
464 self,
465 waittime: float | None = None,
466 pollinterval: float = 0.1,
467 restart: bool = False,
468 ) -> None:
469 """Wait for kernel shutdown, then kill process if it doesn't shutdown.
470
471 This does not send shutdown requests - use :meth:`request_shutdown`
472 first.
473 """
474 if waittime is None:
475 waittime = max(self.shutdown_wait_time, 0)
476 if self.provisioner: # Allow provisioner to override
477 waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime)
478
479 try:
480 await asyncio.wait_for(
481 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
482 )
483 except asyncio.TimeoutError:
484 self.log.debug("Kernel is taking too long to finish, terminating")
485 self._shutdown_status = _ShutdownStatus.SigtermRequest
486 await self._async_send_kernel_sigterm()
487
488 try:
489 await asyncio.wait_for(
490 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
491 )
492 except asyncio.TimeoutError:
493 self.log.debug("Kernel is taking too long to finish, killing")
494 self._shutdown_status = _ShutdownStatus.SigkillRequest
495 await self._async_kill_kernel(restart=restart)
496 else:
497 # Process is no longer alive, wait and clear
498 if self.has_kernel:
499 assert self.provisioner is not None
500 await self.provisioner.wait()
501
502 finish_shutdown = run_sync(_async_finish_shutdown)
503
504 async def _async_cleanup_resources(self, restart: bool = False) -> None:
505 """Clean up resources when the kernel is shut down"""
506 if not restart:
507 self.cleanup_connection_file()
508
509 self.cleanup_ipc_files()
510 self._close_control_socket()
511 self.session.parent = None
512
513 if self._created_context and not restart:
514 self.context.destroy(linger=100)
515
516 if self.provisioner:
517 await self.provisioner.cleanup(restart=restart)
518
519 cleanup_resources = run_sync(_async_cleanup_resources)
520
521 @in_pending_state
522 async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
523 """Attempts to stop the kernel process cleanly.
524
525 This attempts to shutdown the kernels cleanly by:
526
527 1. Sending it a shutdown message over the control channel.
528 2. If that fails, the kernel is shutdown forcibly by sending it
529 a signal.
530
531 Parameters
532 ----------
533 now : bool
534 Should the kernel be forcible killed *now*. This skips the
535 first, nice shutdown attempt.
536 restart: bool
537 Will this kernel be restarted after it is shutdown. When this
538 is True, connection files will not be cleaned up.
539 """
540 if not self.owns_kernel:
541 return
542
543 self.shutting_down = True # Used by restarter to prevent race condition
544 # Stop monitoring for restarting while we shutdown.
545 self.stop_restarter()
546
547 if self.has_kernel:
548 await self._async_interrupt_kernel()
549
550 if now:
551 await self._async_kill_kernel()
552 else:
553 await self._async_request_shutdown(restart=restart)
554 # Don't send any additional kernel kill messages immediately, to give
555 # the kernel a chance to properly execute shutdown actions. Wait for at
556 # most 1s, checking every 0.1s.
557 await self._async_finish_shutdown(restart=restart)
558
559 await self._async_cleanup_resources(restart=restart)
560
561 shutdown_kernel = run_sync(_async_shutdown_kernel)
562
563 async def _async_restart_kernel(
564 self, now: bool = False, newports: bool = False, **kw: t.Any
565 ) -> None:
566 """Restarts a kernel with the arguments that were used to launch it.
567
568 Parameters
569 ----------
570 now : bool, optional
571 If True, the kernel is forcefully restarted *immediately*, without
572 having a chance to do any cleanup action. Otherwise the kernel is
573 given 1s to clean up before a forceful restart is issued.
574
575 In all cases the kernel is restarted, the only difference is whether
576 it is given a chance to perform a clean shutdown or not.
577
578 newports : bool, optional
579 If the old kernel was launched with random ports, this flag decides
580 whether the same ports and connection file will be used again.
581 If False, the same ports and connection file are used. This is
582 the default. If True, new random port numbers are chosen and a
583 new connection file is written. It is still possible that the newly
584 chosen random port numbers happen to be the same as the old ones.
585
586 `**kw` : optional
587 Any options specified here will overwrite those used to launch the
588 kernel.
589 """
590 if self._launch_args is None:
591 msg = "Cannot restart the kernel. No previous call to 'start_kernel'."
592 raise RuntimeError(msg)
593
594 # Stop currently running kernel.
595 await self._async_shutdown_kernel(now=now, restart=True)
596
597 if newports:
598 self.cleanup_random_ports()
599
600 # Start new kernel.
601 self._launch_args.update(kw)
602 await self._async_start_kernel(**self._launch_args)
603
604 restart_kernel = run_sync(_async_restart_kernel)
605
606 @property
607 def owns_kernel(self) -> bool:
608 return self._owns_kernel
609
610 @property
611 def has_kernel(self) -> bool:
612 """Has a kernel process been started that we are actively managing."""
613 return self.provisioner is not None and self.provisioner.has_process
614
615 async def _async_send_kernel_sigterm(self, restart: bool = False) -> None:
616 """similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
617 if self.has_kernel:
618 assert self.provisioner is not None
619 await self.provisioner.terminate(restart=restart)
620
621 _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm)
622
623 async def _async_kill_kernel(self, restart: bool = False) -> None:
624 """Kill the running kernel.
625
626 This is a private method, callers should use shutdown_kernel(now=True).
627 """
628 if self.has_kernel:
629 assert self.provisioner is not None
630 await self.provisioner.kill(restart=restart)
631
632 # Wait until the kernel terminates.
633 try:
634 await asyncio.wait_for(self._async_wait(), timeout=5.0)
635 except asyncio.TimeoutError:
636 # Wait timed out, just log warning but continue - not much more we can do.
637 self.log.warning("Wait for final termination of kernel timed out - continuing...")
638 pass
639 else:
640 # Process is no longer alive, wait and clear
641 if self.has_kernel:
642 await self.provisioner.wait()
643
644 _kill_kernel = run_sync(_async_kill_kernel)
645
646 async def _async_interrupt_kernel(self) -> None:
647 """Interrupts the kernel by sending it a signal.
648
649 Unlike ``signal_kernel``, this operation is well supported on all
650 platforms.
651 """
652 if not self.has_kernel and self._ready is not None:
653 if isinstance(self._ready, CFuture):
654 ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready))
655 else:
656 ready = self._ready
657 # Wait for a shutdown if one is in progress.
658 if self.shutting_down:
659 await ready
660 # Wait for a startup.
661 await ready
662
663 if self.has_kernel:
664 assert self.kernel_spec is not None
665 interrupt_mode = self.kernel_spec.interrupt_mode
666 if interrupt_mode == "signal":
667 await self._async_signal_kernel(signal.SIGINT)
668
669 elif interrupt_mode == "message":
670 msg = self.session.msg("interrupt_request", content={})
671 self._connect_control_socket()
672 self.session.send(self._control_socket, msg)
673 else:
674 msg = "Cannot interrupt kernel. No kernel is running!"
675 raise RuntimeError(msg)
676
677 interrupt_kernel = run_sync(_async_interrupt_kernel)
678
679 async def _async_signal_kernel(self, signum: int) -> None:
680 """Sends a signal to the process group of the kernel (this
681 usually includes the kernel and any subprocesses spawned by
682 the kernel).
683
684 Note that since only SIGTERM is supported on Windows, this function is
685 only useful on Unix systems.
686 """
687 if self.has_kernel:
688 assert self.provisioner is not None
689 await self.provisioner.send_signal(signum)
690 else:
691 msg = "Cannot signal kernel. No kernel is running!"
692 raise RuntimeError(msg)
693
694 signal_kernel = run_sync(_async_signal_kernel)
695
696 async def _async_is_alive(self) -> bool:
697 """Is the kernel process still running?"""
698 if not self.owns_kernel:
699 return True
700
701 if self.has_kernel:
702 assert self.provisioner is not None
703 ret = await self.provisioner.poll()
704 if ret is None:
705 return True
706 return False
707
708 is_alive = run_sync(_async_is_alive)
709
710 async def _async_wait(self, pollinterval: float = 0.1) -> None:
711 # Use busy loop at 100ms intervals, polling until the process is
712 # not alive. If we find the process is no longer alive, complete
713 # its cleanup via the blocking wait(). Callers are responsible for
714 # issuing calls to wait() using a timeout (see _kill_kernel()).
715 while await self._async_is_alive():
716 await asyncio.sleep(pollinterval)
717
718
719class AsyncKernelManager(KernelManager):
720 """An async kernel manager."""
721
722 # the class to create with our `client` method
723 client_class: DottedObjectName = DottedObjectName(
724 "jupyter_client.asynchronous.AsyncKernelClient", config=True
725 )
726 client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient", config=True)
727
728 # The PyZMQ Context to use for communication with the kernel.
729 context: Instance = Instance(zmq.asyncio.Context)
730
731 @default("context")
732 def _context_default(self) -> zmq.asyncio.Context:
733 self._created_context = True
734 return zmq.asyncio.Context()
735
736 def client( # type:ignore[override]
737 self, **kwargs: t.Any
738 ) -> AsyncKernelClient:
739 """Get a client for the manager."""
740 return super().client(**kwargs) # type:ignore[return-value]
741
742 _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment]
743 start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_start_kernel # type:ignore[assignment]
744 pre_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_pre_start_kernel # type:ignore[assignment]
745 post_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_post_start_kernel # type:ignore[assignment]
746 request_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_request_shutdown # type:ignore[assignment]
747 finish_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_finish_shutdown # type:ignore[assignment]
748 cleanup_resources: t.Callable[..., t.Awaitable] = KernelManager._async_cleanup_resources # type:ignore[assignment]
749 shutdown_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_shutdown_kernel # type:ignore[assignment]
750 restart_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_restart_kernel # type:ignore[assignment]
751 _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment]
752 _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment]
753 interrupt_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_interrupt_kernel # type:ignore[assignment]
754 signal_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_signal_kernel # type:ignore[assignment]
755 is_alive: t.Callable[..., t.Awaitable] = KernelManager._async_is_alive # type:ignore[assignment]
756
757
758KernelManagerABC.register(KernelManager)
759
760
761def start_new_kernel(
762 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
763) -> t.Tuple[KernelManager, BlockingKernelClient]:
764 """Start a new kernel, and return its Manager and Client"""
765 km = KernelManager(kernel_name=kernel_name)
766 km.start_kernel(**kwargs)
767 kc = km.client()
768 kc.start_channels()
769 try:
770 kc.wait_for_ready(timeout=startup_timeout)
771 except RuntimeError:
772 kc.stop_channels()
773 km.shutdown_kernel()
774 raise
775
776 return km, kc
777
778
779async def start_new_async_kernel(
780 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
781) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]:
782 """Start a new kernel, and return its Manager and Client"""
783 km = AsyncKernelManager(kernel_name=kernel_name)
784 await km.start_kernel(**kwargs)
785 kc = km.client()
786 kc.start_channels()
787 try:
788 await kc.wait_for_ready(timeout=startup_timeout)
789 except RuntimeError:
790 kc.stop_channels()
791 await km.shutdown_kernel()
792 raise
793
794 return (km, kc)
795
796
797@contextmanager
798def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]:
799 """Context manager to create a kernel in a subprocess.
800
801 The kernel is shut down when the context exits.
802
803 Returns
804 -------
805 kernel_client: connected KernelClient instance
806 """
807 km, kc = start_new_kernel(**kwargs)
808 try:
809 yield kc
810 finally:
811 kc.stop_channels()
812 km.shutdown_kernel(now=True)