1"""Base class for a kernel that talks to frontends over 0MQ."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5from __future__ import annotations
6
7import asyncio
8import inspect
9import itertools
10import logging
11import os
12import socket
13import sys
14import threading
15import time
16import typing as t
17import uuid
18import warnings
19from datetime import datetime
20from functools import partial
21from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal
22
23from .thread import CONTROL_THREAD_NAME
24
25if sys.platform != "win32":
26 from signal import SIGKILL
27else:
28 SIGKILL = "windown-SIGKILL-sentinel"
29
30
31try:
32 # jupyter_client >= 5, use tz-aware now
33 from jupyter_client.session import utcnow as now
34except ImportError:
35 # jupyter_client < 5, use local now()
36 now = datetime.now
37
38import psutil
39import zmq
40from IPython.core.error import StdinNotImplementedError
41from jupyter_client.session import Session
42from tornado import ioloop
43from tornado.queues import Queue, QueueEmpty
44from traitlets.config.configurable import SingletonConfigurable
45from traitlets.traitlets import (
46 Any,
47 Bool,
48 Dict,
49 Float,
50 Instance,
51 Integer,
52 List,
53 Set,
54 Unicode,
55 default,
56 observe,
57)
58from zmq.eventloop.zmqstream import ZMQStream
59
60from ipykernel.jsonutil import json_clean
61
62from ._version import kernel_protocol_version
63from .iostream import OutStream
64
65
66def _accepts_parameters(meth, param_names):
67 parameters = inspect.signature(meth).parameters
68 accepts = {param: False for param in param_names}
69
70 for param in param_names:
71 param_spec = parameters.get(param)
72 accepts[param] = (
73 param_spec
74 and param_spec.kind in [param_spec.KEYWORD_ONLY, param_spec.POSITIONAL_OR_KEYWORD]
75 ) or any(p.kind == p.VAR_KEYWORD for p in parameters.values())
76
77 return accepts
78
79
80class Kernel(SingletonConfigurable):
81 """The base kernel class."""
82
83 # ---------------------------------------------------------------------------
84 # Kernel interface
85 # ---------------------------------------------------------------------------
86
87 # attribute to override with a GUI
88 eventloop = Any(None)
89
90 processes: dict[str, psutil.Process] = {}
91
92 @observe("eventloop")
93 def _update_eventloop(self, change):
94 """schedule call to eventloop from IOLoop"""
95 loop = ioloop.IOLoop.current()
96 if change.new is not None:
97 loop.add_callback(self.enter_eventloop)
98
99 session = Instance(Session, allow_none=True)
100 profile_dir = Instance("IPython.core.profiledir.ProfileDir", allow_none=True)
101 shell_stream = Instance(ZMQStream, allow_none=True)
102
103 shell_streams: List[t.Any] = List(
104 help="""Deprecated shell_streams alias. Use shell_stream
105
106 .. versionchanged:: 6.0
107 shell_streams is deprecated. Use shell_stream.
108 """
109 )
110
111 implementation: str
112 implementation_version: str
113 banner: str
114
115 @default("shell_streams")
116 def _shell_streams_default(self): # pragma: no cover
117 warnings.warn(
118 "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream",
119 DeprecationWarning,
120 stacklevel=2,
121 )
122 if self.shell_stream is not None:
123 return [self.shell_stream]
124 return []
125
126 @observe("shell_streams")
127 def _shell_streams_changed(self, change): # pragma: no cover
128 warnings.warn(
129 "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream",
130 DeprecationWarning,
131 stacklevel=2,
132 )
133 if len(change.new) > 1:
134 warnings.warn(
135 "Kernel only supports one shell stream. Additional streams will be ignored.",
136 RuntimeWarning,
137 stacklevel=2,
138 )
139 if change.new:
140 self.shell_stream = change.new[0]
141
142 control_stream = Instance(ZMQStream, allow_none=True)
143
144 debug_shell_socket = Any()
145
146 control_thread = Any()
147 shell_channel_thread = Any()
148 iopub_socket = Any()
149 iopub_thread = Any()
150 stdin_socket = Any()
151 log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment]
152
153 # identities:
154 int_id = Integer(-1)
155 ident = Unicode()
156
157 @default("ident")
158 def _default_ident(self):
159 return str(uuid.uuid4())
160
161 # This should be overridden by wrapper kernels that implement any real
162 # language.
163 language_info: dict[str, object] = {}
164
165 # any links that should go in the help menu
166 help_links: List[dict[str, str]] = List()
167
168 # Experimental option to break in non-user code.
169 # The ipykernel source is in the call stack, so the user
170 # has to manipulate the step-over and step-into in a wize way.
171 debug_just_my_code = Bool(
172 True,
173 help="""Set to False if you want to debug python standard and dependent libraries.
174 """,
175 ).tag(config=True)
176
177 # track associations with current request
178 # Private interface
179
180 _darwin_app_nap = Bool(
181 True,
182 help="""Whether to use appnope for compatibility with OS X App Nap.
183
184 Only affects OS X >= 10.9.
185 """,
186 ).tag(config=True)
187
188 # track associations with current request
189 _allow_stdin = Bool(False)
190 _parents: Dict[str, t.Any] = Dict({"shell": {}, "control": {}})
191 _parent_ident = Dict({"shell": b"", "control": b""})
192
193 @property
194 def _parent_header(self):
195 warnings.warn(
196 "Kernel._parent_header is deprecated in ipykernel 6. Use .get_parent()",
197 DeprecationWarning,
198 stacklevel=2,
199 )
200 return self.get_parent()
201
202 # Time to sleep after flushing the stdout/err buffers in each execute
203 # cycle. While this introduces a hard limit on the minimal latency of the
204 # execute cycle, it helps prevent output synchronization problems for
205 # clients.
206 # Units are in seconds. The minimum zmq latency on local host is probably
207 # ~150 microseconds, set this to 500us for now. We may need to increase it
208 # a little if it's not enough after more interactive testing.
209 _execute_sleep = Float(0.0005).tag(config=True)
210
211 # Frequency of the kernel's event loop.
212 # Units are in seconds, kernel subclasses for GUI toolkits may need to
213 # adapt to milliseconds.
214 _poll_interval = Float(0.01).tag(config=True)
215
216 stop_on_error_timeout = Float(
217 0.0,
218 config=True,
219 help="""time (in seconds) to wait for messages to arrive
220 when aborting queued requests after an error.
221
222 Requests that arrive within this window after an error
223 will be cancelled.
224
225 Increase in the event of unusually slow network
226 causing significant delays,
227 which can manifest as e.g. "Run all" in a notebook
228 aborting some, but not all, messages after an error.
229 """,
230 )
231
232 # If the shutdown was requested over the network, we leave here the
233 # necessary reply message so it can be sent by our registered atexit
234 # handler. This ensures that the reply is only sent to clients truly at
235 # the end of our shutdown process (which happens after the underlying
236 # IPython shell's own shutdown).
237 _shutdown_message = None
238
239 # This is a dict of port number that the kernel is listening on. It is set
240 # by record_ports and used by connect_request.
241 _recorded_ports = Dict()
242
243 # set of aborted msg_ids
244 aborted = Set()
245
246 # Track execution count here. For IPython, we override this to use the
247 # execution count we store in the shell.
248 execution_count = 0
249
250 msg_types = [
251 "execute_request",
252 "complete_request",
253 "inspect_request",
254 "history_request",
255 "comm_info_request",
256 "kernel_info_request",
257 "connect_request",
258 "shutdown_request",
259 "is_complete_request",
260 "interrupt_request",
261 # deprecated:
262 "apply_request",
263 ]
264 # add deprecated ipyparallel control messages
265 control_msg_types = [
266 *msg_types,
267 "clear_request",
268 "abort_request",
269 "debug_request",
270 "usage_request",
271 "create_subshell_request",
272 "delete_subshell_request",
273 "list_subshell_request",
274 ]
275
276 def __init__(self, **kwargs):
277 """Initialize the kernel."""
278 super().__init__(**kwargs)
279
280 # Kernel application may swap stdout and stderr to OutStream,
281 # which is the case in `IPKernelApp.init_io`, hence `sys.stdout`
282 # can already by different from TextIO at initialization time.
283 self._stdout: OutStream | t.TextIO = sys.stdout
284 self._stderr: OutStream | t.TextIO = sys.stderr
285
286 # Build dict of handlers for message types
287 self.shell_handlers = {}
288 for msg_type in self.msg_types:
289 self.shell_handlers[msg_type] = getattr(self, msg_type)
290
291 self.control_handlers = {}
292 for msg_type in self.control_msg_types:
293 self.control_handlers[msg_type] = getattr(self, msg_type)
294
295 # Storing the accepted parameters for do_execute, used in execute_request
296 self._do_exec_accepted_params = _accepts_parameters(
297 self.do_execute, ["cell_meta", "cell_id"]
298 )
299
300 async def dispatch_control(self, msg):
301 # Ensure only one control message is processed at a time
302 async with asyncio.Lock():
303 await self.process_control(msg)
304
305 async def process_control(self, msg):
306 """dispatch control requests"""
307 if not self.session:
308 return
309 idents, msg = self.session.feed_identities(msg, copy=False)
310 try:
311 msg = self.session.deserialize(msg, content=True, copy=False)
312 except Exception:
313 self.log.error("Invalid Control Message", exc_info=True) # noqa: G201
314 return
315
316 self.log.debug("Control received: %s", msg)
317
318 # Set the parent message for side effects.
319 self.set_parent(idents, msg, channel="control")
320 self._publish_status("busy", "control")
321
322 header = msg["header"]
323 msg_type = header["msg_type"]
324
325 handler = self.control_handlers.get(msg_type, None)
326 if handler is None:
327 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
328 else:
329 try:
330 result = handler(self.control_stream, idents, msg)
331 if inspect.isawaitable(result):
332 await result
333 except Exception:
334 self.log.error("Exception in control handler:", exc_info=True) # noqa: G201
335
336 sys.stdout.flush()
337 sys.stderr.flush()
338 self._publish_status_and_flush("idle", "control", self.control_stream)
339
340 def should_handle(self, stream, msg, idents):
341 """Check whether a shell-channel message should be handled
342
343 Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
344 """
345 msg_id = msg["header"]["msg_id"]
346 if msg_id in self.aborted:
347 # is it safe to assume a msg_id will not be resubmitted?
348 self.aborted.remove(msg_id)
349 self._send_abort_reply(stream, msg, idents)
350 return False
351 return True
352
353 async def dispatch_shell(self, msg, /, subshell_id: str | None = None):
354 """dispatch shell requests"""
355 if len(msg) == 1 and msg[0].buffer == b"stop aborting":
356 # Dummy "stop aborting" message to stop aborting execute requests on this subshell.
357 # This dummy message implementation allows the subshell to abort messages that are
358 # already queued in the zmq sockets/streams without having to know any of their
359 # details in advance.
360 if subshell_id is None:
361 self._aborting = False
362 else:
363 self.shell_channel_thread.manager.set_subshell_aborting(subshell_id, False)
364 return
365
366 if not self.session:
367 return
368
369 if self._supports_kernel_subshells:
370 assert threading.current_thread() not in (
371 self.control_thread,
372 self.shell_channel_thread,
373 )
374
375 idents, msg = self.session.feed_identities(msg, copy=False)
376 try:
377 msg = self.session.deserialize(msg, content=True, copy=False)
378 except Exception:
379 self.log.error("Invalid Message", exc_info=True) # noqa: G201
380 return
381
382 # Set the parent message for side effects.
383 self.set_parent(idents, msg, channel="shell")
384 self._publish_status("busy", "shell")
385
386 msg_type = msg["header"]["msg_type"]
387 assert msg["header"].get("subshell_id") == subshell_id
388
389 if self._supports_kernel_subshells:
390 stream = self.shell_channel_thread.manager.get_subshell_to_shell_channel_socket(
391 subshell_id
392 )
393 else:
394 stream = self.shell_stream
395
396 # Only abort execute requests
397 if msg_type == "execute_request":
398 if subshell_id is None:
399 aborting = self._aborting # type:ignore[unreachable]
400 else:
401 aborting = self.shell_channel_thread.manager.get_subshell_aborting(subshell_id)
402 if aborting:
403 self._send_abort_reply(stream, msg, idents)
404 self._publish_status_and_flush("idle", "shell", stream)
405 return
406
407 # Print some info about this message and leave a '--->' marker, so it's
408 # easier to trace visually the message chain when debugging. Each
409 # handler prints its message at the end.
410 self.log.debug("\n*** MESSAGE TYPE:%s***", msg_type)
411 self.log.debug(" Content: %s\n --->\n ", msg["content"])
412
413 if not self.should_handle(stream, msg, idents):
414 self._publish_status_and_flush("idle", "shell", stream)
415 return
416
417 handler = self.shell_handlers.get(msg_type, None)
418 if handler is None:
419 self.log.warning("Unknown message type: %r", msg_type)
420 else:
421 self.log.debug("%s: %s", msg_type, msg)
422 try:
423 self.pre_handler_hook()
424 except Exception:
425 self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True)
426 try:
427 result = handler(stream, idents, msg)
428 if inspect.isawaitable(result):
429 await result
430 except Exception:
431 self.log.error("Exception in message handler:", exc_info=True) # noqa: G201
432 except KeyboardInterrupt:
433 # Ctrl-c shouldn't crash the kernel here.
434 self.log.error("KeyboardInterrupt caught in kernel.")
435 finally:
436 try:
437 self.post_handler_hook()
438 except Exception:
439 self.log.debug("Unable to signal in post_handler_hook:", exc_info=True)
440
441 sys.stdout.flush()
442 sys.stderr.flush()
443 self._publish_status_and_flush("idle", "shell", stream)
444
445 def pre_handler_hook(self):
446 """Hook to execute before calling message handler"""
447 # ensure default_int_handler during handler call
448 self.saved_sigint_handler = signal(SIGINT, default_int_handler)
449
450 def post_handler_hook(self):
451 """Hook to execute after calling message handler"""
452 signal(SIGINT, self.saved_sigint_handler)
453
454 def enter_eventloop(self):
455 """enter eventloop"""
456 self.log.info("Entering eventloop %s", self.eventloop)
457 # record handle, so we can check when this changes
458 eventloop = self.eventloop
459 if eventloop is None:
460 self.log.info("Exiting as there is no eventloop")
461 return
462
463 async def advance_eventloop():
464 # check if eventloop changed:
465 if self.eventloop is not eventloop:
466 self.log.info("exiting eventloop %s", eventloop)
467 return
468 if self.msg_queue.qsize():
469 self.log.debug("Delaying eventloop due to waiting messages")
470 # still messages to process, make the eventloop wait
471 schedule_next()
472 return
473 self.log.debug("Advancing eventloop %s", eventloop)
474 try:
475 eventloop(self)
476 except KeyboardInterrupt:
477 # Ctrl-C shouldn't crash the kernel
478 self.log.error("KeyboardInterrupt caught in kernel")
479 if self.eventloop is eventloop:
480 # schedule advance again
481 schedule_next()
482
483 def schedule_next():
484 """Schedule the next advance of the eventloop"""
485 # call_later allows the io_loop to process other events if needed.
486 # Going through schedule_dispatch ensures all other dispatches on msg_queue
487 # are processed before we enter the eventloop, even if the previous dispatch was
488 # already consumed from the queue by process_one and the queue is
489 # technically empty.
490 self.log.debug("Scheduling eventloop advance")
491 self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop))
492
493 # begin polling the eventloop
494 schedule_next()
495
496 async def do_one_iteration(self):
497 """Process a single shell message
498
499 Any pending control messages will be flushed as well
500
501 .. versionchanged:: 5
502 This is now a coroutine
503 """
504 # flush messages off of shell stream into the message queue
505 if self.shell_stream:
506 self.shell_stream.flush()
507 # process at most one shell message per iteration
508 await self.process_one(wait=False)
509
510 async def process_one(self, wait=True):
511 """Process one request
512
513 Returns None if no message was handled.
514 """
515 if wait:
516 t, dispatch, args = await self.msg_queue.get()
517 else:
518 try:
519 t, dispatch, args = self.msg_queue.get_nowait()
520 except (asyncio.QueueEmpty, QueueEmpty):
521 return
522
523 if self.control_thread is None and self.control_stream is not None:
524 # If there isn't a separate control thread then this main thread handles both shell
525 # and control messages. Before processing a shell message we need to flush all control
526 # messages and allow them all to be processed.
527 await asyncio.sleep(0)
528 self.control_stream.flush()
529
530 socket = self.control_stream.socket
531 while socket.poll(1):
532 await asyncio.sleep(0)
533 self.control_stream.flush()
534
535 await dispatch(*args)
536
537 async def dispatch_queue(self):
538 """Coroutine to preserve order of message handling
539
540 Ensures that only one message is processing at a time,
541 even when the handler is async
542 """
543
544 while True:
545 try:
546 await self.process_one()
547 except Exception:
548 self.log.exception("Error in message handler")
549
550 _message_counter = Any(
551 help="""Monotonic counter of messages
552 """,
553 )
554
555 @default("_message_counter")
556 def _message_counter_default(self):
557 return itertools.count()
558
559 def schedule_dispatch(self, dispatch, *args):
560 """schedule a message for dispatch"""
561 idx = next(self._message_counter)
562
563 self.msg_queue.put_nowait(
564 (
565 idx,
566 dispatch,
567 args,
568 )
569 )
570 # ensure the eventloop wakes up
571 self.io_loop.add_callback(lambda: None)
572
573 def start(self):
574 """register dispatchers for streams"""
575 self.io_loop = ioloop.IOLoop.current()
576 self.msg_queue: Queue[t.Any] = Queue()
577 if not self.shell_channel_thread:
578 self.io_loop.add_callback(self.dispatch_queue)
579
580 if self.control_stream:
581 self.control_stream.on_recv(self.dispatch_control, copy=False)
582
583 if self.shell_stream:
584 if self.shell_channel_thread:
585 self.shell_channel_thread.manager.set_on_recv_callback(self.shell_main)
586 self.shell_stream.on_recv(self.shell_channel_thread_main, copy=False)
587 else:
588 self.shell_stream.on_recv(
589 partial(
590 self.schedule_dispatch,
591 self.dispatch_shell,
592 ),
593 copy=False,
594 )
595
596 # publish idle status
597 self._publish_status("starting", "shell")
598
599 async def shell_channel_thread_main(self, msg):
600 """Handler for shell messages received on shell_channel_thread"""
601 assert threading.current_thread() == self.shell_channel_thread
602
603 if self.session is None:
604 return
605
606 # deserialize only the header to get subshell_id
607 # Keep original message to send to subshell_id unmodified.
608 _, msg2 = self.session.feed_identities(msg, copy=False)
609 try:
610 msg3 = self.session.deserialize(msg2, content=False, copy=False)
611 subshell_id = msg3["header"].get("subshell_id")
612
613 # Find inproc pair socket to use to send message to correct subshell.
614 subshell_manager = self.shell_channel_thread.manager
615 socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id)
616 assert socket is not None
617 socket.send_multipart(msg, copy=False)
618 except Exception:
619 self.log.error("Invalid message", exc_info=True) # noqa: G201
620
621 if self.shell_stream:
622 self.shell_stream.flush()
623
624 async def shell_main(self, subshell_id: str | None, msg):
625 """Handler of shell messages for a single subshell"""
626 if self._supports_kernel_subshells:
627 if subshell_id is None:
628 assert threading.current_thread() == threading.main_thread()
629 else:
630 assert threading.current_thread() not in (
631 self.shell_channel_thread,
632 threading.main_thread(),
633 )
634 socket_pair = self.shell_channel_thread.manager.get_shell_channel_to_subshell_pair(
635 subshell_id
636 )
637 else:
638 assert subshell_id is None
639 assert threading.current_thread() == threading.main_thread()
640 socket_pair = None
641
642 try:
643 # Whilst executing a shell message, do not accept any other shell messages on the
644 # same subshell, so that cells are run sequentially. Without this we can run multiple
645 # async cells at the same time which would be a nice feature to have but is an API
646 # change.
647 if socket_pair:
648 socket_pair.pause_on_recv()
649 await self.dispatch_shell(msg, subshell_id=subshell_id)
650 finally:
651 if socket_pair:
652 socket_pair.resume_on_recv()
653
654 def record_ports(self, ports):
655 """Record the ports that this kernel is using.
656
657 The creator of the Kernel instance must call this methods if they
658 want the :meth:`connect_request` method to return the port numbers.
659 """
660 self._recorded_ports = ports
661
662 # ---------------------------------------------------------------------------
663 # Kernel request handlers
664 # ---------------------------------------------------------------------------
665
666 def _publish_execute_input(self, code, parent, execution_count):
667 """Publish the code request on the iopub stream."""
668 if not self.session:
669 return
670 self.session.send(
671 self.iopub_socket,
672 "execute_input",
673 {"code": code, "execution_count": execution_count},
674 parent=parent,
675 ident=self._topic("execute_input"),
676 )
677
678 def _publish_status(self, status, channel, parent=None):
679 """send status (busy/idle) on IOPub"""
680 if not self.session:
681 return
682 self.session.send(
683 self.iopub_socket,
684 "status",
685 {"execution_state": status},
686 parent=parent or self.get_parent(channel),
687 ident=self._topic("status"),
688 )
689
690 def _publish_status_and_flush(self, status, channel, stream, parent=None):
691 """send status on IOPub and flush specified stream to ensure reply is sent before handling the next reply"""
692 self._publish_status(status, channel, parent)
693 if stream and hasattr(stream, "flush"):
694 stream.flush(zmq.POLLOUT)
695
696 def _publish_debug_event(self, event):
697 if not self.session:
698 return
699 self.session.send(
700 self.iopub_socket,
701 "debug_event",
702 event,
703 parent=self.get_parent(),
704 ident=self._topic("debug_event"),
705 )
706
707 def set_parent(self, ident, parent, channel="shell"):
708 """Set the current parent request
709
710 Side effects (IOPub messages) and replies are associated with
711 the request that caused them via the parent_header.
712
713 The parent identity is used to route input_request messages
714 on the stdin channel.
715 """
716 self._parent_ident[channel] = ident
717 self._parents[channel] = parent
718
719 def get_parent(self, channel=None):
720 """Get the parent request associated with a channel.
721
722 .. versionadded:: 6
723
724 Parameters
725 ----------
726 channel : str
727 the name of the channel ('shell' or 'control')
728
729 Returns
730 -------
731 message : dict
732 the parent message for the most recent request on the channel.
733 """
734
735 if channel is None:
736 # If a channel is not specified, get information from current thread
737 if threading.current_thread().name == CONTROL_THREAD_NAME:
738 channel = "control"
739 else:
740 channel = "shell"
741
742 return self._parents.get(channel, {})
743
744 def send_response(
745 self,
746 stream,
747 msg_or_type,
748 content=None,
749 ident=None,
750 buffers=None,
751 track=False,
752 header=None,
753 metadata=None,
754 channel=None,
755 ):
756 """Send a response to the message we're currently processing.
757
758 This accepts all the parameters of :meth:`jupyter_client.session.Session.send`
759 except ``parent``.
760
761 This relies on :meth:`set_parent` having been called for the current
762 message.
763 """
764 if not self.session:
765 return None
766 return self.session.send(
767 stream,
768 msg_or_type,
769 content,
770 self.get_parent(channel),
771 ident,
772 buffers,
773 track,
774 header,
775 metadata,
776 )
777
778 def init_metadata(self, parent):
779 """Initialize metadata.
780
781 Run at the beginning of execution requests.
782 """
783 # FIXME: `started` is part of ipyparallel
784 # Remove for ipykernel 5.0
785 return {
786 "started": now(),
787 }
788
789 def finish_metadata(self, parent, metadata, reply_content):
790 """Finish populating metadata.
791
792 Run after completing an execution request.
793 """
794 return metadata
795
796 async def execute_request(self, stream, ident, parent):
797 """handle an execute_request"""
798 if not self.session:
799 return
800 try:
801 content = parent["content"]
802 code = content["code"]
803 silent = content.get("silent", False)
804 store_history = content.get("store_history", not silent)
805 user_expressions = content.get("user_expressions", {})
806 allow_stdin = content.get("allow_stdin", False)
807 cell_meta = parent.get("metadata", {})
808 cell_id = cell_meta.get("cellId")
809 except Exception:
810 self.log.error("Got bad msg: ")
811 self.log.error("%s", parent)
812 return
813
814 stop_on_error = content.get("stop_on_error", True)
815
816 metadata = self.init_metadata(parent)
817
818 # Re-broadcast our input for the benefit of listening clients, and
819 # start computing output
820 if not silent:
821 self.execution_count += 1
822 self._publish_execute_input(code, parent, self.execution_count)
823
824 # Arguments based on the do_execute signature
825 do_execute_args = {
826 "code": code,
827 "silent": silent,
828 "store_history": store_history,
829 "user_expressions": user_expressions,
830 "allow_stdin": allow_stdin,
831 }
832
833 if self._do_exec_accepted_params["cell_meta"]:
834 do_execute_args["cell_meta"] = cell_meta
835 if self._do_exec_accepted_params["cell_id"]:
836 do_execute_args["cell_id"] = cell_id
837
838 subshell_id = parent["header"].get("subshell_id")
839
840 # Call do_execute with the appropriate arguments
841 reply_content = self.do_execute(**do_execute_args)
842
843 if inspect.isawaitable(reply_content):
844 reply_content = await reply_content
845
846 # Flush output before sending the reply.
847 sys.stdout.flush()
848 sys.stderr.flush()
849 # FIXME: on rare occasions, the flush doesn't seem to make it to the
850 # clients... This seems to mitigate the problem, but we definitely need
851 # to better understand what's going on.
852 if self._execute_sleep:
853 time.sleep(self._execute_sleep)
854
855 # Send the reply.
856 reply_content = json_clean(reply_content)
857 metadata = self.finish_metadata(parent, metadata, reply_content)
858
859 reply_msg: dict[str, t.Any] = self.session.send( # type:ignore[assignment]
860 stream,
861 "execute_reply",
862 reply_content,
863 parent,
864 metadata=metadata,
865 ident=ident,
866 )
867
868 self.log.debug("%s", reply_msg)
869
870 if not silent and reply_msg["content"]["status"] == "error" and stop_on_error:
871 subshell_id = parent["header"].get("subshell_id")
872 self._abort_queues(subshell_id)
873
874 def do_execute(
875 self,
876 code,
877 silent,
878 store_history=True,
879 user_expressions=None,
880 allow_stdin=False,
881 *,
882 cell_meta=None,
883 cell_id=None,
884 ):
885 """Execute user code. Must be overridden by subclasses."""
886 raise NotImplementedError
887
888 async def complete_request(self, stream, ident, parent):
889 """Handle a completion request."""
890 if not self.session:
891 return
892 content = parent["content"]
893 code = content["code"]
894 cursor_pos = content["cursor_pos"]
895
896 matches = self.do_complete(code, cursor_pos)
897 if inspect.isawaitable(matches):
898 matches = await matches
899
900 matches = json_clean(matches)
901 self.session.send(stream, "complete_reply", matches, parent, ident)
902
903 def do_complete(self, code, cursor_pos):
904 """Override in subclasses to find completions."""
905 return {
906 "matches": [],
907 "cursor_end": cursor_pos,
908 "cursor_start": cursor_pos,
909 "metadata": {},
910 "status": "ok",
911 }
912
913 async def inspect_request(self, stream, ident, parent):
914 """Handle an inspect request."""
915 if not self.session:
916 return
917 content = parent["content"]
918
919 reply_content = self.do_inspect(
920 content["code"],
921 content["cursor_pos"],
922 content.get("detail_level", 0),
923 set(content.get("omit_sections", [])),
924 )
925 if inspect.isawaitable(reply_content):
926 reply_content = await reply_content
927
928 # Before we send this object over, we scrub it for JSON usage
929 reply_content = json_clean(reply_content)
930 msg = self.session.send(stream, "inspect_reply", reply_content, parent, ident)
931 self.log.debug("%s", msg)
932
933 def do_inspect(self, code, cursor_pos, detail_level=0, omit_sections=()):
934 """Override in subclasses to allow introspection."""
935 return {"status": "ok", "data": {}, "metadata": {}, "found": False}
936
937 async def history_request(self, stream, ident, parent):
938 """Handle a history request."""
939 if not self.session:
940 return
941 content = parent["content"]
942
943 reply_content = self.do_history(**content)
944 if inspect.isawaitable(reply_content):
945 reply_content = await reply_content
946
947 reply_content = json_clean(reply_content)
948 msg = self.session.send(stream, "history_reply", reply_content, parent, ident)
949 self.log.debug("%s", msg)
950
951 def do_history(
952 self,
953 hist_access_type,
954 output,
955 raw,
956 session=None,
957 start=None,
958 stop=None,
959 n=None,
960 pattern=None,
961 unique=False,
962 ):
963 """Override in subclasses to access history."""
964 return {"status": "ok", "history": []}
965
966 async def connect_request(self, stream, ident, parent):
967 """Handle a connect request."""
968 if not self.session:
969 return
970 content = self._recorded_ports.copy() if self._recorded_ports else {}
971 content["status"] = "ok"
972 msg = self.session.send(stream, "connect_reply", content, parent, ident)
973 self.log.debug("%s", msg)
974
975 @property
976 def kernel_info(self):
977 info = {
978 "protocol_version": kernel_protocol_version,
979 "implementation": self.implementation,
980 "implementation_version": self.implementation_version,
981 "language_info": self.language_info,
982 "banner": self.banner,
983 "help_links": self.help_links,
984 "supported_features": [],
985 }
986 if self._supports_kernel_subshells:
987 info["supported_features"] = ["kernel subshells"]
988 return info
989
990 async def kernel_info_request(self, stream, ident, parent):
991 """Handle a kernel info request."""
992 if not self.session:
993 return
994 content = {"status": "ok"}
995 content.update(self.kernel_info)
996 msg = self.session.send(stream, "kernel_info_reply", content, parent, ident)
997 self.log.debug("%s", msg)
998
999 async def comm_info_request(self, stream, ident, parent):
1000 """Handle a comm info request."""
1001 if not self.session:
1002 return
1003 content = parent["content"]
1004 target_name = content.get("target_name", None)
1005
1006 # Should this be moved to ipkernel?
1007 if hasattr(self, "comm_manager"):
1008 comms = {
1009 k: dict(target_name=v.target_name)
1010 for (k, v) in self.comm_manager.comms.items()
1011 if v.target_name == target_name or target_name is None
1012 }
1013 else:
1014 comms = {}
1015 reply_content = dict(comms=comms, status="ok")
1016 msg = self.session.send(stream, "comm_info_reply", reply_content, parent, ident)
1017 self.log.debug("%s", msg)
1018
1019 def _send_interrupt_children(self):
1020 if os.name == "nt":
1021 self.log.error("Interrupt message not supported on Windows")
1022 else:
1023 pid = os.getpid()
1024 pgid = os.getpgid(pid)
1025 # Prefer process-group over process
1026 # but only if the kernel is the leader of the process group
1027 if pgid and pgid == pid and hasattr(os, "killpg"):
1028 try:
1029 os.killpg(pgid, SIGINT)
1030 except OSError:
1031 os.kill(pid, SIGINT)
1032 raise
1033 else:
1034 os.kill(pid, SIGINT)
1035
1036 async def interrupt_request(self, stream, ident, parent):
1037 """Handle an interrupt request."""
1038 if not self.session:
1039 return
1040 content: dict[str, t.Any] = {"status": "ok"}
1041 try:
1042 self._send_interrupt_children()
1043 except OSError as err:
1044 import traceback
1045
1046 content = {
1047 "status": "error",
1048 "traceback": traceback.format_stack(),
1049 "ename": str(type(err).__name__),
1050 "evalue": str(err),
1051 }
1052
1053 self.session.send(stream, "interrupt_reply", content, parent, ident=ident)
1054 return
1055
1056 async def shutdown_request(self, stream, ident, parent):
1057 """Handle a shutdown request."""
1058 if not self.session:
1059 return
1060 content = self.do_shutdown(parent["content"]["restart"])
1061 if inspect.isawaitable(content):
1062 content = await content
1063 self.session.send(stream, "shutdown_reply", content, parent, ident=ident)
1064 # same content, but different msg_id for broadcasting on IOPub
1065 self._shutdown_message = self.session.msg("shutdown_reply", content, parent)
1066
1067 await self._at_shutdown()
1068
1069 self.log.debug("Stopping control ioloop")
1070 if self.control_stream:
1071 control_io_loop = self.control_stream.io_loop
1072 control_io_loop.add_callback(control_io_loop.stop)
1073
1074 self.log.debug("Stopping shell ioloop")
1075 self.io_loop.add_callback(self.io_loop.stop)
1076 if self.shell_stream and self.shell_stream.io_loop != self.io_loop:
1077 shell_io_loop = self.shell_stream.io_loop
1078 shell_io_loop.add_callback(shell_io_loop.stop)
1079
1080 def do_shutdown(self, restart):
1081 """Override in subclasses to do things when the frontend shuts down the
1082 kernel.
1083 """
1084 return {"status": "ok", "restart": restart}
1085
1086 async def is_complete_request(self, stream, ident, parent):
1087 """Handle an is_complete request."""
1088 if not self.session:
1089 return
1090 content = parent["content"]
1091 code = content["code"]
1092
1093 reply_content = self.do_is_complete(code)
1094 if inspect.isawaitable(reply_content):
1095 reply_content = await reply_content
1096 reply_content = json_clean(reply_content)
1097 reply_msg = self.session.send(stream, "is_complete_reply", reply_content, parent, ident)
1098 self.log.debug("%s", reply_msg)
1099
1100 def do_is_complete(self, code):
1101 """Override in subclasses to find completions."""
1102 return {"status": "unknown"}
1103
1104 async def debug_request(self, stream, ident, parent):
1105 """Handle a debug request."""
1106 if not self.session:
1107 return
1108 content = parent["content"]
1109 reply_content = self.do_debug_request(content)
1110 if inspect.isawaitable(reply_content):
1111 reply_content = await reply_content
1112 reply_content = json_clean(reply_content)
1113 reply_msg = self.session.send(stream, "debug_reply", reply_content, parent, ident)
1114 self.log.debug("%s", reply_msg)
1115
1116 def get_process_metric_value(self, process, name, attribute=None):
1117 """Get the process metric value."""
1118 try:
1119 metric_value = getattr(process, name)()
1120 if attribute is not None: # ... a named tuple
1121 return getattr(metric_value, attribute)
1122 # ... or a number
1123 return metric_value
1124 # Avoid littering logs with stack traces
1125 # complaining about dead processes
1126 except BaseException:
1127 return 0
1128
1129 async def usage_request(self, stream, ident, parent):
1130 """Handle a usage request."""
1131 if not self.session:
1132 return
1133 reply_content = {"hostname": socket.gethostname(), "pid": os.getpid()}
1134 current_process = psutil.Process()
1135 all_processes = [current_process, *current_process.children(recursive=True)]
1136 # Ensure 1) self.processes is updated to only current subprocesses
1137 # and 2) we reuse processes when possible (needed for accurate CPU)
1138 self.processes = {
1139 process.pid: self.processes.get(process.pid, process) # type:ignore[misc,call-overload]
1140 for process in all_processes
1141 }
1142 reply_content["kernel_cpu"] = sum(
1143 [
1144 self.get_process_metric_value(process, "cpu_percent", None)
1145 for process in self.processes.values()
1146 ]
1147 )
1148 mem_info_type = "pss" if hasattr(current_process.memory_full_info(), "pss") else "rss"
1149 reply_content["kernel_memory"] = sum(
1150 [
1151 self.get_process_metric_value(process, "memory_full_info", mem_info_type)
1152 for process in self.processes.values()
1153 ]
1154 )
1155 cpu_percent = psutil.cpu_percent()
1156 # https://psutil.readthedocs.io/en/latest/index.html?highlight=cpu#psutil.cpu_percent
1157 # The first time cpu_percent is called it will return a meaningless 0.0 value which you are supposed to ignore.
1158 if cpu_percent is not None and cpu_percent != 0.0: # type:ignore[redundant-expr]
1159 reply_content["host_cpu_percent"] = cpu_percent
1160 reply_content["cpu_count"] = psutil.cpu_count(logical=True)
1161 reply_content["host_virtual_memory"] = dict(psutil.virtual_memory()._asdict())
1162 reply_msg = self.session.send(stream, "usage_reply", reply_content, parent, ident)
1163 self.log.debug("%s", reply_msg)
1164
1165 async def do_debug_request(self, msg):
1166 raise NotImplementedError
1167
1168 async def create_subshell_request(self, socket, ident, parent) -> None:
1169 if not self.session:
1170 return
1171 if not self._supports_kernel_subshells:
1172 self.log.error("Subshells are not supported by this kernel")
1173 return
1174
1175 assert threading.current_thread().name == CONTROL_THREAD_NAME
1176
1177 # This should only be called in the control thread if it exists.
1178 # Request is passed to shell channel thread to process.
1179 control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket
1180 control_socket.send_json({"type": "create"})
1181 reply = control_socket.recv_json()
1182 self.session.send(socket, "create_subshell_reply", reply, parent, ident)
1183
1184 async def delete_subshell_request(self, socket, ident, parent) -> None:
1185 if not self.session:
1186 return
1187 if not self._supports_kernel_subshells:
1188 self.log.error("KERNEL SUBSHELLS NOT SUPPORTED")
1189 return
1190
1191 assert threading.current_thread().name == CONTROL_THREAD_NAME
1192
1193 try:
1194 content = parent["content"]
1195 subshell_id = content["subshell_id"]
1196 except Exception:
1197 self.log.error("Got bad msg from parent: %s", parent)
1198 return
1199
1200 # This should only be called in the control thread if it exists.
1201 # Request is passed to shell channel thread to process.
1202 control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket
1203 control_socket.send_json({"type": "delete", "subshell_id": subshell_id})
1204 reply = control_socket.recv_json()
1205
1206 self.session.send(socket, "delete_subshell_reply", reply, parent, ident)
1207
1208 async def list_subshell_request(self, socket, ident, parent) -> None:
1209 if not self.session:
1210 return
1211 if not self._supports_kernel_subshells:
1212 self.log.error("Subshells are not supported by this kernel")
1213 return
1214
1215 assert threading.current_thread().name == CONTROL_THREAD_NAME
1216
1217 # This should only be called in the control thread if it exists.
1218 # Request is passed to shell channel thread to process.
1219 control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket
1220 control_socket.send_json({"type": "list"})
1221 reply = control_socket.recv_json()
1222
1223 self.session.send(socket, "list_subshell_reply", reply, parent, ident)
1224
1225 # ---------------------------------------------------------------------------
1226 # Engine methods (DEPRECATED)
1227 # ---------------------------------------------------------------------------
1228
1229 async def apply_request(self, stream, ident, parent): # pragma: no cover
1230 """Handle an apply request."""
1231 self.log.warning("apply_request is deprecated in kernel_base, moving to ipyparallel.")
1232 try:
1233 content = parent["content"]
1234 bufs = parent["buffers"]
1235 msg_id = parent["header"]["msg_id"]
1236 except Exception:
1237 self.log.error("Got bad msg: %s", parent, exc_info=True) # noqa: G201
1238 return
1239
1240 md = self.init_metadata(parent)
1241
1242 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
1243
1244 # flush i/o
1245 sys.stdout.flush()
1246 sys.stderr.flush()
1247
1248 md = self.finish_metadata(parent, md, reply_content)
1249 if not self.session:
1250 return
1251 self.session.send(
1252 stream,
1253 "apply_reply",
1254 reply_content,
1255 parent=parent,
1256 ident=ident,
1257 buffers=result_buf,
1258 metadata=md,
1259 )
1260
1261 def do_apply(self, content, bufs, msg_id, reply_metadata):
1262 """DEPRECATED"""
1263 raise NotImplementedError
1264
1265 # ---------------------------------------------------------------------------
1266 # Control messages (DEPRECATED)
1267 # ---------------------------------------------------------------------------
1268
1269 async def abort_request(self, stream, ident, parent): # pragma: no cover
1270 """abort a specific msg by id"""
1271 self.log.warning(
1272 "abort_request is deprecated in kernel_base. It is only part of IPython parallel"
1273 )
1274 msg_ids = parent["content"].get("msg_ids", None)
1275 if isinstance(msg_ids, str):
1276 msg_ids = [msg_ids]
1277 if not msg_ids:
1278 subshell_id = parent["header"].get("subshell_id")
1279 self._abort_queues(subshell_id)
1280
1281 for mid in msg_ids:
1282 self.aborted.add(str(mid))
1283
1284 content = dict(status="ok")
1285 if not self.session:
1286 return
1287 reply_msg = self.session.send(
1288 stream, "abort_reply", content=content, parent=parent, ident=ident
1289 )
1290 self.log.debug("%s", reply_msg)
1291
1292 async def clear_request(self, stream, idents, parent): # pragma: no cover
1293 """Clear our namespace."""
1294 self.log.warning(
1295 "clear_request is deprecated in kernel_base. It is only part of IPython parallel"
1296 )
1297 content = self.do_clear()
1298 if self.session:
1299 self.session.send(stream, "clear_reply", ident=idents, parent=parent, content=content)
1300
1301 def do_clear(self):
1302 """DEPRECATED since 4.0.3"""
1303 raise NotImplementedError
1304
1305 # ---------------------------------------------------------------------------
1306 # Protected interface
1307 # ---------------------------------------------------------------------------
1308
1309 def _topic(self, topic):
1310 """prefixed topic for IOPub messages"""
1311 base = "kernel.%s" % self.ident
1312
1313 return (f"{base}.{topic}").encode()
1314
1315 _aborting = Bool(False)
1316
1317 def _post_dummy_stop_aborting_message(self, subshell_id: str | None) -> None:
1318 """Post a dummy message to the correct subshell that when handled will unset
1319 the _aborting flag.
1320 """
1321 subshell_manager = self.shell_channel_thread.manager
1322 socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id)
1323 assert socket is not None
1324
1325 msg = b"stop aborting" # Magic string for dummy message.
1326 socket.send(msg, copy=False)
1327
1328 def _abort_queues(self, subshell_id: str | None = None):
1329 # while this flag is true,
1330 # execute requests will be aborted
1331
1332 if subshell_id is None:
1333 self._aborting = True
1334 else:
1335 self.shell_channel_thread.manager.set_subshell_aborting(subshell_id, True)
1336 self.log.info("Aborting queue")
1337
1338 if self.shell_channel_thread:
1339 # Only really need to do this if there are messages already queued
1340 self.shell_channel_thread.io_loop.add_callback(
1341 self._post_dummy_stop_aborting_message, subshell_id
1342 )
1343 return
1344
1345 # flush streams, so all currently waiting messages
1346 # are added to the queue
1347 if self.shell_stream:
1348 self.shell_stream.flush()
1349
1350 # Callback to signal that we are done aborting
1351 # dispatch functions _must_ be async
1352 async def stop_aborting():
1353 self.log.info("Finishing abort")
1354 self._aborting = False
1355
1356 # put the stop-aborting event on the message queue
1357 # so that all messages already waiting in the queue are aborted
1358 # before we reset the flag
1359 schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)
1360
1361 if self.stop_on_error_timeout:
1362 # if we have a delay, give messages this long to arrive on the queue
1363 # before we stop aborting requests
1364 self.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting)
1365 # If we have an eventloop, it may interfere with the call_later above.
1366 # If the loop has a _schedule_exit method, we call that so the loop exits
1367 # after stop_on_error_timeout, returning to the main io_loop and letting
1368 # the call_later fire.
1369 if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"):
1370 self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01)
1371 else:
1372 schedule_stop_aborting()
1373
1374 def _send_abort_reply(self, stream, msg, idents):
1375 """Send a reply to an aborted request"""
1376 if not self.session:
1377 return
1378 self.log.info("Aborting %s: %s", msg["header"]["msg_id"], msg["header"]["msg_type"])
1379 reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply"
1380 status = {"status": "aborted"}
1381 md = self.init_metadata(msg)
1382 md = self.finish_metadata(msg, md, status)
1383 md.update(status)
1384
1385 self.session.send(
1386 stream,
1387 reply_type,
1388 metadata=md,
1389 content=status,
1390 parent=msg,
1391 ident=idents,
1392 )
1393
1394 def _no_raw_input(self):
1395 """Raise StdinNotImplementedError if active frontend doesn't support
1396 stdin."""
1397 msg = "raw_input was called, but this frontend does not support stdin."
1398 raise StdinNotImplementedError(msg)
1399
1400 def getpass(self, prompt="", stream=None):
1401 """Forward getpass to frontends
1402
1403 Raises
1404 ------
1405 StdinNotImplementedError if active frontend doesn't support stdin.
1406 """
1407 if not self._allow_stdin:
1408 msg = "getpass was called, but this frontend does not support input requests."
1409 raise StdinNotImplementedError(msg)
1410 if stream is not None:
1411 import warnings
1412
1413 warnings.warn(
1414 "The `stream` parameter of `getpass.getpass` will have no effect when using ipykernel",
1415 UserWarning,
1416 stacklevel=2,
1417 )
1418 return self._input_request(
1419 prompt,
1420 self._parent_ident["shell"],
1421 self.get_parent("shell"),
1422 password=True,
1423 )
1424
1425 def raw_input(self, prompt=""):
1426 """Forward raw_input to frontends
1427
1428 Raises
1429 ------
1430 StdinNotImplementedError if active frontend doesn't support stdin.
1431 """
1432 if not self._allow_stdin:
1433 msg = "raw_input was called, but this frontend does not support input requests."
1434 raise StdinNotImplementedError(msg)
1435 return self._input_request(
1436 str(prompt),
1437 self._parent_ident["shell"],
1438 self.get_parent("shell"),
1439 password=False,
1440 )
1441
1442 def _input_request(self, prompt, ident, parent, password=False):
1443 # Flush output before making the request.
1444 sys.stderr.flush()
1445 sys.stdout.flush()
1446
1447 # flush the stdin socket, to purge stale replies
1448 while True:
1449 try:
1450 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
1451 except zmq.ZMQError as e:
1452 if e.errno == zmq.EAGAIN:
1453 break
1454 raise
1455
1456 # Send the input request.
1457 assert self.session is not None
1458 content = json_clean(dict(prompt=prompt, password=password))
1459 self.session.send(self.stdin_socket, "input_request", content, parent, ident=ident)
1460
1461 # Await a response.
1462 while True:
1463 try:
1464 # Use polling with select() so KeyboardInterrupts can get
1465 # through; doing a blocking recv() means stdin reads are
1466 # uninterruptible on Windows. We need a timeout because
1467 # zmq.select() is also uninterruptible, but at least this
1468 # way reads get noticed immediately and KeyboardInterrupts
1469 # get noticed fairly quickly by human response time standards.
1470 rlist, _, xlist = zmq.select([self.stdin_socket], [], [self.stdin_socket], 0.01)
1471 if rlist or xlist:
1472 ident, reply = self.session.recv(self.stdin_socket)
1473 if (ident, reply) != (None, None):
1474 break
1475 except KeyboardInterrupt:
1476 # re-raise KeyboardInterrupt, to truncate traceback
1477 msg = "Interrupted by user"
1478 raise KeyboardInterrupt(msg) from None
1479 except Exception:
1480 self.log.warning("Invalid Message:", exc_info=True)
1481
1482 try:
1483 value = reply["content"]["value"] # type:ignore[index]
1484 except Exception:
1485 self.log.error("Bad input_reply: %s", parent)
1486 value = ""
1487 if value == "\x04":
1488 # EOF
1489 raise EOFError
1490 return value
1491
1492 def _signal_children(self, signum):
1493 """
1494 Send a signal to all our children
1495
1496 Like `killpg`, but does not include the current process
1497 (or possible parents).
1498 """
1499 sig_rep = f"{Signals(signum)!r}"
1500 for p in self._process_children():
1501 self.log.debug("Sending %s to subprocess %s", sig_rep, p)
1502 try:
1503 if signum == SIGTERM:
1504 p.terminate()
1505 elif signum == SIGKILL:
1506 p.kill()
1507 else:
1508 p.send_signal(signum)
1509 except psutil.NoSuchProcess:
1510 pass
1511
1512 def _process_children(self):
1513 """Retrieve child processes in the kernel's process group
1514
1515 Avoids:
1516 - including parents and self with killpg
1517 - including all children that may have forked-off a new group
1518 """
1519 kernel_process = psutil.Process()
1520 all_children = kernel_process.children(recursive=True)
1521 if os.name == "nt":
1522 return all_children
1523 kernel_pgid = os.getpgrp()
1524 process_group_children = []
1525 for child in all_children:
1526 try:
1527 child_pgid = os.getpgid(child.pid)
1528 except OSError:
1529 pass
1530 else:
1531 if child_pgid == kernel_pgid:
1532 process_group_children.append(child)
1533 return process_group_children
1534
1535 async def _progressively_terminate_all_children(self):
1536 sleeps = (0.01, 0.03, 0.1, 0.3, 1, 3, 10)
1537 if not self._process_children():
1538 self.log.debug("Kernel has no children.")
1539 return
1540
1541 for signum in (SIGTERM, SIGKILL):
1542 for delay in sleeps:
1543 children = self._process_children()
1544 if not children:
1545 self.log.debug("No more children, continuing shutdown routine.")
1546 return
1547 # signals only children, not current process
1548 self._signal_children(signum)
1549 self.log.debug(
1550 "Will sleep %s sec before checking for children and retrying. %s",
1551 delay,
1552 children,
1553 )
1554 await asyncio.sleep(delay)
1555
1556 async def _at_shutdown(self):
1557 """Actions taken at shutdown by the kernel, called by python's atexit."""
1558 try:
1559 await self._progressively_terminate_all_children()
1560 except Exception as e:
1561 self.log.exception("Exception during subprocesses termination %s", e)
1562
1563 finally:
1564 if self._shutdown_message is not None and self.session:
1565 self.session.send(
1566 self.iopub_socket,
1567 self._shutdown_message,
1568 ident=self._topic("shutdown"),
1569 )
1570 self.log.debug("%s", self._shutdown_message)
1571 if self.control_stream:
1572 self.control_stream.flush(zmq.POLLOUT)
1573
1574 @property
1575 def _supports_kernel_subshells(self):
1576 return self.shell_channel_thread is not None