Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/manager.py: 39%
380 statements
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
« prev ^ index » next coverage.py v7.3.3, created at 2023-12-15 06:13 +0000
1"""Base class to manage a running kernel"""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4import asyncio
5import functools
6import os
7import re
8import signal
9import sys
10import typing as t
11import uuid
12import warnings
13from asyncio.futures import Future
14from concurrent.futures import Future as CFuture
15from contextlib import contextmanager
16from enum import Enum
18import zmq
19from jupyter_core.utils import run_sync
20from traitlets import (
21 Any,
22 Bool,
23 Dict,
24 DottedObjectName,
25 Float,
26 Instance,
27 Type,
28 Unicode,
29 default,
30 observe,
31 observe_compat,
32)
33from traitlets.utils.importstring import import_item
35from . import kernelspec
36from .asynchronous import AsyncKernelClient
37from .blocking import BlockingKernelClient
38from .client import KernelClient
39from .connect import ConnectionFileMixin
40from .managerabc import KernelManagerABC
41from .provisioning import KernelProvisionerBase
42from .provisioning import KernelProvisionerFactory as KPF # noqa
45class _ShutdownStatus(Enum):
46 """
48 This is so far used only for testing in order to track the internal state of
49 the shutdown logic, and verifying which path is taken for which
50 missbehavior.
52 """
54 Unset = None
55 ShutdownRequest = "ShutdownRequest"
56 SigtermRequest = "SigtermRequest"
57 SigkillRequest = "SigkillRequest"
60F = t.TypeVar("F", bound=t.Callable[..., t.Any])
63def _get_future() -> t.Union[Future, CFuture]:
64 """Get an appropriate Future object"""
65 try:
66 asyncio.get_running_loop()
67 return Future()
68 except RuntimeError:
69 # No event loop running, use concurrent future
70 return CFuture()
73def in_pending_state(method: F) -> F:
74 """Sets the kernel to a pending state by
75 creating a fresh Future for the KernelManager's `ready`
76 attribute. Once the method is finished, set the Future's results.
77 """
79 @t.no_type_check
80 @functools.wraps(method)
81 async def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
82 """Create a future for the decorated method."""
83 if self._attempted_start or not self._ready:
84 self._ready = _get_future()
85 try:
86 # call wrapped method, await, and set the result or exception.
87 out = await method(self, *args, **kwargs)
88 # Add a small sleep to ensure tests can capture the state before done
89 await asyncio.sleep(0.01)
90 if self.owns_kernel:
91 self._ready.set_result(None)
92 return out
93 except Exception as e:
94 self._ready.set_exception(e)
95 self.log.exception(self._ready.exception())
96 raise e
98 return t.cast(F, wrapper)
101class KernelManager(ConnectionFileMixin):
102 """Manages a single kernel in a subprocess on this host.
104 This version starts kernels with Popen.
105 """
107 _ready: t.Optional[t.Union[Future, CFuture]]
109 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
110 """Initialize a kernel manager."""
111 if args:
112 warnings.warn(
113 "Passing positional only arguments to "
114 "`KernelManager.__init__` is deprecated since jupyter_client"
115 " 8.6, and will become an error on future versions. Positional "
116 " arguments have been ignored since jupyter_client 7.0",
117 DeprecationWarning,
118 stacklevel=2,
119 )
120 self._owns_kernel = kwargs.pop("owns_kernel", True)
121 super().__init__(**kwargs)
122 self._shutdown_status = _ShutdownStatus.Unset
123 self._attempted_start = False
124 self._ready = None
126 _created_context: Bool = Bool(False)
128 # The PyZMQ Context to use for communication with the kernel.
129 context: Instance = Instance(zmq.Context)
131 @default("context")
132 def _context_default(self) -> zmq.Context:
133 self._created_context = True
134 return zmq.Context()
136 # the class to create with our `client` method
137 client_class: DottedObjectName = DottedObjectName(
138 "jupyter_client.blocking.BlockingKernelClient"
139 )
140 client_factory: Type = Type(klass=KernelClient)
142 @default("client_factory")
143 def _client_factory_default(self) -> Type:
144 return import_item(self.client_class)
146 @observe("client_class")
147 def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None:
148 self.client_factory = import_item(str(change["new"]))
150 kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True)
152 # The kernel provisioner with which this KernelManager is communicating.
153 # This will generally be a LocalProvisioner instance unless the kernelspec
154 # indicates otherwise.
155 provisioner: t.Optional[KernelProvisionerBase] = None
157 kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager)
159 @default("kernel_spec_manager")
160 def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager:
161 return kernelspec.KernelSpecManager(data_dir=self.data_dir)
163 @observe("kernel_spec_manager")
164 @observe_compat
165 def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None:
166 self._kernel_spec = None
168 shutdown_wait_time: Float = Float(
169 5.0,
170 config=True,
171 help="Time to wait for a kernel to terminate before killing it, "
172 "in seconds. When a shutdown request is initiated, the kernel "
173 "will be immediately sent an interrupt (SIGINT), followed"
174 "by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
175 "it will be sent a terminate (SIGTERM) request, and finally at "
176 "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
177 "and kill may be equivalent on windows. Note that this value can be"
178 "overridden by the in-use kernel provisioner since shutdown times may"
179 "vary by provisioned environment.",
180 )
182 kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME)
184 @observe("kernel_name")
185 def _kernel_name_changed(self, change: t.Dict[str, str]) -> None:
186 self._kernel_spec = None
187 if change["new"] == "python":
188 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME
190 _kernel_spec: t.Optional[kernelspec.KernelSpec] = None
192 @property
193 def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]:
194 if self._kernel_spec is None and self.kernel_name != "":
195 self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name)
196 return self._kernel_spec
198 cache_ports: Bool = Bool(
199 False,
200 config=True,
201 help="True if the MultiKernelManager should cache ports for this KernelManager instance",
202 )
204 @default("cache_ports")
205 def _default_cache_ports(self) -> bool:
206 return self.transport == "tcp"
208 @property
209 def ready(self) -> t.Union[CFuture, Future]:
210 """A future that resolves when the kernel process has started for the first time"""
211 if not self._ready:
212 self._ready = _get_future()
213 return self._ready
215 @property
216 def ipykernel(self) -> bool:
217 return self.kernel_name in {"python", "python2", "python3"}
219 # Protected traits
220 _launch_args: t.Optional["Dict[str, Any]"] = Dict(allow_none=True)
221 _control_socket: Any = Any()
223 _restarter: Any = Any()
225 autorestart: Bool = Bool(
226 True, config=True, help="""Should we autorestart the kernel if it dies."""
227 )
229 shutting_down: bool = False
231 def __del__(self) -> None:
232 self._close_control_socket()
233 self.cleanup_connection_file()
235 # --------------------------------------------------------------------------
236 # Kernel restarter
237 # --------------------------------------------------------------------------
239 def start_restarter(self) -> None:
240 """Start the kernel restarter."""
241 pass
243 def stop_restarter(self) -> None:
244 """Stop the kernel restarter."""
245 pass
247 def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
248 """Register a callback to be called when a kernel is restarted"""
249 if self._restarter is None:
250 return
251 self._restarter.add_callback(callback, event)
253 def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
254 """Unregister a callback to be called when a kernel is restarted"""
255 if self._restarter is None:
256 return
257 self._restarter.remove_callback(callback, event)
259 # --------------------------------------------------------------------------
260 # create a Client connected to our Kernel
261 # --------------------------------------------------------------------------
263 def client(self, **kwargs: t.Any) -> BlockingKernelClient:
264 """Create a client configured to connect to our kernel"""
265 kw: dict = {}
266 kw.update(self.get_connection_info(session=True))
267 kw.update(
268 {
269 "connection_file": self.connection_file,
270 "parent": self,
271 }
272 )
274 # add kwargs last, for manual overrides
275 kw.update(kwargs)
276 return self.client_factory(**kw)
278 # --------------------------------------------------------------------------
279 # Kernel management
280 # --------------------------------------------------------------------------
282 def update_env(self, *, env: t.Dict[str, str]) -> None:
283 """
284 Allow to update the environment of a kernel manager.
286 This will take effect only after kernel restart when the new env is
287 passed to the new kernel.
289 This is useful as some of the information of the current kernel reflect
290 the state of the session that started it, and those session information
291 (like the attach file path, or name), are mutable.
293 .. version-added: 8.5
294 """
295 # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict
296 if (
297 isinstance(self._launch_args, dict)
298 and "env" in self._launch_args
299 and isinstance(self._launch_args["env"], dict) # type: ignore [unreachable]
300 ):
301 self._launch_args["env"].update(env) # type: ignore [unreachable]
303 def format_kernel_cmd(self, extra_arguments: t.Optional[t.List[str]] = None) -> t.List[str]:
304 """Replace templated args (e.g. {connection_file})"""
305 extra_arguments = extra_arguments or []
306 assert self.kernel_spec is not None
307 cmd = self.kernel_spec.argv + extra_arguments
309 if cmd and cmd[0] in {
310 "python",
311 "python%i" % sys.version_info[0],
312 "python%i.%i" % sys.version_info[:2],
313 }:
314 # executable is 'python' or 'python3', use sys.executable.
315 # These will typically be the same,
316 # but if the current process is in an env
317 # and has been launched by abspath without
318 # activating the env, python on PATH may not be sys.executable,
319 # but it should be.
320 cmd[0] = sys.executable
322 # Make sure to use the realpath for the connection_file
323 # On windows, when running with the store python, the connection_file path
324 # is not usable by non python kernels because the path is being rerouted when
325 # inside of a store app.
326 # See this bug here: https://bugs.python.org/issue41196
327 ns: t.Dict[str, t.Any] = {
328 "connection_file": os.path.realpath(self.connection_file),
329 "prefix": sys.prefix,
330 }
332 if self.kernel_spec: # type:ignore[truthy-bool]
333 ns["resource_dir"] = self.kernel_spec.resource_dir
334 assert isinstance(self._launch_args, dict)
336 ns.update(self._launch_args)
338 pat = re.compile(r"\{([A-Za-z0-9_]+)\}")
340 def from_ns(match: t.Any) -> t.Any:
341 """Get the key out of ns if it's there, otherwise no change."""
342 return ns.get(match.group(1), match.group())
344 return [pat.sub(from_ns, arg) for arg in cmd]
346 async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None:
347 """actually launch the kernel
349 override in a subclass to launch kernel subprocesses differently
350 Note that provisioners can now be used to customize kernel environments
351 and
352 """
353 assert self.provisioner is not None
354 connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw)
355 assert self.provisioner.has_process
356 # Provisioner provides the connection information. Load into kernel manager
357 # and write the connection file, if not already done.
358 self._reconcile_connection_info(connection_info)
360 _launch_kernel = run_sync(_async_launch_kernel)
362 # Control socket used for polite kernel shutdown
364 def _connect_control_socket(self) -> None:
365 if self._control_socket is None:
366 self._control_socket = self._create_connected_socket("control")
367 self._control_socket.linger = 100
369 def _close_control_socket(self) -> None:
370 if self._control_socket is None:
371 return
372 self._control_socket.close()
373 self._control_socket = None
375 async def _async_pre_start_kernel(
376 self, **kw: t.Any
377 ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]:
378 """Prepares a kernel for startup in a separate process.
380 If random ports (port=0) are being used, this method must be called
381 before the channels are created.
383 Parameters
384 ----------
385 `**kw` : optional
386 keyword arguments that are passed down to build the kernel_cmd
387 and launching the kernel (e.g. Popen kwargs).
388 """
389 self.shutting_down = False
390 self.kernel_id = self.kernel_id or kw.pop("kernel_id", str(uuid.uuid4()))
391 # save kwargs for use in restart
392 # assigning Traitlets Dicts to Dict make mypy unhappy but is ok
393 self._launch_args = kw.copy() # type:ignore [assignment]
394 if self.provisioner is None: # will not be None on restarts
395 self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance(
396 self.kernel_id,
397 self.kernel_spec,
398 parent=self,
399 )
400 kw = await self.provisioner.pre_launch(**kw)
401 kernel_cmd = kw.pop("cmd")
402 return kernel_cmd, kw
404 pre_start_kernel = run_sync(_async_pre_start_kernel)
406 async def _async_post_start_kernel(self, **kw: t.Any) -> None:
407 """Performs any post startup tasks relative to the kernel.
409 Parameters
410 ----------
411 `**kw` : optional
412 keyword arguments that were used in the kernel process's launch.
413 """
414 self.start_restarter()
415 self._connect_control_socket()
416 assert self.provisioner is not None
417 await self.provisioner.post_launch(**kw)
419 post_start_kernel = run_sync(_async_post_start_kernel)
421 @in_pending_state
422 async def _async_start_kernel(self, **kw: t.Any) -> None:
423 """Starts a kernel on this host in a separate process.
425 If random ports (port=0) are being used, this method must be called
426 before the channels are created.
428 Parameters
429 ----------
430 `**kw` : optional
431 keyword arguments that are passed down to build the kernel_cmd
432 and launching the kernel (e.g. Popen kwargs).
433 """
434 self._attempted_start = True
435 kernel_cmd, kw = await self._async_pre_start_kernel(**kw)
437 # launch the kernel subprocess
438 self.log.debug("Starting kernel: %s", kernel_cmd)
439 await self._async_launch_kernel(kernel_cmd, **kw)
440 await self._async_post_start_kernel(**kw)
442 start_kernel = run_sync(_async_start_kernel)
444 async def _async_request_shutdown(self, restart: bool = False) -> None:
445 """Send a shutdown request via control channel"""
446 content = {"restart": restart}
447 msg = self.session.msg("shutdown_request", content=content)
448 # ensure control socket is connected
449 self._connect_control_socket()
450 self.session.send(self._control_socket, msg)
451 assert self.provisioner is not None
452 await self.provisioner.shutdown_requested(restart=restart)
453 self._shutdown_status = _ShutdownStatus.ShutdownRequest
455 request_shutdown = run_sync(_async_request_shutdown)
457 async def _async_finish_shutdown(
458 self,
459 waittime: t.Optional[float] = None,
460 pollinterval: float = 0.1,
461 restart: bool = False,
462 ) -> None:
463 """Wait for kernel shutdown, then kill process if it doesn't shutdown.
465 This does not send shutdown requests - use :meth:`request_shutdown`
466 first.
467 """
468 if waittime is None:
469 waittime = max(self.shutdown_wait_time, 0)
470 if self.provisioner: # Allow provisioner to override
471 waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime)
473 try:
474 await asyncio.wait_for(
475 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
476 )
477 except asyncio.TimeoutError:
478 self.log.debug("Kernel is taking too long to finish, terminating")
479 self._shutdown_status = _ShutdownStatus.SigtermRequest
480 await self._async_send_kernel_sigterm()
482 try:
483 await asyncio.wait_for(
484 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
485 )
486 except asyncio.TimeoutError:
487 self.log.debug("Kernel is taking too long to finish, killing")
488 self._shutdown_status = _ShutdownStatus.SigkillRequest
489 await self._async_kill_kernel(restart=restart)
490 else:
491 # Process is no longer alive, wait and clear
492 if self.has_kernel:
493 assert self.provisioner is not None
494 await self.provisioner.wait()
496 finish_shutdown = run_sync(_async_finish_shutdown)
498 async def _async_cleanup_resources(self, restart: bool = False) -> None:
499 """Clean up resources when the kernel is shut down"""
500 if not restart:
501 self.cleanup_connection_file()
503 self.cleanup_ipc_files()
504 self._close_control_socket()
505 self.session.parent = None
507 if self._created_context and not restart:
508 self.context.destroy(linger=100)
510 if self.provisioner:
511 await self.provisioner.cleanup(restart=restart)
513 cleanup_resources = run_sync(_async_cleanup_resources)
515 @in_pending_state
516 async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
517 """Attempts to stop the kernel process cleanly.
519 This attempts to shutdown the kernels cleanly by:
521 1. Sending it a shutdown message over the control channel.
522 2. If that fails, the kernel is shutdown forcibly by sending it
523 a signal.
525 Parameters
526 ----------
527 now : bool
528 Should the kernel be forcible killed *now*. This skips the
529 first, nice shutdown attempt.
530 restart: bool
531 Will this kernel be restarted after it is shutdown. When this
532 is True, connection files will not be cleaned up.
533 """
534 if not self.owns_kernel:
535 return
537 self.shutting_down = True # Used by restarter to prevent race condition
538 # Stop monitoring for restarting while we shutdown.
539 self.stop_restarter()
541 if self.has_kernel:
542 await self._async_interrupt_kernel()
544 if now:
545 await self._async_kill_kernel()
546 else:
547 await self._async_request_shutdown(restart=restart)
548 # Don't send any additional kernel kill messages immediately, to give
549 # the kernel a chance to properly execute shutdown actions. Wait for at
550 # most 1s, checking every 0.1s.
551 await self._async_finish_shutdown(restart=restart)
553 await self._async_cleanup_resources(restart=restart)
555 shutdown_kernel = run_sync(_async_shutdown_kernel)
557 async def _async_restart_kernel(
558 self, now: bool = False, newports: bool = False, **kw: t.Any
559 ) -> None:
560 """Restarts a kernel with the arguments that were used to launch it.
562 Parameters
563 ----------
564 now : bool, optional
565 If True, the kernel is forcefully restarted *immediately*, without
566 having a chance to do any cleanup action. Otherwise the kernel is
567 given 1s to clean up before a forceful restart is issued.
569 In all cases the kernel is restarted, the only difference is whether
570 it is given a chance to perform a clean shutdown or not.
572 newports : bool, optional
573 If the old kernel was launched with random ports, this flag decides
574 whether the same ports and connection file will be used again.
575 If False, the same ports and connection file are used. This is
576 the default. If True, new random port numbers are chosen and a
577 new connection file is written. It is still possible that the newly
578 chosen random port numbers happen to be the same as the old ones.
580 `**kw` : optional
581 Any options specified here will overwrite those used to launch the
582 kernel.
583 """
584 if self._launch_args is None:
585 msg = "Cannot restart the kernel. No previous call to 'start_kernel'."
586 raise RuntimeError(msg)
588 # Stop currently running kernel.
589 await self._async_shutdown_kernel(now=now, restart=True)
591 if newports:
592 self.cleanup_random_ports()
594 # Start new kernel.
595 self._launch_args.update(kw)
596 await self._async_start_kernel(**self._launch_args)
598 restart_kernel = run_sync(_async_restart_kernel)
600 @property
601 def owns_kernel(self) -> bool:
602 return self._owns_kernel
604 @property
605 def has_kernel(self) -> bool:
606 """Has a kernel process been started that we are actively managing."""
607 return self.provisioner is not None and self.provisioner.has_process
609 async def _async_send_kernel_sigterm(self, restart: bool = False) -> None:
610 """similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
611 if self.has_kernel:
612 assert self.provisioner is not None
613 await self.provisioner.terminate(restart=restart)
615 _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm)
617 async def _async_kill_kernel(self, restart: bool = False) -> None:
618 """Kill the running kernel.
620 This is a private method, callers should use shutdown_kernel(now=True).
621 """
622 if self.has_kernel:
623 assert self.provisioner is not None
624 await self.provisioner.kill(restart=restart)
626 # Wait until the kernel terminates.
627 try:
628 await asyncio.wait_for(self._async_wait(), timeout=5.0)
629 except asyncio.TimeoutError:
630 # Wait timed out, just log warning but continue - not much more we can do.
631 self.log.warning("Wait for final termination of kernel timed out - continuing...")
632 pass
633 else:
634 # Process is no longer alive, wait and clear
635 if self.has_kernel:
636 await self.provisioner.wait()
638 _kill_kernel = run_sync(_async_kill_kernel)
640 async def _async_interrupt_kernel(self) -> None:
641 """Interrupts the kernel by sending it a signal.
643 Unlike ``signal_kernel``, this operation is well supported on all
644 platforms.
645 """
646 if not self.has_kernel and self._ready is not None:
647 if isinstance(self._ready, CFuture):
648 ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready))
649 else:
650 ready = self._ready
651 # Wait for a shutdown if one is in progress.
652 if self.shutting_down:
653 await ready
654 # Wait for a startup.
655 await ready
657 if self.has_kernel:
658 assert self.kernel_spec is not None
659 interrupt_mode = self.kernel_spec.interrupt_mode
660 if interrupt_mode == "signal":
661 await self._async_signal_kernel(signal.SIGINT)
663 elif interrupt_mode == "message":
664 msg = self.session.msg("interrupt_request", content={})
665 self._connect_control_socket()
666 self.session.send(self._control_socket, msg)
667 else:
668 msg = "Cannot interrupt kernel. No kernel is running!"
669 raise RuntimeError(msg)
671 interrupt_kernel = run_sync(_async_interrupt_kernel)
673 async def _async_signal_kernel(self, signum: int) -> None:
674 """Sends a signal to the process group of the kernel (this
675 usually includes the kernel and any subprocesses spawned by
676 the kernel).
678 Note that since only SIGTERM is supported on Windows, this function is
679 only useful on Unix systems.
680 """
681 if self.has_kernel:
682 assert self.provisioner is not None
683 await self.provisioner.send_signal(signum)
684 else:
685 msg = "Cannot signal kernel. No kernel is running!"
686 raise RuntimeError(msg)
688 signal_kernel = run_sync(_async_signal_kernel)
690 async def _async_is_alive(self) -> bool:
691 """Is the kernel process still running?"""
692 if not self.owns_kernel:
693 return True
695 if self.has_kernel:
696 assert self.provisioner is not None
697 ret = await self.provisioner.poll()
698 if ret is None:
699 return True
700 return False
702 is_alive = run_sync(_async_is_alive)
704 async def _async_wait(self, pollinterval: float = 0.1) -> None:
705 # Use busy loop at 100ms intervals, polling until the process is
706 # not alive. If we find the process is no longer alive, complete
707 # its cleanup via the blocking wait(). Callers are responsible for
708 # issuing calls to wait() using a timeout (see _kill_kernel()).
709 while await self._async_is_alive():
710 await asyncio.sleep(pollinterval)
713class AsyncKernelManager(KernelManager):
714 """An async kernel manager."""
716 # the class to create with our `client` method
717 client_class: DottedObjectName = DottedObjectName(
718 "jupyter_client.asynchronous.AsyncKernelClient"
719 )
720 client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient")
722 # The PyZMQ Context to use for communication with the kernel.
723 context: Instance = Instance(zmq.asyncio.Context)
725 @default("context")
726 def _context_default(self) -> zmq.asyncio.Context:
727 self._created_context = True
728 return zmq.asyncio.Context()
730 def client( # type:ignore[override]
731 self, **kwargs: t.Any
732 ) -> AsyncKernelClient:
733 """Get a client for the manager."""
734 return super().client(**kwargs) # type:ignore[return-value]
736 _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment]
737 start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_start_kernel # type:ignore[assignment]
738 pre_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_pre_start_kernel # type:ignore[assignment]
739 post_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_post_start_kernel # type:ignore[assignment]
740 request_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_request_shutdown # type:ignore[assignment]
741 finish_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_finish_shutdown # type:ignore[assignment]
742 cleanup_resources: t.Callable[..., t.Awaitable] = KernelManager._async_cleanup_resources # type:ignore[assignment]
743 shutdown_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_shutdown_kernel # type:ignore[assignment]
744 restart_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_restart_kernel # type:ignore[assignment]
745 _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment]
746 _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment]
747 interrupt_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_interrupt_kernel # type:ignore[assignment]
748 signal_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_signal_kernel # type:ignore[assignment]
749 is_alive: t.Callable[..., t.Awaitable] = KernelManager._async_is_alive # type:ignore[assignment]
752KernelManagerABC.register(KernelManager)
755def start_new_kernel(
756 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
757) -> t.Tuple[KernelManager, BlockingKernelClient]:
758 """Start a new kernel, and return its Manager and Client"""
759 km = KernelManager(kernel_name=kernel_name)
760 km.start_kernel(**kwargs)
761 kc = km.client()
762 kc.start_channels()
763 try:
764 kc.wait_for_ready(timeout=startup_timeout)
765 except RuntimeError:
766 kc.stop_channels()
767 km.shutdown_kernel()
768 raise
770 return km, kc
773async def start_new_async_kernel(
774 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
775) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]:
776 """Start a new kernel, and return its Manager and Client"""
777 km = AsyncKernelManager(kernel_name=kernel_name)
778 await km.start_kernel(**kwargs)
779 kc = km.client()
780 kc.start_channels()
781 try:
782 await kc.wait_for_ready(timeout=startup_timeout)
783 except RuntimeError:
784 kc.stop_channels()
785 await km.shutdown_kernel()
786 raise
788 return (km, kc)
791@contextmanager
792def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]:
793 """Context manager to create a kernel in a subprocess.
795 The kernel is shut down when the context exits.
797 Returns
798 -------
799 kernel_client: connected KernelClient instance
800 """
801 km, kc = start_new_kernel(**kwargs)
802 try:
803 yield kc
804 finally:
805 kc.stop_channels()
806 km.shutdown_kernel(now=True)