Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jupyter_client/manager.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

381 statements  

1"""Base class to manage a running kernel""" 

2# Copyright (c) Jupyter Development Team. 

3# Distributed under the terms of the Modified BSD License. 

4import asyncio 

5import functools 

6import os 

7import re 

8import signal 

9import sys 

10import typing as t 

11import uuid 

12import warnings 

13from asyncio.futures import Future 

14from concurrent.futures import Future as CFuture 

15from contextlib import contextmanager 

16from enum import Enum 

17 

18import zmq 

19from jupyter_core.utils import run_sync 

20from traitlets import ( 

21 Any, 

22 Bool, 

23 Dict, 

24 DottedObjectName, 

25 Float, 

26 Instance, 

27 Type, 

28 Unicode, 

29 default, 

30 observe, 

31 observe_compat, 

32) 

33from traitlets.utils.importstring import import_item 

34 

35from . import kernelspec 

36from .asynchronous import AsyncKernelClient 

37from .blocking import BlockingKernelClient 

38from .client import KernelClient 

39from .connect import ConnectionFileMixin 

40from .managerabc import KernelManagerABC 

41from .provisioning import KernelProvisionerBase 

42from .provisioning import KernelProvisionerFactory as KPF # noqa 

43 

44 

45class _ShutdownStatus(Enum): 

46 """ 

47 

48 This is so far used only for testing in order to track the internal state of 

49 the shutdown logic, and verifying which path is taken for which 

50 missbehavior. 

51 

52 """ 

53 

54 Unset = None 

55 ShutdownRequest = "ShutdownRequest" 

56 SigtermRequest = "SigtermRequest" 

57 SigkillRequest = "SigkillRequest" 

58 

59 

60F = t.TypeVar("F", bound=t.Callable[..., t.Any]) 

61 

62 

63def _get_future() -> t.Union[Future, CFuture]: 

64 """Get an appropriate Future object""" 

65 try: 

66 asyncio.get_running_loop() 

67 return Future() 

68 except RuntimeError: 

69 # No event loop running, use concurrent future 

70 return CFuture() 

71 

72 

73def in_pending_state(method: F) -> F: 

74 """Sets the kernel to a pending state by 

75 creating a fresh Future for the KernelManager's `ready` 

76 attribute. Once the method is finished, set the Future's results. 

77 """ 

78 

79 @t.no_type_check 

80 @functools.wraps(method) 

81 async def wrapper(self: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any: 

82 """Create a future for the decorated method.""" 

83 if self._attempted_start or not self._ready: 

84 self._ready = _get_future() 

85 try: 

86 # call wrapped method, await, and set the result or exception. 

87 out = await method(self, *args, **kwargs) 

88 # Add a small sleep to ensure tests can capture the state before done 

89 await asyncio.sleep(0.01) 

90 if self.owns_kernel: 

91 self._ready.set_result(None) 

92 return out 

93 except Exception as e: 

94 self._ready.set_exception(e) 

95 self.log.exception(self._ready.exception()) 

96 raise e 

97 

98 return t.cast(F, wrapper) 

99 

100 

101class KernelManager(ConnectionFileMixin): 

102 """Manages a single kernel in a subprocess on this host. 

103 

104 This version starts kernels with Popen. 

105 """ 

106 

107 _ready: t.Optional[t.Union[Future, CFuture]] 

108 

109 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 

110 """Initialize a kernel manager.""" 

111 if args: 

112 warnings.warn( 

113 "Passing positional only arguments to " 

114 "`KernelManager.__init__` is deprecated since jupyter_client" 

115 " 8.6, and will become an error on future versions. Positional " 

116 " arguments have been ignored since jupyter_client 7.0", 

117 DeprecationWarning, 

118 stacklevel=2, 

119 ) 

120 self._owns_kernel = kwargs.pop("owns_kernel", True) 

121 super().__init__(**kwargs) 

122 self._shutdown_status = _ShutdownStatus.Unset 

123 self._attempted_start = False 

124 self._ready = None 

125 

126 _created_context: Bool = Bool(False) 

127 

128 # The PyZMQ Context to use for communication with the kernel. 

129 context: Instance = Instance(zmq.Context) 

130 

131 @default("context") 

132 def _context_default(self) -> zmq.Context: 

133 self._created_context = True 

134 return zmq.Context() 

135 

136 # the class to create with our `client` method 

137 client_class: DottedObjectName = DottedObjectName( 

138 "jupyter_client.blocking.BlockingKernelClient" 

139 ) 

140 client_factory: Type = Type(klass=KernelClient) 

141 

142 @default("client_factory") 

143 def _client_factory_default(self) -> Type: 

144 return import_item(self.client_class) 

145 

146 @observe("client_class") 

147 def _client_class_changed(self, change: t.Dict[str, DottedObjectName]) -> None: 

148 self.client_factory = import_item(str(change["new"])) 

149 

150 kernel_id: t.Union[str, Unicode] = Unicode(None, allow_none=True) 

151 

152 # The kernel provisioner with which this KernelManager is communicating. 

153 # This will generally be a LocalProvisioner instance unless the kernelspec 

154 # indicates otherwise. 

155 provisioner: t.Optional[KernelProvisionerBase] = None 

156 

157 kernel_spec_manager: Instance = Instance(kernelspec.KernelSpecManager) 

158 

159 @default("kernel_spec_manager") 

160 def _kernel_spec_manager_default(self) -> kernelspec.KernelSpecManager: 

161 return kernelspec.KernelSpecManager(data_dir=self.data_dir) 

162 

163 @observe("kernel_spec_manager") 

164 @observe_compat 

165 def _kernel_spec_manager_changed(self, change: t.Dict[str, Instance]) -> None: 

166 self._kernel_spec = None 

167 

168 shutdown_wait_time: Float = Float( 

169 5.0, 

170 config=True, 

171 help="Time to wait for a kernel to terminate before killing it, " 

172 "in seconds. When a shutdown request is initiated, the kernel " 

173 "will be immediately sent an interrupt (SIGINT), followed" 

174 "by a shutdown_request message, after 1/2 of `shutdown_wait_time`" 

175 "it will be sent a terminate (SIGTERM) request, and finally at " 

176 "the end of `shutdown_wait_time` will be killed (SIGKILL). terminate " 

177 "and kill may be equivalent on windows. Note that this value can be" 

178 "overridden by the in-use kernel provisioner since shutdown times may" 

179 "vary by provisioned environment.", 

180 ) 

181 

182 kernel_name: t.Union[str, Unicode] = Unicode(kernelspec.NATIVE_KERNEL_NAME) 

183 

184 @observe("kernel_name") 

185 def _kernel_name_changed(self, change: t.Dict[str, str]) -> None: 

186 self._kernel_spec = None 

187 if change["new"] == "python": 

188 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME 

189 

190 _kernel_spec: t.Optional[kernelspec.KernelSpec] = None 

191 

192 @property 

193 def kernel_spec(self) -> t.Optional[kernelspec.KernelSpec]: 

194 if self._kernel_spec is None and self.kernel_name != "": 

195 self._kernel_spec = self.kernel_spec_manager.get_kernel_spec(self.kernel_name) 

196 return self._kernel_spec 

197 

198 cache_ports: Bool = Bool( 

199 False, 

200 config=True, 

201 help="True if the MultiKernelManager should cache ports for this KernelManager instance", 

202 ) 

203 

204 @default("cache_ports") 

205 def _default_cache_ports(self) -> bool: 

206 return self.transport == "tcp" 

207 

208 @property 

209 def ready(self) -> t.Union[CFuture, Future]: 

210 """A future that resolves when the kernel process has started for the first time""" 

211 if not self._ready: 

212 self._ready = _get_future() 

213 return self._ready 

214 

215 @property 

216 def ipykernel(self) -> bool: 

217 return self.kernel_name in {"python", "python2", "python3"} 

218 

219 # Protected traits 

220 _launch_args: t.Optional["Dict[str, Any]"] = Dict(allow_none=True) 

221 _control_socket: Any = Any() 

222 

223 _restarter: Any = Any() 

224 

225 autorestart: Bool = Bool( 

226 True, config=True, help="""Should we autorestart the kernel if it dies.""" 

227 ) 

228 

229 shutting_down: bool = False 

230 

231 def __del__(self) -> None: 

232 self._close_control_socket() 

233 self.cleanup_connection_file() 

234 

235 # -------------------------------------------------------------------------- 

236 # Kernel restarter 

237 # -------------------------------------------------------------------------- 

238 

239 def start_restarter(self) -> None: 

240 """Start the kernel restarter.""" 

241 pass 

242 

243 def stop_restarter(self) -> None: 

244 """Stop the kernel restarter.""" 

245 pass 

246 

247 def add_restart_callback(self, callback: t.Callable, event: str = "restart") -> None: 

248 """Register a callback to be called when a kernel is restarted""" 

249 if self._restarter is None: 

250 return 

251 self._restarter.add_callback(callback, event) 

252 

253 def remove_restart_callback(self, callback: t.Callable, event: str = "restart") -> None: 

254 """Unregister a callback to be called when a kernel is restarted""" 

255 if self._restarter is None: 

256 return 

257 self._restarter.remove_callback(callback, event) 

258 

259 # -------------------------------------------------------------------------- 

260 # create a Client connected to our Kernel 

261 # -------------------------------------------------------------------------- 

262 

263 def client(self, **kwargs: t.Any) -> BlockingKernelClient: 

264 """Create a client configured to connect to our kernel""" 

265 kw: dict = {} 

266 kw.update(self.get_connection_info(session=True)) 

267 kw.update( 

268 { 

269 "connection_file": self.connection_file, 

270 "parent": self, 

271 } 

272 ) 

273 

274 # add kwargs last, for manual overrides 

275 kw.update(kwargs) 

276 return self.client_factory(**kw) 

277 

278 # -------------------------------------------------------------------------- 

279 # Kernel management 

280 # -------------------------------------------------------------------------- 

281 

282 def update_env(self, *, env: t.Dict[str, str]) -> None: 

283 """ 

284 Allow to update the environment of a kernel manager. 

285 

286 This will take effect only after kernel restart when the new env is 

287 passed to the new kernel. 

288 

289 This is useful as some of the information of the current kernel reflect 

290 the state of the session that started it, and those session information 

291 (like the attach file path, or name), are mutable. 

292 

293 .. version-added: 8.5 

294 """ 

295 # Mypy think this is unreachable as it see _launch_args as Dict, not t.Dict 

296 if ( 

297 isinstance(self._launch_args, dict) 

298 and "env" in self._launch_args 

299 and isinstance(self._launch_args["env"], dict) # type: ignore [unreachable] 

300 ): 

301 self._launch_args["env"].update(env) # type: ignore [unreachable] 

302 

303 def format_kernel_cmd(self, extra_arguments: t.Optional[t.List[str]] = None) -> t.List[str]: 

304 """Replace templated args (e.g. {connection_file})""" 

305 extra_arguments = extra_arguments or [] 

306 assert self.kernel_spec is not None 

307 cmd = self.kernel_spec.argv + extra_arguments 

308 

309 if cmd and cmd[0] in { 

310 "python", 

311 "python%i" % sys.version_info[0], 

312 "python%i.%i" % sys.version_info[:2], 

313 }: 

314 # executable is 'python' or 'python3', use sys.executable. 

315 # These will typically be the same, 

316 # but if the current process is in an env 

317 # and has been launched by abspath without 

318 # activating the env, python on PATH may not be sys.executable, 

319 # but it should be. 

320 cmd[0] = sys.executable 

321 

322 # Make sure to use the realpath for the connection_file 

323 # On windows, when running with the store python, the connection_file path 

324 # is not usable by non python kernels because the path is being rerouted when 

325 # inside of a store app. 

326 # See this bug here: https://bugs.python.org/issue41196 

327 ns: t.Dict[str, t.Any] = { 

328 "connection_file": os.path.realpath(self.connection_file), 

329 "prefix": sys.prefix, 

330 } 

331 

332 if self.kernel_spec: # type:ignore[truthy-bool] 

333 ns["resource_dir"] = self.kernel_spec.resource_dir 

334 assert isinstance(self._launch_args, dict) 

335 

336 ns.update(self._launch_args) 

337 

338 pat = re.compile(r"\{([A-Za-z0-9_]+)\}") 

339 

340 def from_ns(match: t.Any) -> t.Any: 

341 """Get the key out of ns if it's there, otherwise no change.""" 

342 return ns.get(match.group(1), match.group()) 

343 

344 return [pat.sub(from_ns, arg) for arg in cmd] 

345 

346 async def _async_launch_kernel(self, kernel_cmd: t.List[str], **kw: t.Any) -> None: 

347 """actually launch the kernel 

348 

349 override in a subclass to launch kernel subprocesses differently 

350 Note that provisioners can now be used to customize kernel environments 

351 and 

352 """ 

353 assert self.provisioner is not None 

354 connection_info = await self.provisioner.launch_kernel(kernel_cmd, **kw) 

355 assert self.provisioner.has_process 

356 # Provisioner provides the connection information. Load into kernel manager 

357 # and write the connection file, if not already done. 

358 self._reconcile_connection_info(connection_info) 

359 

360 _launch_kernel = run_sync(_async_launch_kernel) 

361 

362 # Control socket used for polite kernel shutdown 

363 

364 def _connect_control_socket(self) -> None: 

365 if self._control_socket is None: 

366 self._control_socket = self._create_connected_socket("control") 

367 self._control_socket.linger = 100 

368 

369 def _close_control_socket(self) -> None: 

370 if self._control_socket is None: 

371 return 

372 self._control_socket.close() 

373 self._control_socket = None 

374 

375 async def _async_pre_start_kernel( 

376 self, **kw: t.Any 

377 ) -> t.Tuple[t.List[str], t.Dict[str, t.Any]]: 

378 """Prepares a kernel for startup in a separate process. 

379 

380 If random ports (port=0) are being used, this method must be called 

381 before the channels are created. 

382 

383 Parameters 

384 ---------- 

385 `**kw` : optional 

386 keyword arguments that are passed down to build the kernel_cmd 

387 and launching the kernel (e.g. Popen kwargs). 

388 """ 

389 self.shutting_down = False 

390 self.kernel_id = self.kernel_id or kw.pop("kernel_id", str(uuid.uuid4())) 

391 # save kwargs for use in restart 

392 # assigning Traitlets Dicts to Dict make mypy unhappy but is ok 

393 self._launch_args = kw.copy() # type:ignore [assignment] 

394 if self.provisioner is None: # will not be None on restarts 

395 self.provisioner = KPF.instance(parent=self.parent).create_provisioner_instance( 

396 self.kernel_id, 

397 self.kernel_spec, 

398 parent=self, 

399 ) 

400 kw = await self.provisioner.pre_launch(**kw) 

401 kernel_cmd = kw.pop("cmd") 

402 return kernel_cmd, kw 

403 

404 pre_start_kernel = run_sync(_async_pre_start_kernel) 

405 

406 async def _async_post_start_kernel(self, **kw: t.Any) -> None: 

407 """Performs any post startup tasks relative to the kernel. 

408 

409 Parameters 

410 ---------- 

411 `**kw` : optional 

412 keyword arguments that were used in the kernel process's launch. 

413 """ 

414 self.start_restarter() 

415 self._connect_control_socket() 

416 assert self.provisioner is not None 

417 await self.provisioner.post_launch(**kw) 

418 

419 post_start_kernel = run_sync(_async_post_start_kernel) 

420 

421 @in_pending_state 

422 async def _async_start_kernel(self, **kw: t.Any) -> None: 

423 """Starts a kernel on this host in a separate process. 

424 

425 If random ports (port=0) are being used, this method must be called 

426 before the channels are created. 

427 

428 Parameters 

429 ---------- 

430 `**kw` : optional 

431 keyword arguments that are passed down to build the kernel_cmd 

432 and launching the kernel (e.g. Popen kwargs). 

433 """ 

434 self._attempted_start = True 

435 kernel_cmd, kw = await self._async_pre_start_kernel(**kw) 

436 

437 # launch the kernel subprocess 

438 self.log.debug("Starting kernel: %s", kernel_cmd) 

439 await self._async_launch_kernel(kernel_cmd, **kw) 

440 await self._async_post_start_kernel(**kw) 

441 

442 start_kernel = run_sync(_async_start_kernel) 

443 

444 async def _async_request_shutdown(self, restart: bool = False) -> None: 

445 """Send a shutdown request via control channel""" 

446 content = {"restart": restart} 

447 msg = self.session.msg("shutdown_request", content=content) 

448 # ensure control socket is connected 

449 self._connect_control_socket() 

450 self.session.send(self._control_socket, msg) 

451 assert self.provisioner is not None 

452 await self.provisioner.shutdown_requested(restart=restart) 

453 self._shutdown_status = _ShutdownStatus.ShutdownRequest 

454 

455 request_shutdown = run_sync(_async_request_shutdown) 

456 

457 async def _async_finish_shutdown( 

458 self, 

459 waittime: t.Optional[float] = None, 

460 pollinterval: float = 0.1, 

461 restart: bool = False, 

462 ) -> None: 

463 """Wait for kernel shutdown, then kill process if it doesn't shutdown. 

464 

465 This does not send shutdown requests - use :meth:`request_shutdown` 

466 first. 

467 """ 

468 if waittime is None: 

469 waittime = max(self.shutdown_wait_time, 0) 

470 if self.provisioner: # Allow provisioner to override 

471 waittime = self.provisioner.get_shutdown_wait_time(recommended=waittime) 

472 

473 try: 

474 await asyncio.wait_for( 

475 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 

476 ) 

477 except asyncio.TimeoutError: 

478 self.log.debug("Kernel is taking too long to finish, terminating") 

479 self._shutdown_status = _ShutdownStatus.SigtermRequest 

480 await self._async_send_kernel_sigterm() 

481 

482 try: 

483 await asyncio.wait_for( 

484 self._async_wait(pollinterval=pollinterval), timeout=waittime / 2 

485 ) 

486 except asyncio.TimeoutError: 

487 self.log.debug("Kernel is taking too long to finish, killing") 

488 self._shutdown_status = _ShutdownStatus.SigkillRequest 

489 await self._async_kill_kernel(restart=restart) 

490 else: 

491 # Process is no longer alive, wait and clear 

492 if self.has_kernel: 

493 assert self.provisioner is not None 

494 await self.provisioner.wait() 

495 

496 finish_shutdown = run_sync(_async_finish_shutdown) 

497 

498 async def _async_cleanup_resources(self, restart: bool = False) -> None: 

499 """Clean up resources when the kernel is shut down""" 

500 if not restart: 

501 self.cleanup_connection_file() 

502 

503 self.cleanup_ipc_files() 

504 self._close_control_socket() 

505 self.session.parent = None 

506 

507 if self._created_context and not restart: 

508 self.context.destroy(linger=100) 

509 

510 if self.provisioner: 

511 await self.provisioner.cleanup(restart=restart) 

512 

513 cleanup_resources = run_sync(_async_cleanup_resources) 

514 

515 @in_pending_state 

516 async def _async_shutdown_kernel(self, now: bool = False, restart: bool = False) -> None: 

517 """Attempts to stop the kernel process cleanly. 

518 

519 This attempts to shutdown the kernels cleanly by: 

520 

521 1. Sending it a shutdown message over the control channel. 

522 2. If that fails, the kernel is shutdown forcibly by sending it 

523 a signal. 

524 

525 Parameters 

526 ---------- 

527 now : bool 

528 Should the kernel be forcible killed *now*. This skips the 

529 first, nice shutdown attempt. 

530 restart: bool 

531 Will this kernel be restarted after it is shutdown. When this 

532 is True, connection files will not be cleaned up. 

533 """ 

534 if not self.owns_kernel: 

535 return 

536 

537 self.shutting_down = True # Used by restarter to prevent race condition 

538 # Stop monitoring for restarting while we shutdown. 

539 self.stop_restarter() 

540 

541 if self.has_kernel: 

542 await self._async_interrupt_kernel() 

543 

544 if now: 

545 await self._async_kill_kernel() 

546 else: 

547 await self._async_request_shutdown(restart=restart) 

548 # Don't send any additional kernel kill messages immediately, to give 

549 # the kernel a chance to properly execute shutdown actions. Wait for at 

550 # most 1s, checking every 0.1s. 

551 await self._async_finish_shutdown(restart=restart) 

552 

553 await self._async_cleanup_resources(restart=restart) 

554 

555 shutdown_kernel = run_sync(_async_shutdown_kernel) 

556 

557 async def _async_restart_kernel( 

558 self, now: bool = False, newports: bool = False, **kw: t.Any 

559 ) -> None: 

560 """Restarts a kernel with the arguments that were used to launch it. 

561 

562 Parameters 

563 ---------- 

564 now : bool, optional 

565 If True, the kernel is forcefully restarted *immediately*, without 

566 having a chance to do any cleanup action. Otherwise the kernel is 

567 given 1s to clean up before a forceful restart is issued. 

568 

569 In all cases the kernel is restarted, the only difference is whether 

570 it is given a chance to perform a clean shutdown or not. 

571 

572 newports : bool, optional 

573 If the old kernel was launched with random ports, this flag decides 

574 whether the same ports and connection file will be used again. 

575 If False, the same ports and connection file are used. This is 

576 the default. If True, new random port numbers are chosen and a 

577 new connection file is written. It is still possible that the newly 

578 chosen random port numbers happen to be the same as the old ones. 

579 

580 `**kw` : optional 

581 Any options specified here will overwrite those used to launch the 

582 kernel. 

583 """ 

584 if self._launch_args is None: 

585 msg = "Cannot restart the kernel. No previous call to 'start_kernel'." 

586 raise RuntimeError(msg) 

587 

588 # Stop currently running kernel. 

589 await self._async_shutdown_kernel(now=now, restart=True) 

590 

591 if newports: 

592 self.cleanup_random_ports() 

593 

594 # Start new kernel. 

595 self._launch_args.update(kw) 

596 await self._async_start_kernel(**self._launch_args) 

597 

598 restart_kernel = run_sync(_async_restart_kernel) 

599 

600 @property 

601 def owns_kernel(self) -> bool: 

602 return self._owns_kernel 

603 

604 @property 

605 def has_kernel(self) -> bool: 

606 """Has a kernel process been started that we are actively managing.""" 

607 return self.provisioner is not None and self.provisioner.has_process 

608 

609 async def _async_send_kernel_sigterm(self, restart: bool = False) -> None: 

610 """similar to _kill_kernel, but with sigterm (not sigkill), but do not block""" 

611 if self.has_kernel: 

612 assert self.provisioner is not None 

613 await self.provisioner.terminate(restart=restart) 

614 

615 _send_kernel_sigterm = run_sync(_async_send_kernel_sigterm) 

616 

617 async def _async_kill_kernel(self, restart: bool = False) -> None: 

618 """Kill the running kernel. 

619 

620 This is a private method, callers should use shutdown_kernel(now=True). 

621 """ 

622 if self.has_kernel: 

623 assert self.provisioner is not None 

624 await self.provisioner.kill(restart=restart) 

625 

626 # Wait until the kernel terminates. 

627 try: 

628 await asyncio.wait_for(self._async_wait(), timeout=5.0) 

629 except asyncio.TimeoutError: 

630 # Wait timed out, just log warning but continue - not much more we can do. 

631 self.log.warning("Wait for final termination of kernel timed out - continuing...") 

632 pass 

633 else: 

634 # Process is no longer alive, wait and clear 

635 if self.has_kernel: 

636 await self.provisioner.wait() 

637 

638 _kill_kernel = run_sync(_async_kill_kernel) 

639 

640 async def _async_interrupt_kernel(self) -> None: 

641 """Interrupts the kernel by sending it a signal. 

642 

643 Unlike ``signal_kernel``, this operation is well supported on all 

644 platforms. 

645 """ 

646 if not self.has_kernel and self._ready is not None: 

647 if isinstance(self._ready, CFuture): 

648 ready = asyncio.ensure_future(t.cast(Future[t.Any], self._ready)) 

649 else: 

650 ready = self._ready 

651 # Wait for a shutdown if one is in progress. 

652 if self.shutting_down: 

653 await ready 

654 # Wait for a startup. 

655 await ready 

656 

657 if self.has_kernel: 

658 assert self.kernel_spec is not None 

659 interrupt_mode = self.kernel_spec.interrupt_mode 

660 if interrupt_mode == "signal": 

661 await self._async_signal_kernel(signal.SIGINT) 

662 

663 elif interrupt_mode == "message": 

664 msg = self.session.msg("interrupt_request", content={}) 

665 self._connect_control_socket() 

666 self.session.send(self._control_socket, msg) 

667 else: 

668 msg = "Cannot interrupt kernel. No kernel is running!" 

669 raise RuntimeError(msg) 

670 

671 interrupt_kernel = run_sync(_async_interrupt_kernel) 

672 

673 async def _async_signal_kernel(self, signum: int) -> None: 

674 """Sends a signal to the process group of the kernel (this 

675 usually includes the kernel and any subprocesses spawned by 

676 the kernel). 

677 

678 Note that since only SIGTERM is supported on Windows, this function is 

679 only useful on Unix systems. 

680 """ 

681 if self.has_kernel: 

682 assert self.provisioner is not None 

683 await self.provisioner.send_signal(signum) 

684 else: 

685 msg = "Cannot signal kernel. No kernel is running!" 

686 raise RuntimeError(msg) 

687 

688 signal_kernel = run_sync(_async_signal_kernel) 

689 

690 async def _async_is_alive(self) -> bool: 

691 """Is the kernel process still running?""" 

692 if not self.owns_kernel: 

693 return True 

694 

695 if self.has_kernel: 

696 assert self.provisioner is not None 

697 ret = await self.provisioner.poll() 

698 if ret is None: 

699 return True 

700 return False 

701 

702 is_alive = run_sync(_async_is_alive) 

703 

704 async def _async_wait(self, pollinterval: float = 0.1) -> None: 

705 # Use busy loop at 100ms intervals, polling until the process is 

706 # not alive. If we find the process is no longer alive, complete 

707 # its cleanup via the blocking wait(). Callers are responsible for 

708 # issuing calls to wait() using a timeout (see _kill_kernel()). 

709 while await self._async_is_alive(): 

710 await asyncio.sleep(pollinterval) 

711 

712 

713class AsyncKernelManager(KernelManager): 

714 """An async kernel manager.""" 

715 

716 # the class to create with our `client` method 

717 client_class: DottedObjectName = DottedObjectName( 

718 "jupyter_client.asynchronous.AsyncKernelClient" 

719 ) 

720 client_factory: Type = Type(klass="jupyter_client.asynchronous.AsyncKernelClient") 

721 

722 # The PyZMQ Context to use for communication with the kernel. 

723 context: Instance = Instance(zmq.asyncio.Context) 

724 

725 @default("context") 

726 def _context_default(self) -> zmq.asyncio.Context: 

727 self._created_context = True 

728 return zmq.asyncio.Context() 

729 

730 def client( # type:ignore[override] 

731 self, **kwargs: t.Any 

732 ) -> AsyncKernelClient: 

733 """Get a client for the manager.""" 

734 return super().client(**kwargs) # type:ignore[return-value] 

735 

736 _launch_kernel = KernelManager._async_launch_kernel # type:ignore[assignment] 

737 start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_start_kernel # type:ignore[assignment] 

738 pre_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_pre_start_kernel # type:ignore[assignment] 

739 post_start_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_post_start_kernel # type:ignore[assignment] 

740 request_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_request_shutdown # type:ignore[assignment] 

741 finish_shutdown: t.Callable[..., t.Awaitable] = KernelManager._async_finish_shutdown # type:ignore[assignment] 

742 cleanup_resources: t.Callable[..., t.Awaitable] = KernelManager._async_cleanup_resources # type:ignore[assignment] 

743 shutdown_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_shutdown_kernel # type:ignore[assignment] 

744 restart_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_restart_kernel # type:ignore[assignment] 

745 _send_kernel_sigterm = KernelManager._async_send_kernel_sigterm # type:ignore[assignment] 

746 _kill_kernel = KernelManager._async_kill_kernel # type:ignore[assignment] 

747 interrupt_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_interrupt_kernel # type:ignore[assignment] 

748 signal_kernel: t.Callable[..., t.Awaitable] = KernelManager._async_signal_kernel # type:ignore[assignment] 

749 is_alive: t.Callable[..., t.Awaitable] = KernelManager._async_is_alive # type:ignore[assignment] 

750 

751 

752KernelManagerABC.register(KernelManager) 

753 

754 

755def start_new_kernel( 

756 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any 

757) -> t.Tuple[KernelManager, BlockingKernelClient]: 

758 """Start a new kernel, and return its Manager and Client""" 

759 km = KernelManager(kernel_name=kernel_name) 

760 km.start_kernel(**kwargs) 

761 kc = km.client() 

762 kc.start_channels() 

763 try: 

764 kc.wait_for_ready(timeout=startup_timeout) 

765 except RuntimeError: 

766 kc.stop_channels() 

767 km.shutdown_kernel() 

768 raise 

769 

770 return km, kc 

771 

772 

773async def start_new_async_kernel( 

774 startup_timeout: float = 60, kernel_name: str = "python", **kwargs: t.Any 

775) -> t.Tuple[AsyncKernelManager, AsyncKernelClient]: 

776 """Start a new kernel, and return its Manager and Client""" 

777 km = AsyncKernelManager(kernel_name=kernel_name) 

778 await km.start_kernel(**kwargs) 

779 kc = km.client() 

780 kc.start_channels() 

781 try: 

782 await kc.wait_for_ready(timeout=startup_timeout) 

783 except RuntimeError: 

784 kc.stop_channels() 

785 await km.shutdown_kernel() 

786 raise 

787 

788 return (km, kc) 

789 

790 

791@contextmanager 

792def run_kernel(**kwargs: t.Any) -> t.Iterator[KernelClient]: 

793 """Context manager to create a kernel in a subprocess. 

794 

795 The kernel is shut down when the context exits. 

796 

797 Returns 

798 ------- 

799 kernel_client: connected KernelClient instance 

800 """ 

801 km, kc = start_new_kernel(**kwargs) 

802 try: 

803 yield kc 

804 finally: 

805 kc.stop_channels() 

806 km.shutdown_kernel(now=True)