Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/manager.py: 40%

364 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""Base class to manage a running kernel""" 

2# Copyright (c) Jupyter Development Team. 

3# Distributed under the terms of the Modified BSD License. 

4import asyncio 

5import functools 

6import os 

7import re 

8import signal 

9import sys 

10import typing as t 

11import uuid 

12from asyncio.futures import Future 

13from concurrent.futures import Future as CFuture 

14from contextlib import contextmanager 

15from enum import Enum 

16 

17import zmq 

18from jupyter_core.utils import run_sync 

19from traitlets import ( 

20 Any, 

21 Bool, 

22 DottedObjectName, 

23 Float, 

24 Instance, 

25 Type, 

26 Unicode, 

27 default, 

28 observe, 

29 observe_compat, 

30) 

31from traitlets.utils.importstring import import_item 

32 

33from . import kernelspec 

34from .asynchronous import AsyncKernelClient 

35from .blocking import BlockingKernelClient 

36from .client import KernelClient 

37from .connect import ConnectionFileMixin 

38from .managerabc import KernelManagerABC 

39from .provisioning import KernelProvisionerBase 

40from .provisioning import KernelProvisionerFactory as KPF # noqa 

41 

42 

43class _ShutdownStatus(Enum): 

44 """ 

45 

46 This is so far used only for testing in order to track the internal state of 

47 the shutdown logic, and verifying which path is taken for which 

48 missbehavior. 

49 

50 """ 

51 

52 Unset = None 

53 ShutdownRequest = "ShutdownRequest" 

54 SigtermRequest = "SigtermRequest" 

55 SigkillRequest = "SigkillRequest" 

56 

57 

58F = t.TypeVar('F', bound=t.Callable[..., t.Any]) 

59 

60 

61def _get_future() -> t.Union[Future, CFuture]: 

62 """Get an appropriate Future object""" 

63 try: 

64 asyncio.get_running_loop() 

65 return Future() 

66 except RuntimeError: 

67 # No event loop running, use concurrent future 

68 return CFuture() 

69 

70 

71def in_pending_state(method: F) -> F: 

72 """Sets the kernel to a pending state by 

73 creating a fresh Future for the KernelManager's `ready` 

74 attribute. Once the method is finished, set the Future's results. 

75 """ 

76 

77 @t.no_type_check 

78 @functools.wraps(method) 

79 async def wrapper(self, *args, **kwargs): 

80 """Create a future for the decorated method.""" 

81 if self._attempted_start or not self._ready: 

82 self._ready = _get_future() 

83 try: 

84 # call wrapped method, await, and set the result or exception. 

85 out = await method(self, *args, **kwargs) 

86 # Add a small sleep to ensure tests can capture the state before done 

87 await asyncio.sleep(0.01) 

88 self._ready.set_result(None) 

89 return out 

90 except Exception as e: 

91 self._ready.set_exception(e) 

92 self.log.exception(self._ready.exception()) 

93 raise e 

94 

95 return t.cast(F, wrapper) 

96 

97 

98class KernelManager(ConnectionFileMixin): 

99 """Manages a single kernel in a subprocess on this host. 

100 

101 This version starts kernels with Popen. 

102 """ 

103 

104 _ready: t.Optional[t.Union[Future, CFuture]] 

105 

106 def __init__(self, *args, **kwargs): 

107 """Initialize a kernel manager.""" 

108 super().__init__(**kwargs) 

109 self._shutdown_status = _ShutdownStatus.Unset 

110 self._attempted_start = False 

111 self._ready = None 

112 

113 _created_context: Bool = Bool(False) 

114 

115 # The PyZMQ Context to use for communication with the kernel. 

116 context: Instance = Instance(zmq.Context) 

117 

118 @default("context") # type:ignore[misc] 

119 def _context_default(self) -> zmq.Context: 

120 self._created_context = True 

121 return zmq.Context() 

122 

123 # the class to create with our `client` method 

124 client_class: DottedObjectName = DottedObjectName( 

125 "jupyter_client.blocking.BlockingKernelClient" 

126 ) 

127 client_factory: Type = Type(klass=KernelClient) 

128 

129 @default("client_factory") # type:ignore[misc] 

130 def _client_factory_default(self) -> Type: 

131 return import_item(self.client_class) 

132 

133 @observe("client_class") # type:ignore[misc] 

134 def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None: 

135 self.client_factory = import_item(str(change["new"])) 

136 

137 kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True) 

138 

139 # The kernel provisioner with which this KernelManager is communicating. 

140 # This will generally be a LocalProvisioner instance unless the kernelspec 

141 # indicates otherwise. 

142 provisioner: t.Optional[KernelProvisionerBase] = None 

143 

144 kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager) 

145 

146 @default("kernel_spec_manager") # type:ignore[misc] 

147 def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager: 

148 return kernelspec.KernelSpecManager(data_dir=self.data_dir) 

149 

150 @observe("kernel_spec_manager") # type:ignore[misc] 

151 @observe_compat # type:ignore[misc] 

152 def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None: 

153 self._kernel_spec = None 

154 

155 shutdown_wait_time: Float = Float( 

156 5.0, 

157 config=True, 

158 help="Time to wait for a kernel to terminate before killing it, " 

159 "in seconds. When a shutdown request is initiated, the kernel " 

160 "will be immediately sent an interrupt (SIGINT), followed" 

161 "by a shutdown_request message, after 1/2 of `shutdown_wait_time`" 

162 "it will be sent a terminate (SIGTERM) request, and finally at " 

163 "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate " 

164 "and kill may be equivalent on windows. Note that this value can be" 

165 "overridden by the in-use kernel provisioner since shutdown times may" 

166 "vary by provisioned environment.", 

167 ) 

168 

169 kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME) 

170 

171 @observe("kernel_name") # type:ignore[misc] 

172 def _kernel_name_changed(self, change: t.Dict[str, str]) -> None: 

173 self._kernel_spec = None 

174 if change["new"] == "python": 

175 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME 

176 

177 _kernel_spec: t.Optional[kernelspec.KernelSpec] = None 

178 

179 @property 

180 def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]: 

181 if self._kernel_spec is None and self.kernel_name != "": # noqa 

182 self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name) 

183 return self._kernel_spec 

184 

185 cache_ports: Bool = Bool( 

186 help="True if the MultiKernelManager should cache ports for this KernelManager instance" 

187 ) 

188 

189 @default("cache_ports") # type:ignore[misc] 

190 def _default_cache_ports(self) -> bool: 

191 return self.transport == "tcp" 

192 

193 @property 

194 def ready(self) -> t.Union[CFuture, Future]: 

195 """A future that resolves when the kernel process has started for the first time""" 

196 if not self._ready: 

197 self._ready = _get_future() 

198 return self._ready 

199 

200 @property 

201 def ipykernel(self) -> bool: 

202 return self.kernel_name in {"python", "python2", "python3"} 

203 

204 # Protected traits 

205 _launch_args: Any = Any() 

206 _control_socket: Any = Any() 

207 

208 _restarter: Any = Any() 

209 

210 autorestart: Bool = Bool( 

211 True, config=True, help="""Should we autorestart the kernel if it dies.""" 

212 ) 

213 

214 shutting_down: bool = False 

215 

216 def __del__(self) -> None: 

217 self._close_control_socket() 

218 self.cleanup_connection_file() 

219 

220 # -------------------------------------------------------------------------- 

221 # Kernel restarter 

222 # -------------------------------------------------------------------------- 

223 

224 def start_restarter(self) -> None: 

225 """Start the kernel restarter.""" 

226 pass 

227 

228 def stop_restarter(self) -> None: 

229 """Stop the kernel restarter.""" 

230 pass 

231 

232 def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None: 

233 """Register a callback to be called when a kernel is restarted""" 

234 if self._restarter is None: 

235 return 

236 self._restarter.add_callback(callback, event) 

237 

238 def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None: 

239 """Unregister a callback to be called when a kernel is restarted""" 

240 if self._restarter is None: 

241 return 

242 self._restarter.remove_callback(callback, event) 

243 

244 # -------------------------------------------------------------------------- 

245 # create a Client connected to our Kernel 

246 # -------------------------------------------------------------------------- 

247 

248 def client(self, **kwargs: t.Any) -> BlockingKernelClient: 

249 """Create a client configured to connect to our kernel""" 

250 kw: dict = {} 

251 kw.update(self.get_connection_info(session=True)) 

252 kw.update( 

253 { 

254 "connection_file": self.connection_file, 

255 "parent": self, 

256 } 

257 ) 

258 

259 # add kwargs last, for manual overrides 

260 kw.update(kwargs) 

261 return self.client_factory(**kw) 

262 

263 # -------------------------------------------------------------------------- 

264 # Kernel management 

265 # -------------------------------------------------------------------------- 

266 

267 def format_kernel_cmd(self, extra_arguments: t.Optional[t.List[str]] = None) -> t.List[str]: 

268 """Replace templated args (e.g. {connection_file})""" 

269 extra_arguments = extra_arguments or [] 

270 assert self.kernel_spec is not None 

271 cmd = self.kernel_spec.argv + extra_arguments 

272 

273 if cmd and cmd[0] in { 

274 "python", 

275 "python%i" % sys.version_info[0], 

276 "python%i.%i" % sys.version_info[:2], 

277 }: 

278 # executable is 'python' or 'python3', use sys.executable. 

279 # These will typically be the same, 

280 # but if the current process is in an env 

281 # and has been launched by abspath without 

282 # activating the env, python on PATH may not be sys.executable, 

283 # but it should be. 

284 cmd[0] = sys.executable 

285 

286 # Make sure to use the realpath for the connection_file 

287 # On windows, when running with the store python, the connection_file path 

288 # is not usable by non python kernels because the path is being rerouted when 

289 # inside of a store app. 

290 # See this bug here: https://bugs.python.org/issue41196 

291 ns = { 

292 "connection_file": os.path.realpath(self.connection_file), 

293 "prefix": sys.prefix, 

294 } 

295 

296 if self.kernel_spec: 

297 ns["resource_dir"] = self.kernel_spec.resource_dir 

298 

299 ns.update(self._launch_args) 

300 

301 pat = re.compile(r"\{([A-Za-z0-9_]+)\}") 

302 

303 def from_ns(match): 

304 """Get the key out of ns if it's there, otherwise no change.""" 

305 return ns.get(match.group(1), match.group()) 

306 

307 return [pat.sub(from_ns, arg) for arg in cmd] 

308 

309 async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None: 

310 """actually launch the kernel 

311 

312 override in a subclass to launch kernel subprocesses differently 

313 Note that provisioners can now be used to customize kernel environments 

314 and 

315 """ 

316 assert self.provisioner is not None 

317 connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw) 

318 assert self.provisioner.has_process 

319 # Provisioner provides the connection information. Load into kernel manager 

320 # and write the connection file, if not already done. 

321 self._reconcile_connection_info(connection_info) 

322 

323 _launch_kernel = run_sync(_async_launch_kernel) 

324 

325 # Control socket used for polite kernel shutdown 

326 

327 def _connect_control_socket(self) -> None: 

328 if self._control_socket is None: 

329 self._control_socket = self._create_connected_socket("control") 

330 self._control_socket.linger = 100 

331 

332 def _close_control_socket(self) -> None: 

333 if self._control_socket is None: 

334 return 

335 self._control_socket.close() 

336 self._control_socket = None 

337 

338 async def _async_pre_start_kernel( 

339 self, **kw: t.Any 

340 ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]: 

341 """Prepares a kernel for startup in a separate process. 

342 

343 If random ports (port=0) are being used, this method must be called 

344 before the channels are created. 

345 

346 Parameters 

347 ---------- 

348 `**kw` : optional 

349 keyword arguments that are passed down to build the kernel_cmd 

350 and launching the kernel (e.g. Popen kwargs). 

351 """ 

352 self.shutting_down = False 

353 self.kernel_id = self.kernel_id or kw.pop('kernel_id', str(uuid.uuid4())) 

354 # save kwargs for use in restart 

355 self._launch_args = kw.copy() 

356 if self.provisioner is None: # will not be None on restarts 

357 self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance( 

358 self.kernel_id, 

359 self.kernel_spec, 

360 parent=self, 

361 ) 

362 kw = await self.provisioner.pre_launch(**kw) 

363 kernel_cmd = kw.pop('cmd') 

364 return kernel_cmd, kw 

365 

366 pre_start_kernel = run_sync(_async_pre_start_kernel) 

367 

368 async def _async_post_start_kernel(self, **kw: t.Any) -> None: 

369 """Performs any post startup tasks relative to the kernel. 

370 

371 Parameters 

372 ---------- 

373 `**kw` : optional 

374 keyword arguments that were used in the kernel process's launch. 

375 """ 

376 self.start_restarter() 

377 self._connect_control_socket() 

378 assert self.provisioner is not None 

379 await self.provisioner.post_launch(**kw) 

380 

381 post_start_kernel = run_sync(_async_post_start_kernel) 

382 

383 @in_pending_state 

384 async def _async_start_kernel(self, **kw: t.Any) -> None: 

385 """Starts a kernel on this host in a separate process. 

386 

387 If random ports (port=0) are being used, this method must be called 

388 before the channels are created. 

389 

390 Parameters 

391 ---------- 

392 `**kw` : optional 

393 keyword arguments that are passed down to build the kernel_cmd 

394 and launching the kernel (e.g. Popen kwargs). 

395 """ 

396 self._attempted_start = True 

397 kernel_cmd, kw = await self._async_pre_start_kernel(**kw) 

398 

399 # launch the kernel subprocess 

400 self.log.debug("Starting kernel: %s", kernel_cmd) 

401 await self._async_launch_kernel(kernel_cmd, **kw) 

402 await self._async_post_start_kernel(**kw) 

403 

404 start_kernel = run_sync(_async_start_kernel) 

405 

406 async def _async_request_shutdown(self, restart: bool = False) -> None: 

407 """Send a shutdown request via control channel""" 

408 content = {"restart": restart} 

409 msg = self.session.msg("shutdown_request", content=content) 

410 # ensure control socket is connected 

411 self._connect_control_socket() 

412 self.session.send(self._control_socket, msg) 

413 assert self.provisioner is not None 

414 await self.provisioner.shutdown_requested(restart=restart) 

415 self._shutdown_status = _ShutdownStatus.ShutdownRequest 

416 

417 request_shutdown = run_sync(_async_request_shutdown) 

418 

419 async def _async_finish_shutdown( 

420 self, 

421 waittime: t.Optional[float] = None, 

422 pollinterval: float = 0.1, 

423 restart: bool = False, 

424 ) -> None: 

425 """Wait for kernel shutdown, then kill process if it doesn't shutdown. 

426 

427 This does not send shutdown requests - use :meth:`request_shutdown` 

428 first. 

429 """ 

430 if waittime is None: 

431 waittime = max(self.shutdown_wait_time, 0) 

432 if self.provisioner: # Allow provisioner to override 

433 waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime) 

434 

435 try: 

436 await asyncio.wait_for( 

437 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 

438 ) 

439 except asyncio.TimeoutError: 

440 self.log.debug("Kernel is taking too long to finish, terminating") 

441 self._shutdown_status = _ShutdownStatus.SigtermRequest 

442 await self._async_send_kernel_sigterm() 

443 

444 try: 

445 await asyncio.wait_for( 

446 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 

447 ) 

448 except asyncio.TimeoutError: 

449 self.log.debug("Kernel is taking too long to finish, killing") 

450 self._shutdown_status = _ShutdownStatus.SigkillRequest 

451 await self._async_kill_kernel(restart=restart) 

452 else: 

453 # Process is no longer alive, wait and clear 

454 if self.has_kernel: 

455 assert self.provisioner is not None 

456 await self.provisioner.wait() 

457 

458 finish_shutdown = run_sync(_async_finish_shutdown) 

459 

460 async def _async_cleanup_resources(self, restart: bool = False) -> None: 

461 """Clean up resources when the kernel is shut down""" 

462 if not restart: 

463 self.cleanup_connection_file() 

464 

465 self.cleanup_ipc_files() 

466 self._close_control_socket() 

467 self.session.parent = None 

468 

469 if self._created_context and not restart: 

470 self.context.destroy(linger=100) 

471 

472 if self.provisioner: 

473 await self.provisioner.cleanup(restart=restart) 

474 

475 cleanup_resources = run_sync(_async_cleanup_resources) 

476 

477 @in_pending_state 

478 async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: 

479 """Attempts to stop the kernel process cleanly. 

480 

481 This attempts to shutdown the kernels cleanly by: 

482 

483 1. Sending it a shutdown message over the control channel. 

484 2. If that fails, the kernel is shutdown forcibly by sending it 

485 a signal. 

486 

487 Parameters 

488 ---------- 

489 now : bool 

490 Should the kernel be forcible killed *now*. This skips the 

491 first, nice shutdown attempt. 

492 restart: bool 

493 Will this kernel be restarted after it is shutdown. When this 

494 is True, connection files will not be cleaned up. 

495 """ 

496 self.shutting_down = True # Used by restarter to prevent race condition 

497 # Stop monitoring for restarting while we shutdown. 

498 self.stop_restarter() 

499 

500 if self.has_kernel: 

501 await self._async_interrupt_kernel() 

502 

503 if now: 

504 await self._async_kill_kernel() 

505 else: 

506 await self._async_request_shutdown(restart=restart) 

507 # Don't send any additional kernel kill messages immediately, to give 

508 # the kernel a chance to properly execute shutdown actions. Wait for at 

509 # most 1s, checking every 0.1s. 

510 await self._async_finish_shutdown(restart=restart) 

511 

512 await self._async_cleanup_resources(restart=restart) 

513 

514 shutdown_kernel = run_sync(_async_shutdown_kernel) 

515 

516 async def _async_restart_kernel( 

517 self, now: bool = False, newports: bool = False, **kw: t.Any 

518 ) -> None: 

519 """Restarts a kernel with the arguments that were used to launch it. 

520 

521 Parameters 

522 ---------- 

523 now : bool, optional 

524 If True, the kernel is forcefully restarted *immediately*, without 

525 having a chance to do any cleanup action. Otherwise the kernel is 

526 given 1s to clean up before a forceful restart is issued. 

527 

528 In all cases the kernel is restarted, the only difference is whether 

529 it is given a chance to perform a clean shutdown or not. 

530 

531 newports : bool, optional 

532 If the old kernel was launched with random ports, this flag decides 

533 whether the same ports and connection file will be used again. 

534 If False, the same ports and connection file are used. This is 

535 the default. If True, new random port numbers are chosen and a 

536 new connection file is written. It is still possible that the newly 

537 chosen random port numbers happen to be the same as the old ones. 

538 

539 `**kw` : optional 

540 Any options specified here will overwrite those used to launch the 

541 kernel. 

542 """ 

543 if self._launch_args is None: 

544 msg = "Cannot restart the kernel. No previous call to 'start_kernel'." 

545 raise RuntimeError(msg) 

546 

547 # Stop currently running kernel. 

548 await self._async_shutdown_kernel(now=now, restart=True) 

549 

550 if newports: 

551 self.cleanup_random_ports() 

552 

553 # Start new kernel. 

554 self._launch_args.update(kw) 

555 await self._async_start_kernel(**self._launch_args) 

556 

557 restart_kernel = run_sync(_async_restart_kernel) 

558 

559 @property 

560 def has_kernel(self) -> bool: 

561 """Has a kernel process been started that we are actively managing.""" 

562 return self.provisioner is not None and self.provisioner.has_process 

563 

564 async def _async_send_kernel_sigterm(self, restart: bool = False) -> None: 

565 """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" 

566 if self.has_kernel: 

567 assert self.provisioner is not None 

568 await self.provisioner.terminate(restart=restart) 

569 

570 _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm) 

571 

572 async def _async_kill_kernel(self, restart: bool = False) -> None: 

573 """Kill the running kernel. 

574 

575 This is a private method, callers should use shutdown_kernel(now=True). 

576 """ 

577 if self.has_kernel: 

578 assert self.provisioner is not None 

579 await self.provisioner.kill(restart=restart) 

580 

581 # Wait until the kernel terminates. 

582 try: 

583 await asyncio.wait_for(self._async_wait(), timeout=5.0) 

584 except asyncio.TimeoutError: 

585 # Wait timed out, just log warning but continue - not much more we can do. 

586 self.log.warning("Wait for final termination of kernel timed out - continuing...") 

587 pass 

588 else: 

589 # Process is no longer alive, wait and clear 

590 if self.has_kernel: 

591 await self.provisioner.wait() 

592 

593 _kill_kernel = run_sync(_async_kill_kernel) 

594 

595 async def _async_interrupt_kernel(self) -> None: 

596 """Interrupts the kernel by sending it a signal. 

597 

598 Unlike ``signal_kernel``, this operation is well supported on all 

599 platforms. 

600 """ 

601 if not self.has_kernel and self._ready is not None: 

602 if isinstance(self._ready, CFuture): 

603 ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready)) 

604 else: 

605 ready = self._ready 

606 # Wait for a shutdown if one is in progress. 

607 if self.shutting_down: 

608 await ready 

609 # Wait for a startup. 

610 await ready 

611 

612 if self.has_kernel: 

613 assert self.kernel_spec is not None 

614 interrupt_mode = self.kernel_spec.interrupt_mode 

615 if interrupt_mode == "signal": 

616 await self._async_signal_kernel(signal.SIGINT) 

617 

618 elif interrupt_mode == "message": 

619 msg = self.session.msg("interrupt_request", content={}) 

620 self._connect_control_socket() 

621 self.session.send(self._control_socket, msg) 

622 else: 

623 msg = "Cannot interrupt kernel. No kernel is running!" 

624 raise RuntimeError(msg) 

625 

626 interrupt_kernel = run_sync(_async_interrupt_kernel) 

627 

628 async def _async_signal_kernel(self, signum: int) -> None: 

629 """Sends a signal to the process group of the kernel (this 

630 usually includes the kernel and any subprocesses spawned by 

631 the kernel). 

632 

633 Note that since only SIGTERM is supported on Windows, this function is 

634 only useful on Unix systems. 

635 """ 

636 if self.has_kernel: 

637 assert self.provisioner is not None 

638 await self.provisioner.send_signal(signum) 

639 else: 

640 msg = "Cannot signal kernel. No kernel is running!" 

641 raise RuntimeError(msg) 

642 

643 signal_kernel = run_sync(_async_signal_kernel) 

644 

645 async def _async_is_alive(self) -> bool: 

646 """Is the kernel process still running?""" 

647 if self.has_kernel: 

648 assert self.provisioner is not None 

649 ret = await self.provisioner.poll() 

650 if ret is None: 

651 return True 

652 return False 

653 

654 is_alive = run_sync(_async_is_alive) 

655 

656 async def _async_wait(self, pollinterval: float = 0.1) -> None: 

657 # Use busy loop at 100ms intervals, polling until the process is 

658 # not alive. If we find the process is no longer alive, complete 

659 # its cleanup via the blocking wait(). Callers are responsible for 

660 # issuing calls to wait() using a timeout (see _kill_kernel()). 

661 while await self._async_is_alive(): 

662 await asyncio.sleep(pollinterval) 

663 

664 

665class AsyncKernelManager(KernelManager): 

666 """An async kernel manager.""" 

667 

668 # the class to create with our `client` method 

669 client_class: DottedObjectName = DottedObjectName( 

670 "jupyter_client.asynchronous.AsyncKernelClient" 

671 ) 

672 client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient") 

673 

674 # The PyZMQ Context to use for communication with the kernel. 

675 context: Instance = Instance(zmq.asyncio.Context) 

676 

677 @default("context") # type:ignore[misc] 

678 def _context_default(self) -> zmq.asyncio.Context: 

679 self._created_context = True 

680 return zmq.asyncio.Context() 

681 

682 def client(self, **kwargs: t.Any) -> AsyncKernelClient: # type:ignore 

683 """Get a client for the manager.""" 

684 return super().client(**kwargs) # type:ignore 

685 

686 _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment] 

687 start_kernel: t.Callable[ 

688 ..., t.Awaitable 

689 ] = KernelManager._async_start_kernel # type:ignore[assignment] 

690 pre_start_kernel: t.Callable[ 

691 ..., t.Awaitable 

692 ] = KernelManager._async_pre_start_kernel # type:ignore[assignment] 

693 post_start_kernel: t.Callable[ 

694 ..., t.Awaitable 

695 ] = KernelManager._async_post_start_kernel # type:ignore[assignment] 

696 request_shutdown: t.Callable[ 

697 ..., t.Awaitable 

698 ] = KernelManager._async_request_shutdown # type:ignore[assignment] 

699 finish_shutdown: t.Callable[ 

700 ..., t.Awaitable 

701 ] = KernelManager._async_finish_shutdown # type:ignore[assignment] 

702 cleanup_resources: t.Callable[ 

703 ..., t.Awaitable 

704 ] = KernelManager._async_cleanup_resources # type:ignore[assignment] 

705 shutdown_kernel: t.Callable[ 

706 ..., t.Awaitable 

707 ] = KernelManager._async_shutdown_kernel # type:ignore[assignment] 

708 restart_kernel: t.Callable[ 

709 ..., t.Awaitable 

710 ] = KernelManager._async_restart_kernel # type:ignore[assignment] 

711 _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment] 

712 _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment] 

713 interrupt_kernel: t.Callable[ 

714 ..., t.Awaitable 

715 ] = KernelManager._async_interrupt_kernel # type:ignore[assignment] 

716 signal_kernel: t.Callable[ 

717 ..., t.Awaitable 

718 ] = KernelManager._async_signal_kernel # type:ignore[assignment] 

719 is_alive: t.Callable[ 

720 ..., t.Awaitable 

721 ] = KernelManager._async_is_alive # type:ignore[assignment] 

722 

723 

724KernelManagerABC.register(KernelManager) 

725 

726 

727def start_new_kernel( 

728 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any 

729) -> t.Tuple[KernelManager, BlockingKernelClient]: 

730 """Start a new kernel, and return its Manager and Client""" 

731 km = KernelManager(kernel_name=kernel_name) 

732 km.start_kernel(**kwargs) 

733 kc = km.client() 

734 kc.start_channels() 

735 try: 

736 kc.wait_for_ready(timeout=startup_timeout) 

737 except RuntimeError: 

738 kc.stop_channels() 

739 km.shutdown_kernel() 

740 raise 

741 

742 return km, kc 

743 

744 

745async def start_new_async_kernel( 

746 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any 

747) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]: 

748 """Start a new kernel, and return its Manager and Client""" 

749 km = AsyncKernelManager(kernel_name=kernel_name) 

750 await km.start_kernel(**kwargs) 

751 kc = km.client() 

752 kc.start_channels() 

753 try: 

754 await kc.wait_for_ready(timeout=startup_timeout) 

755 except RuntimeError: 

756 kc.stop_channels() 

757 await km.shutdown_kernel() 

758 raise 

759 

760 return (km, kc) 

761 

762 

763@contextmanager 

764def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]: 

765 """Context manager to create a kernel in a subprocess. 

766 

767 The kernel is shut down when the context exits. 

768 

769 Returns 

770 ------- 

771 kernel_client: connected KernelClient instance 

772 """ 

773 km, kc = start_new_kernel(**kwargs) 

774 try: 

775 yield kc 

776 finally: 

777 kc.stop_channels() 

778 km.shutdown_kernel(now=True)