Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nbclient/client.py: 19%

512 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""nbclient implementation.""" 

2import asyncio 

3import atexit 

4import base64 

5import collections 

6import datetime 

7import re 

8import signal 

9import typing as t 

10from contextlib import asynccontextmanager, contextmanager 

11from queue import Empty 

12from textwrap import dedent 

13from time import monotonic 

14 

15from jupyter_client import KernelManager 

16from jupyter_client.client import KernelClient 

17from nbformat import NotebookNode 

18from nbformat.v4 import output_from_msg 

19from traitlets import Any, Bool, Callable, Dict, Enum, Integer, List, Type, Unicode, default 

20from traitlets.config.configurable import LoggingConfigurable 

21 

22from .exceptions import ( 

23 CellControlSignal, 

24 CellExecutionComplete, 

25 CellExecutionError, 

26 CellTimeoutError, 

27 DeadKernelError, 

28) 

29from .output_widget import OutputWidget 

30from .util import ensure_async, run_hook, run_sync 

31 

32_RGX_CARRIAGERETURN = re.compile(r".*\r(?=[^\n])") 

33_RGX_BACKSPACE = re.compile(r"[^\n]\b") 

34 

35 

36def timestamp(msg: t.Optional[t.Dict] = None) -> str: 

37 """Get the timestamp for a message.""" 

38 if msg and 'header' in msg: # The test mocks don't provide a header, so tolerate that 

39 msg_header = msg['header'] 

40 if 'date' in msg_header and isinstance(msg_header['date'], datetime.datetime): 

41 try: 

42 # reformat datetime into expected format 

43 formatted_time = datetime.datetime.strftime( 

44 msg_header['date'], '%Y-%m-%dT%H:%M:%S.%fZ' 

45 ) 

46 if ( 

47 formatted_time 

48 ): # docs indicate strftime may return empty string, so let's catch that too 

49 return formatted_time 

50 except Exception: # noqa 

51 pass # fallback to a local time 

52 

53 return datetime.datetime.utcnow().isoformat() + 'Z' 

54 

55 

56class NotebookClient(LoggingConfigurable): 

57 """ 

58 Encompasses a Client for executing cells in a notebook 

59 """ 

60 

61 timeout: int = Integer( 

62 None, 

63 allow_none=True, 

64 help=dedent( 

65 """ 

66 The time to wait (in seconds) for output from executions. 

67 If a cell execution takes longer, a TimeoutError is raised. 

68 

69 ``None`` or ``-1`` will disable the timeout. If ``timeout_func`` is set, 

70 it overrides ``timeout``. 

71 """ 

72 ), 

73 ).tag(config=True) 

74 

75 timeout_func: t.Callable[..., t.Optional[int]] = Any( 

76 default_value=None, 

77 allow_none=True, 

78 help=dedent( 

79 """ 

80 A callable which, when given the cell source as input, 

81 returns the time to wait (in seconds) for output from cell 

82 executions. If a cell execution takes longer, a TimeoutError 

83 is raised. 

84 

85 Returning ``None`` or ``-1`` will disable the timeout for the cell. 

86 Not setting ``timeout_func`` will cause the client to 

87 default to using the ``timeout`` trait for all cells. The 

88 ``timeout_func`` trait overrides ``timeout`` if it is not ``None``. 

89 """ 

90 ), 

91 ).tag(config=True) 

92 

93 interrupt_on_timeout: bool = Bool( 

94 False, 

95 help=dedent( 

96 """ 

97 If execution of a cell times out, interrupt the kernel and 

98 continue executing other cells rather than throwing an error and 

99 stopping. 

100 """ 

101 ), 

102 ).tag(config=True) 

103 

104 error_on_timeout: t.Optional[t.Dict] = Dict( 

105 default_value=None, 

106 allow_none=True, 

107 help=dedent( 

108 """ 

109 If a cell execution was interrupted after a timeout, don't wait for 

110 the execute_reply from the kernel (e.g. KeyboardInterrupt error). 

111 Instead, return an execute_reply with the given error, which should 

112 be of the following form:: 

113 

114 { 

115 'ename': str, # Exception name, as a string 

116 'evalue': str, # Exception value, as a string 

117 'traceback': list(str), # traceback frames, as strings 

118 } 

119 """ 

120 ), 

121 ).tag(config=True) 

122 

123 startup_timeout: int = Integer( 

124 60, 

125 help=dedent( 

126 """ 

127 The time to wait (in seconds) for the kernel to start. 

128 If kernel startup takes longer, a RuntimeError is 

129 raised. 

130 """ 

131 ), 

132 ).tag(config=True) 

133 

134 allow_errors: bool = Bool( 

135 False, 

136 help=dedent( 

137 """ 

138 If ``False`` (default), when a cell raises an error the 

139 execution is stopped and a ``CellExecutionError`` 

140 is raised, except if the error name is in 

141 ``allow_error_names``. 

142 If ``True``, execution errors are ignored and the execution 

143 is continued until the end of the notebook. Output from 

144 exceptions is included in the cell output in both cases. 

145 """ 

146 ), 

147 ).tag(config=True) 

148 

149 allow_error_names: t.List[str] = List( 

150 Unicode(), 

151 help=dedent( 

152 """ 

153 List of error names which won't stop the execution. Use this if the 

154 ``allow_errors`` option it too general and you want to allow only 

155 specific kinds of errors. 

156 """ 

157 ), 

158 ).tag(config=True) 

159 

160 force_raise_errors: bool = Bool( 

161 False, 

162 help=dedent( 

163 """ 

164 If False (default), errors from executing the notebook can be 

165 allowed with a ``raises-exception`` tag on a single cell, or the 

166 ``allow_errors`` or ``allow_error_names`` configurable options for 

167 all cells. An allowed error will be recorded in notebook output, and 

168 execution will continue. If an error occurs when it is not 

169 explicitly allowed, a ``CellExecutionError`` will be raised. 

170 If True, ``CellExecutionError`` will be raised for any error that occurs 

171 while executing the notebook. This overrides the ``allow_errors`` 

172 and ``allow_error_names`` options and the ``raises-exception`` cell 

173 tag. 

174 """ 

175 ), 

176 ).tag(config=True) 

177 

178 skip_cells_with_tag: str = Unicode( 

179 'skip-execution', 

180 help=dedent( 

181 """ 

182 Name of the cell tag to use to denote a cell that should be skipped. 

183 """ 

184 ), 

185 ).tag(config=True) 

186 

187 extra_arguments: t.List = List(Unicode()).tag(config=True) 

188 

189 kernel_name: str = Unicode( 

190 '', 

191 help=dedent( 

192 """ 

193 Name of kernel to use to execute the cells. 

194 If not set, use the kernel_spec embedded in the notebook. 

195 """ 

196 ), 

197 ).tag(config=True) 

198 

199 raise_on_iopub_timeout: bool = Bool( 

200 False, 

201 help=dedent( 

202 """ 

203 If ``False`` (default), then the kernel will continue waiting for 

204 iopub messages until it receives a kernel idle message, or until a 

205 timeout occurs, at which point the currently executing cell will be 

206 skipped. If ``True``, then an error will be raised after the first 

207 timeout. This option generally does not need to be used, but may be 

208 useful in contexts where there is the possibility of executing 

209 notebooks with memory-consuming infinite loops. 

210 """ 

211 ), 

212 ).tag(config=True) 

213 

214 store_widget_state: bool = Bool( 

215 True, 

216 help=dedent( 

217 """ 

218 If ``True`` (default), then the state of the Jupyter widgets created 

219 at the kernel will be stored in the metadata of the notebook. 

220 """ 

221 ), 

222 ).tag(config=True) 

223 

224 record_timing: bool = Bool( 

225 True, 

226 help=dedent( 

227 """ 

228 If ``True`` (default), then the execution timings of each cell will 

229 be stored in the metadata of the notebook. 

230 """ 

231 ), 

232 ).tag(config=True) 

233 

234 iopub_timeout: int = Integer( 

235 4, 

236 allow_none=False, 

237 help=dedent( 

238 """ 

239 The time to wait (in seconds) for IOPub output. This generally 

240 doesn't need to be set, but on some slow networks (such as CI 

241 systems) the default timeout might not be long enough to get all 

242 messages. 

243 """ 

244 ), 

245 ).tag(config=True) 

246 

247 shell_timeout_interval: int = Integer( 

248 5, 

249 allow_none=False, 

250 help=dedent( 

251 """ 

252 The time to wait (in seconds) for Shell output before retrying. 

253 This generally doesn't need to be set, but if one needs to check 

254 for dead kernels at a faster rate this can help. 

255 """ 

256 ), 

257 ).tag(config=True) 

258 

259 shutdown_kernel = Enum( 

260 ['graceful', 'immediate'], 

261 default_value='graceful', 

262 help=dedent( 

263 """ 

264 If ``graceful`` (default), then the kernel is given time to clean 

265 up after executing all cells, e.g., to execute its ``atexit`` hooks. 

266 If ``immediate``, then the kernel is signaled to immediately 

267 terminate. 

268 """ 

269 ), 

270 ).tag(config=True) 

271 

272 ipython_hist_file: str = Unicode( 

273 default_value=':memory:', 

274 help="""Path to file to use for SQLite history database for an IPython kernel. 

275 

276 The specific value ``:memory:`` (including the colon 

277 at both end but not the back ticks), avoids creating a history file. Otherwise, IPython 

278 will create a history file for each kernel. 

279 

280 When running kernels simultaneously (e.g. via multiprocessing) saving history a single 

281 SQLite file can result in database errors, so using ``:memory:`` is recommended in 

282 non-interactive contexts. 

283 """, 

284 ).tag(config=True) 

285 

286 kernel_manager_class = Type( 

287 config=True, klass=KernelManager, help='The kernel manager class to use.' 

288 ) 

289 

290 on_notebook_start: t.Optional[t.Callable] = Callable( 

291 default_value=None, 

292 allow_none=True, 

293 help=dedent( 

294 """ 

295 A callable which executes after the kernel manager and kernel client are setup, and 

296 cells are about to execute. 

297 Called with kwargs ``notebook``. 

298 """ 

299 ), 

300 ).tag(config=True) 

301 

302 on_notebook_complete: t.Optional[t.Callable] = Callable( 

303 default_value=None, 

304 allow_none=True, 

305 help=dedent( 

306 """ 

307 A callable which executes after the kernel is cleaned up. 

308 Called with kwargs ``notebook``. 

309 """ 

310 ), 

311 ).tag(config=True) 

312 

313 on_notebook_error: t.Optional[t.Callable] = Callable( 

314 default_value=None, 

315 allow_none=True, 

316 help=dedent( 

317 """ 

318 A callable which executes when the notebook encounters an error. 

319 Called with kwargs ``notebook``. 

320 """ 

321 ), 

322 ).tag(config=True) 

323 

324 on_cell_start: t.Optional[t.Callable] = Callable( 

325 default_value=None, 

326 allow_none=True, 

327 help=dedent( 

328 """ 

329 A callable which executes before a cell is executed and before non-executing cells 

330 are skipped. 

331 Called with kwargs ``cell`` and ``cell_index``. 

332 """ 

333 ), 

334 ).tag(config=True) 

335 

336 on_cell_execute: t.Optional[t.Callable] = Callable( 

337 default_value=None, 

338 allow_none=True, 

339 help=dedent( 

340 """ 

341 A callable which executes just before a code cell is executed. 

342 Called with kwargs ``cell`` and ``cell_index``. 

343 """ 

344 ), 

345 ).tag(config=True) 

346 

347 on_cell_complete: t.Optional[t.Callable] = Callable( 

348 default_value=None, 

349 allow_none=True, 

350 help=dedent( 

351 """ 

352 A callable which executes after a cell execution is complete. It is 

353 called even when a cell results in a failure. 

354 Called with kwargs ``cell`` and ``cell_index``. 

355 """ 

356 ), 

357 ).tag(config=True) 

358 

359 on_cell_executed: t.Optional[t.Callable] = Callable( 

360 default_value=None, 

361 allow_none=True, 

362 help=dedent( 

363 """ 

364 A callable which executes just after a code cell is executed, whether 

365 or not it results in an error. 

366 Called with kwargs ``cell``, ``cell_index`` and ``execute_reply``. 

367 """ 

368 ), 

369 ).tag(config=True) 

370 

371 on_cell_error: t.Optional[t.Callable] = Callable( 

372 default_value=None, 

373 allow_none=True, 

374 help=dedent( 

375 """ 

376 A callable which executes when a cell execution results in an error. 

377 This is executed even if errors are suppressed with ``cell_allows_errors``. 

378 Called with kwargs ``cell`, ``cell_index`` and ``execute_reply``. 

379 """ 

380 ), 

381 ).tag(config=True) 

382 

383 @default('kernel_manager_class') # type:ignore[misc] 

384 def _kernel_manager_class_default(self) -> t.Type[KernelManager]: 

385 """Use a dynamic default to avoid importing jupyter_client at startup""" 

386 from jupyter_client import AsyncKernelManager 

387 

388 return AsyncKernelManager 

389 

390 _display_id_map = Dict( 

391 help=dedent( 

392 """ 

393 mapping of locations of outputs with a given display_id 

394 tracks cell index and output index within cell.outputs for 

395 each appearance of the display_id 

396 { 

397 'display_id': { 

398 cell_idx: [output_idx,] 

399 } 

400 } 

401 """ 

402 ) 

403 ) 

404 

405 display_data_priority: t.List = List( 

406 [ 

407 'text/html', 

408 'application/pdf', 

409 'text/latex', 

410 'image/svg+xml', 

411 'image/png', 

412 'image/jpeg', 

413 'text/markdown', 

414 'text/plain', 

415 ], 

416 help=""" 

417 An ordered list of preferred output type, the first 

418 encountered will usually be used when converting discarding 

419 the others. 

420 """, 

421 ).tag(config=True) 

422 

423 resources = Dict( 

424 help=dedent( 

425 """ 

426 Additional resources used in the conversion process. For example, 

427 passing ``{'metadata': {'path': run_path}}`` sets the 

428 execution path to ``run_path``. 

429 """ 

430 ) 

431 ) 

432 

433 coalesce_streams = Bool( 

434 help=dedent( 

435 """ 

436 Merge all stream outputs with shared names into single streams. 

437 """ 

438 ) 

439 ) 

440 

441 def __init__(self, nb: NotebookNode, km: t.Optional[KernelManager] = None, **kw: t.Any) -> None: 

442 """Initializes the execution manager. 

443 

444 Parameters 

445 ---------- 

446 nb : NotebookNode 

447 Notebook being executed. 

448 km : KernelManager (optional) 

449 Optional kernel manager. If none is provided, a kernel manager will 

450 be created. 

451 """ 

452 super().__init__(**kw) 

453 self.nb: NotebookNode = nb 

454 self.km: t.Optional[KernelManager] = km 

455 self.owns_km: bool = km is None # whether the NotebookClient owns the kernel manager 

456 self.kc: t.Optional[KernelClient] = None 

457 self.reset_execution_trackers() 

458 self.widget_registry: t.Dict[str, t.Dict] = { 

459 '@jupyter-widgets/output': {'OutputModel': OutputWidget} 

460 } 

461 # comm_open_handlers should return an object with a .handle_msg(msg) method or None 

462 self.comm_open_handlers: t.Dict[str, t.Any] = { 

463 'jupyter.widget': self.on_comm_open_jupyter_widget 

464 } 

465 

466 def reset_execution_trackers(self) -> None: 

467 """Resets any per-execution trackers.""" 

468 self.task_poll_for_reply: t.Optional[asyncio.Future] = None 

469 self.code_cells_executed = 0 

470 self._display_id_map = {} 

471 self.widget_state: t.Dict[str, t.Dict] = {} 

472 self.widget_buffers: t.Dict[str, t.Dict[t.Tuple[str, ...], t.Dict[str, str]]] = {} 

473 # maps to list of hooks, where the last is used, this is used 

474 # to support nested use of output widgets. 

475 self.output_hook_stack: t.Any = collections.defaultdict(list) 

476 # our front-end mimicking Output widgets 

477 self.comm_objects: t.Dict[str, t.Any] = {} 

478 

479 def create_kernel_manager(self) -> KernelManager: 

480 """Creates a new kernel manager. 

481 

482 Returns 

483 ------- 

484 km : KernelManager 

485 Kernel manager whose client class is asynchronous. 

486 """ 

487 if not self.kernel_name: 

488 kn = self.nb.metadata.get('kernelspec', {}).get('name') 

489 if kn is not None: 

490 self.kernel_name = kn 

491 

492 if not self.kernel_name: 

493 self.km = self.kernel_manager_class(config=self.config) 

494 else: 

495 self.km = self.kernel_manager_class(kernel_name=self.kernel_name, config=self.config) 

496 assert self.km is not None 

497 return self.km 

498 

499 async def _async_cleanup_kernel(self) -> None: 

500 assert self.km is not None 

501 now = self.shutdown_kernel == "immediate" 

502 try: 

503 # Queue the manager to kill the process, and recover gracefully if it's already dead. 

504 if await ensure_async(self.km.is_alive()): 

505 await ensure_async(self.km.shutdown_kernel(now=now)) 

506 except RuntimeError as e: 

507 # The error isn't specialized, so we have to check the message 

508 if 'No kernel is running!' not in str(e): 

509 raise 

510 finally: 

511 # Remove any state left over even if we failed to stop the kernel 

512 await ensure_async(self.km.cleanup_resources()) 

513 if getattr(self, "kc", None) and self.kc is not None: 

514 await ensure_async(self.kc.stop_channels()) # type:ignore[func-returns-value] 

515 self.kc = None 

516 self.km = None 

517 

518 _cleanup_kernel = run_sync(_async_cleanup_kernel) 

519 

520 async def async_start_new_kernel(self, **kwargs: t.Any) -> None: 

521 """Creates a new kernel. 

522 

523 Parameters 

524 ---------- 

525 kwargs : 

526 Any options for ``self.kernel_manager_class.start_kernel()``. Because 

527 that defaults to AsyncKernelManager, this will likely include options 

528 accepted by ``AsyncKernelManager.start_kernel()``, which includes ``cwd``. 

529 """ 

530 assert self.km is not None 

531 resource_path = self.resources.get('metadata', {}).get('path') or None 

532 if resource_path and 'cwd' not in kwargs: 

533 kwargs["cwd"] = resource_path 

534 

535 has_history_manager_arg = any( 

536 arg.startswith('--HistoryManager.hist_file') for arg in self.extra_arguments 

537 ) 

538 if ( 

539 hasattr(self.km, 'ipykernel') 

540 and self.km.ipykernel 

541 and self.ipython_hist_file 

542 and not has_history_manager_arg 

543 ): 

544 self.extra_arguments += [f'--HistoryManager.hist_file={self.ipython_hist_file}'] 

545 

546 await ensure_async(self.km.start_kernel(extra_arguments=self.extra_arguments, **kwargs)) 

547 

548 start_new_kernel = run_sync(async_start_new_kernel) 

549 

550 async def async_start_new_kernel_client(self) -> KernelClient: 

551 """Creates a new kernel client. 

552 

553 Returns 

554 ------- 

555 kc : KernelClient 

556 Kernel client as created by the kernel manager ``km``. 

557 """ 

558 assert self.km is not None 

559 try: 

560 self.kc = self.km.client() 

561 await ensure_async(self.kc.start_channels()) # type:ignore[func-returns-value] 

562 await ensure_async(self.kc.wait_for_ready(timeout=self.startup_timeout)) 

563 except Exception as e: 

564 self.log.error( 

565 "Error occurred while starting new kernel client for kernel {}: {}".format( 

566 getattr(self.km, 'kernel_id', None), str(e) 

567 ) 

568 ) 

569 await self._async_cleanup_kernel() 

570 raise 

571 self.kc.allow_stdin = False 

572 await run_hook(self.on_notebook_start, notebook=self.nb) 

573 return self.kc 

574 

575 start_new_kernel_client = run_sync(async_start_new_kernel_client) 

576 

577 @contextmanager 

578 def setup_kernel(self, **kwargs: t.Any) -> t.Generator: 

579 """ 

580 Context manager for setting up the kernel to execute a notebook. 

581 

582 The assigns the Kernel Manager (``self.km``) if missing and Kernel Client(``self.kc``). 

583 

584 When control returns from the yield it stops the client's zmq channels, and shuts 

585 down the kernel. 

586 """ 

587 # by default, cleanup the kernel client if we own the kernel manager 

588 # and keep it alive if we don't 

589 cleanup_kc = kwargs.pop('cleanup_kc', self.owns_km) 

590 

591 # Can't use run_until_complete on an asynccontextmanager function :( 

592 if self.km is None: 

593 self.km = self.create_kernel_manager() 

594 

595 if not self.km.has_kernel: 

596 self.start_new_kernel(**kwargs) 

597 

598 if self.kc is None: 

599 self.start_new_kernel_client() 

600 

601 try: 

602 yield 

603 finally: 

604 if cleanup_kc: 

605 self._cleanup_kernel() 

606 

607 @asynccontextmanager 

608 async def async_setup_kernel(self, **kwargs: t.Any) -> t.AsyncGenerator: 

609 """ 

610 Context manager for setting up the kernel to execute a notebook. 

611 

612 This assigns the Kernel Manager (``self.km``) if missing and Kernel Client(``self.kc``). 

613 

614 When control returns from the yield it stops the client's zmq channels, and shuts 

615 down the kernel. 

616 

617 Handlers for SIGINT and SIGTERM are also added to cleanup in case of unexpected shutdown. 

618 """ 

619 # by default, cleanup the kernel client if we own the kernel manager 

620 # and keep it alive if we don't 

621 cleanup_kc = kwargs.pop('cleanup_kc', self.owns_km) 

622 if self.km is None: 

623 self.km = self.create_kernel_manager() 

624 

625 # self._cleanup_kernel uses run_async, which ensures the ioloop is running again. 

626 # This is necessary as the ioloop has stopped once atexit fires. 

627 atexit.register(self._cleanup_kernel) 

628 

629 def on_signal(): 

630 """Handle signals.""" 

631 self._async_cleanup_kernel_future = asyncio.ensure_future(self._async_cleanup_kernel()) 

632 atexit.unregister(self._cleanup_kernel) 

633 

634 loop = asyncio.get_event_loop() 

635 try: 

636 loop.add_signal_handler(signal.SIGINT, on_signal) 

637 loop.add_signal_handler(signal.SIGTERM, on_signal) 

638 except RuntimeError: 

639 # NotImplementedError: Windows does not support signals. 

640 # RuntimeError: Raised when add_signal_handler is called outside the main thread 

641 pass 

642 

643 if not self.km.has_kernel: 

644 await self.async_start_new_kernel(**kwargs) 

645 

646 if self.kc is None: 

647 await self.async_start_new_kernel_client() 

648 

649 try: 

650 yield 

651 except RuntimeError as e: 

652 await run_hook(self.on_notebook_error, notebook=self.nb) 

653 raise e 

654 finally: 

655 if cleanup_kc: 

656 await self._async_cleanup_kernel() 

657 await run_hook(self.on_notebook_complete, notebook=self.nb) 

658 atexit.unregister(self._cleanup_kernel) 

659 try: 

660 loop.remove_signal_handler(signal.SIGINT) 

661 loop.remove_signal_handler(signal.SIGTERM) 

662 except RuntimeError: 

663 pass 

664 

665 async def async_execute(self, reset_kc: bool = False, **kwargs: t.Any) -> NotebookNode: 

666 """ 

667 Executes each code cell. 

668 

669 Parameters 

670 ---------- 

671 kwargs : 

672 Any option for ``self.kernel_manager_class.start_kernel()``. Because 

673 that defaults to AsyncKernelManager, this will likely include options 

674 accepted by ``jupyter_client.AsyncKernelManager.start_kernel()``, 

675 which includes ``cwd``. 

676 

677 ``reset_kc`` if True, the kernel client will be reset and a new one 

678 will be created (default: False). 

679 

680 Returns 

681 ------- 

682 nb : NotebookNode 

683 The executed notebook. 

684 """ 

685 if reset_kc and self.owns_km: 

686 await self._async_cleanup_kernel() 

687 self.reset_execution_trackers() 

688 

689 async with self.async_setup_kernel(**kwargs): 

690 assert self.kc is not None 

691 self.log.info("Executing notebook with kernel: %s" % self.kernel_name) 

692 msg_id = await ensure_async(self.kc.kernel_info()) 

693 info_msg = await self.async_wait_for_reply(msg_id) 

694 if info_msg is not None: 

695 if 'language_info' in info_msg['content']: 

696 self.nb.metadata['language_info'] = info_msg['content']['language_info'] 

697 else: 

698 raise RuntimeError( 

699 'Kernel info received message content has no "language_info" key. ' 

700 'Content is:\n' + str(info_msg['content']) 

701 ) 

702 for index, cell in enumerate(self.nb.cells): 

703 # Ignore `'execution_count' in content` as it's always 1 

704 # when store_history is False 

705 await self.async_execute_cell( 

706 cell, index, execution_count=self.code_cells_executed + 1 

707 ) 

708 self.set_widgets_metadata() 

709 

710 return self.nb 

711 

712 execute = run_sync(async_execute) 

713 

714 def set_widgets_metadata(self) -> None: 

715 """Set with widget metadata.""" 

716 if self.widget_state: 

717 self.nb.metadata.widgets = { 

718 'application/vnd.jupyter.widget-state+json': { 

719 'state': { 

720 model_id: self._serialize_widget_state(state) 

721 for model_id, state in self.widget_state.items() 

722 if '_model_name' in state 

723 }, 

724 'version_major': 2, 

725 'version_minor': 0, 

726 } 

727 } 

728 for key, widget in self.nb.metadata.widgets[ 

729 'application/vnd.jupyter.widget-state+json' 

730 ]['state'].items(): 

731 buffers = self.widget_buffers.get(key) 

732 if buffers: 

733 widget['buffers'] = list(buffers.values()) 

734 

735 def _update_display_id(self, display_id: str, msg: t.Dict) -> None: 

736 """Update outputs with a given display_id""" 

737 if display_id not in self._display_id_map: 

738 self.log.debug("display id %r not in %s", display_id, self._display_id_map) 

739 return 

740 

741 if msg['header']['msg_type'] == 'update_display_data': 

742 msg['header']['msg_type'] = 'display_data' 

743 

744 try: 

745 out = output_from_msg(msg) 

746 except ValueError: 

747 self.log.error(f"unhandled iopub msg: {msg['msg_type']}") 

748 return 

749 

750 for cell_idx, output_indices in self._display_id_map[display_id].items(): 

751 cell = self.nb['cells'][cell_idx] 

752 outputs = cell['outputs'] 

753 for output_idx in output_indices: 

754 outputs[output_idx]['data'] = out['data'] 

755 outputs[output_idx]['metadata'] = out['metadata'] 

756 

757 async def _async_poll_for_reply( 

758 self, 

759 msg_id: str, 

760 cell: NotebookNode, 

761 timeout: t.Optional[int], 

762 task_poll_output_msg: asyncio.Future, 

763 task_poll_kernel_alive: asyncio.Future, 

764 ) -> t.Dict: 

765 msg: t.Dict 

766 assert self.kc is not None 

767 new_timeout: t.Optional[float] = None 

768 if timeout is not None: 

769 deadline = monotonic() + timeout 

770 new_timeout = float(timeout) 

771 error_on_timeout_execute_reply = None 

772 while True: 

773 try: 

774 if error_on_timeout_execute_reply: 

775 msg = error_on_timeout_execute_reply 

776 msg['parent_header'] = {'msg_id': msg_id} 

777 else: 

778 msg = await ensure_async(self.kc.shell_channel.get_msg(timeout=new_timeout)) 

779 if msg['parent_header'].get('msg_id') == msg_id: 

780 if self.record_timing: 

781 cell['metadata']['execution']['shell.execute_reply'] = timestamp(msg) 

782 try: 

783 await asyncio.wait_for(task_poll_output_msg, self.iopub_timeout) 

784 except (asyncio.TimeoutError, Empty): 

785 if self.raise_on_iopub_timeout: 

786 task_poll_kernel_alive.cancel() 

787 raise CellTimeoutError.error_from_timeout_and_cell( 

788 "Timeout waiting for IOPub output", self.iopub_timeout, cell 

789 ) from None 

790 else: 

791 self.log.warning("Timeout waiting for IOPub output") 

792 task_poll_kernel_alive.cancel() 

793 return msg 

794 else: 

795 if new_timeout is not None: 

796 new_timeout = max(0, deadline - monotonic()) 

797 except Empty: 

798 # received no message, check if kernel is still alive 

799 assert timeout is not None 

800 task_poll_kernel_alive.cancel() 

801 await self._async_check_alive() 

802 error_on_timeout_execute_reply = await self._async_handle_timeout(timeout, cell) 

803 

804 async def _async_poll_output_msg( 

805 self, parent_msg_id: str, cell: NotebookNode, cell_index: int 

806 ) -> None: 

807 assert self.kc is not None 

808 while True: 

809 msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None)) 

810 if msg['parent_header'].get('msg_id') == parent_msg_id: 

811 try: 

812 # Will raise CellExecutionComplete when completed 

813 self.process_message(msg, cell, cell_index) 

814 except CellExecutionComplete: 

815 return 

816 

817 async def _async_poll_kernel_alive(self) -> None: 

818 while True: 

819 await asyncio.sleep(1) 

820 try: 

821 await self._async_check_alive() 

822 except DeadKernelError: 

823 assert self.task_poll_for_reply is not None 

824 self.task_poll_for_reply.cancel() 

825 return 

826 

827 def _get_timeout(self, cell: t.Optional[NotebookNode]) -> t.Optional[int]: 

828 if self.timeout_func is not None and cell is not None: 

829 timeout = self.timeout_func(cell) 

830 else: 

831 timeout = self.timeout 

832 

833 if not timeout or timeout < 0: 

834 timeout = None 

835 

836 return timeout 

837 

838 async def _async_handle_timeout( 

839 self, timeout: int, cell: t.Optional[NotebookNode] = None 

840 ) -> t.Union[None, t.Dict]: 

841 self.log.error("Timeout waiting for execute reply (%is)." % timeout) 

842 if self.interrupt_on_timeout: 

843 self.log.error("Interrupting kernel") 

844 assert self.km is not None 

845 await ensure_async(self.km.interrupt_kernel()) 

846 if self.error_on_timeout: 

847 execute_reply = {"content": {**self.error_on_timeout, "status": "error"}} 

848 return execute_reply 

849 return None 

850 else: 

851 assert cell is not None 

852 raise CellTimeoutError.error_from_timeout_and_cell( 

853 "Cell execution timed out", timeout, cell 

854 ) 

855 

856 async def _async_check_alive(self) -> None: 

857 assert self.kc is not None 

858 if not await ensure_async(self.kc.is_alive()): # type:ignore[attr-defined] 

859 self.log.error("Kernel died while waiting for execute reply.") 

860 raise DeadKernelError("Kernel died") 

861 

862 async def async_wait_for_reply( 

863 self, msg_id: str, cell: t.Optional[NotebookNode] = None 

864 ) -> t.Optional[t.Dict]: 

865 """Wait for a message reply.""" 

866 assert self.kc is not None 

867 # wait for finish, with timeout 

868 timeout = self._get_timeout(cell) 

869 cummulative_time = 0 

870 while True: 

871 try: 

872 msg: t.Dict = await ensure_async( 

873 self.kc.shell_channel.get_msg(timeout=self.shell_timeout_interval) 

874 ) 

875 except Empty: 

876 await self._async_check_alive() 

877 cummulative_time += self.shell_timeout_interval 

878 if timeout and cummulative_time > timeout: 

879 await self._async_handle_timeout(timeout, cell) 

880 break 

881 else: 

882 if msg['parent_header'].get('msg_id') == msg_id: 

883 return msg 

884 return None 

885 

886 wait_for_reply = run_sync(async_wait_for_reply) 

887 # Backwards compatibility naming for papermill 

888 _wait_for_reply = wait_for_reply 

889 

890 def _passed_deadline(self, deadline: int) -> bool: 

891 if deadline is not None and deadline - monotonic() <= 0: 

892 return True 

893 return False 

894 

895 async def _check_raise_for_error( 

896 self, cell: NotebookNode, cell_index: int, exec_reply: t.Optional[t.Dict] 

897 ) -> None: 

898 if exec_reply is None: 

899 return None 

900 

901 exec_reply_content = exec_reply['content'] 

902 if exec_reply_content['status'] != 'error': 

903 return None 

904 

905 cell_allows_errors = (not self.force_raise_errors) and ( 

906 self.allow_errors 

907 or exec_reply_content.get('ename') in self.allow_error_names 

908 or "raises-exception" in cell.metadata.get("tags", []) 

909 ) 

910 await run_hook( 

911 self.on_cell_error, cell=cell, cell_index=cell_index, execute_reply=exec_reply 

912 ) 

913 if not cell_allows_errors: 

914 raise CellExecutionError.from_cell_and_msg(cell, exec_reply_content) 

915 

916 async def async_execute_cell( 

917 self, 

918 cell: NotebookNode, 

919 cell_index: int, 

920 execution_count: t.Optional[int] = None, 

921 store_history: bool = True, 

922 ) -> NotebookNode: 

923 """ 

924 Executes a single code cell. 

925 

926 To execute all cells see :meth:`execute`. 

927 

928 Parameters 

929 ---------- 

930 cell : nbformat.NotebookNode 

931 The cell which is currently being processed. 

932 cell_index : int 

933 The position of the cell within the notebook object. 

934 execution_count : int 

935 The execution count to be assigned to the cell (default: Use kernel response) 

936 store_history : bool 

937 Determines if history should be stored in the kernel (default: False). 

938 Specific to ipython kernels, which can store command histories. 

939 

940 Returns 

941 ------- 

942 output : dict 

943 The execution output payload (or None for no output). 

944 

945 Raises 

946 ------ 

947 CellExecutionError 

948 If execution failed and should raise an exception, this will be raised 

949 with defaults about the failure. 

950 

951 Returns 

952 ------- 

953 cell : NotebookNode 

954 The cell which was just processed. 

955 """ 

956 assert self.kc is not None 

957 

958 await run_hook(self.on_cell_start, cell=cell, cell_index=cell_index) 

959 

960 if cell.cell_type != 'code' or not cell.source.strip(): 

961 self.log.debug("Skipping non-executing cell %s", cell_index) 

962 return cell 

963 

964 if self.skip_cells_with_tag in cell.metadata.get("tags", []): 

965 self.log.debug("Skipping tagged cell %s", cell_index) 

966 return cell 

967 

968 if self.record_timing: # clear execution metadata prior to execution 

969 cell['metadata']['execution'] = {} 

970 

971 self.log.debug("Executing cell:\n%s", cell.source) 

972 

973 cell_allows_errors = (not self.force_raise_errors) and ( 

974 self.allow_errors or "raises-exception" in cell.metadata.get("tags", []) 

975 ) 

976 

977 await run_hook(self.on_cell_execute, cell=cell, cell_index=cell_index) 

978 parent_msg_id = await ensure_async( 

979 self.kc.execute( 

980 cell.source, store_history=store_history, stop_on_error=not cell_allows_errors 

981 ) 

982 ) 

983 await run_hook(self.on_cell_complete, cell=cell, cell_index=cell_index) 

984 # We launched a code cell to execute 

985 self.code_cells_executed += 1 

986 exec_timeout = self._get_timeout(cell) 

987 

988 cell.outputs = [] 

989 self.clear_before_next_output = False 

990 

991 task_poll_kernel_alive = asyncio.ensure_future(self._async_poll_kernel_alive()) 

992 task_poll_output_msg = asyncio.ensure_future( 

993 self._async_poll_output_msg(parent_msg_id, cell, cell_index) 

994 ) 

995 self.task_poll_for_reply = asyncio.ensure_future( 

996 self._async_poll_for_reply( 

997 parent_msg_id, cell, exec_timeout, task_poll_output_msg, task_poll_kernel_alive 

998 ) 

999 ) 

1000 try: 

1001 exec_reply = await self.task_poll_for_reply 

1002 except asyncio.CancelledError: 

1003 # can only be cancelled by task_poll_kernel_alive when the kernel is dead 

1004 task_poll_output_msg.cancel() 

1005 raise DeadKernelError("Kernel died") from None 

1006 except Exception as e: 

1007 # Best effort to cancel request if it hasn't been resolved 

1008 try: 

1009 # Check if the task_poll_output is doing the raising for us 

1010 if not isinstance(e, CellControlSignal): 

1011 task_poll_output_msg.cancel() 

1012 finally: 

1013 raise 

1014 

1015 if execution_count: 

1016 cell['execution_count'] = execution_count 

1017 await run_hook( 

1018 self.on_cell_executed, cell=cell, cell_index=cell_index, execute_reply=exec_reply 

1019 ) 

1020 

1021 if self.coalesce_streams and cell.outputs: 

1022 new_outputs = [] 

1023 streams: dict[str, NotebookNode] = {} 

1024 for output in cell.outputs: 

1025 if output["output_type"] == "stream": 

1026 if output["name"] in streams: 

1027 streams[output["name"]]["text"] += output["text"] 

1028 else: 

1029 new_outputs.append(output) 

1030 streams[output["name"]] = output 

1031 else: 

1032 new_outputs.append(output) 

1033 

1034 # process \r and \b characters 

1035 for output in streams.values(): 

1036 old = output["text"] 

1037 while len(output["text"]) < len(old): 

1038 old = output["text"] 

1039 # Cancel out anything-but-newline followed by backspace 

1040 output["text"] = _RGX_BACKSPACE.sub("", output["text"]) 

1041 # Replace all carriage returns not followed by newline 

1042 output["text"] = _RGX_CARRIAGERETURN.sub("", output["text"]) 

1043 

1044 # We also want to ensure stdout and stderr are always in the same consecutive order, 

1045 # because they are asynchronous, so order isn't guaranteed. 

1046 for i, output in enumerate(new_outputs): 

1047 if output["output_type"] == "stream" and output["name"] == "stderr": 

1048 if ( 

1049 len(new_outputs) >= i + 2 

1050 and new_outputs[i + 1]["output_type"] == "stream" 

1051 and new_outputs[i + 1]["name"] == "stdout" 

1052 ): 

1053 stdout = new_outputs.pop(i + 1) 

1054 new_outputs.insert(i, stdout) 

1055 

1056 cell.outputs = new_outputs 

1057 

1058 await self._check_raise_for_error(cell, cell_index, exec_reply) 

1059 

1060 self.nb['cells'][cell_index] = cell 

1061 return cell 

1062 

1063 execute_cell = run_sync(async_execute_cell) 

1064 

1065 def process_message( 

1066 self, msg: t.Dict, cell: NotebookNode, cell_index: int 

1067 ) -> t.Optional[NotebookNode]: 

1068 """ 

1069 Processes a kernel message, updates cell state, and returns the 

1070 resulting output object that was appended to cell.outputs. 

1071 

1072 The input argument *cell* is modified in-place. 

1073 

1074 Parameters 

1075 ---------- 

1076 msg : dict 

1077 The kernel message being processed. 

1078 cell : nbformat.NotebookNode 

1079 The cell which is currently being processed. 

1080 cell_index : int 

1081 The position of the cell within the notebook object. 

1082 

1083 Returns 

1084 ------- 

1085 output : NotebookNode 

1086 The execution output payload (or None for no output). 

1087 

1088 Raises 

1089 ------ 

1090 CellExecutionComplete 

1091 Once a message arrives which indicates computation completeness. 

1092 

1093 """ 

1094 msg_type = msg['msg_type'] 

1095 self.log.debug("msg_type: %s", msg_type) 

1096 content = msg['content'] 

1097 self.log.debug("content: %s", content) 

1098 

1099 display_id = content.get('transient', {}).get('display_id', None) 

1100 if display_id and msg_type in {'execute_result', 'display_data', 'update_display_data'}: 

1101 self._update_display_id(display_id, msg) 

1102 

1103 # set the prompt number for the input and the output 

1104 if 'execution_count' in content: 

1105 cell['execution_count'] = content['execution_count'] 

1106 

1107 if self.record_timing: 

1108 if msg_type == 'status': 

1109 if content['execution_state'] == 'idle': 

1110 cell['metadata']['execution']['iopub.status.idle'] = timestamp(msg) 

1111 elif content['execution_state'] == 'busy': 

1112 cell['metadata']['execution']['iopub.status.busy'] = timestamp(msg) 

1113 elif msg_type == 'execute_input': 

1114 cell['metadata']['execution']['iopub.execute_input'] = timestamp(msg) 

1115 

1116 if msg_type == 'status': 

1117 if content['execution_state'] == 'idle': 

1118 raise CellExecutionComplete() 

1119 elif msg_type == 'clear_output': 

1120 self.clear_output(cell.outputs, msg, cell_index) 

1121 elif msg_type.startswith('comm'): 

1122 self.handle_comm_msg(cell.outputs, msg, cell_index) 

1123 # Check for remaining messages we don't process 

1124 elif msg_type not in ['execute_input', 'update_display_data']: 

1125 # Assign output as our processed "result" 

1126 return self.output(cell.outputs, msg, display_id, cell_index) 

1127 return None 

1128 

1129 def output( 

1130 self, outs: t.List, msg: t.Dict, display_id: str, cell_index: int 

1131 ) -> t.Optional[NotebookNode]: 

1132 """Handle output.""" 

1133 

1134 msg_type = msg['msg_type'] 

1135 out: t.Optional[NotebookNode] = None 

1136 

1137 parent_msg_id = msg['parent_header'].get('msg_id') 

1138 if self.output_hook_stack[parent_msg_id]: 

1139 # if we have a hook registered, it will override our 

1140 # default output behaviour (e.g. OutputWidget) 

1141 hook = self.output_hook_stack[parent_msg_id][-1] 

1142 hook.output(outs, msg, display_id, cell_index) 

1143 return None 

1144 

1145 try: 

1146 out = output_from_msg(msg) 

1147 except ValueError: 

1148 self.log.error(f"unhandled iopub msg: {msg_type}") 

1149 return None 

1150 

1151 if self.clear_before_next_output: 

1152 self.log.debug('Executing delayed clear_output') 

1153 outs[:] = [] 

1154 self.clear_display_id_mapping(cell_index) 

1155 self.clear_before_next_output = False 

1156 

1157 if display_id: 

1158 # record output index in: 

1159 # _display_id_map[display_id][cell_idx] 

1160 cell_map = self._display_id_map.setdefault(display_id, {}) 

1161 output_idx_list = cell_map.setdefault(cell_index, []) 

1162 output_idx_list.append(len(outs)) 

1163 

1164 outs.append(out) 

1165 

1166 return out 

1167 

1168 def clear_output(self, outs: t.List, msg: t.Dict, cell_index: int) -> None: 

1169 """Clear output.""" 

1170 content = msg['content'] 

1171 

1172 parent_msg_id = msg['parent_header'].get('msg_id') 

1173 if self.output_hook_stack[parent_msg_id]: 

1174 # if we have a hook registered, it will override our 

1175 # default clear_output behaviour (e.g. OutputWidget) 

1176 hook = self.output_hook_stack[parent_msg_id][-1] 

1177 hook.clear_output(outs, msg, cell_index) 

1178 return 

1179 

1180 if content.get('wait'): 

1181 self.log.debug('Wait to clear output') 

1182 self.clear_before_next_output = True 

1183 else: 

1184 self.log.debug('Immediate clear output') 

1185 outs[:] = [] 

1186 self.clear_display_id_mapping(cell_index) 

1187 

1188 def clear_display_id_mapping(self, cell_index: int) -> None: 

1189 """Clear a display id mapping for a cell.""" 

1190 for _, cell_map in self._display_id_map.items(): 

1191 if cell_index in cell_map: 

1192 cell_map[cell_index] = [] 

1193 

1194 def handle_comm_msg(self, outs: t.List, msg: t.Dict, cell_index: int) -> None: 

1195 """Handle a comm message.""" 

1196 content = msg['content'] 

1197 data = content['data'] 

1198 if self.store_widget_state and 'state' in data: # ignore custom msg'es 

1199 self.widget_state.setdefault(content['comm_id'], {}).update(data['state']) 

1200 if 'buffer_paths' in data and data['buffer_paths']: 

1201 comm_id = content['comm_id'] 

1202 if comm_id not in self.widget_buffers: 

1203 self.widget_buffers[comm_id] = {} 

1204 # for each comm, the path uniquely identifies a buffer 

1205 new_buffers: t.Dict[t.Tuple[str, ...], t.Dict[str, str]] = { 

1206 tuple(k["path"]): k for k in self._get_buffer_data(msg) 

1207 } 

1208 self.widget_buffers[comm_id].update(new_buffers) 

1209 # There are cases where we need to mimic a frontend, to get similar behaviour as 

1210 # when using the Output widget from Jupyter lab/notebook 

1211 if msg['msg_type'] == 'comm_open': 

1212 target = msg['content'].get('target_name') 

1213 handler = self.comm_open_handlers.get(target) 

1214 if handler: 

1215 comm_id = msg['content']['comm_id'] 

1216 comm_object = handler(msg) 

1217 if comm_object: 

1218 self.comm_objects[comm_id] = comm_object 

1219 else: 

1220 self.log.warning(f'No handler found for comm target {target!r}') 

1221 elif msg['msg_type'] == 'comm_msg': 

1222 content = msg['content'] 

1223 comm_id = msg['content']['comm_id'] 

1224 if comm_id in self.comm_objects: 

1225 self.comm_objects[comm_id].handle_msg(msg) 

1226 

1227 def _serialize_widget_state(self, state: t.Dict) -> t.Dict[str, t.Any]: 

1228 """Serialize a widget state, following format in @jupyter-widgets/schema.""" 

1229 return { 

1230 'model_name': state.get('_model_name'), 

1231 'model_module': state.get('_model_module'), 

1232 'model_module_version': state.get('_model_module_version'), 

1233 'state': state, 

1234 } 

1235 

1236 def _get_buffer_data(self, msg: t.Dict) -> t.List[t.Dict[str, str]]: 

1237 encoded_buffers = [] 

1238 paths = msg['content']['data']['buffer_paths'] 

1239 buffers = msg['buffers'] 

1240 for path, buffer in zip(paths, buffers): 

1241 encoded_buffers.append( 

1242 { 

1243 'data': base64.b64encode(buffer).decode('utf-8'), 

1244 'encoding': 'base64', 

1245 'path': path, 

1246 } 

1247 ) 

1248 return encoded_buffers 

1249 

1250 def register_output_hook(self, msg_id: str, hook: OutputWidget) -> None: 

1251 """Registers an override object that handles output/clear_output instead. 

1252 

1253 Multiple hooks can be registered, where the last one will be used (stack based) 

1254 """ 

1255 # mimics 

1256 # https://jupyterlab.github.io/jupyterlab/services/interfaces/kernel.ikernelconnection.html#registermessagehook 

1257 self.output_hook_stack[msg_id].append(hook) 

1258 

1259 def remove_output_hook(self, msg_id: str, hook: OutputWidget) -> None: 

1260 """Unregisters an override object that handles output/clear_output instead""" 

1261 # mimics 

1262 # https://jupyterlab.github.io/jupyterlab/services/interfaces/kernel.ikernelconnection.html#removemessagehook 

1263 removed_hook = self.output_hook_stack[msg_id].pop() 

1264 assert removed_hook == hook 

1265 

1266 def on_comm_open_jupyter_widget(self, msg: t.Dict) -> t.Optional[t.Any]: 

1267 """Handle a jupyter widget comm open.""" 

1268 content = msg['content'] 

1269 data = content['data'] 

1270 state = data['state'] 

1271 comm_id = msg['content']['comm_id'] 

1272 module = self.widget_registry.get(state['_model_module']) 

1273 if module: 

1274 widget_class = module.get(state['_model_name']) 

1275 if widget_class: 

1276 return widget_class(comm_id, state, self.kc, self) 

1277 return None 

1278 

1279 

1280def execute( 

1281 nb: NotebookNode, 

1282 cwd: t.Optional[str] = None, 

1283 km: t.Optional[KernelManager] = None, 

1284 **kwargs: t.Any, 

1285) -> NotebookNode: 

1286 """Execute a notebook's code, updating outputs within the notebook object. 

1287 

1288 This is a convenient wrapper around NotebookClient. It returns the 

1289 modified notebook object. 

1290 

1291 Parameters 

1292 ---------- 

1293 nb : NotebookNode 

1294 The notebook object to be executed 

1295 cwd : str, optional 

1296 If supplied, the kernel will run in this directory 

1297 km : AsyncKernelManager, optional 

1298 If supplied, the specified kernel manager will be used for code execution. 

1299 kwargs : 

1300 Any other options for NotebookClient, e.g. timeout, kernel_name 

1301 """ 

1302 resources = {} 

1303 if cwd is not None: 

1304 resources['metadata'] = {'path': cwd} 

1305 return NotebookClient(nb=nb, resources=resources, km=km, **kwargs).execute()