Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/manager.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

381 statements  

1"""Base class to manage a running kernel""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import asyncio 

6import functools 

7import os 

8import re 

9import signal 

10import sys 

11import typing as t 

12import uuid 

13import warnings 

14from asyncio.futures import Future 

15from concurrent.futures import Future as CFuture 

16from contextlib import contextmanager 

17from enum import Enum 

18 

19import zmq 

20from jupyter_core.utils import run_sync 

21from traitlets import ( 

22 Any, 

23 Bool, 

24 Dict, 

25 DottedObjectName, 

26 Float, 

27 Instance, 

28 Type, 

29 Unicode, 

30 default, 

31 observe, 

32 observe_compat, 

33) 

34from traitlets.utils.importstring import import_item 

35 

36from . import kernelspec 

37from .asynchronous import AsyncKernelClient 

38from .blocking import BlockingKernelClient 

39from .client import KernelClient 

40from .connect import ConnectionFileMixin 

41from .managerabc import KernelManagerABC 

42from .provisioning import KernelProvisionerBase 

43from .provisioning import KernelProvisionerFactory as KPF # noqa 

44 

45 

46class _ShutdownStatus(Enum): 

47 """ 

48 

49 This is so far used only for testing in order to track the internal state of 

50 the shutdown logic, and verifying which path is taken for which 

51 missbehavior. 

52 

53 """ 

54 

55 Unset = None 

56 ShutdownRequest = "ShutdownRequest" 

57 SigtermRequest = "SigtermRequest" 

58 SigkillRequest = "SigkillRequest" 

59 

60 

61F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

62 

63 

64def _get_future() -> t.Union[Future, CFuture]: 

65 """Get an appropriate Future object""" 

66 try: 

67 asyncio.get_running_loop() 

68 return Future() 

69 except RuntimeError: 

70 # No event loop running, use concurrent future 

71 return CFuture() 

72 

73 

74def in_pending_state(method: F) -> F: 

75 """Sets the kernel to a pending state by 

76 creating a fresh Future for the KernelManager's `ready` 

77 attribute. Once the method is finished, set the Future's results. 

78 """ 

79 

80 @t.no_type_check 

81 @functools.wraps(method) 

82 async def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 

83 """Create a future for the decorated method.""" 

84 if self._attempted_start or not self._ready: 

85 self._ready = _get_future() 

86 try: 

87 # call wrapped method, await, and set the result or exception. 

88 out = await method(self, *args, **kwargs) 

89 # Add a small sleep to ensure tests can capture the state before done 

90 await asyncio.sleep(0.01) 

91 if self.owns_kernel: 

92 self._ready.set_result(None) 

93 return out 

94 except Exception as e: 

95 self._ready.set_exception(e) 

96 self.log.exception(self._ready.exception()) 

97 raise e 

98 

99 return t.cast(F, wrapper) 

100 

101 

102class KernelManager(ConnectionFileMixin): 

103 """Manages a single kernel in a subprocess on this host. 

104 

105 This version starts kernels with Popen. 

106 """ 

107 

108 _ready: t.Union[Future, CFuture] | None 

109 

110 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 

111 """Initialize a kernel manager.""" 

112 if args: 

113 warnings.warn( 

114 "Passing positional only arguments to " 

115 "`KernelManager.__init__` is deprecated since jupyter_client" 

116 " 8.6, and will become an error on future versions. Positional " 

117 " arguments have been ignored since jupyter_client 7.0", 

118 DeprecationWarning, 

119 stacklevel=2, 

120 ) 

121 self._owns_kernel = kwargs.pop("owns_kernel", True) 

122 super().__init__(**kwargs) 

123 self._shutdown_status = _ShutdownStatus.Unset 

124 self._attempted_start = False 

125 self._ready = None 

126 

127 _created_context: Bool = Bool(False) 

128 

129 # The PyZMQ Context to use for communication with the kernel. 

130 context: Instance = Instance(zmq.Context) 

131 

132 @default("context") 

133 def _context_default(self) -> zmq.Context: 

134 self._created_context = True 

135 return zmq.Context() 

136 

137 # the class to create with our `client` method 

138 client_class: DottedObjectName = DottedObjectName( 

139 "jupyter_client.blocking.BlockingKernelClient", config=True 

140 ) 

141 client_factory: Type = Type(klass=KernelClient, config=True) 

142 

143 @default("client_factory") 

144 def _client_factory_default(self) -> Type: 

145 return import_item(self.client_class) 

146 

147 @observe("client_class") 

148 def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None: 

149 self.client_factory = import_item(str(change["new"])) 

150 

151 kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True) 

152 

153 # The kernel provisioner with which this KernelManager is communicating. 

154 # This will generally be a LocalProvisioner instance unless the kernelspec 

155 # indicates otherwise. 

156 provisioner: KernelProvisionerBase | None = None 

157 

158 kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager) 

159 

160 @default("kernel_spec_manager") 

161 def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager: 

162 return kernelspec.KernelSpecManager(data_dir=self.data_dir) 

163 

164 @observe("kernel_spec_manager") 

165 @observe_compat 

166 def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None: 

167 self._kernel_spec = None 

168 

169 shutdown_wait_time: Float = Float( 

170 5.0, 

171 config=True, 

172 help="Time to wait for a kernel to terminate before killing it, " 

173 "in seconds. When a shutdown request is initiated, the kernel " 

174 "will be immediately sent an interrupt (SIGINT), followed" 

175 "by a shutdown_request message, after 1/2 of `shutdown_wait_time`" 

176 "it will be sent a terminate (SIGTERM) request, and finally at " 

177 "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate " 

178 "and kill may be equivalent on windows. Note that this value can be" 

179 "overridden by the in-use kernel provisioner since shutdown times may" 

180 "vary by provisioned environment.", 

181 ) 

182 

183 kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME) 

184 

185 @observe("kernel_name") 

186 def _kernel_name_changed(self, change: t.Dict[str, str]) -> None: 

187 self._kernel_spec = None 

188 if change["new"] == "python": 

189 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME 

190 

191 _kernel_spec: kernelspec.KernelSpec | None = None 

192 

193 @property 

194 def kernel_spec(self) -> kernelspec.KernelSpec | None: 

195 if self._kernel_spec is None and self.kernel_name != "": 

196 self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name) 

197 return self._kernel_spec 

198 

199 cache_ports: Bool = Bool( 

200 False, 

201 config=True, 

202 help="True if the MultiKernelManager should cache ports for this KernelManager instance", 

203 ) 

204 

205 @default("cache_ports") 

206 def _default_cache_ports(self) -> bool: 

207 return self.transport == "tcp" 

208 

209 @property 

210 def ready(self) -> t.Union[CFuture, Future]: 

211 """A future that resolves when the kernel process has started for the first time""" 

212 if not self._ready: 

213 self._ready = _get_future() 

214 return self._ready 

215 

216 @property 

217 def ipykernel(self) -> bool: 

218 return self.kernel_name in {"python", "python2", "python3"} 

219 

220 # Protected traits 

221 _launch_args: t.Optional["Dict[str, Any]"] = Dict(allow_none=True) 

222 _control_socket: Any = Any() 

223 

224 _restarter: Any = Any() 

225 

226 autorestart: Bool = Bool( 

227 True, config=True, help="""Should we autorestart the kernel if it dies.""" 

228 ) 

229 

230 shutting_down: bool = False 

231 

232 def __del__(self) -> None: 

233 self._close_control_socket() 

234 self.cleanup_connection_file() 

235 

236 # -------------------------------------------------------------------------- 

237 # Kernel restarter 

238 # -------------------------------------------------------------------------- 

239 

240 def start_restarter(self) -> None: 

241 """Start the kernel restarter.""" 

242 pass 

243 

244 def stop_restarter(self) -> None: 

245 """Stop the kernel restarter.""" 

246 pass 

247 

248 def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None: 

249 """Register a callback to be called when a kernel is restarted""" 

250 if self._restarter is None: 

251 return 

252 self._restarter.add_callback(callback, event) 

253 

254 def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None: 

255 """Unregister a callback to be called when a kernel is restarted""" 

256 if self._restarter is None: 

257 return 

258 self._restarter.remove_callback(callback, event) 

259 

260 # -------------------------------------------------------------------------- 

261 # create a Client connected to our Kernel 

262 # -------------------------------------------------------------------------- 

263 

264 def client(self, **kwargs: t.Any) -> BlockingKernelClient: 

265 """Create a client configured to connect to our kernel""" 

266 kw: dict = {} 

267 kw.update(self.get_connection_info(session=True)) 

268 kw.update( 

269 { 

270 "connection_file": self.connection_file, 

271 "parent": self, 

272 } 

273 ) 

274 

275 # add kwargs last, for manual overrides 

276 kw.update(kwargs) 

277 return self.client_factory(**kw) 

278 

279 # -------------------------------------------------------------------------- 

280 # Kernel management 

281 # -------------------------------------------------------------------------- 

282 

283 def update_env(self, *, env: t.Dict[str, str]) -> None: 

284 """ 

285 Allow to update the environment of a kernel manager. 

286 

287 This will take effect only after kernel restart when the new env is 

288 passed to the new kernel. 

289 

290 This is useful as some of the information of the current kernel reflect 

291 the state of the session that started it, and those session information 

292 (like the attach file path, or name), are mutable. 

293 

294 .. version-added: 8.5 

295 """ 

296 # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict 

297 if ( 

298 isinstance(self._launch_args, dict) 

299 and "env" in self._launch_args 

300 and isinstance(self._launch_args["env"], dict) # type: ignore [unreachable] 

301 ): 

302 self._launch_args["env"].update(env) # type: ignore [unreachable] 

303 

304 def format_kernel_cmd(self, extra_arguments: t.List[str] | None = None) -> t.List[str]: 

305 """Replace templated args (e.g. {connection_file})""" 

306 extra_arguments = extra_arguments or [] 

307 assert self.kernel_spec is not None 

308 cmd = self.kernel_spec.argv + extra_arguments 

309 

310 if cmd and cmd[0] in { 

311 "python", 

312 "python%i" % sys.version_info[0], 

313 "python%i.%i" % sys.version_info[:2], 

314 }: 

315 # executable is 'python' or 'python3', use sys.executable. 

316 # These will typically be the same, 

317 # but if the current process is in an env 

318 # and has been launched by abspath without 

319 # activating the env, python on PATH may not be sys.executable, 

320 # but it should be. 

321 cmd[0] = sys.executable 

322 

323 # Make sure to use the realpath for the connection_file 

324 # On windows, when running with the store python, the connection_file path 

325 # is not usable by non python kernels because the path is being rerouted when 

326 # inside of a store app. 

327 # See this bug here: https://bugs.python.org/issue41196 

328 ns: t.Dict[str, t.Any] = { 

329 "connection_file": os.path.realpath(self.connection_file), 

330 "prefix": sys.prefix, 

331 } 

332 

333 if self.kernel_spec: # type:ignore[truthy-bool] 

334 ns["resource_dir"] = self.kernel_spec.resource_dir 

335 assert isinstance(self._launch_args, dict) 

336 

337 ns.update(self._launch_args) 

338 

339 pat = re.compile(r"\{([A-Za-z0-9_]+)\}") 

340 

341 def from_ns(match: t.Any) -> t.Any: 

342 """Get the key out of ns if it's there, otherwise no change.""" 

343 return ns.get(match.group(1), match.group()) 

344 

345 return [pat.sub(from_ns, arg) for arg in cmd] 

346 

347 async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None: 

348 """actually launch the kernel 

349 

350 override in a subclass to launch kernel subprocesses differently 

351 Note that provisioners can now be used to customize kernel environments 

352 and 

353 """ 

354 assert self.provisioner is not None 

355 connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw) 

356 assert self.provisioner.has_process 

357 # Provisioner provides the connection information. Load into kernel manager 

358 # and write the connection file, if not already done. 

359 self._reconcile_connection_info(connection_info) 

360 

361 _launch_kernel = run_sync(_async_launch_kernel) 

362 

363 # Control socket used for polite kernel shutdown 

364 

365 def _connect_control_socket(self) -> None: 

366 if self._control_socket is None: 

367 self._control_socket = self._create_connected_socket("control") 

368 self._control_socket.linger = 100 

369 

370 def _close_control_socket(self) -> None: 

371 if self._control_socket is None: 

372 return 

373 self._control_socket.close() 

374 self._control_socket = None 

375 

376 async def _async_pre_start_kernel( 

377 self, **kw: t.Any 

378 ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]: 

379 """Prepares a kernel for startup in a separate process. 

380 

381 If random ports (port=0) are being used, this method must be called 

382 before the channels are created. 

383 

384 Parameters 

385 ---------- 

386 `**kw` : optional 

387 keyword arguments that are passed down to build the kernel_cmd 

388 and launching the kernel (e.g. Popen kwargs). 

389 """ 

390 self.shutting_down = False 

391 self.kernel_id = self.kernel_id or kw.pop("kernel_id", str(uuid.uuid4())) 

392 # save kwargs for use in restart 

393 # assigning Traitlets Dicts to Dict make mypy unhappy but is ok 

394 self._launch_args = kw.copy() 

395 if self.provisioner is None: # will not be None on restarts 

396 self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance( 

397 self.kernel_id, 

398 self.kernel_spec, 

399 parent=self, 

400 ) 

401 kw = await self.provisioner.pre_launch(**kw) 

402 kernel_cmd = kw.pop("cmd") 

403 return kernel_cmd, kw 

404 

405 pre_start_kernel = run_sync(_async_pre_start_kernel) 

406 

407 async def _async_post_start_kernel(self, **kw: t.Any) -> None: 

408 """Performs any post startup tasks relative to the kernel. 

409 

410 Parameters 

411 ---------- 

412 `**kw` : optional 

413 keyword arguments that were used in the kernel process's launch. 

414 """ 

415 self.start_restarter() 

416 self._connect_control_socket() 

417 assert self.provisioner is not None 

418 await self.provisioner.post_launch(**kw) 

419 

420 post_start_kernel = run_sync(_async_post_start_kernel) 

421 

422 @in_pending_state 

423 async def _async_start_kernel(self, **kw: t.Any) -> None: 

424 """Starts a kernel on this host in a separate process. 

425 

426 If random ports (port=0) are being used, this method must be called 

427 before the channels are created. 

428 

429 Parameters 

430 ---------- 

431 `**kw` : optional 

432 keyword arguments that are passed down to build the kernel_cmd 

433 and launching the kernel (e.g. Popen kwargs). 

434 """ 

435 self._attempted_start = True 

436 kernel_cmd, kw = await self._async_pre_start_kernel(**kw) 

437 

438 # launch the kernel subprocess 

439 self.log.debug("Starting kernel: %s", kernel_cmd) 

440 await self._async_launch_kernel(kernel_cmd, **kw) 

441 await self._async_post_start_kernel(**kw) 

442 

443 start_kernel = run_sync(_async_start_kernel) 

444 

445 async def _async_request_shutdown(self, restart: bool = False) -> None: 

446 """Send a shutdown request via control channel""" 

447 content = {"restart": restart} 

448 msg = self.session.msg("shutdown_request", content=content) 

449 # ensure control socket is connected 

450 self._connect_control_socket() 

451 self.session.send(self._control_socket, msg) 

452 assert self.provisioner is not None 

453 await self.provisioner.shutdown_requested(restart=restart) 

454 self._shutdown_status = _ShutdownStatus.ShutdownRequest 

455 

456 request_shutdown = run_sync(_async_request_shutdown) 

457 

458 async def _async_finish_shutdown( 

459 self, 

460 waittime: float | None = None, 

461 pollinterval: float = 0.1, 

462 restart: bool = False, 

463 ) -> None: 

464 """Wait for kernel shutdown, then kill process if it doesn't shutdown. 

465 

466 This does not send shutdown requests - use :meth:`request_shutdown` 

467 first. 

468 """ 

469 if waittime is None: 

470 waittime = max(self.shutdown_wait_time, 0) 

471 if self.provisioner: # Allow provisioner to override 

472 waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime) 

473 

474 try: 

475 await asyncio.wait_for( 

476 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 

477 ) 

478 except asyncio.TimeoutError: 

479 self.log.debug("Kernel is taking too long to finish, terminating") 

480 self._shutdown_status = _ShutdownStatus.SigtermRequest 

481 await self._async_send_kernel_sigterm() 

482 

483 try: 

484 await asyncio.wait_for( 

485 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 

486 ) 

487 except asyncio.TimeoutError: 

488 self.log.debug("Kernel is taking too long to finish, killing") 

489 self._shutdown_status = _ShutdownStatus.SigkillRequest 

490 await self._async_kill_kernel(restart=restart) 

491 else: 

492 # Process is no longer alive, wait and clear 

493 if self.has_kernel: 

494 assert self.provisioner is not None 

495 await self.provisioner.wait() 

496 

497 finish_shutdown = run_sync(_async_finish_shutdown) 

498 

499 async def _async_cleanup_resources(self, restart: bool = False) -> None: 

500 """Clean up resources when the kernel is shut down""" 

501 if not restart: 

502 self.cleanup_connection_file() 

503 

504 self.cleanup_ipc_files() 

505 self._close_control_socket() 

506 self.session.parent = None 

507 

508 if self._created_context and not restart: 

509 self.context.destroy(linger=100) 

510 

511 if self.provisioner: 

512 await self.provisioner.cleanup(restart=restart) 

513 

514 cleanup_resources = run_sync(_async_cleanup_resources) 

515 

516 @in_pending_state 

517 async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: 

518 """Attempts to stop the kernel process cleanly. 

519 

520 This attempts to shutdown the kernels cleanly by: 

521 

522 1. Sending it a shutdown message over the control channel. 

523 2. If that fails, the kernel is shutdown forcibly by sending it 

524 a signal. 

525 

526 Parameters 

527 ---------- 

528 now : bool 

529 Should the kernel be forcible killed *now*. This skips the 

530 first, nice shutdown attempt. 

531 restart: bool 

532 Will this kernel be restarted after it is shutdown. When this 

533 is True, connection files will not be cleaned up. 

534 """ 

535 if not self.owns_kernel: 

536 return 

537 

538 self.shutting_down = True # Used by restarter to prevent race condition 

539 # Stop monitoring for restarting while we shutdown. 

540 self.stop_restarter() 

541 

542 if self.has_kernel: 

543 await self._async_interrupt_kernel() 

544 

545 if now: 

546 await self._async_kill_kernel() 

547 else: 

548 await self._async_request_shutdown(restart=restart) 

549 # Don't send any additional kernel kill messages immediately, to give 

550 # the kernel a chance to properly execute shutdown actions. Wait for at 

551 # most 1s, checking every 0.1s. 

552 await self._async_finish_shutdown(restart=restart) 

553 

554 await self._async_cleanup_resources(restart=restart) 

555 

556 shutdown_kernel = run_sync(_async_shutdown_kernel) 

557 

558 async def _async_restart_kernel( 

559 self, now: bool = False, newports: bool = False, **kw: t.Any 

560 ) -> None: 

561 """Restarts a kernel with the arguments that were used to launch it. 

562 

563 Parameters 

564 ---------- 

565 now : bool, optional 

566 If True, the kernel is forcefully restarted *immediately*, without 

567 having a chance to do any cleanup action. Otherwise the kernel is 

568 given 1s to clean up before a forceful restart is issued. 

569 

570 In all cases the kernel is restarted, the only difference is whether 

571 it is given a chance to perform a clean shutdown or not. 

572 

573 newports : bool, optional 

574 If the old kernel was launched with random ports, this flag decides 

575 whether the same ports and connection file will be used again. 

576 If False, the same ports and connection file are used. This is 

577 the default. If True, new random port numbers are chosen and a 

578 new connection file is written. It is still possible that the newly 

579 chosen random port numbers happen to be the same as the old ones. 

580 

581 `**kw` : optional 

582 Any options specified here will overwrite those used to launch the 

583 kernel. 

584 """ 

585 if self._launch_args is None: 

586 msg = "Cannot restart the kernel. No previous call to 'start_kernel'." 

587 raise RuntimeError(msg) 

588 

589 # Stop currently running kernel. 

590 await self._async_shutdown_kernel(now=now, restart=True) 

591 

592 if newports: 

593 self.cleanup_random_ports() 

594 

595 # Start new kernel. 

596 self._launch_args.update(kw) 

597 await self._async_start_kernel(**self._launch_args) 

598 

599 restart_kernel = run_sync(_async_restart_kernel) 

600 

601 @property 

602 def owns_kernel(self) -> bool: 

603 return self._owns_kernel 

604 

605 @property 

606 def has_kernel(self) -> bool: 

607 """Has a kernel process been started that we are actively managing.""" 

608 return self.provisioner is not None and self.provisioner.has_process 

609 

610 async def _async_send_kernel_sigterm(self, restart: bool = False) -> None: 

611 """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" 

612 if self.has_kernel: 

613 assert self.provisioner is not None 

614 await self.provisioner.terminate(restart=restart) 

615 

616 _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm) 

617 

618 async def _async_kill_kernel(self, restart: bool = False) -> None: 

619 """Kill the running kernel. 

620 

621 This is a private method, callers should use shutdown_kernel(now=True). 

622 """ 

623 if self.has_kernel: 

624 assert self.provisioner is not None 

625 await self.provisioner.kill(restart=restart) 

626 

627 # Wait until the kernel terminates. 

628 try: 

629 await asyncio.wait_for(self._async_wait(), timeout=5.0) 

630 except asyncio.TimeoutError: 

631 # Wait timed out, just log warning but continue - not much more we can do. 

632 self.log.warning("Wait for final termination of kernel timed out - continuing...") 

633 pass 

634 else: 

635 # Process is no longer alive, wait and clear 

636 if self.has_kernel: 

637 await self.provisioner.wait() 

638 

639 _kill_kernel = run_sync(_async_kill_kernel) 

640 

641 async def _async_interrupt_kernel(self) -> None: 

642 """Interrupts the kernel by sending it a signal. 

643 

644 Unlike ``signal_kernel``, this operation is well supported on all 

645 platforms. 

646 """ 

647 if not self.has_kernel and self._ready is not None: 

648 if isinstance(self._ready, CFuture): 

649 ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready)) 

650 else: 

651 ready = self._ready 

652 # Wait for a shutdown if one is in progress. 

653 if self.shutting_down: 

654 await ready 

655 # Wait for a startup. 

656 await ready 

657 

658 if self.has_kernel: 

659 assert self.kernel_spec is not None 

660 interrupt_mode = self.kernel_spec.interrupt_mode 

661 if interrupt_mode == "signal": 

662 await self._async_signal_kernel(signal.SIGINT) 

663 

664 elif interrupt_mode == "message": 

665 msg = self.session.msg("interrupt_request", content={}) 

666 self._connect_control_socket() 

667 self.session.send(self._control_socket, msg) 

668 else: 

669 msg = "Cannot interrupt kernel. No kernel is running!" 

670 raise RuntimeError(msg) 

671 

672 interrupt_kernel = run_sync(_async_interrupt_kernel) 

673 

674 async def _async_signal_kernel(self, signum: int) -> None: 

675 """Sends a signal to the process group of the kernel (this 

676 usually includes the kernel and any subprocesses spawned by 

677 the kernel). 

678 

679 Note that since only SIGTERM is supported on Windows, this function is 

680 only useful on Unix systems. 

681 """ 

682 if self.has_kernel: 

683 assert self.provisioner is not None 

684 await self.provisioner.send_signal(signum) 

685 else: 

686 msg = "Cannot signal kernel. No kernel is running!" 

687 raise RuntimeError(msg) 

688 

689 signal_kernel = run_sync(_async_signal_kernel) 

690 

691 async def _async_is_alive(self) -> bool: 

692 """Is the kernel process still running?""" 

693 if not self.owns_kernel: 

694 return True 

695 

696 if self.has_kernel: 

697 assert self.provisioner is not None 

698 ret = await self.provisioner.poll() 

699 if ret is None: 

700 return True 

701 return False 

702 

703 is_alive = run_sync(_async_is_alive) 

704 

705 async def _async_wait(self, pollinterval: float = 0.1) -> None: 

706 # Use busy loop at 100ms intervals, polling until the process is 

707 # not alive. If we find the process is no longer alive, complete 

708 # its cleanup via the blocking wait(). Callers are responsible for 

709 # issuing calls to wait() using a timeout (see _kill_kernel()). 

710 while await self._async_is_alive(): 

711 await asyncio.sleep(pollinterval) 

712 

713 

714class AsyncKernelManager(KernelManager): 

715 """An async kernel manager.""" 

716 

717 # the class to create with our `client` method 

718 client_class: DottedObjectName = DottedObjectName( 

719 "jupyter_client.asynchronous.AsyncKernelClient", config=True 

720 ) 

721 client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient", config=True) 

722 

723 # The PyZMQ Context to use for communication with the kernel. 

724 context: Instance = Instance(zmq.asyncio.Context) 

725 

726 @default("context") 

727 def _context_default(self) -> zmq.asyncio.Context: 

728 self._created_context = True 

729 return zmq.asyncio.Context() 

730 

731 def client( # type:ignore[override] 

732 self, **kwargs: t.Any 

733 ) -> AsyncKernelClient: 

734 """Get a client for the manager.""" 

735 return super().client(**kwargs) # type:ignore[return-value] 

736 

737 _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment] 

738 start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_start_kernel # type:ignore[assignment] 

739 pre_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_pre_start_kernel # type:ignore[assignment] 

740 post_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_post_start_kernel # type:ignore[assignment] 

741 request_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_request_shutdown # type:ignore[assignment] 

742 finish_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_finish_shutdown # type:ignore[assignment] 

743 cleanup_resources: t.Callable[..., t.Awaitable] = KernelManager._async_cleanup_resources # type:ignore[assignment] 

744 shutdown_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_shutdown_kernel # type:ignore[assignment] 

745 restart_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_restart_kernel # type:ignore[assignment] 

746 _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment] 

747 _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment] 

748 interrupt_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_interrupt_kernel # type:ignore[assignment] 

749 signal_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_signal_kernel # type:ignore[assignment] 

750 is_alive: t.Callable[..., t.Awaitable] = KernelManager._async_is_alive # type:ignore[assignment] 

751 

752 

753KernelManagerABC.register(KernelManager) 

754 

755 

756def start_new_kernel( 

757 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any 

758) -> t.Tuple[KernelManager, BlockingKernelClient]: 

759 """Start a new kernel, and return its Manager and Client""" 

760 km = KernelManager(kernel_name=kernel_name) 

761 km.start_kernel(**kwargs) 

762 kc = km.client() 

763 kc.start_channels() 

764 try: 

765 kc.wait_for_ready(timeout=startup_timeout) 

766 except RuntimeError: 

767 kc.stop_channels() 

768 km.shutdown_kernel() 

769 raise 

770 

771 return km, kc 

772 

773 

774async def start_new_async_kernel( 

775 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any 

776) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]: 

777 """Start a new kernel, and return its Manager and Client""" 

778 km = AsyncKernelManager(kernel_name=kernel_name) 

779 await km.start_kernel(**kwargs) 

780 kc = km.client() 

781 kc.start_channels() 

782 try: 

783 await kc.wait_for_ready(timeout=startup_timeout) 

784 except RuntimeError: 

785 kc.stop_channels() 

786 await km.shutdown_kernel() 

787 raise 

788 

789 return (km, kc) 

790 

791 

792@contextmanager 

793def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]: 

794 """Context manager to create a kernel in a subprocess. 

795 

796 The kernel is shut down when the context exits. 

797 

798 Returns 

799 ------- 

800 kernel_client: connected KernelClient instance 

801 """ 

802 km, kc = start_new_kernel(**kwargs) 

803 try: 

804 yield kc 

805 finally: 

806 kc.stop_channels() 

807 km.shutdown_kernel(now=True)