Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/manager.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

384 statements  

1"""Base class to manage a running kernel""" 

2 

3# Copyright (c) Jupyter Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5import asyncio 

6import functools 

7import os 

8import re 

9import signal 

10import sys 

11import typing as t 

12import uuid 

13import warnings 

14from asyncio.futures import Future 

15from concurrent.futures import Future as CFuture 

16from contextlib import contextmanager 

17from enum import Enum 

18 

19import zmq 

20from jupyter_core.utils import run_sync 

21from traitlets import ( 

22 Any, 

23 Bool, 

24 Dict, 

25 DottedObjectName, 

26 Float, 

27 Instance, 

28 Type, 

29 Unicode, 

30 default, 

31 observe, 

32 observe_compat, 

33) 

34from traitlets.utils.importstring import import_item 

35 

36from . import kernelspec 

37from .asynchronous import AsyncKernelClient 

38from .blocking import BlockingKernelClient 

39from .client import KernelClient 

40from .connect import ConnectionFileMixin 

41from .managerabc import KernelManagerABC 

42from .provisioning import KernelProvisionerBase 

43from .provisioning import KernelProvisionerFactory as KPF # noqa 

44 

45 

46class _ShutdownStatus(Enum): 

47 """ 

48 

49 This is so far used only for testing in order to track the internal state of 

50 the shutdown logic, and verifying which path is taken for which 

51 missbehavior. 

52 

53 """ 

54 

55 Unset = None 

56 ShutdownRequest = "ShutdownRequest" 

57 SigtermRequest = "SigtermRequest" 

58 SigkillRequest = "SigkillRequest" 

59 

60 

61F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

62 

63 

64def _get_future() -> t.Union[Future, CFuture]: 

65 """Get an appropriate Future object""" 

66 try: 

67 asyncio.get_running_loop() 

68 return Future() 

69 except RuntimeError: 

70 # No event loop running, use concurrent future 

71 return CFuture() 

72 

73 

74def in_pending_state(method: F) -> F: 

75 """Sets the kernel to a pending state by 

76 creating a fresh Future for the KernelManager's `ready` 

77 attribute. Once the method is finished, set the Future's results. 

78 """ 

79 

80 @t.no_type_check 

81 @functools.wraps(method) 

82 async def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 

83 """Create a future for the decorated method.""" 

84 if self._attempted_start or not self._ready: 

85 self._ready = _get_future() 

86 try: 

87 # call wrapped method, await, and set the result or exception. 

88 out = await method(self, *args, **kwargs) 

89 # Add a small sleep to ensure tests can capture the state before done 

90 await asyncio.sleep(0.01) 

91 if self.owns_kernel: 

92 self._ready.set_result(None) 

93 return out 

94 except Exception as e: 

95 self._ready.set_exception(e) 

96 self.log.exception(self._ready.exception()) 

97 raise e 

98 

99 return t.cast(F, wrapper) 

100 

101 

102class KernelManager(ConnectionFileMixin): 

103 """Manages a single kernel in a subprocess on this host. 

104 

105 This version starts kernels with Popen. 

106 """ 

107 

108 _ready: t.Union[Future, CFuture] | None 

109 

110 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 

111 """Initialize a kernel manager.""" 

112 if args: 

113 warnings.warn( 

114 "Passing positional only arguments to " 

115 "`KernelManager.__init__` is deprecated since jupyter_client" 

116 " 8.6, and will become an error on future versions. Positional " 

117 " arguments have been ignored since jupyter_client 7.0", 

118 DeprecationWarning, 

119 stacklevel=2, 

120 ) 

121 self._owns_kernel = kwargs.pop("owns_kernel", True) 

122 super().__init__(**kwargs) 

123 self._shutdown_status = _ShutdownStatus.Unset 

124 self._attempted_start = False 

125 self._ready = None 

126 

127 _created_context: Bool = Bool(False) 

128 

129 # The PyZMQ Context to use for communication with the kernel. 

130 context: Instance = Instance(zmq.Context) 

131 

132 @default("context") 

133 def _context_default(self) -> zmq.Context: 

134 self._created_context = True 

135 return zmq.Context() 

136 

137 # the class to create with our `client` method 

138 client_class: DottedObjectName = DottedObjectName( 

139 "jupyter_client.blocking.BlockingKernelClient", config=True 

140 ) 

141 client_factory: Type = Type(klass=KernelClient, config=True) 

142 

143 @default("client_factory") 

144 def _client_factory_default(self) -> Type: 

145 return import_item(self.client_class) 

146 

147 @observe("client_class") 

148 def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None: 

149 self.client_factory = import_item(str(change["new"])) 

150 

151 kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True) 

152 

153 # The kernel provisioner with which this KernelManager is communicating. 

154 # This will generally be a LocalProvisioner instance unless the kernelspec 

155 # indicates otherwise. 

156 provisioner: KernelProvisionerBase | None = None 

157 

158 kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager) 

159 

160 @default("kernel_spec_manager") 

161 def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager: 

162 return kernelspec.KernelSpecManager(data_dir=self.data_dir) 

163 

164 @observe("kernel_spec_manager") 

165 @observe_compat 

166 def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None: 

167 self._kernel_spec = None 

168 

169 shutdown_wait_time: Float = Float( 

170 5.0, 

171 config=True, 

172 help="Time to wait for a kernel to terminate before killing it, " 

173 "in seconds. When a shutdown request is initiated, the kernel " 

174 "will be immediately sent an interrupt (SIGINT), followed" 

175 "by a shutdown_request message, after 1/2 of `shutdown_wait_time`" 

176 "it will be sent a terminate (SIGTERM) request, and finally at " 

177 "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate " 

178 "and kill may be equivalent on windows. Note that this value can be" 

179 "overridden by the in-use kernel provisioner since shutdown times may" 

180 "vary by provisioned environment.", 

181 ) 

182 

183 kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME) 

184 

185 @observe("kernel_name") 

186 def _kernel_name_changed(self, change: t.Dict[str, str]) -> None: 

187 self._kernel_spec = None 

188 if change["new"] == "python": 

189 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME 

190 

191 _kernel_spec: kernelspec.KernelSpec | None = None 

192 

193 @property 

194 def kernel_spec(self) -> kernelspec.KernelSpec | None: 

195 if self._kernel_spec is None and self.kernel_name != "": 

196 self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name) 

197 return self._kernel_spec 

198 

199 cache_ports: Bool = Bool( 

200 False, 

201 config=True, 

202 help="True if the MultiKernelManager should cache ports for this KernelManager instance", 

203 ) 

204 

205 @default("cache_ports") 

206 def _default_cache_ports(self) -> bool: 

207 return self.transport == "tcp" 

208 

209 @property 

210 def ready(self) -> t.Union[CFuture, Future]: 

211 """A future that resolves when the kernel process has started for the first time""" 

212 if not self._ready: 

213 self._ready = _get_future() 

214 return self._ready 

215 

216 @property 

217 def ipykernel(self) -> bool: 

218 return self.kernel_name in {"python", "python2", "python3"} 

219 

220 # Protected traits 

221 _launch_args: t.Optional["Dict[str, Any]"] = Dict(allow_none=True) 

222 _control_socket: Any = Any() 

223 

224 _restarter: Any = Any() 

225 

226 autorestart: Bool = Bool( 

227 True, config=True, help="""Should we autorestart the kernel if it dies.""" 

228 ) 

229 

230 shutting_down: bool = False 

231 

232 def __del__(self) -> None: 

233 self._close_control_socket() 

234 self.cleanup_connection_file() 

235 

236 # -------------------------------------------------------------------------- 

237 # Kernel restarter 

238 # -------------------------------------------------------------------------- 

239 

240 def start_restarter(self) -> None: 

241 """Start the kernel restarter.""" 

242 pass 

243 

244 def stop_restarter(self) -> None: 

245 """Stop the kernel restarter.""" 

246 pass 

247 

248 def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None: 

249 """Register a callback to be called when a kernel is restarted""" 

250 if self._restarter is None: 

251 return 

252 self._restarter.add_callback(callback, event) 

253 

254 def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None: 

255 """Unregister a callback to be called when a kernel is restarted""" 

256 if self._restarter is None: 

257 return 

258 self._restarter.remove_callback(callback, event) 

259 

260 # -------------------------------------------------------------------------- 

261 # create a Client connected to our Kernel 

262 # -------------------------------------------------------------------------- 

263 

264 def client(self, **kwargs: t.Any) -> BlockingKernelClient: 

265 """Create a client configured to connect to our kernel""" 

266 kw: dict = {} 

267 kw.update(self.get_connection_info(session=True)) 

268 kw.update( 

269 { 

270 "connection_file": self.connection_file, 

271 "parent": self, 

272 } 

273 ) 

274 

275 # add kwargs last, for manual overrides 

276 kw.update(kwargs) 

277 return self.client_factory(**kw) 

278 

279 # -------------------------------------------------------------------------- 

280 # Kernel management 

281 # -------------------------------------------------------------------------- 

282 

283 def resolve_path(self, path: str) -> str | None: 

284 """Resolve path to given file.""" 

285 assert self.provisioner is not None 

286 return self.provisioner.resolve_path(path) 

287 

288 def update_env(self, *, env: t.Dict[str, str]) -> None: 

289 """ 

290 Allow to update the environment of a kernel manager. 

291 

292 This will take effect only after kernel restart when the new env is 

293 passed to the new kernel. 

294 

295 This is useful as some of the information of the current kernel reflect 

296 the state of the session that started it, and those session information 

297 (like the attach file path, or name), are mutable. 

298 

299 .. version-added: 8.5 

300 """ 

301 # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict 

302 if ( 

303 isinstance(self._launch_args, dict) 

304 and "env" in self._launch_args 

305 and isinstance(self._launch_args["env"], dict) # type: ignore [unreachable] 

306 ): 

307 self._launch_args["env"].update(env) # type: ignore [unreachable] 

308 

309 def format_kernel_cmd(self, extra_arguments: t.List[str] | None = None) -> t.List[str]: 

310 """Replace templated args (e.g. {connection_file})""" 

311 extra_arguments = extra_arguments or [] 

312 assert self.kernel_spec is not None 

313 cmd = self.kernel_spec.argv + extra_arguments 

314 

315 if cmd and cmd[0] in { 

316 "python", 

317 "python%i" % sys.version_info[0], 

318 "python%i.%i" % sys.version_info[:2], 

319 }: 

320 # executable is 'python' or 'python3', use sys.executable. 

321 # These will typically be the same, 

322 # but if the current process is in an env 

323 # and has been launched by abspath without 

324 # activating the env, python on PATH may not be sys.executable, 

325 # but it should be. 

326 cmd[0] = sys.executable 

327 

328 # Make sure to use the realpath for the connection_file 

329 # On windows, when running with the store python, the connection_file path 

330 # is not usable by non python kernels because the path is being rerouted when 

331 # inside of a store app. 

332 # See this bug here: https://bugs.python.org/issue41196 

333 ns: t.Dict[str, t.Any] = { 

334 "connection_file": os.path.realpath(self.connection_file), 

335 "prefix": sys.prefix, 

336 } 

337 

338 if self.kernel_spec: # type:ignore[truthy-bool] 

339 ns["resource_dir"] = self.kernel_spec.resource_dir 

340 assert isinstance(self._launch_args, dict) 

341 

342 ns.update(self._launch_args) 

343 

344 pat = re.compile(r"\{([A-Za-z0-9_]+)\}") 

345 

346 def from_ns(match: t.Any) -> t.Any: 

347 """Get the key out of ns if it's there, otherwise no change.""" 

348 return ns.get(match.group(1), match.group()) 

349 

350 return [pat.sub(from_ns, arg) for arg in cmd] 

351 

352 async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None: 

353 """actually launch the kernel 

354 

355 override in a subclass to launch kernel subprocesses differently 

356 Note that provisioners can now be used to customize kernel environments 

357 and 

358 """ 

359 assert self.provisioner is not None 

360 connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw) 

361 assert self.provisioner.has_process 

362 # Provisioner provides the connection information. Load into kernel manager 

363 # and write the connection file, if not already done. 

364 self._reconcile_connection_info(connection_info) 

365 

366 _launch_kernel = run_sync(_async_launch_kernel) 

367 

368 # Control socket used for polite kernel shutdown 

369 

370 def _connect_control_socket(self) -> None: 

371 if self._control_socket is None: 

372 self._control_socket = self._create_connected_socket("control") 

373 self._control_socket.linger = 100 

374 

375 def _close_control_socket(self) -> None: 

376 if self._control_socket is None: 

377 return 

378 self._control_socket.close() 

379 self._control_socket = None 

380 

381 async def _async_pre_start_kernel( 

382 self, **kw: t.Any 

383 ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]: 

384 """Prepares a kernel for startup in a separate process. 

385 

386 If random ports (port=0) are being used, this method must be called 

387 before the channels are created. 

388 

389 Parameters 

390 ---------- 

391 `**kw` : optional 

392 keyword arguments that are passed down to build the kernel_cmd 

393 and launching the kernel (e.g. Popen kwargs). 

394 """ 

395 self.shutting_down = False 

396 self.kernel_id = self.kernel_id or kw.pop("kernel_id", str(uuid.uuid4())) 

397 # save kwargs for use in restart 

398 # assigning Traitlets Dicts to Dict make mypy unhappy but is ok 

399 self._launch_args = kw.copy() 

400 if self.provisioner is None: # will not be None on restarts 

401 self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance( 

402 self.kernel_id, 

403 self.kernel_spec, 

404 parent=self, 

405 ) 

406 kw = await self.provisioner.pre_launch(**kw) 

407 kernel_cmd = kw.pop("cmd") 

408 return kernel_cmd, kw 

409 

410 pre_start_kernel = run_sync(_async_pre_start_kernel) 

411 

412 async def _async_post_start_kernel(self, **kw: t.Any) -> None: 

413 """Performs any post startup tasks relative to the kernel. 

414 

415 Parameters 

416 ---------- 

417 `**kw` : optional 

418 keyword arguments that were used in the kernel process's launch. 

419 """ 

420 self.start_restarter() 

421 self._connect_control_socket() 

422 assert self.provisioner is not None 

423 await self.provisioner.post_launch(**kw) 

424 

425 post_start_kernel = run_sync(_async_post_start_kernel) 

426 

427 @in_pending_state 

428 async def _async_start_kernel(self, **kw: t.Any) -> None: 

429 """Starts a kernel on this host in a separate process. 

430 

431 If random ports (port=0) are being used, this method must be called 

432 before the channels are created. 

433 

434 Parameters 

435 ---------- 

436 `**kw` : optional 

437 keyword arguments that are passed down to build the kernel_cmd 

438 and launching the kernel (e.g. Popen kwargs). 

439 """ 

440 self._attempted_start = True 

441 kernel_cmd, kw = await self._async_pre_start_kernel(**kw) 

442 

443 # launch the kernel subprocess 

444 self.log.debug("Starting kernel: %s", kernel_cmd) 

445 await self._async_launch_kernel(kernel_cmd, **kw) 

446 await self._async_post_start_kernel(**kw) 

447 

448 start_kernel = run_sync(_async_start_kernel) 

449 

450 async def _async_request_shutdown(self, restart: bool = False) -> None: 

451 """Send a shutdown request via control channel""" 

452 content = {"restart": restart} 

453 msg = self.session.msg("shutdown_request", content=content) 

454 # ensure control socket is connected 

455 self._connect_control_socket() 

456 self.session.send(self._control_socket, msg) 

457 assert self.provisioner is not None 

458 await self.provisioner.shutdown_requested(restart=restart) 

459 self._shutdown_status = _ShutdownStatus.ShutdownRequest 

460 

461 request_shutdown = run_sync(_async_request_shutdown) 

462 

463 async def _async_finish_shutdown( 

464 self, 

465 waittime: float | None = None, 

466 pollinterval: float = 0.1, 

467 restart: bool = False, 

468 ) -> None: 

469 """Wait for kernel shutdown, then kill process if it doesn't shutdown. 

470 

471 This does not send shutdown requests - use :meth:`request_shutdown` 

472 first. 

473 """ 

474 if waittime is None: 

475 waittime = max(self.shutdown_wait_time, 0) 

476 if self.provisioner: # Allow provisioner to override 

477 waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime) 

478 

479 try: 

480 await asyncio.wait_for( 

481 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 

482 ) 

483 except asyncio.TimeoutError: 

484 self.log.debug("Kernel is taking too long to finish, terminating") 

485 self._shutdown_status = _ShutdownStatus.SigtermRequest 

486 await self._async_send_kernel_sigterm() 

487 

488 try: 

489 await asyncio.wait_for( 

490 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 

491 ) 

492 except asyncio.TimeoutError: 

493 self.log.debug("Kernel is taking too long to finish, killing") 

494 self._shutdown_status = _ShutdownStatus.SigkillRequest 

495 await self._async_kill_kernel(restart=restart) 

496 else: 

497 # Process is no longer alive, wait and clear 

498 if self.has_kernel: 

499 assert self.provisioner is not None 

500 await self.provisioner.wait() 

501 

502 finish_shutdown = run_sync(_async_finish_shutdown) 

503 

504 async def _async_cleanup_resources(self, restart: bool = False) -> None: 

505 """Clean up resources when the kernel is shut down""" 

506 if not restart: 

507 self.cleanup_connection_file() 

508 

509 self.cleanup_ipc_files() 

510 self._close_control_socket() 

511 self.session.parent = None 

512 

513 if self._created_context and not restart: 

514 self.context.destroy(linger=100) 

515 

516 if self.provisioner: 

517 await self.provisioner.cleanup(restart=restart) 

518 

519 cleanup_resources = run_sync(_async_cleanup_resources) 

520 

521 @in_pending_state 

522 async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: 

523 """Attempts to stop the kernel process cleanly. 

524 

525 This attempts to shutdown the kernels cleanly by: 

526 

527 1. Sending it a shutdown message over the control channel. 

528 2. If that fails, the kernel is shutdown forcibly by sending it 

529 a signal. 

530 

531 Parameters 

532 ---------- 

533 now : bool 

534 Should the kernel be forcible killed *now*. This skips the 

535 first, nice shutdown attempt. 

536 restart: bool 

537 Will this kernel be restarted after it is shutdown. When this 

538 is True, connection files will not be cleaned up. 

539 """ 

540 if not self.owns_kernel: 

541 return 

542 

543 self.shutting_down = True # Used by restarter to prevent race condition 

544 # Stop monitoring for restarting while we shutdown. 

545 self.stop_restarter() 

546 

547 if self.has_kernel: 

548 await self._async_interrupt_kernel() 

549 

550 if now: 

551 await self._async_kill_kernel() 

552 else: 

553 await self._async_request_shutdown(restart=restart) 

554 # Don't send any additional kernel kill messages immediately, to give 

555 # the kernel a chance to properly execute shutdown actions. Wait for at 

556 # most 1s, checking every 0.1s. 

557 await self._async_finish_shutdown(restart=restart) 

558 

559 await self._async_cleanup_resources(restart=restart) 

560 

561 shutdown_kernel = run_sync(_async_shutdown_kernel) 

562 

563 async def _async_restart_kernel( 

564 self, now: bool = False, newports: bool = False, **kw: t.Any 

565 ) -> None: 

566 """Restarts a kernel with the arguments that were used to launch it. 

567 

568 Parameters 

569 ---------- 

570 now : bool, optional 

571 If True, the kernel is forcefully restarted *immediately*, without 

572 having a chance to do any cleanup action. Otherwise the kernel is 

573 given 1s to clean up before a forceful restart is issued. 

574 

575 In all cases the kernel is restarted, the only difference is whether 

576 it is given a chance to perform a clean shutdown or not. 

577 

578 newports : bool, optional 

579 If the old kernel was launched with random ports, this flag decides 

580 whether the same ports and connection file will be used again. 

581 If False, the same ports and connection file are used. This is 

582 the default. If True, new random port numbers are chosen and a 

583 new connection file is written. It is still possible that the newly 

584 chosen random port numbers happen to be the same as the old ones. 

585 

586 `**kw` : optional 

587 Any options specified here will overwrite those used to launch the 

588 kernel. 

589 """ 

590 if self._launch_args is None: 

591 msg = "Cannot restart the kernel. No previous call to 'start_kernel'." 

592 raise RuntimeError(msg) 

593 

594 # Stop currently running kernel. 

595 await self._async_shutdown_kernel(now=now, restart=True) 

596 

597 if newports: 

598 self.cleanup_random_ports() 

599 

600 # Start new kernel. 

601 self._launch_args.update(kw) 

602 await self._async_start_kernel(**self._launch_args) 

603 

604 restart_kernel = run_sync(_async_restart_kernel) 

605 

606 @property 

607 def owns_kernel(self) -> bool: 

608 return self._owns_kernel 

609 

610 @property 

611 def has_kernel(self) -> bool: 

612 """Has a kernel process been started that we are actively managing.""" 

613 return self.provisioner is not None and self.provisioner.has_process 

614 

615 async def _async_send_kernel_sigterm(self, restart: bool = False) -> None: 

616 """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" 

617 if self.has_kernel: 

618 assert self.provisioner is not None 

619 await self.provisioner.terminate(restart=restart) 

620 

621 _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm) 

622 

623 async def _async_kill_kernel(self, restart: bool = False) -> None: 

624 """Kill the running kernel. 

625 

626 This is a private method, callers should use shutdown_kernel(now=True). 

627 """ 

628 if self.has_kernel: 

629 assert self.provisioner is not None 

630 await self.provisioner.kill(restart=restart) 

631 

632 # Wait until the kernel terminates. 

633 try: 

634 await asyncio.wait_for(self._async_wait(), timeout=5.0) 

635 except asyncio.TimeoutError: 

636 # Wait timed out, just log warning but continue - not much more we can do. 

637 self.log.warning("Wait for final termination of kernel timed out - continuing...") 

638 pass 

639 else: 

640 # Process is no longer alive, wait and clear 

641 if self.has_kernel: 

642 await self.provisioner.wait() 

643 

644 _kill_kernel = run_sync(_async_kill_kernel) 

645 

646 async def _async_interrupt_kernel(self) -> None: 

647 """Interrupts the kernel by sending it a signal. 

648 

649 Unlike ``signal_kernel``, this operation is well supported on all 

650 platforms. 

651 """ 

652 if not self.has_kernel and self._ready is not None: 

653 if isinstance(self._ready, CFuture): 

654 ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready)) 

655 else: 

656 ready = self._ready 

657 # Wait for a shutdown if one is in progress. 

658 if self.shutting_down: 

659 await ready 

660 # Wait for a startup. 

661 await ready 

662 

663 if self.has_kernel: 

664 assert self.kernel_spec is not None 

665 interrupt_mode = self.kernel_spec.interrupt_mode 

666 if interrupt_mode == "signal": 

667 await self._async_signal_kernel(signal.SIGINT) 

668 

669 elif interrupt_mode == "message": 

670 msg = self.session.msg("interrupt_request", content={}) 

671 self._connect_control_socket() 

672 self.session.send(self._control_socket, msg) 

673 else: 

674 msg = "Cannot interrupt kernel. No kernel is running!" 

675 raise RuntimeError(msg) 

676 

677 interrupt_kernel = run_sync(_async_interrupt_kernel) 

678 

679 async def _async_signal_kernel(self, signum: int) -> None: 

680 """Sends a signal to the process group of the kernel (this 

681 usually includes the kernel and any subprocesses spawned by 

682 the kernel). 

683 

684 Note that since only SIGTERM is supported on Windows, this function is 

685 only useful on Unix systems. 

686 """ 

687 if self.has_kernel: 

688 assert self.provisioner is not None 

689 await self.provisioner.send_signal(signum) 

690 else: 

691 msg = "Cannot signal kernel. No kernel is running!" 

692 raise RuntimeError(msg) 

693 

694 signal_kernel = run_sync(_async_signal_kernel) 

695 

696 async def _async_is_alive(self) -> bool: 

697 """Is the kernel process still running?""" 

698 if not self.owns_kernel: 

699 return True 

700 

701 if self.has_kernel: 

702 assert self.provisioner is not None 

703 ret = await self.provisioner.poll() 

704 if ret is None: 

705 return True 

706 return False 

707 

708 is_alive = run_sync(_async_is_alive) 

709 

710 async def _async_wait(self, pollinterval: float = 0.1) -> None: 

711 # Use busy loop at 100ms intervals, polling until the process is 

712 # not alive. If we find the process is no longer alive, complete 

713 # its cleanup via the blocking wait(). Callers are responsible for 

714 # issuing calls to wait() using a timeout (see _kill_kernel()). 

715 while await self._async_is_alive(): 

716 await asyncio.sleep(pollinterval) 

717 

718 

719class AsyncKernelManager(KernelManager): 

720 """An async kernel manager.""" 

721 

722 # the class to create with our `client` method 

723 client_class: DottedObjectName = DottedObjectName( 

724 "jupyter_client.asynchronous.AsyncKernelClient", config=True 

725 ) 

726 client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient", config=True) 

727 

728 # The PyZMQ Context to use for communication with the kernel. 

729 context: Instance = Instance(zmq.asyncio.Context) 

730 

731 @default("context") 

732 def _context_default(self) -> zmq.asyncio.Context: 

733 self._created_context = True 

734 return zmq.asyncio.Context() 

735 

736 def client( # type:ignore[override] 

737 self, **kwargs: t.Any 

738 ) -> AsyncKernelClient: 

739 """Get a client for the manager.""" 

740 return super().client(**kwargs) # type:ignore[return-value] 

741 

742 _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment] 

743 start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_start_kernel # type:ignore[assignment] 

744 pre_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_pre_start_kernel # type:ignore[assignment] 

745 post_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_post_start_kernel # type:ignore[assignment] 

746 request_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_request_shutdown # type:ignore[assignment] 

747 finish_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_finish_shutdown # type:ignore[assignment] 

748 cleanup_resources: t.Callable[..., t.Awaitable] = KernelManager._async_cleanup_resources # type:ignore[assignment] 

749 shutdown_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_shutdown_kernel # type:ignore[assignment] 

750 restart_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_restart_kernel # type:ignore[assignment] 

751 _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment] 

752 _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment] 

753 interrupt_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_interrupt_kernel # type:ignore[assignment] 

754 signal_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_signal_kernel # type:ignore[assignment] 

755 is_alive: t.Callable[..., t.Awaitable] = KernelManager._async_is_alive # type:ignore[assignment] 

756 

757 

758KernelManagerABC.register(KernelManager) 

759 

760 

761def start_new_kernel( 

762 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any 

763) -> t.Tuple[KernelManager, BlockingKernelClient]: 

764 """Start a new kernel, and return its Manager and Client""" 

765 km = KernelManager(kernel_name=kernel_name) 

766 km.start_kernel(**kwargs) 

767 kc = km.client() 

768 kc.start_channels() 

769 try: 

770 kc.wait_for_ready(timeout=startup_timeout) 

771 except RuntimeError: 

772 kc.stop_channels() 

773 km.shutdown_kernel() 

774 raise 

775 

776 return km, kc 

777 

778 

779async def start_new_async_kernel( 

780 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any 

781) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]: 

782 """Start a new kernel, and return its Manager and Client""" 

783 km = AsyncKernelManager(kernel_name=kernel_name) 

784 await km.start_kernel(**kwargs) 

785 kc = km.client() 

786 kc.start_channels() 

787 try: 

788 await kc.wait_for_ready(timeout=startup_timeout) 

789 except RuntimeError: 

790 kc.stop_channels() 

791 await km.shutdown_kernel() 

792 raise 

793 

794 return (km, kc) 

795 

796 

797@contextmanager 

798def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]: 

799 """Context manager to create a kernel in a subprocess. 

800 

801 The kernel is shut down when the context exits. 

802 

803 Returns 

804 ------- 

805 kernel_client: connected KernelClient instance 

806 """ 

807 km, kc = start_new_kernel(**kwargs) 

808 try: 

809 yield kc 

810 finally: 

811 kc.stop_channels() 

812 km.shutdown_kernel(now=True)