Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/nbclient/client.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

520 statements  

1"""nbclient implementation.""" 

2from __future__ import annotations 

3 

4import asyncio 

5import atexit 

6import base64 

7import collections 

8import datetime 

9import re 

10import signal 

11import typing as t 

12from contextlib import asynccontextmanager, contextmanager 

13from queue import Empty 

14from textwrap import dedent 

15from time import monotonic 

16 

17from jupyter_client.client import KernelClient 

18from jupyter_client.manager import KernelManager 

19from nbformat import NotebookNode 

20from nbformat.v4 import output_from_msg 

21from traitlets import Any, Bool, Callable, Dict, Enum, Integer, List, Type, Unicode, default 

22from traitlets.config.configurable import LoggingConfigurable 

23 

24from .exceptions import ( 

25 CellControlSignal, 

26 CellExecutionComplete, 

27 CellExecutionError, 

28 CellTimeoutError, 

29 DeadKernelError, 

30) 

31from .output_widget import OutputWidget 

32from .util import ensure_async, run_hook, run_sync 

33 

34_RGX_CARRIAGERETURN = re.compile(r".*\r(?=[^\n])") 

35_RGX_BACKSPACE = re.compile(r"[^\n]\b") 

36 

37# mypy: disable-error-code="no-untyped-call" 

38 

39 

40def timestamp(msg: dict[str, t.Any] | None = None) -> str: 

41 """Get the timestamp for a message.""" 

42 if msg and "header" in msg: # The test mocks don't provide a header, so tolerate that 

43 msg_header = msg["header"] 

44 if "date" in msg_header and isinstance(msg_header["date"], datetime.datetime): 

45 try: 

46 # reformat datetime into expected format 

47 formatted_time = datetime.datetime.strftime( 

48 msg_header["date"], "%Y-%m-%dT%H:%M:%S.%fZ" 

49 ) 

50 if ( 

51 formatted_time 

52 ): # docs indicate strftime may return empty string, so let's catch that too 

53 return formatted_time 

54 except Exception: # noqa 

55 pass # fallback to a local time 

56 

57 return datetime.datetime.utcnow().isoformat() + "Z" 

58 

59 

60class NotebookClient(LoggingConfigurable): 

61 """ 

62 Encompasses a Client for executing cells in a notebook 

63 """ 

64 

65 timeout = Integer( 

66 None, 

67 allow_none=True, 

68 help=dedent( 

69 """ 

70 The time to wait (in seconds) for output from executions. 

71 If a cell execution takes longer, a TimeoutError is raised. 

72 

73 ``None`` or ``-1`` will disable the timeout. If ``timeout_func`` is set, 

74 it overrides ``timeout``. 

75 """ 

76 ), 

77 ).tag(config=True) 

78 

79 timeout_func: t.Callable[..., int | None] | None = Any( # type:ignore[assignment] 

80 default_value=None, 

81 allow_none=True, 

82 help=dedent( 

83 """ 

84 A callable which, when given the cell source as input, 

85 returns the time to wait (in seconds) for output from cell 

86 executions. If a cell execution takes longer, a TimeoutError 

87 is raised. 

88 

89 Returning ``None`` or ``-1`` will disable the timeout for the cell. 

90 Not setting ``timeout_func`` will cause the client to 

91 default to using the ``timeout`` trait for all cells. The 

92 ``timeout_func`` trait overrides ``timeout`` if it is not ``None``. 

93 """ 

94 ), 

95 ).tag(config=True) 

96 

97 interrupt_on_timeout = Bool( 

98 False, 

99 help=dedent( 

100 """ 

101 If execution of a cell times out, interrupt the kernel and 

102 continue executing other cells rather than throwing an error and 

103 stopping. 

104 """ 

105 ), 

106 ).tag(config=True) 

107 

108 error_on_timeout = Dict( 

109 default_value=None, 

110 allow_none=True, 

111 help=dedent( 

112 """ 

113 If a cell execution was interrupted after a timeout, don't wait for 

114 the execute_reply from the kernel (e.g. KeyboardInterrupt error). 

115 Instead, return an execute_reply with the given error, which should 

116 be of the following form:: 

117 

118 { 

119 'ename': str, # Exception name, as a string 

120 'evalue': str, # Exception value, as a string 

121 'traceback': list(str), # traceback frames, as strings 

122 } 

123 """ 

124 ), 

125 ).tag(config=True) 

126 

127 startup_timeout = Integer( 

128 60, 

129 help=dedent( 

130 """ 

131 The time to wait (in seconds) for the kernel to start. 

132 If kernel startup takes longer, a RuntimeError is 

133 raised. 

134 """ 

135 ), 

136 ).tag(config=True) 

137 

138 allow_errors = Bool( 

139 False, 

140 help=dedent( 

141 """ 

142 If ``False`` (default), when a cell raises an error the 

143 execution is stopped and a ``CellExecutionError`` 

144 is raised, except if the error name is in 

145 ``allow_error_names``. 

146 If ``True``, execution errors are ignored and the execution 

147 is continued until the end of the notebook. Output from 

148 exceptions is included in the cell output in both cases. 

149 """ 

150 ), 

151 ).tag(config=True) 

152 

153 allow_error_names = List( 

154 Unicode(), 

155 help=dedent( 

156 """ 

157 List of error names which won't stop the execution. Use this if the 

158 ``allow_errors`` option it too general and you want to allow only 

159 specific kinds of errors. 

160 """ 

161 ), 

162 ).tag(config=True) 

163 

164 force_raise_errors = Bool( 

165 False, 

166 help=dedent( 

167 """ 

168 If False (default), errors from executing the notebook can be 

169 allowed with a ``raises-exception`` tag on a single cell, or the 

170 ``allow_errors`` or ``allow_error_names`` configurable options for 

171 all cells. An allowed error will be recorded in notebook output, and 

172 execution will continue. If an error occurs when it is not 

173 explicitly allowed, a ``CellExecutionError`` will be raised. 

174 If True, ``CellExecutionError`` will be raised for any error that occurs 

175 while executing the notebook. This overrides the ``allow_errors`` 

176 and ``allow_error_names`` options and the ``raises-exception`` cell 

177 tag. 

178 """ 

179 ), 

180 ).tag(config=True) 

181 

182 skip_cells_with_tag = Unicode( 

183 "skip-execution", 

184 help=dedent( 

185 """ 

186 Name of the cell tag to use to denote a cell that should be skipped. 

187 """ 

188 ), 

189 ).tag(config=True) 

190 

191 extra_arguments = List(Unicode()).tag(config=True) 

192 

193 kernel_name = Unicode( 

194 "", 

195 help=dedent( 

196 """ 

197 Name of kernel to use to execute the cells. 

198 If not set, use the kernel_spec embedded in the notebook. 

199 """ 

200 ), 

201 ).tag(config=True) 

202 

203 raise_on_iopub_timeout = Bool( 

204 False, 

205 help=dedent( 

206 """ 

207 If ``False`` (default), then the kernel will continue waiting for 

208 iopub messages until it receives a kernel idle message, or until a 

209 timeout occurs, at which point the currently executing cell will be 

210 skipped. If ``True``, then an error will be raised after the first 

211 timeout. This option generally does not need to be used, but may be 

212 useful in contexts where there is the possibility of executing 

213 notebooks with memory-consuming infinite loops. 

214 """ 

215 ), 

216 ).tag(config=True) 

217 

218 store_widget_state = Bool( 

219 True, 

220 help=dedent( 

221 """ 

222 If ``True`` (default), then the state of the Jupyter widgets created 

223 at the kernel will be stored in the metadata of the notebook. 

224 """ 

225 ), 

226 ).tag(config=True) 

227 

228 record_timing = Bool( 

229 True, 

230 help=dedent( 

231 """ 

232 If ``True`` (default), then the execution timings of each cell will 

233 be stored in the metadata of the notebook. 

234 """ 

235 ), 

236 ).tag(config=True) 

237 

238 iopub_timeout = Integer( 

239 4, 

240 allow_none=False, 

241 help=dedent( 

242 """ 

243 The time to wait (in seconds) for IOPub output. This generally 

244 doesn't need to be set, but on some slow networks (such as CI 

245 systems) the default timeout might not be long enough to get all 

246 messages. 

247 """ 

248 ), 

249 ).tag(config=True) 

250 

251 shell_timeout_interval = Integer( 

252 5, 

253 allow_none=False, 

254 help=dedent( 

255 """ 

256 The time to wait (in seconds) for Shell output before retrying. 

257 This generally doesn't need to be set, but if one needs to check 

258 for dead kernels at a faster rate this can help. 

259 """ 

260 ), 

261 ).tag(config=True) 

262 

263 shutdown_kernel = Enum( 

264 ["graceful", "immediate"], 

265 default_value="graceful", 

266 help=dedent( 

267 """ 

268 If ``graceful`` (default), then the kernel is given time to clean 

269 up after executing all cells, e.g., to execute its ``atexit`` hooks. 

270 If ``immediate``, then the kernel is signaled to immediately 

271 terminate. 

272 """ 

273 ), 

274 ).tag(config=True) 

275 

276 ipython_hist_file = Unicode( 

277 default_value=":memory:", 

278 help="""Path to file to use for SQLite history database for an IPython kernel. 

279 

280 The specific value ``:memory:`` (including the colon 

281 at both end but not the back ticks), avoids creating a history file. Otherwise, IPython 

282 will create a history file for each kernel. 

283 

284 When running kernels simultaneously (e.g. via multiprocessing) saving history a single 

285 SQLite file can result in database errors, so using ``:memory:`` is recommended in 

286 non-interactive contexts. 

287 """, 

288 ).tag(config=True) 

289 

290 kernel_manager_class = Type( 

291 config=True, klass=KernelManager, help="The kernel manager class to use." 

292 ) 

293 

294 on_notebook_start = Callable( 

295 default_value=None, 

296 allow_none=True, 

297 help=dedent( 

298 """ 

299 A callable which executes after the kernel manager and kernel client are setup, and 

300 cells are about to execute. 

301 Called with kwargs ``notebook``. 

302 """ 

303 ), 

304 ).tag(config=True) 

305 

306 on_notebook_complete = Callable( 

307 default_value=None, 

308 allow_none=True, 

309 help=dedent( 

310 """ 

311 A callable which executes after the kernel is cleaned up. 

312 Called with kwargs ``notebook``. 

313 """ 

314 ), 

315 ).tag(config=True) 

316 

317 on_notebook_error = Callable( 

318 default_value=None, 

319 allow_none=True, 

320 help=dedent( 

321 """ 

322 A callable which executes when the notebook encounters an error. 

323 Called with kwargs ``notebook``. 

324 """ 

325 ), 

326 ).tag(config=True) 

327 

328 on_cell_start = Callable( 

329 default_value=None, 

330 allow_none=True, 

331 help=dedent( 

332 """ 

333 A callable which executes before a cell is executed and before non-executing cells 

334 are skipped. 

335 Called with kwargs ``cell`` and ``cell_index``. 

336 """ 

337 ), 

338 ).tag(config=True) 

339 

340 on_cell_execute = Callable( 

341 default_value=None, 

342 allow_none=True, 

343 help=dedent( 

344 """ 

345 A callable which executes just before a code cell is executed. 

346 Called with kwargs ``cell`` and ``cell_index``. 

347 """ 

348 ), 

349 ).tag(config=True) 

350 

351 on_cell_complete = Callable( 

352 default_value=None, 

353 allow_none=True, 

354 help=dedent( 

355 """ 

356 A callable which executes after a cell execution is complete. It is 

357 called even when a cell results in a failure. 

358 Called with kwargs ``cell`` and ``cell_index``. 

359 """ 

360 ), 

361 ).tag(config=True) 

362 

363 on_cell_executed = Callable( 

364 default_value=None, 

365 allow_none=True, 

366 help=dedent( 

367 """ 

368 A callable which executes just after a code cell is executed, whether 

369 or not it results in an error. 

370 Called with kwargs ``cell``, ``cell_index`` and ``execute_reply``. 

371 """ 

372 ), 

373 ).tag(config=True) 

374 

375 on_cell_error = Callable( 

376 default_value=None, 

377 allow_none=True, 

378 help=dedent( 

379 """ 

380 A callable which executes when a cell execution results in an error. 

381 This is executed even if errors are suppressed with ``cell_allows_errors``. 

382 Called with kwargs ``cell`, ``cell_index`` and ``execute_reply``. 

383 """ 

384 ), 

385 ).tag(config=True) 

386 

387 @default("kernel_manager_class") 

388 def _kernel_manager_class_default(self) -> type[KernelManager]: 

389 """Use a dynamic default to avoid importing jupyter_client at startup""" 

390 from jupyter_client import AsyncKernelManager # type:ignore[attr-defined] 

391 

392 return AsyncKernelManager 

393 

394 _display_id_map: dict[str, t.Any] = Dict( # type:ignore[assignment] 

395 help=dedent( 

396 """ 

397 mapping of locations of outputs with a given display_id 

398 tracks cell index and output index within cell.outputs for 

399 each appearance of the display_id 

400 { 

401 'display_id': { 

402 cell_idx: [output_idx,] 

403 } 

404 } 

405 """ 

406 ) 

407 ) 

408 

409 display_data_priority = List( 

410 [ 

411 "text/html", 

412 "application/pdf", 

413 "text/latex", 

414 "image/svg+xml", 

415 "image/png", 

416 "image/jpeg", 

417 "text/markdown", 

418 "text/plain", 

419 ], 

420 help=""" 

421 An ordered list of preferred output type, the first 

422 encountered will usually be used when converting discarding 

423 the others. 

424 """, 

425 ).tag(config=True) 

426 

427 resources: dict[str, t.Any] = Dict( # type:ignore[assignment] 

428 help=dedent( 

429 """ 

430 Additional resources used in the conversion process. For example, 

431 passing ``{'metadata': {'path': run_path}}`` sets the 

432 execution path to ``run_path``. 

433 """ 

434 ) 

435 ) 

436 

437 coalesce_streams = Bool( 

438 help=dedent( 

439 """ 

440 Merge all stream outputs with shared names into single streams. 

441 """ 

442 ) 

443 ) 

444 

445 def __init__(self, nb: NotebookNode, km: KernelManager | None = None, **kw: t.Any) -> None: 

446 """Initializes the execution manager. 

447 

448 Parameters 

449 ---------- 

450 nb : NotebookNode 

451 Notebook being executed. 

452 km : KernelManager (optional) 

453 Optional kernel manager. If none is provided, a kernel manager will 

454 be created. 

455 """ 

456 super().__init__(**kw) 

457 self.nb: NotebookNode = nb 

458 self.km: KernelManager | None = km 

459 self.owns_km: bool = km is None # whether the NotebookClient owns the kernel manager 

460 self.kc: KernelClient | None = None 

461 self.reset_execution_trackers() 

462 self.widget_registry: dict[str, dict[str, t.Any]] = { 

463 "@jupyter-widgets/output": {"OutputModel": OutputWidget} 

464 } 

465 # comm_open_handlers should return an object with a .handle_msg(msg) method or None 

466 self.comm_open_handlers: dict[str, t.Any] = { 

467 "jupyter.widget": self.on_comm_open_jupyter_widget 

468 } 

469 

470 def reset_execution_trackers(self) -> None: 

471 """Resets any per-execution trackers.""" 

472 self.task_poll_for_reply: asyncio.Future[t.Any] | None = None 

473 self.code_cells_executed = 0 

474 self._display_id_map = {} 

475 self.widget_state: dict[str, dict[str, t.Any]] = {} 

476 self.widget_buffers: dict[str, dict[tuple[str, ...], dict[str, str]]] = {} 

477 # maps to list of hooks, where the last is used, this is used 

478 # to support nested use of output widgets. 

479 self.output_hook_stack: t.Any = collections.defaultdict(list) 

480 # our front-end mimicking Output widgets 

481 self.comm_objects: dict[str, t.Any] = {} 

482 

483 def create_kernel_manager(self) -> KernelManager: 

484 """Creates a new kernel manager. 

485 

486 Returns 

487 ------- 

488 km : KernelManager 

489 Kernel manager whose client class is asynchronous. 

490 """ 

491 if not self.kernel_name: 

492 kn = self.nb.metadata.get("kernelspec", {}).get("name") 

493 if kn is not None: 

494 self.kernel_name = kn 

495 

496 if not self.kernel_name: 

497 self.km = self.kernel_manager_class(config=self.config) 

498 else: 

499 self.km = self.kernel_manager_class(kernel_name=self.kernel_name, config=self.config) 

500 assert self.km is not None 

501 return self.km 

502 

503 async def _async_cleanup_kernel(self) -> None: 

504 assert self.km is not None 

505 now = self.shutdown_kernel == "immediate" 

506 try: 

507 # Queue the manager to kill the process, and recover gracefully if it's already dead. 

508 if await ensure_async(self.km.is_alive()): 

509 await ensure_async(self.km.shutdown_kernel(now=now)) 

510 except RuntimeError as e: 

511 # The error isn't specialized, so we have to check the message 

512 if "No kernel is running!" not in str(e): 

513 raise 

514 finally: 

515 # Remove any state left over even if we failed to stop the kernel 

516 await ensure_async(self.km.cleanup_resources()) 

517 if getattr(self, "kc", None) and self.kc is not None: 

518 await ensure_async(self.kc.stop_channels()) # type:ignore[func-returns-value] 

519 self.kc = None 

520 self.km = None 

521 

522 _cleanup_kernel = run_sync(_async_cleanup_kernel) 

523 

524 async def async_start_new_kernel(self, **kwargs: t.Any) -> None: 

525 """Creates a new kernel. 

526 

527 Parameters 

528 ---------- 

529 kwargs : 

530 Any options for ``self.kernel_manager_class.start_kernel()``. Because 

531 that defaults to AsyncKernelManager, this will likely include options 

532 accepted by ``AsyncKernelManager.start_kernel()``, which includes ``cwd``. 

533 """ 

534 assert self.km is not None 

535 resource_path = self.resources.get("metadata", {}).get("path") or None 

536 if resource_path and "cwd" not in kwargs: 

537 kwargs["cwd"] = resource_path 

538 

539 has_history_manager_arg = any( 

540 arg.startswith("--HistoryManager.hist_file") for arg in self.extra_arguments 

541 ) 

542 if ( 

543 hasattr(self.km, "ipykernel") 

544 and self.km.ipykernel 

545 and self.ipython_hist_file 

546 and not has_history_manager_arg 

547 ): 

548 self.extra_arguments += [f"--HistoryManager.hist_file={self.ipython_hist_file}"] 

549 

550 await ensure_async(self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs)) 

551 

552 start_new_kernel = run_sync(async_start_new_kernel) 

553 

554 async def async_start_new_kernel_client(self) -> KernelClient: 

555 """Creates a new kernel client. 

556 

557 Returns 

558 ------- 

559 kc : KernelClient 

560 Kernel client as created by the kernel manager ``km``. 

561 """ 

562 assert self.km is not None 

563 try: 

564 self.kc = self.km.client() 

565 await ensure_async(self.kc.start_channels()) # type:ignore[func-returns-value] 

566 await ensure_async(self.kc.wait_for_ready(timeout=self.startup_timeout)) 

567 except Exception as e: 

568 self.log.error( 

569 "Error occurred while starting new kernel client for kernel {}: {}".format( 

570 getattr(self.km, "kernel_id", None), str(e) 

571 ) 

572 ) 

573 await self._async_cleanup_kernel() 

574 raise 

575 self.kc.allow_stdin = False 

576 await run_hook(self.on_notebook_start, notebook=self.nb) 

577 return self.kc 

578 

579 start_new_kernel_client = run_sync(async_start_new_kernel_client) 

580 

581 @contextmanager 

582 def setup_kernel(self, **kwargs: t.Any) -> t.Generator[None, None, None]: 

583 """ 

584 Context manager for setting up the kernel to execute a notebook. 

585 

586 The assigns the Kernel Manager (``self.km``) if missing and Kernel Client(``self.kc``). 

587 

588 When control returns from the yield it stops the client's zmq channels, and shuts 

589 down the kernel. 

590 """ 

591 # by default, cleanup the kernel client if we own the kernel manager 

592 # and keep it alive if we don't 

593 cleanup_kc = kwargs.pop("cleanup_kc", self.owns_km) 

594 

595 # Can't use run_until_complete on an asynccontextmanager function :( 

596 if self.km is None: 

597 self.km = self.create_kernel_manager() 

598 

599 if not self.km.has_kernel: 

600 self.start_new_kernel(**kwargs) 

601 

602 if self.kc is None: 

603 self.start_new_kernel_client() 

604 

605 try: 

606 yield 

607 finally: 

608 if cleanup_kc: 

609 self._cleanup_kernel() 

610 

611 @asynccontextmanager 

612 async def async_setup_kernel(self, **kwargs: t.Any) -> t.AsyncGenerator[None, None]: 

613 """ 

614 Context manager for setting up the kernel to execute a notebook. 

615 

616 This assigns the Kernel Manager (``self.km``) if missing and Kernel Client(``self.kc``). 

617 

618 When control returns from the yield it stops the client's zmq channels, and shuts 

619 down the kernel. 

620 

621 Handlers for SIGINT and SIGTERM are also added to cleanup in case of unexpected shutdown. 

622 """ 

623 # by default, cleanup the kernel client if we own the kernel manager 

624 # and keep it alive if we don't 

625 cleanup_kc = kwargs.pop("cleanup_kc", self.owns_km) 

626 if self.km is None: 

627 self.km = self.create_kernel_manager() 

628 

629 # self._cleanup_kernel uses run_async, which ensures the ioloop is running again. 

630 # This is necessary as the ioloop has stopped once atexit fires. 

631 atexit.register(self._cleanup_kernel) 

632 

633 def on_signal() -> None: 

634 """Handle signals.""" 

635 self._async_cleanup_kernel_future = asyncio.ensure_future(self._async_cleanup_kernel()) 

636 atexit.unregister(self._cleanup_kernel) 

637 

638 loop = asyncio.get_event_loop() 

639 try: 

640 loop.add_signal_handler(signal.SIGINT, on_signal) 

641 loop.add_signal_handler(signal.SIGTERM, on_signal) 

642 except RuntimeError: 

643 # NotImplementedError: Windows does not support signals. 

644 # RuntimeError: Raised when add_signal_handler is called outside the main thread 

645 pass 

646 

647 if not self.km.has_kernel: 

648 await self.async_start_new_kernel(**kwargs) 

649 

650 if self.kc is None: 

651 await self.async_start_new_kernel_client() 

652 

653 try: 

654 yield 

655 except RuntimeError as e: 

656 await run_hook(self.on_notebook_error, notebook=self.nb) 

657 raise e 

658 finally: 

659 if cleanup_kc: 

660 await self._async_cleanup_kernel() 

661 await run_hook(self.on_notebook_complete, notebook=self.nb) 

662 atexit.unregister(self._cleanup_kernel) 

663 try: 

664 loop.remove_signal_handler(signal.SIGINT) 

665 loop.remove_signal_handler(signal.SIGTERM) 

666 except RuntimeError: 

667 pass 

668 

669 async def async_execute(self, reset_kc: bool = False, **kwargs: t.Any) -> NotebookNode: 

670 """ 

671 Executes each code cell. 

672 

673 Parameters 

674 ---------- 

675 kwargs : 

676 Any option for ``self.kernel_manager_class.start_kernel()``. Because 

677 that defaults to AsyncKernelManager, this will likely include options 

678 accepted by ``jupyter_client.AsyncKernelManager.start_kernel()``, 

679 which includes ``cwd``. 

680 

681 ``reset_kc`` if True, the kernel client will be reset and a new one 

682 will be created (default: False). 

683 

684 Returns 

685 ------- 

686 nb : NotebookNode 

687 The executed notebook. 

688 """ 

689 if reset_kc and self.owns_km: 

690 await self._async_cleanup_kernel() 

691 self.reset_execution_trackers() 

692 

693 async with self.async_setup_kernel(**kwargs): 

694 assert self.kc is not None 

695 self.log.info("Executing notebook with kernel: %s" % self.kernel_name) 

696 msg_id = await ensure_async(self.kc.kernel_info()) 

697 info_msg = await self.async_wait_for_reply(msg_id) 

698 if info_msg is not None: 

699 if "language_info" in info_msg["content"]: 

700 self.nb.metadata["language_info"] = info_msg["content"]["language_info"] 

701 else: 

702 raise RuntimeError( 

703 'Kernel info received message content has no "language_info" key. ' 

704 "Content is:\n" + str(info_msg["content"]) 

705 ) 

706 for index, cell in enumerate(self.nb.cells): 

707 # Ignore `'execution_count' in content` as it's always 1 

708 # when store_history is False 

709 await self.async_execute_cell( 

710 cell, index, execution_count=self.code_cells_executed + 1 

711 ) 

712 self.set_widgets_metadata() 

713 

714 return self.nb 

715 

716 execute = run_sync(async_execute) 

717 

718 def set_widgets_metadata(self) -> None: 

719 """Set with widget metadata.""" 

720 if self.widget_state: 

721 self.nb.metadata.widgets = { 

722 "application/vnd.jupyter.widget-state+json": { 

723 "state": { 

724 model_id: self._serialize_widget_state(state) 

725 for model_id, state in self.widget_state.items() 

726 if "_model_name" in state 

727 }, 

728 "version_major": 2, 

729 "version_minor": 0, 

730 } 

731 } 

732 for key, widget in self.nb.metadata.widgets[ 

733 "application/vnd.jupyter.widget-state+json" 

734 ]["state"].items(): 

735 buffers = self.widget_buffers.get(key) 

736 if buffers: 

737 widget["buffers"] = list(buffers.values()) 

738 

739 def _update_display_id(self, display_id: str, msg: dict[str, t.Any]) -> None: 

740 """Update outputs with a given display_id""" 

741 if display_id not in self._display_id_map: 

742 self.log.debug("display id %r not in %s", display_id, self._display_id_map) 

743 return 

744 

745 if msg["header"]["msg_type"] == "update_display_data": 

746 msg["header"]["msg_type"] = "display_data" 

747 

748 try: 

749 out = output_from_msg(msg) 

750 except ValueError: 

751 self.log.error(f"unhandled iopub msg: {msg['msg_type']}") 

752 return 

753 

754 for cell_idx, output_indices in self._display_id_map[display_id].items(): 

755 cell = self.nb["cells"][cell_idx] 

756 outputs = cell["outputs"] 

757 for output_idx in output_indices: 

758 outputs[output_idx]["data"] = out["data"] 

759 outputs[output_idx]["metadata"] = out["metadata"] 

760 

761 async def _async_poll_for_reply( 

762 self, 

763 msg_id: str, 

764 cell: NotebookNode, 

765 timeout: int | None, 

766 task_poll_output_msg: asyncio.Future[t.Any], 

767 task_poll_kernel_alive: asyncio.Future[t.Any], 

768 ) -> dict[str, t.Any]: 

769 msg: dict[str, t.Any] 

770 assert self.kc is not None 

771 new_timeout: float | None = None 

772 if timeout is not None: 

773 deadline = monotonic() + timeout 

774 new_timeout = float(timeout) 

775 error_on_timeout_execute_reply = None 

776 while True: 

777 try: 

778 if error_on_timeout_execute_reply: 

779 msg = error_on_timeout_execute_reply # type:ignore[unreachable] 

780 msg["parent_header"] = {"msg_id": msg_id} 

781 else: 

782 msg = await ensure_async(self.kc.shell_channel.get_msg(timeout=new_timeout)) 

783 if msg["parent_header"].get("msg_id") == msg_id: 

784 if self.record_timing: 

785 cell["metadata"]["execution"]["shell.execute_reply"] = timestamp(msg) 

786 try: 

787 await asyncio.wait_for(task_poll_output_msg, self.iopub_timeout) 

788 except (asyncio.TimeoutError, Empty): 

789 if self.raise_on_iopub_timeout: 

790 task_poll_kernel_alive.cancel() 

791 raise CellTimeoutError.error_from_timeout_and_cell( 

792 "Timeout waiting for IOPub output", self.iopub_timeout, cell 

793 ) from None 

794 else: 

795 self.log.warning("Timeout waiting for IOPub output") 

796 task_poll_kernel_alive.cancel() 

797 return msg 

798 else: 

799 if new_timeout is not None: 

800 new_timeout = max(0, deadline - monotonic()) 

801 except Empty: 

802 # received no message, check if kernel is still alive 

803 assert timeout is not None 

804 task_poll_kernel_alive.cancel() 

805 await self._async_check_alive() 

806 error_on_timeout_execute_reply = await self._async_handle_timeout(timeout, cell) 

807 

808 async def _async_poll_output_msg( 

809 self, parent_msg_id: str, cell: NotebookNode, cell_index: int 

810 ) -> None: 

811 assert self.kc is not None 

812 while True: 

813 msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None)) 

814 if msg["parent_header"].get("msg_id") == parent_msg_id: 

815 try: 

816 # Will raise CellExecutionComplete when completed 

817 self.process_message(msg, cell, cell_index) 

818 except CellExecutionComplete: 

819 return 

820 

821 async def _async_poll_kernel_alive(self) -> None: 

822 while True: 

823 await asyncio.sleep(1) 

824 try: 

825 await self._async_check_alive() 

826 except DeadKernelError: 

827 assert self.task_poll_for_reply is not None 

828 self.task_poll_for_reply.cancel() 

829 return 

830 

831 def _get_timeout(self, cell: NotebookNode | None) -> int | None: 

832 if self.timeout_func is not None and cell is not None: 

833 timeout = self.timeout_func(cell) 

834 else: 

835 timeout = self.timeout 

836 

837 if not timeout or timeout < 0: 

838 timeout = None 

839 

840 return timeout 

841 

842 async def _async_handle_timeout( 

843 self, timeout: int, cell: NotebookNode | None = None 

844 ) -> None | dict[str, t.Any]: 

845 self.log.error("Timeout waiting for execute reply (%is)." % timeout) 

846 if self.interrupt_on_timeout: 

847 self.log.error("Interrupting kernel") 

848 assert self.km is not None 

849 await ensure_async(self.km.interrupt_kernel()) 

850 if self.error_on_timeout: 

851 execute_reply = {"content": {**self.error_on_timeout, "status": "error"}} 

852 return execute_reply 

853 return None 

854 else: 

855 assert cell is not None 

856 raise CellTimeoutError.error_from_timeout_and_cell( 

857 "Cell execution timed out", timeout, cell 

858 ) 

859 

860 async def _async_check_alive(self) -> None: 

861 assert self.kc is not None 

862 if not await ensure_async(self.kc.is_alive()): # type:ignore[attr-defined] 

863 self.log.error("Kernel died while waiting for execute reply.") 

864 raise DeadKernelError("Kernel died") 

865 

866 async def async_wait_for_reply( 

867 self, msg_id: str, cell: NotebookNode | None = None 

868 ) -> dict[str, t.Any] | None: 

869 """Wait for a message reply.""" 

870 assert self.kc is not None 

871 # wait for finish, with timeout 

872 timeout = self._get_timeout(cell) 

873 cummulative_time = 0 

874 while True: 

875 try: 

876 msg: dict[str, t.Any] = await ensure_async( 

877 self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval) 

878 ) 

879 except Empty: 

880 await self._async_check_alive() 

881 cummulative_time += self.shell_timeout_interval 

882 if timeout and cummulative_time > timeout: 

883 await self._async_handle_timeout(timeout, cell) 

884 break 

885 else: 

886 if msg["parent_header"].get("msg_id") == msg_id: 

887 return msg 

888 return None 

889 

890 wait_for_reply = run_sync(async_wait_for_reply) 

891 # Backwards compatibility naming for papermill 

892 _wait_for_reply = wait_for_reply 

893 

894 def _passed_deadline(self, deadline: int | None) -> bool: 

895 if deadline is not None and deadline - monotonic() <= 0: 

896 return True 

897 return False 

898 

899 async def _check_raise_for_error( 

900 self, cell: NotebookNode, cell_index: int, exec_reply: dict[str, t.Any] | None 

901 ) -> None: 

902 if exec_reply is None: 

903 return None 

904 

905 exec_reply_content = exec_reply["content"] 

906 if exec_reply_content["status"] != "error": 

907 return None 

908 

909 cell_allows_errors = (not self.force_raise_errors) and ( 

910 self.allow_errors 

911 or exec_reply_content.get("ename") in self.allow_error_names 

912 or "raises-exception" in cell.metadata.get("tags", []) 

913 ) 

914 await run_hook( 

915 self.on_cell_error, cell=cell, cell_index=cell_index, execute_reply=exec_reply 

916 ) 

917 if not cell_allows_errors: 

918 raise CellExecutionError.from_cell_and_msg(cell, exec_reply_content) 

919 

920 async def async_execute_cell( 

921 self, 

922 cell: NotebookNode, 

923 cell_index: int, 

924 execution_count: int | None = None, 

925 store_history: bool = True, 

926 ) -> NotebookNode: 

927 """ 

928 Executes a single code cell. 

929 

930 To execute all cells see :meth:`execute`. 

931 

932 Parameters 

933 ---------- 

934 cell : nbformat.NotebookNode 

935 The cell which is currently being processed. 

936 cell_index : int 

937 The position of the cell within the notebook object. 

938 execution_count : int 

939 The execution count to be assigned to the cell (default: Use kernel response) 

940 store_history : bool 

941 Determines if history should be stored in the kernel (default: False). 

942 Specific to ipython kernels, which can store command histories. 

943 

944 Returns 

945 ------- 

946 output : dict 

947 The execution output payload (or None for no output). 

948 

949 Raises 

950 ------ 

951 CellExecutionError 

952 If execution failed and should raise an exception, this will be raised 

953 with defaults about the failure. 

954 

955 Returns 

956 ------- 

957 cell : NotebookNode 

958 The cell which was just processed. 

959 """ 

960 assert self.kc is not None 

961 

962 await run_hook(self.on_cell_start, cell=cell, cell_index=cell_index) 

963 

964 if cell.cell_type != "code" or not cell.source.strip(): 

965 self.log.debug("Skipping non-executing cell %s", cell_index) 

966 return cell 

967 

968 if self.skip_cells_with_tag in cell.metadata.get("tags", []): 

969 self.log.debug("Skipping tagged cell %s", cell_index) 

970 return cell 

971 

972 if self.record_timing: # clear execution metadata prior to execution 

973 cell["metadata"]["execution"] = {} 

974 

975 self.log.debug("Executing cell:\n%s", cell.source) 

976 

977 cell_allows_errors = (not self.force_raise_errors) and ( 

978 self.allow_errors or "raises-exception" in cell.metadata.get("tags", []) 

979 ) 

980 

981 await run_hook(self.on_cell_execute, cell=cell, cell_index=cell_index) 

982 parent_msg_id = await ensure_async( 

983 self.kc.execute( 

984 cell.source, store_history=store_history, stop_on_error=not cell_allows_errors 

985 ) 

986 ) 

987 await run_hook(self.on_cell_complete, cell=cell, cell_index=cell_index) 

988 # We launched a code cell to execute 

989 self.code_cells_executed += 1 

990 exec_timeout = self._get_timeout(cell) 

991 

992 cell.outputs = [] 

993 self.clear_before_next_output = False 

994 

995 task_poll_kernel_alive = asyncio.ensure_future(self._async_poll_kernel_alive()) 

996 task_poll_output_msg = asyncio.ensure_future( 

997 self._async_poll_output_msg(parent_msg_id, cell, cell_index) 

998 ) 

999 self.task_poll_for_reply = asyncio.ensure_future( 

1000 self._async_poll_for_reply( 

1001 parent_msg_id, cell, exec_timeout, task_poll_output_msg, task_poll_kernel_alive 

1002 ) 

1003 ) 

1004 try: 

1005 exec_reply = await self.task_poll_for_reply 

1006 except asyncio.CancelledError: 

1007 # can only be cancelled by task_poll_kernel_alive when the kernel is dead 

1008 task_poll_output_msg.cancel() 

1009 raise DeadKernelError("Kernel died") from None 

1010 except Exception as e: 

1011 # Best effort to cancel request if it hasn't been resolved 

1012 try: 

1013 # Check if the task_poll_output is doing the raising for us 

1014 if not isinstance(e, CellControlSignal): 

1015 task_poll_output_msg.cancel() 

1016 finally: 

1017 raise 

1018 

1019 if execution_count: 

1020 cell["execution_count"] = execution_count 

1021 await run_hook( 

1022 self.on_cell_executed, cell=cell, cell_index=cell_index, execute_reply=exec_reply 

1023 ) 

1024 

1025 if self.coalesce_streams and cell.outputs: 

1026 new_outputs = [] 

1027 streams: dict[str, NotebookNode] = {} 

1028 for output in cell.outputs: 

1029 if output["output_type"] == "stream": 

1030 if output["name"] in streams: 

1031 streams[output["name"]]["text"] += output["text"] 

1032 else: 

1033 new_outputs.append(output) 

1034 streams[output["name"]] = output 

1035 else: 

1036 new_outputs.append(output) 

1037 

1038 # process \r and \b characters 

1039 for output in streams.values(): 

1040 old = output["text"] 

1041 while len(output["text"]) < len(old): 

1042 old = output["text"] 

1043 # Cancel out anything-but-newline followed by backspace 

1044 output["text"] = _RGX_BACKSPACE.sub("", output["text"]) 

1045 # Replace all carriage returns not followed by newline 

1046 output["text"] = _RGX_CARRIAGERETURN.sub("", output["text"]) 

1047 

1048 # We also want to ensure stdout and stderr are always in the same consecutive order, 

1049 # because they are asynchronous, so order isn't guaranteed. 

1050 for i, output in enumerate(new_outputs): 

1051 if output["output_type"] == "stream" and output["name"] == "stderr": 

1052 if ( 

1053 len(new_outputs) >= i + 2 

1054 and new_outputs[i + 1]["output_type"] == "stream" 

1055 and new_outputs[i + 1]["name"] == "stdout" 

1056 ): 

1057 stdout = new_outputs.pop(i + 1) 

1058 new_outputs.insert(i, stdout) 

1059 

1060 cell.outputs = new_outputs 

1061 

1062 await self._check_raise_for_error(cell, cell_index, exec_reply) 

1063 

1064 self.nb["cells"][cell_index] = cell 

1065 return cell 

1066 

1067 execute_cell = run_sync(async_execute_cell) 

1068 

1069 def process_message( 

1070 self, msg: dict[str, t.Any], cell: NotebookNode, cell_index: int 

1071 ) -> NotebookNode | None: 

1072 """ 

1073 Processes a kernel message, updates cell state, and returns the 

1074 resulting output object that was appended to cell.outputs. 

1075 

1076 The input argument *cell* is modified in-place. 

1077 

1078 Parameters 

1079 ---------- 

1080 msg : dict 

1081 The kernel message being processed. 

1082 cell : nbformat.NotebookNode 

1083 The cell which is currently being processed. 

1084 cell_index : int 

1085 The position of the cell within the notebook object. 

1086 

1087 Returns 

1088 ------- 

1089 output : NotebookNode 

1090 The execution output payload (or None for no output). 

1091 

1092 Raises 

1093 ------ 

1094 CellExecutionComplete 

1095 Once a message arrives which indicates computation completeness. 

1096 

1097 """ 

1098 msg_type = msg["msg_type"] 

1099 self.log.debug("msg_type: %s", msg_type) 

1100 content = msg["content"] 

1101 self.log.debug("content: %s", content) 

1102 

1103 # while it's tempting to go for a more concise 

1104 # display_id = content.get("transient", {}).get("display_id", None) 

1105 # this breaks if transient is explicitly set to None 

1106 transient = content.get("transient") 

1107 display_id = transient.get("display_id") if transient else None 

1108 

1109 if display_id and msg_type in {"execute_result", "display_data", "update_display_data"}: 

1110 self._update_display_id(display_id, msg) 

1111 

1112 # set the prompt number for the input and the output 

1113 if "execution_count" in content: 

1114 cell["execution_count"] = content["execution_count"] 

1115 

1116 if self.record_timing: 

1117 if msg_type == "status": 

1118 if content["execution_state"] == "idle": 

1119 cell["metadata"]["execution"]["iopub.status.idle"] = timestamp(msg) 

1120 elif content["execution_state"] == "busy": 

1121 cell["metadata"]["execution"]["iopub.status.busy"] = timestamp(msg) 

1122 elif msg_type == "execute_input": 

1123 cell["metadata"]["execution"]["iopub.execute_input"] = timestamp(msg) 

1124 

1125 if msg_type == "status": 

1126 if content["execution_state"] == "idle": 

1127 raise CellExecutionComplete() 

1128 elif msg_type == "clear_output": 

1129 self.clear_output(cell.outputs, msg, cell_index) 

1130 elif msg_type.startswith("comm"): 

1131 self.handle_comm_msg(cell.outputs, msg, cell_index) 

1132 # Check for remaining messages we don't process 

1133 elif msg_type not in ["execute_input", "update_display_data"]: 

1134 # Assign output as our processed "result" 

1135 return self.output(cell.outputs, msg, display_id, cell_index) 

1136 return None 

1137 

1138 def output( 

1139 self, outs: list[NotebookNode], msg: dict[str, t.Any], display_id: str, cell_index: int 

1140 ) -> NotebookNode | None: 

1141 """Handle output.""" 

1142 

1143 msg_type = msg["msg_type"] 

1144 out: NotebookNode | None = None 

1145 

1146 parent_msg_id = msg["parent_header"].get("msg_id") 

1147 if self.output_hook_stack[parent_msg_id]: 

1148 # if we have a hook registered, it will override our 

1149 # default output behaviour (e.g. OutputWidget) 

1150 hook = self.output_hook_stack[parent_msg_id][-1] 

1151 hook.output(outs, msg, display_id, cell_index) 

1152 return None 

1153 

1154 try: 

1155 out = output_from_msg(msg) 

1156 except ValueError: 

1157 self.log.error(f"unhandled iopub msg: {msg_type}") 

1158 return None 

1159 

1160 if self.clear_before_next_output: 

1161 self.log.debug("Executing delayed clear_output") 

1162 outs[:] = [] 

1163 self.clear_display_id_mapping(cell_index) 

1164 self.clear_before_next_output = False 

1165 

1166 if display_id: 

1167 # record output index in: 

1168 # _display_id_map[display_id][cell_idx] 

1169 cell_map = self._display_id_map.setdefault(display_id, {}) 

1170 output_idx_list = cell_map.setdefault(cell_index, []) 

1171 output_idx_list.append(len(outs)) 

1172 

1173 if out: 

1174 outs.append(out) 

1175 

1176 return out 

1177 

1178 def clear_output( 

1179 self, outs: list[NotebookNode], msg: dict[str, t.Any], cell_index: int 

1180 ) -> None: 

1181 """Clear output.""" 

1182 content = msg["content"] 

1183 

1184 parent_msg_id = msg["parent_header"].get("msg_id") 

1185 if self.output_hook_stack[parent_msg_id]: 

1186 # if we have a hook registered, it will override our 

1187 # default clear_output behaviour (e.g. OutputWidget) 

1188 hook = self.output_hook_stack[parent_msg_id][-1] 

1189 hook.clear_output(outs, msg, cell_index) 

1190 return 

1191 

1192 if content.get("wait"): 

1193 self.log.debug("Wait to clear output") 

1194 self.clear_before_next_output = True 

1195 else: 

1196 self.log.debug("Immediate clear output") 

1197 outs[:] = [] 

1198 self.clear_display_id_mapping(cell_index) 

1199 

1200 def clear_display_id_mapping(self, cell_index: int) -> None: 

1201 """Clear a display id mapping for a cell.""" 

1202 for _, cell_map in self._display_id_map.items(): 

1203 if cell_index in cell_map: 

1204 cell_map[cell_index] = [] 

1205 

1206 def handle_comm_msg( 

1207 self, outs: list[NotebookNode], msg: dict[str, t.Any], cell_index: int 

1208 ) -> None: 

1209 """Handle a comm message.""" 

1210 content = msg["content"] 

1211 data = content["data"] 

1212 if self.store_widget_state and "state" in data: # ignore custom msg'es 

1213 self.widget_state.setdefault(content["comm_id"], {}).update(data["state"]) 

1214 if data.get("buffer_paths"): 

1215 comm_id = content["comm_id"] 

1216 if comm_id not in self.widget_buffers: 

1217 self.widget_buffers[comm_id] = {} 

1218 # for each comm, the path uniquely identifies a buffer 

1219 new_buffers: dict[tuple[str, ...], dict[str, str]] = { 

1220 tuple(k["path"]): k for k in self._get_buffer_data(msg) 

1221 } 

1222 self.widget_buffers[comm_id].update(new_buffers) 

1223 # There are cases where we need to mimic a frontend, to get similar behaviour as 

1224 # when using the Output widget from Jupyter lab/notebook 

1225 if msg["msg_type"] == "comm_open": 

1226 target = msg["content"].get("target_name") 

1227 handler = self.comm_open_handlers.get(target) 

1228 if handler: 

1229 comm_id = msg["content"]["comm_id"] 

1230 comm_object = handler(msg) 

1231 if comm_object: 

1232 self.comm_objects[comm_id] = comm_object 

1233 else: 

1234 self.log.warning(f"No handler found for comm target {target!r}") 

1235 elif msg["msg_type"] == "comm_msg": 

1236 content = msg["content"] 

1237 comm_id = msg["content"]["comm_id"] 

1238 if comm_id in self.comm_objects: 

1239 self.comm_objects[comm_id].handle_msg(msg) 

1240 

1241 def _serialize_widget_state(self, state: dict[str, t.Any]) -> dict[str, t.Any]: 

1242 """Serialize a widget state, following format in @jupyter-widgets/schema.""" 

1243 return { 

1244 "model_name": state.get("_model_name"), 

1245 "model_module": state.get("_model_module"), 

1246 "model_module_version": state.get("_model_module_version"), 

1247 "state": state, 

1248 } 

1249 

1250 def _get_buffer_data(self, msg: dict[str, t.Any]) -> list[dict[str, str]]: 

1251 encoded_buffers = [] 

1252 paths = msg["content"]["data"]["buffer_paths"] 

1253 buffers = msg["buffers"] 

1254 for path, buffer in zip(paths, buffers): 

1255 encoded_buffers.append( 

1256 { 

1257 "data": base64.b64encode(buffer).decode("utf-8"), 

1258 "encoding": "base64", 

1259 "path": path, 

1260 } 

1261 ) 

1262 return encoded_buffers 

1263 

1264 def register_output_hook(self, msg_id: str, hook: OutputWidget) -> None: 

1265 """Registers an override object that handles output/clear_output instead. 

1266 

1267 Multiple hooks can be registered, where the last one will be used (stack based) 

1268 """ 

1269 # mimics 

1270 # https://jupyterlab.github.io/jupyterlab/services/interfaces/kernel.ikernelconnection.html#registermessagehook 

1271 self.output_hook_stack[msg_id].append(hook) 

1272 

1273 def remove_output_hook(self, msg_id: str, hook: OutputWidget) -> None: 

1274 """Unregisters an override object that handles output/clear_output instead""" 

1275 # mimics 

1276 # https://jupyterlab.github.io/jupyterlab/services/interfaces/kernel.ikernelconnection.html#removemessagehook 

1277 removed_hook = self.output_hook_stack[msg_id].pop() 

1278 assert removed_hook == hook 

1279 

1280 def on_comm_open_jupyter_widget(self, msg: dict[str, t.Any]) -> t.Any | None: 

1281 """Handle a jupyter widget comm open.""" 

1282 content = msg["content"] 

1283 data = content["data"] 

1284 state = data["state"] 

1285 comm_id = msg["content"]["comm_id"] 

1286 module = self.widget_registry.get(state["_model_module"]) 

1287 if module: 

1288 widget_class = module.get(state["_model_name"]) 

1289 if widget_class: 

1290 return widget_class(comm_id, state, self.kc, self) 

1291 return None 

1292 

1293 

1294def execute( 

1295 nb: NotebookNode, 

1296 cwd: str | None = None, 

1297 km: KernelManager | None = None, 

1298 **kwargs: t.Any, 

1299) -> NotebookNode: 

1300 """Execute a notebook's code, updating outputs within the notebook object. 

1301 

1302 This is a convenient wrapper around NotebookClient. It returns the 

1303 modified notebook object. 

1304 

1305 Parameters 

1306 ---------- 

1307 nb : NotebookNode 

1308 The notebook object to be executed 

1309 cwd : str, optional 

1310 If supplied, the kernel will run in this directory 

1311 km : AsyncKernelManager, optional 

1312 If supplied, the specified kernel manager will be used for code execution. 

1313 kwargs : 

1314 Any other options for NotebookClient, e.g. timeout, kernel_name 

1315 """ 

1316 resources = {} 

1317 if cwd is not None: 

1318 resources["metadata"] = {"path": cwd} 

1319 return NotebookClient(nb=nb, resources=resources, km=km, **kwargs).execute()