1"""Base class to manage a running kernel"""
2
3# Copyright (c) Jupyter Development Team.
4# Distributed under the terms of the Modified BSD License.
5import asyncio
6import functools
7import os
8import re
9import signal
10import sys
11import typing as t
12import uuid
13import warnings
14from asyncio.futures import Future
15from concurrent.futures import Future as CFuture
16from contextlib import contextmanager
17from enum import Enum
18
19import zmq
20from jupyter_core.utils import run_sync
21from traitlets import (
22 Any,
23 Bool,
24 Dict,
25 DottedObjectName,
26 Float,
27 Instance,
28 Type,
29 Unicode,
30 default,
31 observe,
32 observe_compat,
33)
34from traitlets.utils.importstring import import_item
35
36from . import kernelspec
37from .asynchronous import AsyncKernelClient
38from .blocking import BlockingKernelClient
39from .client import KernelClient
40from .connect import ConnectionFileMixin
41from .managerabc import KernelManagerABC
42from .provisioning import KernelProvisionerBase
43from .provisioning import KernelProvisionerFactory as KPF # noqa
44
45
46class _ShutdownStatus(Enum):
47 """
48
49 This is so far used only for testing in order to track the internal state of
50 the shutdown logic, and verifying which path is taken for which
51 missbehavior.
52
53 """
54
55 Unset = None
56 ShutdownRequest = "ShutdownRequest"
57 SigtermRequest = "SigtermRequest"
58 SigkillRequest = "SigkillRequest"
59
60
61F = t.TypeVar("F", bound=t.Callable[..., t.Any])
62
63
64def _get_future() -> t.Union[Future, CFuture]:
65 """Get an appropriate Future object"""
66 try:
67 asyncio.get_running_loop()
68 return Future()
69 except RuntimeError:
70 # No event loop running, use concurrent future
71 return CFuture()
72
73
74def in_pending_state(method: F) -> F:
75 """Sets the kernel to a pending state by
76 creating a fresh Future for the KernelManager's `ready`
77 attribute. Once the method is finished, set the Future's results.
78 """
79
80 @t.no_type_check
81 @functools.wraps(method)
82 async def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
83 """Create a future for the decorated method."""
84 if self._attempted_start or not self._ready:
85 self._ready = _get_future()
86 try:
87 # call wrapped method, await, and set the result or exception.
88 out = await method(self, *args, **kwargs)
89 # Add a small sleep to ensure tests can capture the state before done
90 await asyncio.sleep(0.01)
91 if self.owns_kernel:
92 self._ready.set_result(None)
93 return out
94 except Exception as e:
95 self._ready.set_exception(e)
96 self.log.exception(self._ready.exception())
97 raise e
98
99 return t.cast(F, wrapper)
100
101
102class KernelManager(ConnectionFileMixin):
103 """Manages a single kernel in a subprocess on this host.
104
105 This version starts kernels with Popen.
106 """
107
108 _ready: t.Union[Future, CFuture] | None
109
110 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
111 """Initialize a kernel manager."""
112 if args:
113 warnings.warn(
114 "Passing positional only arguments to "
115 "`KernelManager.__init__` is deprecated since jupyter_client"
116 " 8.6, and will become an error on future versions. Positional "
117 " arguments have been ignored since jupyter_client 7.0",
118 DeprecationWarning,
119 stacklevel=2,
120 )
121 self._owns_kernel = kwargs.pop("owns_kernel", True)
122 super().__init__(**kwargs)
123 self._shutdown_status = _ShutdownStatus.Unset
124 self._attempted_start = False
125 self._ready = None
126
127 _created_context: Bool = Bool(False)
128
129 # The PyZMQ Context to use for communication with the kernel.
130 context: Instance = Instance(zmq.Context)
131
132 @default("context")
133 def _context_default(self) -> zmq.Context:
134 self._created_context = True
135 return zmq.Context()
136
137 # the class to create with our `client` method
138 client_class: DottedObjectName = DottedObjectName(
139 "jupyter_client.blocking.BlockingKernelClient", config=True
140 )
141 client_factory: Type = Type(klass=KernelClient, config=True)
142
143 @default("client_factory")
144 def _client_factory_default(self) -> Type:
145 return import_item(self.client_class)
146
147 @observe("client_class")
148 def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None:
149 self.client_factory = import_item(str(change["new"]))
150
151 kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True)
152
153 # The kernel provisioner with which this KernelManager is communicating.
154 # This will generally be a LocalProvisioner instance unless the kernelspec
155 # indicates otherwise.
156 provisioner: KernelProvisionerBase | None = None
157
158 kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager)
159
160 @default("kernel_spec_manager")
161 def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager:
162 return kernelspec.KernelSpecManager(data_dir=self.data_dir)
163
164 @observe("kernel_spec_manager")
165 @observe_compat
166 def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None:
167 self._kernel_spec = None
168
169 shutdown_wait_time: Float = Float(
170 5.0,
171 config=True,
172 help="Time to wait for a kernel to terminate before killing it, "
173 "in seconds. When a shutdown request is initiated, the kernel "
174 "will be immediately sent an interrupt (SIGINT), followed"
175 "by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
176 "it will be sent a terminate (SIGTERM) request, and finally at "
177 "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
178 "and kill may be equivalent on windows. Note that this value can be"
179 "overridden by the in-use kernel provisioner since shutdown times may"
180 "vary by provisioned environment.",
181 )
182
183 kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME)
184
185 @observe("kernel_name")
186 def _kernel_name_changed(self, change: t.Dict[str, str]) -> None:
187 self._kernel_spec = None
188 if change["new"] == "python":
189 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME
190
191 _kernel_spec: kernelspec.KernelSpec | None = None
192
193 @property
194 def kernel_spec(self) -> kernelspec.KernelSpec | None:
195 if self._kernel_spec is None and self.kernel_name != "":
196 self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name)
197 return self._kernel_spec
198
199 cache_ports: Bool = Bool(
200 False,
201 config=True,
202 help="True if the MultiKernelManager should cache ports for this KernelManager instance",
203 )
204
205 @default("cache_ports")
206 def _default_cache_ports(self) -> bool:
207 return self.transport == "tcp"
208
209 @property
210 def ready(self) -> t.Union[CFuture, Future]:
211 """A future that resolves when the kernel process has started for the first time"""
212 if not self._ready:
213 self._ready = _get_future()
214 return self._ready
215
216 @property
217 def ipykernel(self) -> bool:
218 return self.kernel_name in {"python", "python2", "python3"}
219
220 # Protected traits
221 _launch_args: t.Optional["Dict[str, Any]"] = Dict(allow_none=True)
222 _control_socket: Any = Any()
223
224 _restarter: Any = Any()
225
226 autorestart: Bool = Bool(
227 True, config=True, help="""Should we autorestart the kernel if it dies."""
228 )
229
230 shutting_down: bool = False
231
232 def __del__(self) -> None:
233 self._close_control_socket()
234 self.cleanup_connection_file()
235
236 # --------------------------------------------------------------------------
237 # Kernel restarter
238 # --------------------------------------------------------------------------
239
240 def start_restarter(self) -> None:
241 """Start the kernel restarter."""
242 pass
243
244 def stop_restarter(self) -> None:
245 """Stop the kernel restarter."""
246 pass
247
248 def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
249 """Register a callback to be called when a kernel is restarted"""
250 if self._restarter is None:
251 return
252 self._restarter.add_callback(callback, event)
253
254 def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
255 """Unregister a callback to be called when a kernel is restarted"""
256 if self._restarter is None:
257 return
258 self._restarter.remove_callback(callback, event)
259
260 # --------------------------------------------------------------------------
261 # create a Client connected to our Kernel
262 # --------------------------------------------------------------------------
263
264 def client(self, **kwargs: t.Any) -> BlockingKernelClient:
265 """Create a client configured to connect to our kernel"""
266 kw: dict = {}
267 kw.update(self.get_connection_info(session=True))
268 kw.update(
269 {
270 "connection_file": self.connection_file,
271 "parent": self,
272 }
273 )
274
275 # add kwargs last, for manual overrides
276 kw.update(kwargs)
277 return self.client_factory(**kw)
278
279 # --------------------------------------------------------------------------
280 # Kernel management
281 # --------------------------------------------------------------------------
282
283 def update_env(self, *, env: t.Dict[str, str]) -> None:
284 """
285 Allow to update the environment of a kernel manager.
286
287 This will take effect only after kernel restart when the new env is
288 passed to the new kernel.
289
290 This is useful as some of the information of the current kernel reflect
291 the state of the session that started it, and those session information
292 (like the attach file path, or name), are mutable.
293
294 .. version-added: 8.5
295 """
296 # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict
297 if (
298 isinstance(self._launch_args, dict)
299 and "env" in self._launch_args
300 and isinstance(self._launch_args["env"], dict) # type: ignore [unreachable]
301 ):
302 self._launch_args["env"].update(env) # type: ignore [unreachable]
303
304 def format_kernel_cmd(self, extra_arguments: t.List[str] | None = None) -> t.List[str]:
305 """Replace templated args (e.g. {connection_file})"""
306 extra_arguments = extra_arguments or []
307 assert self.kernel_spec is not None
308 cmd = self.kernel_spec.argv + extra_arguments
309
310 if cmd and cmd[0] in {
311 "python",
312 "python%i" % sys.version_info[0],
313 "python%i.%i" % sys.version_info[:2],
314 }:
315 # executable is 'python' or 'python3', use sys.executable.
316 # These will typically be the same,
317 # but if the current process is in an env
318 # and has been launched by abspath without
319 # activating the env, python on PATH may not be sys.executable,
320 # but it should be.
321 cmd[0] = sys.executable
322
323 # Make sure to use the realpath for the connection_file
324 # On windows, when running with the store python, the connection_file path
325 # is not usable by non python kernels because the path is being rerouted when
326 # inside of a store app.
327 # See this bug here: https://bugs.python.org/issue41196
328 ns: t.Dict[str, t.Any] = {
329 "connection_file": os.path.realpath(self.connection_file),
330 "prefix": sys.prefix,
331 }
332
333 if self.kernel_spec: # type:ignore[truthy-bool]
334 ns["resource_dir"] = self.kernel_spec.resource_dir
335 assert isinstance(self._launch_args, dict)
336
337 ns.update(self._launch_args)
338
339 pat = re.compile(r"\{([A-Za-z0-9_]+)\}")
340
341 def from_ns(match: t.Any) -> t.Any:
342 """Get the key out of ns if it's there, otherwise no change."""
343 return ns.get(match.group(1), match.group())
344
345 return [pat.sub(from_ns, arg) for arg in cmd]
346
347 async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None:
348 """actually launch the kernel
349
350 override in a subclass to launch kernel subprocesses differently
351 Note that provisioners can now be used to customize kernel environments
352 and
353 """
354 assert self.provisioner is not None
355 connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw)
356 assert self.provisioner.has_process
357 # Provisioner provides the connection information. Load into kernel manager
358 # and write the connection file, if not already done.
359 self._reconcile_connection_info(connection_info)
360
361 _launch_kernel = run_sync(_async_launch_kernel)
362
363 # Control socket used for polite kernel shutdown
364
365 def _connect_control_socket(self) -> None:
366 if self._control_socket is None:
367 self._control_socket = self._create_connected_socket("control")
368 self._control_socket.linger = 100
369
370 def _close_control_socket(self) -> None:
371 if self._control_socket is None:
372 return
373 self._control_socket.close()
374 self._control_socket = None
375
376 async def _async_pre_start_kernel(
377 self, **kw: t.Any
378 ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]:
379 """Prepares a kernel for startup in a separate process.
380
381 If random ports (port=0) are being used, this method must be called
382 before the channels are created.
383
384 Parameters
385 ----------
386 `**kw` : optional
387 keyword arguments that are passed down to build the kernel_cmd
388 and launching the kernel (e.g. Popen kwargs).
389 """
390 self.shutting_down = False
391 self.kernel_id = self.kernel_id or kw.pop("kernel_id", str(uuid.uuid4()))
392 # save kwargs for use in restart
393 # assigning Traitlets Dicts to Dict make mypy unhappy but is ok
394 self._launch_args = kw.copy()
395 if self.provisioner is None: # will not be None on restarts
396 self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance(
397 self.kernel_id,
398 self.kernel_spec,
399 parent=self,
400 )
401 kw = await self.provisioner.pre_launch(**kw)
402 kernel_cmd = kw.pop("cmd")
403 return kernel_cmd, kw
404
405 pre_start_kernel = run_sync(_async_pre_start_kernel)
406
407 async def _async_post_start_kernel(self, **kw: t.Any) -> None:
408 """Performs any post startup tasks relative to the kernel.
409
410 Parameters
411 ----------
412 `**kw` : optional
413 keyword arguments that were used in the kernel process's launch.
414 """
415 self.start_restarter()
416 self._connect_control_socket()
417 assert self.provisioner is not None
418 await self.provisioner.post_launch(**kw)
419
420 post_start_kernel = run_sync(_async_post_start_kernel)
421
422 @in_pending_state
423 async def _async_start_kernel(self, **kw: t.Any) -> None:
424 """Starts a kernel on this host in a separate process.
425
426 If random ports (port=0) are being used, this method must be called
427 before the channels are created.
428
429 Parameters
430 ----------
431 `**kw` : optional
432 keyword arguments that are passed down to build the kernel_cmd
433 and launching the kernel (e.g. Popen kwargs).
434 """
435 self._attempted_start = True
436 kernel_cmd, kw = await self._async_pre_start_kernel(**kw)
437
438 # launch the kernel subprocess
439 self.log.debug("Starting kernel: %s", kernel_cmd)
440 await self._async_launch_kernel(kernel_cmd, **kw)
441 await self._async_post_start_kernel(**kw)
442
443 start_kernel = run_sync(_async_start_kernel)
444
445 async def _async_request_shutdown(self, restart: bool = False) -> None:
446 """Send a shutdown request via control channel"""
447 content = {"restart": restart}
448 msg = self.session.msg("shutdown_request", content=content)
449 # ensure control socket is connected
450 self._connect_control_socket()
451 self.session.send(self._control_socket, msg)
452 assert self.provisioner is not None
453 await self.provisioner.shutdown_requested(restart=restart)
454 self._shutdown_status = _ShutdownStatus.ShutdownRequest
455
456 request_shutdown = run_sync(_async_request_shutdown)
457
458 async def _async_finish_shutdown(
459 self,
460 waittime: float | None = None,
461 pollinterval: float = 0.1,
462 restart: bool = False,
463 ) -> None:
464 """Wait for kernel shutdown, then kill process if it doesn't shutdown.
465
466 This does not send shutdown requests - use :meth:`request_shutdown`
467 first.
468 """
469 if waittime is None:
470 waittime = max(self.shutdown_wait_time, 0)
471 if self.provisioner: # Allow provisioner to override
472 waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime)
473
474 try:
475 await asyncio.wait_for(
476 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
477 )
478 except asyncio.TimeoutError:
479 self.log.debug("Kernel is taking too long to finish, terminating")
480 self._shutdown_status = _ShutdownStatus.SigtermRequest
481 await self._async_send_kernel_sigterm()
482
483 try:
484 await asyncio.wait_for(
485 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
486 )
487 except asyncio.TimeoutError:
488 self.log.debug("Kernel is taking too long to finish, killing")
489 self._shutdown_status = _ShutdownStatus.SigkillRequest
490 await self._async_kill_kernel(restart=restart)
491 else:
492 # Process is no longer alive, wait and clear
493 if self.has_kernel:
494 assert self.provisioner is not None
495 await self.provisioner.wait()
496
497 finish_shutdown = run_sync(_async_finish_shutdown)
498
499 async def _async_cleanup_resources(self, restart: bool = False) -> None:
500 """Clean up resources when the kernel is shut down"""
501 if not restart:
502 self.cleanup_connection_file()
503
504 self.cleanup_ipc_files()
505 self._close_control_socket()
506 self.session.parent = None
507
508 if self._created_context and not restart:
509 self.context.destroy(linger=100)
510
511 if self.provisioner:
512 await self.provisioner.cleanup(restart=restart)
513
514 cleanup_resources = run_sync(_async_cleanup_resources)
515
516 @in_pending_state
517 async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
518 """Attempts to stop the kernel process cleanly.
519
520 This attempts to shutdown the kernels cleanly by:
521
522 1. Sending it a shutdown message over the control channel.
523 2. If that fails, the kernel is shutdown forcibly by sending it
524 a signal.
525
526 Parameters
527 ----------
528 now : bool
529 Should the kernel be forcible killed *now*. This skips the
530 first, nice shutdown attempt.
531 restart: bool
532 Will this kernel be restarted after it is shutdown. When this
533 is True, connection files will not be cleaned up.
534 """
535 if not self.owns_kernel:
536 return
537
538 self.shutting_down = True # Used by restarter to prevent race condition
539 # Stop monitoring for restarting while we shutdown.
540 self.stop_restarter()
541
542 if self.has_kernel:
543 await self._async_interrupt_kernel()
544
545 if now:
546 await self._async_kill_kernel()
547 else:
548 await self._async_request_shutdown(restart=restart)
549 # Don't send any additional kernel kill messages immediately, to give
550 # the kernel a chance to properly execute shutdown actions. Wait for at
551 # most 1s, checking every 0.1s.
552 await self._async_finish_shutdown(restart=restart)
553
554 await self._async_cleanup_resources(restart=restart)
555
556 shutdown_kernel = run_sync(_async_shutdown_kernel)
557
558 async def _async_restart_kernel(
559 self, now: bool = False, newports: bool = False, **kw: t.Any
560 ) -> None:
561 """Restarts a kernel with the arguments that were used to launch it.
562
563 Parameters
564 ----------
565 now : bool, optional
566 If True, the kernel is forcefully restarted *immediately*, without
567 having a chance to do any cleanup action. Otherwise the kernel is
568 given 1s to clean up before a forceful restart is issued.
569
570 In all cases the kernel is restarted, the only difference is whether
571 it is given a chance to perform a clean shutdown or not.
572
573 newports : bool, optional
574 If the old kernel was launched with random ports, this flag decides
575 whether the same ports and connection file will be used again.
576 If False, the same ports and connection file are used. This is
577 the default. If True, new random port numbers are chosen and a
578 new connection file is written. It is still possible that the newly
579 chosen random port numbers happen to be the same as the old ones.
580
581 `**kw` : optional
582 Any options specified here will overwrite those used to launch the
583 kernel.
584 """
585 if self._launch_args is None:
586 msg = "Cannot restart the kernel. No previous call to 'start_kernel'."
587 raise RuntimeError(msg)
588
589 # Stop currently running kernel.
590 await self._async_shutdown_kernel(now=now, restart=True)
591
592 if newports:
593 self.cleanup_random_ports()
594
595 # Start new kernel.
596 self._launch_args.update(kw)
597 await self._async_start_kernel(**self._launch_args)
598
599 restart_kernel = run_sync(_async_restart_kernel)
600
601 @property
602 def owns_kernel(self) -> bool:
603 return self._owns_kernel
604
605 @property
606 def has_kernel(self) -> bool:
607 """Has a kernel process been started that we are actively managing."""
608 return self.provisioner is not None and self.provisioner.has_process
609
610 async def _async_send_kernel_sigterm(self, restart: bool = False) -> None:
611 """similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
612 if self.has_kernel:
613 assert self.provisioner is not None
614 await self.provisioner.terminate(restart=restart)
615
616 _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm)
617
618 async def _async_kill_kernel(self, restart: bool = False) -> None:
619 """Kill the running kernel.
620
621 This is a private method, callers should use shutdown_kernel(now=True).
622 """
623 if self.has_kernel:
624 assert self.provisioner is not None
625 await self.provisioner.kill(restart=restart)
626
627 # Wait until the kernel terminates.
628 try:
629 await asyncio.wait_for(self._async_wait(), timeout=5.0)
630 except asyncio.TimeoutError:
631 # Wait timed out, just log warning but continue - not much more we can do.
632 self.log.warning("Wait for final termination of kernel timed out - continuing...")
633 pass
634 else:
635 # Process is no longer alive, wait and clear
636 if self.has_kernel:
637 await self.provisioner.wait()
638
639 _kill_kernel = run_sync(_async_kill_kernel)
640
641 async def _async_interrupt_kernel(self) -> None:
642 """Interrupts the kernel by sending it a signal.
643
644 Unlike ``signal_kernel``, this operation is well supported on all
645 platforms.
646 """
647 if not self.has_kernel and self._ready is not None:
648 if isinstance(self._ready, CFuture):
649 ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready))
650 else:
651 ready = self._ready
652 # Wait for a shutdown if one is in progress.
653 if self.shutting_down:
654 await ready
655 # Wait for a startup.
656 await ready
657
658 if self.has_kernel:
659 assert self.kernel_spec is not None
660 interrupt_mode = self.kernel_spec.interrupt_mode
661 if interrupt_mode == "signal":
662 await self._async_signal_kernel(signal.SIGINT)
663
664 elif interrupt_mode == "message":
665 msg = self.session.msg("interrupt_request", content={})
666 self._connect_control_socket()
667 self.session.send(self._control_socket, msg)
668 else:
669 msg = "Cannot interrupt kernel. No kernel is running!"
670 raise RuntimeError(msg)
671
672 interrupt_kernel = run_sync(_async_interrupt_kernel)
673
674 async def _async_signal_kernel(self, signum: int) -> None:
675 """Sends a signal to the process group of the kernel (this
676 usually includes the kernel and any subprocesses spawned by
677 the kernel).
678
679 Note that since only SIGTERM is supported on Windows, this function is
680 only useful on Unix systems.
681 """
682 if self.has_kernel:
683 assert self.provisioner is not None
684 await self.provisioner.send_signal(signum)
685 else:
686 msg = "Cannot signal kernel. No kernel is running!"
687 raise RuntimeError(msg)
688
689 signal_kernel = run_sync(_async_signal_kernel)
690
691 async def _async_is_alive(self) -> bool:
692 """Is the kernel process still running?"""
693 if not self.owns_kernel:
694 return True
695
696 if self.has_kernel:
697 assert self.provisioner is not None
698 ret = await self.provisioner.poll()
699 if ret is None:
700 return True
701 return False
702
703 is_alive = run_sync(_async_is_alive)
704
705 async def _async_wait(self, pollinterval: float = 0.1) -> None:
706 # Use busy loop at 100ms intervals, polling until the process is
707 # not alive. If we find the process is no longer alive, complete
708 # its cleanup via the blocking wait(). Callers are responsible for
709 # issuing calls to wait() using a timeout (see _kill_kernel()).
710 while await self._async_is_alive():
711 await asyncio.sleep(pollinterval)
712
713
714class AsyncKernelManager(KernelManager):
715 """An async kernel manager."""
716
717 # the class to create with our `client` method
718 client_class: DottedObjectName = DottedObjectName(
719 "jupyter_client.asynchronous.AsyncKernelClient", config=True
720 )
721 client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient", config=True)
722
723 # The PyZMQ Context to use for communication with the kernel.
724 context: Instance = Instance(zmq.asyncio.Context)
725
726 @default("context")
727 def _context_default(self) -> zmq.asyncio.Context:
728 self._created_context = True
729 return zmq.asyncio.Context()
730
731 def client( # type:ignore[override]
732 self, **kwargs: t.Any
733 ) -> AsyncKernelClient:
734 """Get a client for the manager."""
735 return super().client(**kwargs) # type:ignore[return-value]
736
737 _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment]
738 start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_start_kernel # type:ignore[assignment]
739 pre_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_pre_start_kernel # type:ignore[assignment]
740 post_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_post_start_kernel # type:ignore[assignment]
741 request_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_request_shutdown # type:ignore[assignment]
742 finish_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_finish_shutdown # type:ignore[assignment]
743 cleanup_resources: t.Callable[..., t.Awaitable] = KernelManager._async_cleanup_resources # type:ignore[assignment]
744 shutdown_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_shutdown_kernel # type:ignore[assignment]
745 restart_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_restart_kernel # type:ignore[assignment]
746 _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment]
747 _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment]
748 interrupt_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_interrupt_kernel # type:ignore[assignment]
749 signal_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_signal_kernel # type:ignore[assignment]
750 is_alive: t.Callable[..., t.Awaitable] = KernelManager._async_is_alive # type:ignore[assignment]
751
752
753KernelManagerABC.register(KernelManager)
754
755
756def start_new_kernel(
757 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
758) -> t.Tuple[KernelManager, BlockingKernelClient]:
759 """Start a new kernel, and return its Manager and Client"""
760 km = KernelManager(kernel_name=kernel_name)
761 km.start_kernel(**kwargs)
762 kc = km.client()
763 kc.start_channels()
764 try:
765 kc.wait_for_ready(timeout=startup_timeout)
766 except RuntimeError:
767 kc.stop_channels()
768 km.shutdown_kernel()
769 raise
770
771 return km, kc
772
773
774async def start_new_async_kernel(
775 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
776) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]:
777 """Start a new kernel, and return its Manager and Client"""
778 km = AsyncKernelManager(kernel_name=kernel_name)
779 await km.start_kernel(**kwargs)
780 kc = km.client()
781 kc.start_channels()
782 try:
783 await kc.wait_for_ready(timeout=startup_timeout)
784 except RuntimeError:
785 kc.stop_channels()
786 await km.shutdown_kernel()
787 raise
788
789 return (km, kc)
790
791
792@contextmanager
793def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]:
794 """Context manager to create a kernel in a subprocess.
795
796 The kernel is shut down when the context exits.
797
798 Returns
799 -------
800 kernel_client: connected KernelClient instance
801 """
802 km, kc = start_new_kernel(**kwargs)
803 try:
804 yield kc
805 finally:
806 kc.stop_channels()
807 km.shutdown_kernel(now=True)