1"""Base class to manage a running kernel"""
2# Copyright (c) Jupyter Development Team.
3# Distributed under the terms of the Modified BSD License.
4import asyncio
5import functools
6import os
7import re
8import signal
9import sys
10import typing as t
11import uuid
12import warnings
13from asyncio.futures import Future
14from concurrent.futures import Future as CFuture
15from contextlib import contextmanager
16from enum import Enum
17
18import zmq
19from jupyter_core.utils import run_sync
20from traitlets import (
21 Any,
22 Bool,
23 Dict,
24 DottedObjectName,
25 Float,
26 Instance,
27 Type,
28 Unicode,
29 default,
30 observe,
31 observe_compat,
32)
33from traitlets.utils.importstring import import_item
34
35from . import kernelspec
36from .asynchronous import AsyncKernelClient
37from .blocking import BlockingKernelClient
38from .client import KernelClient
39from .connect import ConnectionFileMixin
40from .managerabc import KernelManagerABC
41from .provisioning import KernelProvisionerBase
42from .provisioning import KernelProvisionerFactory as KPF # noqa
43
44
45class _ShutdownStatus(Enum):
46 """
47
48 This is so far used only for testing in order to track the internal state of
49 the shutdown logic, and verifying which path is taken for which
50 missbehavior.
51
52 """
53
54 Unset = None
55 ShutdownRequest = "ShutdownRequest"
56 SigtermRequest = "SigtermRequest"
57 SigkillRequest = "SigkillRequest"
58
59
60F = t.TypeVar("F", bound=t.Callable[..., t.Any])
61
62
63def _get_future() -> t.Union[Future, CFuture]:
64 """Get an appropriate Future object"""
65 try:
66 asyncio.get_running_loop()
67 return Future()
68 except RuntimeError:
69 # No event loop running, use concurrent future
70 return CFuture()
71
72
73def in_pending_state(method: F) -> F:
74 """Sets the kernel to a pending state by
75 creating a fresh Future for the KernelManager's `ready`
76 attribute. Once the method is finished, set the Future's results.
77 """
78
79 @t.no_type_check
80 @functools.wraps(method)
81 async def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
82 """Create a future for the decorated method."""
83 if self._attempted_start or not self._ready:
84 self._ready = _get_future()
85 try:
86 # call wrapped method, await, and set the result or exception.
87 out = await method(self, *args, **kwargs)
88 # Add a small sleep to ensure tests can capture the state before done
89 await asyncio.sleep(0.01)
90 if self.owns_kernel:
91 self._ready.set_result(None)
92 return out
93 except Exception as e:
94 self._ready.set_exception(e)
95 self.log.exception(self._ready.exception())
96 raise e
97
98 return t.cast(F, wrapper)
99
100
101class KernelManager(ConnectionFileMixin):
102 """Manages a single kernel in a subprocess on this host.
103
104 This version starts kernels with Popen.
105 """
106
107 _ready: t.Optional[t.Union[Future, CFuture]]
108
109 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
110 """Initialize a kernel manager."""
111 if args:
112 warnings.warn(
113 "Passing positional only arguments to "
114 "`KernelManager.__init__` is deprecated since jupyter_client"
115 " 8.6, and will become an error on future versions. Positional "
116 " arguments have been ignored since jupyter_client 7.0",
117 DeprecationWarning,
118 stacklevel=2,
119 )
120 self._owns_kernel = kwargs.pop("owns_kernel", True)
121 super().__init__(**kwargs)
122 self._shutdown_status = _ShutdownStatus.Unset
123 self._attempted_start = False
124 self._ready = None
125
126 _created_context: Bool = Bool(False)
127
128 # The PyZMQ Context to use for communication with the kernel.
129 context: Instance = Instance(zmq.Context)
130
131 @default("context")
132 def _context_default(self) -> zmq.Context:
133 self._created_context = True
134 return zmq.Context()
135
136 # the class to create with our `client` method
137 client_class: DottedObjectName = DottedObjectName(
138 "jupyter_client.blocking.BlockingKernelClient"
139 )
140 client_factory: Type = Type(klass=KernelClient)
141
142 @default("client_factory")
143 def _client_factory_default(self) -> Type:
144 return import_item(self.client_class)
145
146 @observe("client_class")
147 def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None:
148 self.client_factory = import_item(str(change["new"]))
149
150 kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True)
151
152 # The kernel provisioner with which this KernelManager is communicating.
153 # This will generally be a LocalProvisioner instance unless the kernelspec
154 # indicates otherwise.
155 provisioner: t.Optional[KernelProvisionerBase] = None
156
157 kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager)
158
159 @default("kernel_spec_manager")
160 def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager:
161 return kernelspec.KernelSpecManager(data_dir=self.data_dir)
162
163 @observe("kernel_spec_manager")
164 @observe_compat
165 def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None:
166 self._kernel_spec = None
167
168 shutdown_wait_time: Float = Float(
169 5.0,
170 config=True,
171 help="Time to wait for a kernel to terminate before killing it, "
172 "in seconds. When a shutdown request is initiated, the kernel "
173 "will be immediately sent an interrupt (SIGINT), followed"
174 "by a shutdown_request message, after 1/2 of `shutdown_wait_time`"
175 "it will be sent a terminate (SIGTERM) request, and finally at "
176 "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate "
177 "and kill may be equivalent on windows. Note that this value can be"
178 "overridden by the in-use kernel provisioner since shutdown times may"
179 "vary by provisioned environment.",
180 )
181
182 kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME)
183
184 @observe("kernel_name")
185 def _kernel_name_changed(self, change: t.Dict[str, str]) -> None:
186 self._kernel_spec = None
187 if change["new"] == "python":
188 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME
189
190 _kernel_spec: t.Optional[kernelspec.KernelSpec] = None
191
192 @property
193 def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]:
194 if self._kernel_spec is None and self.kernel_name != "":
195 self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name)
196 return self._kernel_spec
197
198 cache_ports: Bool = Bool(
199 False,
200 config=True,
201 help="True if the MultiKernelManager should cache ports for this KernelManager instance",
202 )
203
204 @default("cache_ports")
205 def _default_cache_ports(self) -> bool:
206 return self.transport == "tcp"
207
208 @property
209 def ready(self) -> t.Union[CFuture, Future]:
210 """A future that resolves when the kernel process has started for the first time"""
211 if not self._ready:
212 self._ready = _get_future()
213 return self._ready
214
215 @property
216 def ipykernel(self) -> bool:
217 return self.kernel_name in {"python", "python2", "python3"}
218
219 # Protected traits
220 _launch_args: t.Optional["Dict[str, Any]"] = Dict(allow_none=True)
221 _control_socket: Any = Any()
222
223 _restarter: Any = Any()
224
225 autorestart: Bool = Bool(
226 True, config=True, help="""Should we autorestart the kernel if it dies."""
227 )
228
229 shutting_down: bool = False
230
231 def __del__(self) -> None:
232 self._close_control_socket()
233 self.cleanup_connection_file()
234
235 # --------------------------------------------------------------------------
236 # Kernel restarter
237 # --------------------------------------------------------------------------
238
239 def start_restarter(self) -> None:
240 """Start the kernel restarter."""
241 pass
242
243 def stop_restarter(self) -> None:
244 """Stop the kernel restarter."""
245 pass
246
247 def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
248 """Register a callback to be called when a kernel is restarted"""
249 if self._restarter is None:
250 return
251 self._restarter.add_callback(callback, event)
252
253 def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None:
254 """Unregister a callback to be called when a kernel is restarted"""
255 if self._restarter is None:
256 return
257 self._restarter.remove_callback(callback, event)
258
259 # --------------------------------------------------------------------------
260 # create a Client connected to our Kernel
261 # --------------------------------------------------------------------------
262
263 def client(self, **kwargs: t.Any) -> BlockingKernelClient:
264 """Create a client configured to connect to our kernel"""
265 kw: dict = {}
266 kw.update(self.get_connection_info(session=True))
267 kw.update(
268 {
269 "connection_file": self.connection_file,
270 "parent": self,
271 }
272 )
273
274 # add kwargs last, for manual overrides
275 kw.update(kwargs)
276 return self.client_factory(**kw)
277
278 # --------------------------------------------------------------------------
279 # Kernel management
280 # --------------------------------------------------------------------------
281
282 def update_env(self, *, env: t.Dict[str, str]) -> None:
283 """
284 Allow to update the environment of a kernel manager.
285
286 This will take effect only after kernel restart when the new env is
287 passed to the new kernel.
288
289 This is useful as some of the information of the current kernel reflect
290 the state of the session that started it, and those session information
291 (like the attach file path, or name), are mutable.
292
293 .. version-added: 8.5
294 """
295 # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict
296 if (
297 isinstance(self._launch_args, dict)
298 and "env" in self._launch_args
299 and isinstance(self._launch_args["env"], dict) # type: ignore [unreachable]
300 ):
301 self._launch_args["env"].update(env) # type: ignore [unreachable]
302
303 def format_kernel_cmd(self, extra_arguments: t.Optional[t.List[str]] = None) -> t.List[str]:
304 """Replace templated args (e.g. {connection_file})"""
305 extra_arguments = extra_arguments or []
306 assert self.kernel_spec is not None
307 cmd = self.kernel_spec.argv + extra_arguments
308
309 if cmd and cmd[0] in {
310 "python",
311 "python%i" % sys.version_info[0],
312 "python%i.%i" % sys.version_info[:2],
313 }:
314 # executable is 'python' or 'python3', use sys.executable.
315 # These will typically be the same,
316 # but if the current process is in an env
317 # and has been launched by abspath without
318 # activating the env, python on PATH may not be sys.executable,
319 # but it should be.
320 cmd[0] = sys.executable
321
322 # Make sure to use the realpath for the connection_file
323 # On windows, when running with the store python, the connection_file path
324 # is not usable by non python kernels because the path is being rerouted when
325 # inside of a store app.
326 # See this bug here: https://bugs.python.org/issue41196
327 ns: t.Dict[str, t.Any] = {
328 "connection_file": os.path.realpath(self.connection_file),
329 "prefix": sys.prefix,
330 }
331
332 if self.kernel_spec: # type:ignore[truthy-bool]
333 ns["resource_dir"] = self.kernel_spec.resource_dir
334 assert isinstance(self._launch_args, dict)
335
336 ns.update(self._launch_args)
337
338 pat = re.compile(r"\{([A-Za-z0-9_]+)\}")
339
340 def from_ns(match: t.Any) -> t.Any:
341 """Get the key out of ns if it's there, otherwise no change."""
342 return ns.get(match.group(1), match.group())
343
344 return [pat.sub(from_ns, arg) for arg in cmd]
345
346 async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None:
347 """actually launch the kernel
348
349 override in a subclass to launch kernel subprocesses differently
350 Note that provisioners can now be used to customize kernel environments
351 and
352 """
353 assert self.provisioner is not None
354 connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw)
355 assert self.provisioner.has_process
356 # Provisioner provides the connection information. Load into kernel manager
357 # and write the connection file, if not already done.
358 self._reconcile_connection_info(connection_info)
359
360 _launch_kernel = run_sync(_async_launch_kernel)
361
362 # Control socket used for polite kernel shutdown
363
364 def _connect_control_socket(self) -> None:
365 if self._control_socket is None:
366 self._control_socket = self._create_connected_socket("control")
367 self._control_socket.linger = 100
368
369 def _close_control_socket(self) -> None:
370 if self._control_socket is None:
371 return
372 self._control_socket.close()
373 self._control_socket = None
374
375 async def _async_pre_start_kernel(
376 self, **kw: t.Any
377 ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]:
378 """Prepares a kernel for startup in a separate process.
379
380 If random ports (port=0) are being used, this method must be called
381 before the channels are created.
382
383 Parameters
384 ----------
385 `**kw` : optional
386 keyword arguments that are passed down to build the kernel_cmd
387 and launching the kernel (e.g. Popen kwargs).
388 """
389 self.shutting_down = False
390 self.kernel_id = self.kernel_id or kw.pop("kernel_id", str(uuid.uuid4()))
391 # save kwargs for use in restart
392 # assigning Traitlets Dicts to Dict make mypy unhappy but is ok
393 self._launch_args = kw.copy() # type:ignore [assignment]
394 if self.provisioner is None: # will not be None on restarts
395 self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance(
396 self.kernel_id,
397 self.kernel_spec,
398 parent=self,
399 )
400 kw = await self.provisioner.pre_launch(**kw)
401 kernel_cmd = kw.pop("cmd")
402 return kernel_cmd, kw
403
404 pre_start_kernel = run_sync(_async_pre_start_kernel)
405
406 async def _async_post_start_kernel(self, **kw: t.Any) -> None:
407 """Performs any post startup tasks relative to the kernel.
408
409 Parameters
410 ----------
411 `**kw` : optional
412 keyword arguments that were used in the kernel process's launch.
413 """
414 self.start_restarter()
415 self._connect_control_socket()
416 assert self.provisioner is not None
417 await self.provisioner.post_launch(**kw)
418
419 post_start_kernel = run_sync(_async_post_start_kernel)
420
421 @in_pending_state
422 async def _async_start_kernel(self, **kw: t.Any) -> None:
423 """Starts a kernel on this host in a separate process.
424
425 If random ports (port=0) are being used, this method must be called
426 before the channels are created.
427
428 Parameters
429 ----------
430 `**kw` : optional
431 keyword arguments that are passed down to build the kernel_cmd
432 and launching the kernel (e.g. Popen kwargs).
433 """
434 self._attempted_start = True
435 kernel_cmd, kw = await self._async_pre_start_kernel(**kw)
436
437 # launch the kernel subprocess
438 self.log.debug("Starting kernel: %s", kernel_cmd)
439 await self._async_launch_kernel(kernel_cmd, **kw)
440 await self._async_post_start_kernel(**kw)
441
442 start_kernel = run_sync(_async_start_kernel)
443
444 async def _async_request_shutdown(self, restart: bool = False) -> None:
445 """Send a shutdown request via control channel"""
446 content = {"restart": restart}
447 msg = self.session.msg("shutdown_request", content=content)
448 # ensure control socket is connected
449 self._connect_control_socket()
450 self.session.send(self._control_socket, msg)
451 assert self.provisioner is not None
452 await self.provisioner.shutdown_requested(restart=restart)
453 self._shutdown_status = _ShutdownStatus.ShutdownRequest
454
455 request_shutdown = run_sync(_async_request_shutdown)
456
457 async def _async_finish_shutdown(
458 self,
459 waittime: t.Optional[float] = None,
460 pollinterval: float = 0.1,
461 restart: bool = False,
462 ) -> None:
463 """Wait for kernel shutdown, then kill process if it doesn't shutdown.
464
465 This does not send shutdown requests - use :meth:`request_shutdown`
466 first.
467 """
468 if waittime is None:
469 waittime = max(self.shutdown_wait_time, 0)
470 if self.provisioner: # Allow provisioner to override
471 waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime)
472
473 try:
474 await asyncio.wait_for(
475 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
476 )
477 except asyncio.TimeoutError:
478 self.log.debug("Kernel is taking too long to finish, terminating")
479 self._shutdown_status = _ShutdownStatus.SigtermRequest
480 await self._async_send_kernel_sigterm()
481
482 try:
483 await asyncio.wait_for(
484 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2
485 )
486 except asyncio.TimeoutError:
487 self.log.debug("Kernel is taking too long to finish, killing")
488 self._shutdown_status = _ShutdownStatus.SigkillRequest
489 await self._async_kill_kernel(restart=restart)
490 else:
491 # Process is no longer alive, wait and clear
492 if self.has_kernel:
493 assert self.provisioner is not None
494 await self.provisioner.wait()
495
496 finish_shutdown = run_sync(_async_finish_shutdown)
497
498 async def _async_cleanup_resources(self, restart: bool = False) -> None:
499 """Clean up resources when the kernel is shut down"""
500 if not restart:
501 self.cleanup_connection_file()
502
503 self.cleanup_ipc_files()
504 self._close_control_socket()
505 self.session.parent = None
506
507 if self._created_context and not restart:
508 self.context.destroy(linger=100)
509
510 if self.provisioner:
511 await self.provisioner.cleanup(restart=restart)
512
513 cleanup_resources = run_sync(_async_cleanup_resources)
514
515 @in_pending_state
516 async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None:
517 """Attempts to stop the kernel process cleanly.
518
519 This attempts to shutdown the kernels cleanly by:
520
521 1. Sending it a shutdown message over the control channel.
522 2. If that fails, the kernel is shutdown forcibly by sending it
523 a signal.
524
525 Parameters
526 ----------
527 now : bool
528 Should the kernel be forcible killed *now*. This skips the
529 first, nice shutdown attempt.
530 restart: bool
531 Will this kernel be restarted after it is shutdown. When this
532 is True, connection files will not be cleaned up.
533 """
534 if not self.owns_kernel:
535 return
536
537 self.shutting_down = True # Used by restarter to prevent race condition
538 # Stop monitoring for restarting while we shutdown.
539 self.stop_restarter()
540
541 if self.has_kernel:
542 await self._async_interrupt_kernel()
543
544 if now:
545 await self._async_kill_kernel()
546 else:
547 await self._async_request_shutdown(restart=restart)
548 # Don't send any additional kernel kill messages immediately, to give
549 # the kernel a chance to properly execute shutdown actions. Wait for at
550 # most 1s, checking every 0.1s.
551 await self._async_finish_shutdown(restart=restart)
552
553 await self._async_cleanup_resources(restart=restart)
554
555 shutdown_kernel = run_sync(_async_shutdown_kernel)
556
557 async def _async_restart_kernel(
558 self, now: bool = False, newports: bool = False, **kw: t.Any
559 ) -> None:
560 """Restarts a kernel with the arguments that were used to launch it.
561
562 Parameters
563 ----------
564 now : bool, optional
565 If True, the kernel is forcefully restarted *immediately*, without
566 having a chance to do any cleanup action. Otherwise the kernel is
567 given 1s to clean up before a forceful restart is issued.
568
569 In all cases the kernel is restarted, the only difference is whether
570 it is given a chance to perform a clean shutdown or not.
571
572 newports : bool, optional
573 If the old kernel was launched with random ports, this flag decides
574 whether the same ports and connection file will be used again.
575 If False, the same ports and connection file are used. This is
576 the default. If True, new random port numbers are chosen and a
577 new connection file is written. It is still possible that the newly
578 chosen random port numbers happen to be the same as the old ones.
579
580 `**kw` : optional
581 Any options specified here will overwrite those used to launch the
582 kernel.
583 """
584 if self._launch_args is None:
585 msg = "Cannot restart the kernel. No previous call to 'start_kernel'."
586 raise RuntimeError(msg)
587
588 # Stop currently running kernel.
589 await self._async_shutdown_kernel(now=now, restart=True)
590
591 if newports:
592 self.cleanup_random_ports()
593
594 # Start new kernel.
595 self._launch_args.update(kw)
596 await self._async_start_kernel(**self._launch_args)
597
598 restart_kernel = run_sync(_async_restart_kernel)
599
600 @property
601 def owns_kernel(self) -> bool:
602 return self._owns_kernel
603
604 @property
605 def has_kernel(self) -> bool:
606 """Has a kernel process been started that we are actively managing."""
607 return self.provisioner is not None and self.provisioner.has_process
608
609 async def _async_send_kernel_sigterm(self, restart: bool = False) -> None:
610 """similar to _kill_kernel, but with sigterm (not sigkill), but do not block"""
611 if self.has_kernel:
612 assert self.provisioner is not None
613 await self.provisioner.terminate(restart=restart)
614
615 _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm)
616
617 async def _async_kill_kernel(self, restart: bool = False) -> None:
618 """Kill the running kernel.
619
620 This is a private method, callers should use shutdown_kernel(now=True).
621 """
622 if self.has_kernel:
623 assert self.provisioner is not None
624 await self.provisioner.kill(restart=restart)
625
626 # Wait until the kernel terminates.
627 try:
628 await asyncio.wait_for(self._async_wait(), timeout=5.0)
629 except asyncio.TimeoutError:
630 # Wait timed out, just log warning but continue - not much more we can do.
631 self.log.warning("Wait for final termination of kernel timed out - continuing...")
632 pass
633 else:
634 # Process is no longer alive, wait and clear
635 if self.has_kernel:
636 await self.provisioner.wait()
637
638 _kill_kernel = run_sync(_async_kill_kernel)
639
640 async def _async_interrupt_kernel(self) -> None:
641 """Interrupts the kernel by sending it a signal.
642
643 Unlike ``signal_kernel``, this operation is well supported on all
644 platforms.
645 """
646 if not self.has_kernel and self._ready is not None:
647 if isinstance(self._ready, CFuture):
648 ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready))
649 else:
650 ready = self._ready
651 # Wait for a shutdown if one is in progress.
652 if self.shutting_down:
653 await ready
654 # Wait for a startup.
655 await ready
656
657 if self.has_kernel:
658 assert self.kernel_spec is not None
659 interrupt_mode = self.kernel_spec.interrupt_mode
660 if interrupt_mode == "signal":
661 await self._async_signal_kernel(signal.SIGINT)
662
663 elif interrupt_mode == "message":
664 msg = self.session.msg("interrupt_request", content={})
665 self._connect_control_socket()
666 self.session.send(self._control_socket, msg)
667 else:
668 msg = "Cannot interrupt kernel. No kernel is running!"
669 raise RuntimeError(msg)
670
671 interrupt_kernel = run_sync(_async_interrupt_kernel)
672
673 async def _async_signal_kernel(self, signum: int) -> None:
674 """Sends a signal to the process group of the kernel (this
675 usually includes the kernel and any subprocesses spawned by
676 the kernel).
677
678 Note that since only SIGTERM is supported on Windows, this function is
679 only useful on Unix systems.
680 """
681 if self.has_kernel:
682 assert self.provisioner is not None
683 await self.provisioner.send_signal(signum)
684 else:
685 msg = "Cannot signal kernel. No kernel is running!"
686 raise RuntimeError(msg)
687
688 signal_kernel = run_sync(_async_signal_kernel)
689
690 async def _async_is_alive(self) -> bool:
691 """Is the kernel process still running?"""
692 if not self.owns_kernel:
693 return True
694
695 if self.has_kernel:
696 assert self.provisioner is not None
697 ret = await self.provisioner.poll()
698 if ret is None:
699 return True
700 return False
701
702 is_alive = run_sync(_async_is_alive)
703
704 async def _async_wait(self, pollinterval: float = 0.1) -> None:
705 # Use busy loop at 100ms intervals, polling until the process is
706 # not alive. If we find the process is no longer alive, complete
707 # its cleanup via the blocking wait(). Callers are responsible for
708 # issuing calls to wait() using a timeout (see _kill_kernel()).
709 while await self._async_is_alive():
710 await asyncio.sleep(pollinterval)
711
712
713class AsyncKernelManager(KernelManager):
714 """An async kernel manager."""
715
716 # the class to create with our `client` method
717 client_class: DottedObjectName = DottedObjectName(
718 "jupyter_client.asynchronous.AsyncKernelClient"
719 )
720 client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient")
721
722 # The PyZMQ Context to use for communication with the kernel.
723 context: Instance = Instance(zmq.asyncio.Context)
724
725 @default("context")
726 def _context_default(self) -> zmq.asyncio.Context:
727 self._created_context = True
728 return zmq.asyncio.Context()
729
730 def client( # type:ignore[override]
731 self, **kwargs: t.Any
732 ) -> AsyncKernelClient:
733 """Get a client for the manager."""
734 return super().client(**kwargs) # type:ignore[return-value]
735
736 _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment]
737 start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_start_kernel # type:ignore[assignment]
738 pre_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_pre_start_kernel # type:ignore[assignment]
739 post_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_post_start_kernel # type:ignore[assignment]
740 request_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_request_shutdown # type:ignore[assignment]
741 finish_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_finish_shutdown # type:ignore[assignment]
742 cleanup_resources: t.Callable[..., t.Awaitable] = KernelManager._async_cleanup_resources # type:ignore[assignment]
743 shutdown_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_shutdown_kernel # type:ignore[assignment]
744 restart_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_restart_kernel # type:ignore[assignment]
745 _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment]
746 _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment]
747 interrupt_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_interrupt_kernel # type:ignore[assignment]
748 signal_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_signal_kernel # type:ignore[assignment]
749 is_alive: t.Callable[..., t.Awaitable] = KernelManager._async_is_alive # type:ignore[assignment]
750
751
752KernelManagerABC.register(KernelManager)
753
754
755def start_new_kernel(
756 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
757) -> t.Tuple[KernelManager, BlockingKernelClient]:
758 """Start a new kernel, and return its Manager and Client"""
759 km = KernelManager(kernel_name=kernel_name)
760 km.start_kernel(**kwargs)
761 kc = km.client()
762 kc.start_channels()
763 try:
764 kc.wait_for_ready(timeout=startup_timeout)
765 except RuntimeError:
766 kc.stop_channels()
767 km.shutdown_kernel()
768 raise
769
770 return km, kc
771
772
773async def start_new_async_kernel(
774 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any
775) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]:
776 """Start a new kernel, and return its Manager and Client"""
777 km = AsyncKernelManager(kernel_name=kernel_name)
778 await km.start_kernel(**kwargs)
779 kc = km.client()
780 kc.start_channels()
781 try:
782 await kc.wait_for_ready(timeout=startup_timeout)
783 except RuntimeError:
784 kc.stop_channels()
785 await km.shutdown_kernel()
786 raise
787
788 return (km, kc)
789
790
791@contextmanager
792def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]:
793 """Context manager to create a kernel in a subprocess.
794
795 The kernel is shut down when the context exits.
796
797 Returns
798 -------
799 kernel_client: connected KernelClient instance
800 """
801 km, kc = start_new_kernel(**kwargs)
802 try:
803 yield kc
804 finally:
805 kc.stop_channels()
806 km.shutdown_kernel(now=True)