Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/nbclient/client.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""nbclient implementation."""
2from __future__ import annotations
4import asyncio
5import atexit
6import base64
7import collections
8import datetime
9import re
10import signal
11import typing as t
12from contextlib import asynccontextmanager, contextmanager
13from queue import Empty
14from textwrap import dedent
15from time import monotonic
17from jupyter_client.client import KernelClient
18from jupyter_client.manager import KernelManager
19from nbformat import NotebookNode
20from nbformat.v4 import output_from_msg
21from traitlets import Any, Bool, Callable, Dict, Enum, Integer, List, Type, Unicode, default
22from traitlets.config.configurable import LoggingConfigurable
24from .exceptions import (
25 CellControlSignal,
26 CellExecutionComplete,
27 CellExecutionError,
28 CellTimeoutError,
29 DeadKernelError,
30)
31from .output_widget import OutputWidget
32from .util import ensure_async, run_hook, run_sync
34_RGX_CARRIAGERETURN = re.compile(r".*\r(?=[^\n])")
35_RGX_BACKSPACE = re.compile(r"[^\n]\b")
37# mypy: disable-error-code="no-untyped-call"
40def timestamp(msg: dict[str, t.Any] | None = None) -> str:
41 """Get the timestamp for a message."""
42 if msg and "header" in msg: # The test mocks don't provide a header, so tolerate that
43 msg_header = msg["header"]
44 if "date" in msg_header and isinstance(msg_header["date"], datetime.datetime):
45 try:
46 # reformat datetime into expected format
47 formatted_time = datetime.datetime.strftime(
48 msg_header["date"], "%Y-%m-%dT%H:%M:%S.%fZ"
49 )
50 if (
51 formatted_time
52 ): # docs indicate strftime may return empty string, so let's catch that too
53 return formatted_time
54 except Exception: # noqa
55 pass # fallback to a local time
57 return datetime.datetime.utcnow().isoformat() + "Z"
60class NotebookClient(LoggingConfigurable):
61 """
62 Encompasses a Client for executing cells in a notebook
63 """
65 timeout = Integer(
66 None,
67 allow_none=True,
68 help=dedent(
69 """
70 The time to wait (in seconds) for output from executions.
71 If a cell execution takes longer, a TimeoutError is raised.
73 ``None`` or ``-1`` will disable the timeout. If ``timeout_func`` is set,
74 it overrides ``timeout``.
75 """
76 ),
77 ).tag(config=True)
79 timeout_func: t.Callable[..., int | None] | None = Any( # type:ignore[assignment]
80 default_value=None,
81 allow_none=True,
82 help=dedent(
83 """
84 A callable which, when given the cell source as input,
85 returns the time to wait (in seconds) for output from cell
86 executions. If a cell execution takes longer, a TimeoutError
87 is raised.
89 Returning ``None`` or ``-1`` will disable the timeout for the cell.
90 Not setting ``timeout_func`` will cause the client to
91 default to using the ``timeout`` trait for all cells. The
92 ``timeout_func`` trait overrides ``timeout`` if it is not ``None``.
93 """
94 ),
95 ).tag(config=True)
97 interrupt_on_timeout = Bool(
98 False,
99 help=dedent(
100 """
101 If execution of a cell times out, interrupt the kernel and
102 continue executing other cells rather than throwing an error and
103 stopping.
104 """
105 ),
106 ).tag(config=True)
108 error_on_timeout = Dict(
109 default_value=None,
110 allow_none=True,
111 help=dedent(
112 """
113 If a cell execution was interrupted after a timeout, don't wait for
114 the execute_reply from the kernel (e.g. KeyboardInterrupt error).
115 Instead, return an execute_reply with the given error, which should
116 be of the following form::
118 {
119 'ename': str, # Exception name, as a string
120 'evalue': str, # Exception value, as a string
121 'traceback': list(str), # traceback frames, as strings
122 }
123 """
124 ),
125 ).tag(config=True)
127 startup_timeout = Integer(
128 60,
129 help=dedent(
130 """
131 The time to wait (in seconds) for the kernel to start.
132 If kernel startup takes longer, a RuntimeError is
133 raised.
134 """
135 ),
136 ).tag(config=True)
138 allow_errors = Bool(
139 False,
140 help=dedent(
141 """
142 If ``False`` (default), when a cell raises an error the
143 execution is stopped and a ``CellExecutionError``
144 is raised, except if the error name is in
145 ``allow_error_names``.
146 If ``True``, execution errors are ignored and the execution
147 is continued until the end of the notebook. Output from
148 exceptions is included in the cell output in both cases.
149 """
150 ),
151 ).tag(config=True)
153 allow_error_names = List(
154 Unicode(),
155 help=dedent(
156 """
157 List of error names which won't stop the execution. Use this if the
158 ``allow_errors`` option it too general and you want to allow only
159 specific kinds of errors.
160 """
161 ),
162 ).tag(config=True)
164 force_raise_errors = Bool(
165 False,
166 help=dedent(
167 """
168 If False (default), errors from executing the notebook can be
169 allowed with a ``raises-exception`` tag on a single cell, or the
170 ``allow_errors`` or ``allow_error_names`` configurable options for
171 all cells. An allowed error will be recorded in notebook output, and
172 execution will continue. If an error occurs when it is not
173 explicitly allowed, a ``CellExecutionError`` will be raised.
174 If True, ``CellExecutionError`` will be raised for any error that occurs
175 while executing the notebook. This overrides the ``allow_errors``
176 and ``allow_error_names`` options and the ``raises-exception`` cell
177 tag.
178 """
179 ),
180 ).tag(config=True)
182 skip_cells_with_tag = Unicode(
183 "skip-execution",
184 help=dedent(
185 """
186 Name of the cell tag to use to denote a cell that should be skipped.
187 """
188 ),
189 ).tag(config=True)
191 extra_arguments = List(Unicode()).tag(config=True)
193 kernel_name = Unicode(
194 "",
195 help=dedent(
196 """
197 Name of kernel to use to execute the cells.
198 If not set, use the kernel_spec embedded in the notebook.
199 """
200 ),
201 ).tag(config=True)
203 raise_on_iopub_timeout = Bool(
204 False,
205 help=dedent(
206 """
207 If ``False`` (default), then the kernel will continue waiting for
208 iopub messages until it receives a kernel idle message, or until a
209 timeout occurs, at which point the currently executing cell will be
210 skipped. If ``True``, then an error will be raised after the first
211 timeout. This option generally does not need to be used, but may be
212 useful in contexts where there is the possibility of executing
213 notebooks with memory-consuming infinite loops.
214 """
215 ),
216 ).tag(config=True)
218 store_widget_state = Bool(
219 True,
220 help=dedent(
221 """
222 If ``True`` (default), then the state of the Jupyter widgets created
223 at the kernel will be stored in the metadata of the notebook.
224 """
225 ),
226 ).tag(config=True)
228 record_timing = Bool(
229 True,
230 help=dedent(
231 """
232 If ``True`` (default), then the execution timings of each cell will
233 be stored in the metadata of the notebook.
234 """
235 ),
236 ).tag(config=True)
238 iopub_timeout = Integer(
239 4,
240 allow_none=False,
241 help=dedent(
242 """
243 The time to wait (in seconds) for IOPub output. This generally
244 doesn't need to be set, but on some slow networks (such as CI
245 systems) the default timeout might not be long enough to get all
246 messages.
247 """
248 ),
249 ).tag(config=True)
251 shell_timeout_interval = Integer(
252 5,
253 allow_none=False,
254 help=dedent(
255 """
256 The time to wait (in seconds) for Shell output before retrying.
257 This generally doesn't need to be set, but if one needs to check
258 for dead kernels at a faster rate this can help.
259 """
260 ),
261 ).tag(config=True)
263 shutdown_kernel = Enum(
264 ["graceful", "immediate"],
265 default_value="graceful",
266 help=dedent(
267 """
268 If ``graceful`` (default), then the kernel is given time to clean
269 up after executing all cells, e.g., to execute its ``atexit`` hooks.
270 If ``immediate``, then the kernel is signaled to immediately
271 terminate.
272 """
273 ),
274 ).tag(config=True)
276 ipython_hist_file = Unicode(
277 default_value=":memory:",
278 help="""Path to file to use for SQLite history database for an IPython kernel.
280 The specific value ``:memory:`` (including the colon
281 at both end but not the back ticks), avoids creating a history file. Otherwise, IPython
282 will create a history file for each kernel.
284 When running kernels simultaneously (e.g. via multiprocessing) saving history a single
285 SQLite file can result in database errors, so using ``:memory:`` is recommended in
286 non-interactive contexts.
287 """,
288 ).tag(config=True)
290 kernel_manager_class = Type(
291 config=True, klass=KernelManager, help="The kernel manager class to use."
292 )
294 on_notebook_start = Callable(
295 default_value=None,
296 allow_none=True,
297 help=dedent(
298 """
299 A callable which executes after the kernel manager and kernel client are setup, and
300 cells are about to execute.
301 Called with kwargs ``notebook``.
302 """
303 ),
304 ).tag(config=True)
306 on_notebook_complete = Callable(
307 default_value=None,
308 allow_none=True,
309 help=dedent(
310 """
311 A callable which executes after the kernel is cleaned up.
312 Called with kwargs ``notebook``.
313 """
314 ),
315 ).tag(config=True)
317 on_notebook_error = Callable(
318 default_value=None,
319 allow_none=True,
320 help=dedent(
321 """
322 A callable which executes when the notebook encounters an error.
323 Called with kwargs ``notebook``.
324 """
325 ),
326 ).tag(config=True)
328 on_cell_start = Callable(
329 default_value=None,
330 allow_none=True,
331 help=dedent(
332 """
333 A callable which executes before a cell is executed and before non-executing cells
334 are skipped.
335 Called with kwargs ``cell`` and ``cell_index``.
336 """
337 ),
338 ).tag(config=True)
340 on_cell_execute = Callable(
341 default_value=None,
342 allow_none=True,
343 help=dedent(
344 """
345 A callable which executes just before a code cell is executed.
346 Called with kwargs ``cell`` and ``cell_index``.
347 """
348 ),
349 ).tag(config=True)
351 on_cell_complete = Callable(
352 default_value=None,
353 allow_none=True,
354 help=dedent(
355 """
356 A callable which executes after a cell execution is complete. It is
357 called even when a cell results in a failure.
358 Called with kwargs ``cell`` and ``cell_index``.
359 """
360 ),
361 ).tag(config=True)
363 on_cell_executed = Callable(
364 default_value=None,
365 allow_none=True,
366 help=dedent(
367 """
368 A callable which executes just after a code cell is executed, whether
369 or not it results in an error.
370 Called with kwargs ``cell``, ``cell_index`` and ``execute_reply``.
371 """
372 ),
373 ).tag(config=True)
375 on_cell_error = Callable(
376 default_value=None,
377 allow_none=True,
378 help=dedent(
379 """
380 A callable which executes when a cell execution results in an error.
381 This is executed even if errors are suppressed with ``cell_allows_errors``.
382 Called with kwargs ``cell`, ``cell_index`` and ``execute_reply``.
383 """
384 ),
385 ).tag(config=True)
387 @default("kernel_manager_class")
388 def _kernel_manager_class_default(self) -> type[KernelManager]:
389 """Use a dynamic default to avoid importing jupyter_client at startup"""
390 from jupyter_client import AsyncKernelManager # type:ignore[attr-defined]
392 return AsyncKernelManager
394 _display_id_map: dict[str, t.Any] = Dict( # type:ignore[assignment]
395 help=dedent(
396 """
397 mapping of locations of outputs with a given display_id
398 tracks cell index and output index within cell.outputs for
399 each appearance of the display_id
400 {
401 'display_id': {
402 cell_idx: [output_idx,]
403 }
404 }
405 """
406 )
407 )
409 display_data_priority = List(
410 [
411 "text/html",
412 "application/pdf",
413 "text/latex",
414 "image/svg+xml",
415 "image/png",
416 "image/jpeg",
417 "text/markdown",
418 "text/plain",
419 ],
420 help="""
421 An ordered list of preferred output type, the first
422 encountered will usually be used when converting discarding
423 the others.
424 """,
425 ).tag(config=True)
427 resources: dict[str, t.Any] = Dict( # type:ignore[assignment]
428 help=dedent(
429 """
430 Additional resources used in the conversion process. For example,
431 passing ``{'metadata': {'path': run_path}}`` sets the
432 execution path to ``run_path``.
433 """
434 )
435 )
437 coalesce_streams = Bool(
438 help=dedent(
439 """
440 Merge all stream outputs with shared names into single streams.
441 """
442 )
443 )
445 def __init__(self, nb: NotebookNode, km: KernelManager | None = None, **kw: t.Any) -> None:
446 """Initializes the execution manager.
448 Parameters
449 ----------
450 nb : NotebookNode
451 Notebook being executed.
452 km : KernelManager (optional)
453 Optional kernel manager. If none is provided, a kernel manager will
454 be created.
455 """
456 super().__init__(**kw)
457 self.nb: NotebookNode = nb
458 self.km: KernelManager | None = km
459 self.owns_km: bool = km is None # whether the NotebookClient owns the kernel manager
460 self.kc: KernelClient | None = None
461 self.reset_execution_trackers()
462 self.widget_registry: dict[str, dict[str, t.Any]] = {
463 "@jupyter-widgets/output": {"OutputModel": OutputWidget}
464 }
465 # comm_open_handlers should return an object with a .handle_msg(msg) method or None
466 self.comm_open_handlers: dict[str, t.Any] = {
467 "jupyter.widget": self.on_comm_open_jupyter_widget
468 }
470 def reset_execution_trackers(self) -> None:
471 """Resets any per-execution trackers."""
472 self.task_poll_for_reply: asyncio.Future[t.Any] | None = None
473 self.code_cells_executed = 0
474 self._display_id_map = {}
475 self.widget_state: dict[str, dict[str, t.Any]] = {}
476 self.widget_buffers: dict[str, dict[tuple[str, ...], dict[str, str]]] = {}
477 # maps to list of hooks, where the last is used, this is used
478 # to support nested use of output widgets.
479 self.output_hook_stack: t.Any = collections.defaultdict(list)
480 # our front-end mimicking Output widgets
481 self.comm_objects: dict[str, t.Any] = {}
483 def create_kernel_manager(self) -> KernelManager:
484 """Creates a new kernel manager.
486 Returns
487 -------
488 km : KernelManager
489 Kernel manager whose client class is asynchronous.
490 """
491 if not self.kernel_name:
492 kn = self.nb.metadata.get("kernelspec", {}).get("name")
493 if kn is not None:
494 self.kernel_name = kn
496 if not self.kernel_name:
497 self.km = self.kernel_manager_class(config=self.config)
498 else:
499 self.km = self.kernel_manager_class(kernel_name=self.kernel_name, config=self.config)
500 assert self.km is not None
501 return self.km
503 async def _async_cleanup_kernel(self) -> None:
504 assert self.km is not None
505 now = self.shutdown_kernel == "immediate"
506 try:
507 # Queue the manager to kill the process, and recover gracefully if it's already dead.
508 if await ensure_async(self.km.is_alive()):
509 await ensure_async(self.km.shutdown_kernel(now=now))
510 except RuntimeError as e:
511 # The error isn't specialized, so we have to check the message
512 if "No kernel is running!" not in str(e):
513 raise
514 finally:
515 # Remove any state left over even if we failed to stop the kernel
516 await ensure_async(self.km.cleanup_resources())
517 if getattr(self, "kc", None) and self.kc is not None:
518 await ensure_async(self.kc.stop_channels()) # type:ignore[func-returns-value]
519 self.kc = None
520 self.km = None
522 _cleanup_kernel = run_sync(_async_cleanup_kernel)
524 async def async_start_new_kernel(self, **kwargs: t.Any) -> None:
525 """Creates a new kernel.
527 Parameters
528 ----------
529 kwargs :
530 Any options for ``self.kernel_manager_class.start_kernel()``. Because
531 that defaults to AsyncKernelManager, this will likely include options
532 accepted by ``AsyncKernelManager.start_kernel()``, which includes ``cwd``.
533 """
534 assert self.km is not None
535 resource_path = self.resources.get("metadata", {}).get("path") or None
536 if resource_path and "cwd" not in kwargs:
537 kwargs["cwd"] = resource_path
539 has_history_manager_arg = any(
540 arg.startswith("--HistoryManager.hist_file") for arg in self.extra_arguments
541 )
542 if (
543 hasattr(self.km, "ipykernel")
544 and self.km.ipykernel
545 and self.ipython_hist_file
546 and not has_history_manager_arg
547 ):
548 self.extra_arguments += [f"--HistoryManager.hist_file={self.ipython_hist_file}"]
550 await ensure_async(self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs))
552 start_new_kernel = run_sync(async_start_new_kernel)
554 async def async_start_new_kernel_client(self) -> KernelClient:
555 """Creates a new kernel client.
557 Returns
558 -------
559 kc : KernelClient
560 Kernel client as created by the kernel manager ``km``.
561 """
562 assert self.km is not None
563 try:
564 self.kc = self.km.client()
565 await ensure_async(self.kc.start_channels()) # type:ignore[func-returns-value]
566 await ensure_async(self.kc.wait_for_ready(timeout=self.startup_timeout))
567 except Exception as e:
568 self.log.error(
569 "Error occurred while starting new kernel client for kernel {}: {}".format(
570 getattr(self.km, "kernel_id", None), str(e)
571 )
572 )
573 await self._async_cleanup_kernel()
574 raise
575 self.kc.allow_stdin = False
576 await run_hook(self.on_notebook_start, notebook=self.nb)
577 return self.kc
579 start_new_kernel_client = run_sync(async_start_new_kernel_client)
581 @contextmanager
582 def setup_kernel(self, **kwargs: t.Any) -> t.Generator[None, None, None]:
583 """
584 Context manager for setting up the kernel to execute a notebook.
586 The assigns the Kernel Manager (``self.km``) if missing and Kernel Client(``self.kc``).
588 When control returns from the yield it stops the client's zmq channels, and shuts
589 down the kernel.
590 """
591 # by default, cleanup the kernel client if we own the kernel manager
592 # and keep it alive if we don't
593 cleanup_kc = kwargs.pop("cleanup_kc", self.owns_km)
595 # Can't use run_until_complete on an asynccontextmanager function :(
596 if self.km is None:
597 self.km = self.create_kernel_manager()
599 if not self.km.has_kernel:
600 self.start_new_kernel(**kwargs)
602 if self.kc is None:
603 self.start_new_kernel_client()
605 try:
606 yield
607 finally:
608 if cleanup_kc:
609 self._cleanup_kernel()
611 @asynccontextmanager
612 async def async_setup_kernel(self, **kwargs: t.Any) -> t.AsyncGenerator[None, None]:
613 """
614 Context manager for setting up the kernel to execute a notebook.
616 This assigns the Kernel Manager (``self.km``) if missing and Kernel Client(``self.kc``).
618 When control returns from the yield it stops the client's zmq channels, and shuts
619 down the kernel.
621 Handlers for SIGINT and SIGTERM are also added to cleanup in case of unexpected shutdown.
622 """
623 # by default, cleanup the kernel client if we own the kernel manager
624 # and keep it alive if we don't
625 cleanup_kc = kwargs.pop("cleanup_kc", self.owns_km)
626 if self.km is None:
627 self.km = self.create_kernel_manager()
629 # self._cleanup_kernel uses run_async, which ensures the ioloop is running again.
630 # This is necessary as the ioloop has stopped once atexit fires.
631 atexit.register(self._cleanup_kernel)
633 def on_signal() -> None:
634 """Handle signals."""
635 self._async_cleanup_kernel_future = asyncio.ensure_future(self._async_cleanup_kernel())
636 atexit.unregister(self._cleanup_kernel)
638 loop = asyncio.get_event_loop()
639 try:
640 loop.add_signal_handler(signal.SIGINT, on_signal)
641 loop.add_signal_handler(signal.SIGTERM, on_signal)
642 except RuntimeError:
643 # NotImplementedError: Windows does not support signals.
644 # RuntimeError: Raised when add_signal_handler is called outside the main thread
645 pass
647 if not self.km.has_kernel:
648 await self.async_start_new_kernel(**kwargs)
650 if self.kc is None:
651 await self.async_start_new_kernel_client()
653 try:
654 yield
655 except RuntimeError as e:
656 await run_hook(self.on_notebook_error, notebook=self.nb)
657 raise e
658 finally:
659 if cleanup_kc:
660 await self._async_cleanup_kernel()
661 await run_hook(self.on_notebook_complete, notebook=self.nb)
662 atexit.unregister(self._cleanup_kernel)
663 try:
664 loop.remove_signal_handler(signal.SIGINT)
665 loop.remove_signal_handler(signal.SIGTERM)
666 except RuntimeError:
667 pass
669 async def async_execute(self, reset_kc: bool = False, **kwargs: t.Any) -> NotebookNode:
670 """
671 Executes each code cell.
673 Parameters
674 ----------
675 kwargs :
676 Any option for ``self.kernel_manager_class.start_kernel()``. Because
677 that defaults to AsyncKernelManager, this will likely include options
678 accepted by ``jupyter_client.AsyncKernelManager.start_kernel()``,
679 which includes ``cwd``.
681 ``reset_kc`` if True, the kernel client will be reset and a new one
682 will be created (default: False).
684 Returns
685 -------
686 nb : NotebookNode
687 The executed notebook.
688 """
689 if reset_kc and self.owns_km:
690 await self._async_cleanup_kernel()
691 self.reset_execution_trackers()
693 async with self.async_setup_kernel(**kwargs):
694 assert self.kc is not None
695 self.log.info("Executing notebook with kernel: %s" % self.kernel_name)
696 msg_id = await ensure_async(self.kc.kernel_info())
697 info_msg = await self.async_wait_for_reply(msg_id)
698 if info_msg is not None:
699 if "language_info" in info_msg["content"]:
700 self.nb.metadata["language_info"] = info_msg["content"]["language_info"]
701 else:
702 raise RuntimeError(
703 'Kernel info received message content has no "language_info" key. '
704 "Content is:\n" + str(info_msg["content"])
705 )
706 for index, cell in enumerate(self.nb.cells):
707 # Ignore `'execution_count' in content` as it's always 1
708 # when store_history is False
709 await self.async_execute_cell(
710 cell, index, execution_count=self.code_cells_executed + 1
711 )
712 self.set_widgets_metadata()
714 return self.nb
716 execute = run_sync(async_execute)
718 def set_widgets_metadata(self) -> None:
719 """Set with widget metadata."""
720 if self.widget_state:
721 self.nb.metadata.widgets = {
722 "application/vnd.jupyter.widget-state+json": {
723 "state": {
724 model_id: self._serialize_widget_state(state)
725 for model_id, state in self.widget_state.items()
726 if "_model_name" in state
727 },
728 "version_major": 2,
729 "version_minor": 0,
730 }
731 }
732 for key, widget in self.nb.metadata.widgets[
733 "application/vnd.jupyter.widget-state+json"
734 ]["state"].items():
735 buffers = self.widget_buffers.get(key)
736 if buffers:
737 widget["buffers"] = list(buffers.values())
739 def _update_display_id(self, display_id: str, msg: dict[str, t.Any]) -> None:
740 """Update outputs with a given display_id"""
741 if display_id not in self._display_id_map:
742 self.log.debug("display id %r not in %s", display_id, self._display_id_map)
743 return
745 if msg["header"]["msg_type"] == "update_display_data":
746 msg["header"]["msg_type"] = "display_data"
748 try:
749 out = output_from_msg(msg)
750 except ValueError:
751 self.log.error(f"unhandled iopub msg: {msg['msg_type']}")
752 return
754 for cell_idx, output_indices in self._display_id_map[display_id].items():
755 cell = self.nb["cells"][cell_idx]
756 outputs = cell["outputs"]
757 for output_idx in output_indices:
758 outputs[output_idx]["data"] = out["data"]
759 outputs[output_idx]["metadata"] = out["metadata"]
761 async def _async_poll_for_reply(
762 self,
763 msg_id: str,
764 cell: NotebookNode,
765 timeout: int | None,
766 task_poll_output_msg: asyncio.Future[t.Any],
767 task_poll_kernel_alive: asyncio.Future[t.Any],
768 ) -> dict[str, t.Any]:
769 msg: dict[str, t.Any]
770 assert self.kc is not None
771 new_timeout: float | None = None
772 if timeout is not None:
773 deadline = monotonic() + timeout
774 new_timeout = float(timeout)
775 error_on_timeout_execute_reply = None
776 while True:
777 try:
778 if error_on_timeout_execute_reply:
779 msg = error_on_timeout_execute_reply # type:ignore[unreachable]
780 msg["parent_header"] = {"msg_id": msg_id}
781 else:
782 msg = await ensure_async(self.kc.shell_channel.get_msg(timeout=new_timeout))
783 if msg["parent_header"].get("msg_id") == msg_id:
784 if self.record_timing:
785 cell["metadata"]["execution"]["shell.execute_reply"] = timestamp(msg)
786 try:
787 await asyncio.wait_for(task_poll_output_msg, self.iopub_timeout)
788 except (asyncio.TimeoutError, Empty):
789 if self.raise_on_iopub_timeout:
790 task_poll_kernel_alive.cancel()
791 raise CellTimeoutError.error_from_timeout_and_cell(
792 "Timeout waiting for IOPub output", self.iopub_timeout, cell
793 ) from None
794 else:
795 self.log.warning("Timeout waiting for IOPub output")
796 task_poll_kernel_alive.cancel()
797 return msg
798 else:
799 if new_timeout is not None:
800 new_timeout = max(0, deadline - monotonic())
801 except Empty:
802 # received no message, check if kernel is still alive
803 assert timeout is not None
804 task_poll_kernel_alive.cancel()
805 await self._async_check_alive()
806 error_on_timeout_execute_reply = await self._async_handle_timeout(timeout, cell)
808 async def _async_poll_output_msg(
809 self, parent_msg_id: str, cell: NotebookNode, cell_index: int
810 ) -> None:
811 assert self.kc is not None
812 while True:
813 msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None))
814 if msg["parent_header"].get("msg_id") == parent_msg_id:
815 try:
816 # Will raise CellExecutionComplete when completed
817 self.process_message(msg, cell, cell_index)
818 except CellExecutionComplete:
819 return
821 async def _async_poll_kernel_alive(self) -> None:
822 while True:
823 await asyncio.sleep(1)
824 try:
825 await self._async_check_alive()
826 except DeadKernelError:
827 assert self.task_poll_for_reply is not None
828 self.task_poll_for_reply.cancel()
829 return
831 def _get_timeout(self, cell: NotebookNode | None) -> int | None:
832 if self.timeout_func is not None and cell is not None:
833 timeout = self.timeout_func(cell)
834 else:
835 timeout = self.timeout
837 if not timeout or timeout < 0:
838 timeout = None
840 return timeout
842 async def _async_handle_timeout(
843 self, timeout: int, cell: NotebookNode | None = None
844 ) -> None | dict[str, t.Any]:
845 self.log.error("Timeout waiting for execute reply (%is)." % timeout)
846 if self.interrupt_on_timeout:
847 self.log.error("Interrupting kernel")
848 assert self.km is not None
849 await ensure_async(self.km.interrupt_kernel())
850 if self.error_on_timeout:
851 execute_reply = {"content": {**self.error_on_timeout, "status": "error"}}
852 return execute_reply
853 return None
854 else:
855 assert cell is not None
856 raise CellTimeoutError.error_from_timeout_and_cell(
857 "Cell execution timed out", timeout, cell
858 )
860 async def _async_check_alive(self) -> None:
861 assert self.kc is not None
862 if not await ensure_async(self.kc.is_alive()): # type:ignore[attr-defined]
863 self.log.error("Kernel died while waiting for execute reply.")
864 raise DeadKernelError("Kernel died")
866 async def async_wait_for_reply(
867 self, msg_id: str, cell: NotebookNode | None = None
868 ) -> dict[str, t.Any] | None:
869 """Wait for a message reply."""
870 assert self.kc is not None
871 # wait for finish, with timeout
872 timeout = self._get_timeout(cell)
873 cummulative_time = 0
874 while True:
875 try:
876 msg: dict[str, t.Any] = await ensure_async(
877 self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval)
878 )
879 except Empty:
880 await self._async_check_alive()
881 cummulative_time += self.shell_timeout_interval
882 if timeout and cummulative_time > timeout:
883 await self._async_handle_timeout(timeout, cell)
884 break
885 else:
886 if msg["parent_header"].get("msg_id") == msg_id:
887 return msg
888 return None
890 wait_for_reply = run_sync(async_wait_for_reply)
891 # Backwards compatibility naming for papermill
892 _wait_for_reply = wait_for_reply
894 def _passed_deadline(self, deadline: int | None) -> bool:
895 if deadline is not None and deadline - monotonic() <= 0:
896 return True
897 return False
899 async def _check_raise_for_error(
900 self, cell: NotebookNode, cell_index: int, exec_reply: dict[str, t.Any] | None
901 ) -> None:
902 if exec_reply is None:
903 return None
905 exec_reply_content = exec_reply["content"]
906 if exec_reply_content["status"] != "error":
907 return None
909 cell_allows_errors = (not self.force_raise_errors) and (
910 self.allow_errors
911 or exec_reply_content.get("ename") in self.allow_error_names
912 or "raises-exception" in cell.metadata.get("tags", [])
913 )
914 await run_hook(
915 self.on_cell_error, cell=cell, cell_index=cell_index, execute_reply=exec_reply
916 )
917 if not cell_allows_errors:
918 raise CellExecutionError.from_cell_and_msg(cell, exec_reply_content)
920 async def async_execute_cell(
921 self,
922 cell: NotebookNode,
923 cell_index: int,
924 execution_count: int | None = None,
925 store_history: bool = True,
926 ) -> NotebookNode:
927 """
928 Executes a single code cell.
930 To execute all cells see :meth:`execute`.
932 Parameters
933 ----------
934 cell : nbformat.NotebookNode
935 The cell which is currently being processed.
936 cell_index : int
937 The position of the cell within the notebook object.
938 execution_count : int
939 The execution count to be assigned to the cell (default: Use kernel response)
940 store_history : bool
941 Determines if history should be stored in the kernel (default: False).
942 Specific to ipython kernels, which can store command histories.
944 Returns
945 -------
946 output : dict
947 The execution output payload (or None for no output).
949 Raises
950 ------
951 CellExecutionError
952 If execution failed and should raise an exception, this will be raised
953 with defaults about the failure.
955 Returns
956 -------
957 cell : NotebookNode
958 The cell which was just processed.
959 """
960 assert self.kc is not None
962 await run_hook(self.on_cell_start, cell=cell, cell_index=cell_index)
964 if cell.cell_type != "code" or not cell.source.strip():
965 self.log.debug("Skipping non-executing cell %s", cell_index)
966 return cell
968 if self.skip_cells_with_tag in cell.metadata.get("tags", []):
969 self.log.debug("Skipping tagged cell %s", cell_index)
970 return cell
972 if self.record_timing: # clear execution metadata prior to execution
973 cell["metadata"]["execution"] = {}
975 self.log.debug("Executing cell:\n%s", cell.source)
977 cell_allows_errors = (not self.force_raise_errors) and (
978 self.allow_errors or "raises-exception" in cell.metadata.get("tags", [])
979 )
981 await run_hook(self.on_cell_execute, cell=cell, cell_index=cell_index)
982 parent_msg_id = await ensure_async(
983 self.kc.execute(
984 cell.source, store_history=store_history, stop_on_error=not cell_allows_errors
985 )
986 )
987 await run_hook(self.on_cell_complete, cell=cell, cell_index=cell_index)
988 # We launched a code cell to execute
989 self.code_cells_executed += 1
990 exec_timeout = self._get_timeout(cell)
992 cell.outputs = []
993 self.clear_before_next_output = False
995 task_poll_kernel_alive = asyncio.ensure_future(self._async_poll_kernel_alive())
996 task_poll_output_msg = asyncio.ensure_future(
997 self._async_poll_output_msg(parent_msg_id, cell, cell_index)
998 )
999 self.task_poll_for_reply = asyncio.ensure_future(
1000 self._async_poll_for_reply(
1001 parent_msg_id, cell, exec_timeout, task_poll_output_msg, task_poll_kernel_alive
1002 )
1003 )
1004 try:
1005 exec_reply = await self.task_poll_for_reply
1006 except asyncio.CancelledError:
1007 # can only be cancelled by task_poll_kernel_alive when the kernel is dead
1008 task_poll_output_msg.cancel()
1009 raise DeadKernelError("Kernel died") from None
1010 except Exception as e:
1011 # Best effort to cancel request if it hasn't been resolved
1012 try:
1013 # Check if the task_poll_output is doing the raising for us
1014 if not isinstance(e, CellControlSignal):
1015 task_poll_output_msg.cancel()
1016 finally:
1017 raise
1019 if execution_count:
1020 cell["execution_count"] = execution_count
1021 await run_hook(
1022 self.on_cell_executed, cell=cell, cell_index=cell_index, execute_reply=exec_reply
1023 )
1025 if self.coalesce_streams and cell.outputs:
1026 new_outputs = []
1027 streams: dict[str, NotebookNode] = {}
1028 for output in cell.outputs:
1029 if output["output_type"] == "stream":
1030 if output["name"] in streams:
1031 streams[output["name"]]["text"] += output["text"]
1032 else:
1033 new_outputs.append(output)
1034 streams[output["name"]] = output
1035 else:
1036 new_outputs.append(output)
1038 # process \r and \b characters
1039 for output in streams.values():
1040 old = output["text"]
1041 while len(output["text"]) < len(old):
1042 old = output["text"]
1043 # Cancel out anything-but-newline followed by backspace
1044 output["text"] = _RGX_BACKSPACE.sub("", output["text"])
1045 # Replace all carriage returns not followed by newline
1046 output["text"] = _RGX_CARRIAGERETURN.sub("", output["text"])
1048 # We also want to ensure stdout and stderr are always in the same consecutive order,
1049 # because they are asynchronous, so order isn't guaranteed.
1050 for i, output in enumerate(new_outputs):
1051 if output["output_type"] == "stream" and output["name"] == "stderr":
1052 if (
1053 len(new_outputs) >= i + 2
1054 and new_outputs[i + 1]["output_type"] == "stream"
1055 and new_outputs[i + 1]["name"] == "stdout"
1056 ):
1057 stdout = new_outputs.pop(i + 1)
1058 new_outputs.insert(i, stdout)
1060 cell.outputs = new_outputs
1062 await self._check_raise_for_error(cell, cell_index, exec_reply)
1064 self.nb["cells"][cell_index] = cell
1065 return cell
1067 execute_cell = run_sync(async_execute_cell)
1069 def process_message(
1070 self, msg: dict[str, t.Any], cell: NotebookNode, cell_index: int
1071 ) -> NotebookNode | None:
1072 """
1073 Processes a kernel message, updates cell state, and returns the
1074 resulting output object that was appended to cell.outputs.
1076 The input argument *cell* is modified in-place.
1078 Parameters
1079 ----------
1080 msg : dict
1081 The kernel message being processed.
1082 cell : nbformat.NotebookNode
1083 The cell which is currently being processed.
1084 cell_index : int
1085 The position of the cell within the notebook object.
1087 Returns
1088 -------
1089 output : NotebookNode
1090 The execution output payload (or None for no output).
1092 Raises
1093 ------
1094 CellExecutionComplete
1095 Once a message arrives which indicates computation completeness.
1097 """
1098 msg_type = msg["msg_type"]
1099 self.log.debug("msg_type: %s", msg_type)
1100 content = msg["content"]
1101 self.log.debug("content: %s", content)
1103 # while it's tempting to go for a more concise
1104 # display_id = content.get("transient", {}).get("display_id", None)
1105 # this breaks if transient is explicitly set to None
1106 transient = content.get("transient")
1107 display_id = transient.get("display_id") if transient else None
1109 if display_id and msg_type in {"execute_result", "display_data", "update_display_data"}:
1110 self._update_display_id(display_id, msg)
1112 # set the prompt number for the input and the output
1113 if "execution_count" in content:
1114 cell["execution_count"] = content["execution_count"]
1116 if self.record_timing:
1117 if msg_type == "status":
1118 if content["execution_state"] == "idle":
1119 cell["metadata"]["execution"]["iopub.status.idle"] = timestamp(msg)
1120 elif content["execution_state"] == "busy":
1121 cell["metadata"]["execution"]["iopub.status.busy"] = timestamp(msg)
1122 elif msg_type == "execute_input":
1123 cell["metadata"]["execution"]["iopub.execute_input"] = timestamp(msg)
1125 if msg_type == "status":
1126 if content["execution_state"] == "idle":
1127 raise CellExecutionComplete()
1128 elif msg_type == "clear_output":
1129 self.clear_output(cell.outputs, msg, cell_index)
1130 elif msg_type.startswith("comm"):
1131 self.handle_comm_msg(cell.outputs, msg, cell_index)
1132 # Check for remaining messages we don't process
1133 elif msg_type not in ["execute_input", "update_display_data"]:
1134 # Assign output as our processed "result"
1135 return self.output(cell.outputs, msg, display_id, cell_index)
1136 return None
1138 def output(
1139 self, outs: list[NotebookNode], msg: dict[str, t.Any], display_id: str, cell_index: int
1140 ) -> NotebookNode | None:
1141 """Handle output."""
1143 msg_type = msg["msg_type"]
1144 out: NotebookNode | None = None
1146 parent_msg_id = msg["parent_header"].get("msg_id")
1147 if self.output_hook_stack[parent_msg_id]:
1148 # if we have a hook registered, it will override our
1149 # default output behaviour (e.g. OutputWidget)
1150 hook = self.output_hook_stack[parent_msg_id][-1]
1151 hook.output(outs, msg, display_id, cell_index)
1152 return None
1154 try:
1155 out = output_from_msg(msg)
1156 except ValueError:
1157 self.log.error(f"unhandled iopub msg: {msg_type}")
1158 return None
1160 if self.clear_before_next_output:
1161 self.log.debug("Executing delayed clear_output")
1162 outs[:] = []
1163 self.clear_display_id_mapping(cell_index)
1164 self.clear_before_next_output = False
1166 if display_id:
1167 # record output index in:
1168 # _display_id_map[display_id][cell_idx]
1169 cell_map = self._display_id_map.setdefault(display_id, {})
1170 output_idx_list = cell_map.setdefault(cell_index, [])
1171 output_idx_list.append(len(outs))
1173 if out:
1174 outs.append(out)
1176 return out
1178 def clear_output(
1179 self, outs: list[NotebookNode], msg: dict[str, t.Any], cell_index: int
1180 ) -> None:
1181 """Clear output."""
1182 content = msg["content"]
1184 parent_msg_id = msg["parent_header"].get("msg_id")
1185 if self.output_hook_stack[parent_msg_id]:
1186 # if we have a hook registered, it will override our
1187 # default clear_output behaviour (e.g. OutputWidget)
1188 hook = self.output_hook_stack[parent_msg_id][-1]
1189 hook.clear_output(outs, msg, cell_index)
1190 return
1192 if content.get("wait"):
1193 self.log.debug("Wait to clear output")
1194 self.clear_before_next_output = True
1195 else:
1196 self.log.debug("Immediate clear output")
1197 outs[:] = []
1198 self.clear_display_id_mapping(cell_index)
1200 def clear_display_id_mapping(self, cell_index: int) -> None:
1201 """Clear a display id mapping for a cell."""
1202 for _, cell_map in self._display_id_map.items():
1203 if cell_index in cell_map:
1204 cell_map[cell_index] = []
1206 def handle_comm_msg(
1207 self, outs: list[NotebookNode], msg: dict[str, t.Any], cell_index: int
1208 ) -> None:
1209 """Handle a comm message."""
1210 content = msg["content"]
1211 data = content["data"]
1212 if self.store_widget_state and "state" in data: # ignore custom msg'es
1213 self.widget_state.setdefault(content["comm_id"], {}).update(data["state"])
1214 if data.get("buffer_paths"):
1215 comm_id = content["comm_id"]
1216 if comm_id not in self.widget_buffers:
1217 self.widget_buffers[comm_id] = {}
1218 # for each comm, the path uniquely identifies a buffer
1219 new_buffers: dict[tuple[str, ...], dict[str, str]] = {
1220 tuple(k["path"]): k for k in self._get_buffer_data(msg)
1221 }
1222 self.widget_buffers[comm_id].update(new_buffers)
1223 # There are cases where we need to mimic a frontend, to get similar behaviour as
1224 # when using the Output widget from Jupyter lab/notebook
1225 if msg["msg_type"] == "comm_open":
1226 target = msg["content"].get("target_name")
1227 handler = self.comm_open_handlers.get(target)
1228 if handler:
1229 comm_id = msg["content"]["comm_id"]
1230 comm_object = handler(msg)
1231 if comm_object:
1232 self.comm_objects[comm_id] = comm_object
1233 else:
1234 self.log.warning(f"No handler found for comm target {target!r}")
1235 elif msg["msg_type"] == "comm_msg":
1236 content = msg["content"]
1237 comm_id = msg["content"]["comm_id"]
1238 if comm_id in self.comm_objects:
1239 self.comm_objects[comm_id].handle_msg(msg)
1241 def _serialize_widget_state(self, state: dict[str, t.Any]) -> dict[str, t.Any]:
1242 """Serialize a widget state, following format in @jupyter-widgets/schema."""
1243 return {
1244 "model_name": state.get("_model_name"),
1245 "model_module": state.get("_model_module"),
1246 "model_module_version": state.get("_model_module_version"),
1247 "state": state,
1248 }
1250 def _get_buffer_data(self, msg: dict[str, t.Any]) -> list[dict[str, str]]:
1251 encoded_buffers = []
1252 paths = msg["content"]["data"]["buffer_paths"]
1253 buffers = msg["buffers"]
1254 for path, buffer in zip(paths, buffers):
1255 encoded_buffers.append(
1256 {
1257 "data": base64.b64encode(buffer).decode("utf-8"),
1258 "encoding": "base64",
1259 "path": path,
1260 }
1261 )
1262 return encoded_buffers
1264 def register_output_hook(self, msg_id: str, hook: OutputWidget) -> None:
1265 """Registers an override object that handles output/clear_output instead.
1267 Multiple hooks can be registered, where the last one will be used (stack based)
1268 """
1269 # mimics
1270 # https://jupyterlab.github.io/jupyterlab/services/interfaces/kernel.ikernelconnection.html#registermessagehook
1271 self.output_hook_stack[msg_id].append(hook)
1273 def remove_output_hook(self, msg_id: str, hook: OutputWidget) -> None:
1274 """Unregisters an override object that handles output/clear_output instead"""
1275 # mimics
1276 # https://jupyterlab.github.io/jupyterlab/services/interfaces/kernel.ikernelconnection.html#removemessagehook
1277 removed_hook = self.output_hook_stack[msg_id].pop()
1278 assert removed_hook == hook
1280 def on_comm_open_jupyter_widget(self, msg: dict[str, t.Any]) -> t.Any | None:
1281 """Handle a jupyter widget comm open."""
1282 content = msg["content"]
1283 data = content["data"]
1284 state = data["state"]
1285 comm_id = msg["content"]["comm_id"]
1286 module = self.widget_registry.get(state["_model_module"])
1287 if module:
1288 widget_class = module.get(state["_model_name"])
1289 if widget_class:
1290 return widget_class(comm_id, state, self.kc, self)
1291 return None
1294def execute(
1295 nb: NotebookNode,
1296 cwd: str | None = None,
1297 km: KernelManager | None = None,
1298 **kwargs: t.Any,
1299) -> NotebookNode:
1300 """Execute a notebook's code, updating outputs within the notebook object.
1302 This is a convenient wrapper around NotebookClient. It returns the
1303 modified notebook object.
1305 Parameters
1306 ----------
1307 nb : NotebookNode
1308 The notebook object to be executed
1309 cwd : str, optional
1310 If supplied, the kernel will run in this directory
1311 km : AsyncKernelManager, optional
1312 If supplied, the specified kernel manager will be used for code execution.
1313 kwargs :
1314 Any other options for NotebookClient, e.g. timeout, kernel_name
1315 """
1316 resources = {}
1317 if cwd is not None:
1318 resources["metadata"] = {"path": cwd}
1319 return NotebookClient(nb=nb, resources=resources, km=km, **kwargs).execute()