Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/jupyter_client/multikernelmanager.py: 43%

222 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-01 06:54 +0000

1"""A kernel manager for multiple kernels""" 

2# Copyright (c) Jupyter Development Team. 

3# Distributed under the terms of the Modified BSD License. 

4import asyncio 

5import os 

6import socket 

7import typing as t 

8import uuid 

9from functools import wraps 

10 

11import zmq 

12from traitlets import Any, Bool, Dict, DottedObjectName, Instance, Unicode, default, observe 

13from traitlets.config.configurable import LoggingConfigurable 

14from traitlets.utils.importstring import import_item 

15 

16from .kernelspec import NATIVE_KERNEL_NAME, KernelSpecManager 

17from .manager import KernelManager 

18from .utils import ensure_async, run_sync 

19 

20 

21class DuplicateKernelError(Exception): 

22 pass 

23 

24 

25def kernel_method(f: t.Callable) -> t.Callable: 

26 """decorator for proxying MKM.method(kernel_id) to individual KMs by ID""" 

27 

28 @wraps(f) 

29 def wrapped( 

30 self: t.Any, kernel_id: str, *args: t.Any, **kwargs: t.Any 

31 ) -> t.Union[t.Callable, t.Awaitable]: 

32 # get the kernel 

33 km = self.get_kernel(kernel_id) 

34 method = getattr(km, f.__name__) 

35 # call the kernel's method 

36 r = method(*args, **kwargs) 

37 # last thing, call anything defined in the actual class method 

38 # such as logging messages 

39 f(self, kernel_id, *args, **kwargs) 

40 # return the method result 

41 return r 

42 

43 return wrapped 

44 

45 

46class MultiKernelManager(LoggingConfigurable): 

47 """A class for managing multiple kernels.""" 

48 

49 default_kernel_name = Unicode( 

50 NATIVE_KERNEL_NAME, help="The name of the default kernel to start" 

51 ).tag(config=True) 

52 

53 kernel_spec_manager = Instance(KernelSpecManager, allow_none=True) 

54 

55 kernel_manager_class = DottedObjectName( 

56 "jupyter_client.ioloop.IOLoopKernelManager", 

57 help="""The kernel manager class. This is configurable to allow 

58 subclassing of the KernelManager for customized behavior. 

59 """, 

60 ).tag(config=True) 

61 

62 @observe("kernel_manager_class") 

63 def _kernel_manager_class_changed(self, change): 

64 self.kernel_manager_factory = self._create_kernel_manager_factory() 

65 

66 kernel_manager_factory = Any(help="this is kernel_manager_class after import") 

67 

68 @default("kernel_manager_factory") 

69 def _kernel_manager_factory_default(self): 

70 return self._create_kernel_manager_factory() 

71 

72 def _create_kernel_manager_factory(self) -> t.Callable: 

73 kernel_manager_ctor = import_item(self.kernel_manager_class) 

74 

75 def create_kernel_manager(*args: t.Any, **kwargs: t.Any) -> KernelManager: 

76 if self.shared_context: 

77 if self.context.closed: 

78 # recreate context if closed 

79 self.context = self._context_default() 

80 kwargs.setdefault("context", self.context) 

81 km = kernel_manager_ctor(*args, **kwargs) 

82 return km 

83 

84 return create_kernel_manager 

85 

86 shared_context = Bool( 

87 True, 

88 help="Share a single zmq.Context to talk to all my kernels", 

89 ).tag(config=True) 

90 

91 context = Instance("zmq.Context") 

92 

93 _created_context = Bool(False) 

94 

95 _pending_kernels = Dict() 

96 

97 @property 

98 def _starting_kernels(self): 

99 """A shim for backwards compatibility.""" 

100 return self._pending_kernels 

101 

102 @default("context") # type:ignore[misc] 

103 def _context_default(self) -> zmq.Context: 

104 self._created_context = True 

105 return zmq.Context() 

106 

107 connection_dir = Unicode("") 

108 

109 _kernels = Dict() 

110 

111 def __del__(self): 

112 """Handle garbage collection. Destroy context if applicable.""" 

113 if self._created_context and self.context and not self.context.closed: 

114 if self.log: 

115 self.log.debug("Destroying zmq context for %s", self) 

116 self.context.destroy() 

117 try: 

118 super_del = super().__del__ # type:ignore[misc] 

119 except AttributeError: 

120 pass 

121 else: 

122 super_del() 

123 

124 def list_kernel_ids(self) -> t.List[str]: 

125 """Return a list of the kernel ids of the active kernels.""" 

126 # Create a copy so we can iterate over kernels in operations 

127 # that delete keys. 

128 return list(self._kernels.keys()) 

129 

130 def __len__(self) -> int: 

131 """Return the number of running kernels.""" 

132 return len(self.list_kernel_ids()) 

133 

134 def __contains__(self, kernel_id: str) -> bool: 

135 return kernel_id in self._kernels 

136 

137 def pre_start_kernel( 

138 self, kernel_name: t.Optional[str], kwargs: t.Any 

139 ) -> t.Tuple[KernelManager, str, str]: 

140 # kwargs should be mutable, passing it as a dict argument. 

141 kernel_id = kwargs.pop("kernel_id", self.new_kernel_id(**kwargs)) 

142 if kernel_id in self: 

143 raise DuplicateKernelError("Kernel already exists: %s" % kernel_id) 

144 

145 if kernel_name is None: 

146 kernel_name = self.default_kernel_name 

147 # kernel_manager_factory is the constructor for the KernelManager 

148 # subclass we are using. It can be configured as any Configurable, 

149 # including things like its transport and ip. 

150 constructor_kwargs = {} 

151 if self.kernel_spec_manager: 

152 constructor_kwargs["kernel_spec_manager"] = self.kernel_spec_manager 

153 km = self.kernel_manager_factory( 

154 connection_file=os.path.join(self.connection_dir, "kernel-%s.json" % kernel_id), 

155 parent=self, 

156 log=self.log, 

157 kernel_name=kernel_name, 

158 **constructor_kwargs, 

159 ) 

160 return km, kernel_name, kernel_id 

161 

162 async def _add_kernel_when_ready( 

163 self, kernel_id: str, km: KernelManager, kernel_awaitable: t.Awaitable 

164 ) -> None: 

165 try: 

166 await kernel_awaitable 

167 self._kernels[kernel_id] = km 

168 self._pending_kernels.pop(kernel_id, None) 

169 except Exception as e: 

170 self.log.exception(e) 

171 

172 async def _remove_kernel_when_ready( 

173 self, kernel_id: str, kernel_awaitable: t.Awaitable 

174 ) -> None: 

175 try: 

176 await kernel_awaitable 

177 self.remove_kernel(kernel_id) 

178 self._pending_kernels.pop(kernel_id, None) 

179 except Exception as e: 

180 self.log.exception(e) 

181 

182 def _using_pending_kernels(self): 

183 """Returns a boolean; a clearer method for determining if 

184 this multikernelmanager is using pending kernels or not 

185 """ 

186 return getattr(self, 'use_pending_kernels', False) 

187 

188 async def _async_start_kernel( 

189 self, *, kernel_name: t.Optional[str] = None, **kwargs: t.Any 

190 ) -> str: 

191 """Start a new kernel. 

192 

193 The caller can pick a kernel_id by passing one in as a keyword arg, 

194 otherwise one will be generated using new_kernel_id(). 

195 

196 The kernel ID for the newly started kernel is returned. 

197 """ 

198 km, kernel_name, kernel_id = self.pre_start_kernel(kernel_name, kwargs) 

199 if not isinstance(km, KernelManager): 

200 self.log.warning( 

201 "Kernel manager class ({km_class}) is not an instance of 'KernelManager'!".format( 

202 km_class=self.kernel_manager_class.__class__ 

203 ) 

204 ) 

205 kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner 

206 

207 starter = ensure_async(km.start_kernel(**kwargs)) 

208 task = asyncio.create_task(self._add_kernel_when_ready(kernel_id, km, starter)) 

209 self._pending_kernels[kernel_id] = task 

210 # Handling a Pending Kernel 

211 if self._using_pending_kernels(): 

212 # If using pending kernels, do not block 

213 # on the kernel start. 

214 self._kernels[kernel_id] = km 

215 else: 

216 await task 

217 # raise an exception if one occurred during kernel startup. 

218 if km.ready.exception(): 

219 raise km.ready.exception() # type: ignore 

220 

221 return kernel_id 

222 

223 start_kernel = run_sync(_async_start_kernel) 

224 

225 async def _async_shutdown_kernel( 

226 self, 

227 kernel_id: str, 

228 now: t.Optional[bool] = False, 

229 restart: t.Optional[bool] = False, 

230 ) -> None: 

231 """Shutdown a kernel by its kernel uuid. 

232 

233 Parameters 

234 ========== 

235 kernel_id : uuid 

236 The id of the kernel to shutdown. 

237 now : bool 

238 Should the kernel be shutdown forcibly using a signal. 

239 restart : bool 

240 Will the kernel be restarted? 

241 """ 

242 self.log.info("Kernel shutdown: %s", kernel_id) 

243 # If the kernel is still starting, wait for it to be ready. 

244 if kernel_id in self._pending_kernels: 

245 task = self._pending_kernels[kernel_id] 

246 try: 

247 await task 

248 km = self.get_kernel(kernel_id) 

249 await t.cast(asyncio.Future, km.ready) 

250 except asyncio.CancelledError: 

251 pass 

252 except Exception: 

253 self.remove_kernel(kernel_id) 

254 return 

255 km = self.get_kernel(kernel_id) 

256 # If a pending kernel raised an exception, remove it. 

257 if not km.ready.cancelled() and km.ready.exception(): 

258 self.remove_kernel(kernel_id) 

259 return 

260 stopper = ensure_async(km.shutdown_kernel(now, restart)) 

261 fut = asyncio.ensure_future(self._remove_kernel_when_ready(kernel_id, stopper)) 

262 self._pending_kernels[kernel_id] = fut 

263 # Await the kernel if not using pending kernels. 

264 if not self._using_pending_kernels(): 

265 await fut 

266 # raise an exception if one occurred during kernel shutdown. 

267 if km.ready.exception(): 

268 raise km.ready.exception() # type: ignore 

269 

270 shutdown_kernel = run_sync(_async_shutdown_kernel) 

271 

272 @kernel_method 

273 def request_shutdown(self, kernel_id: str, restart: t.Optional[bool] = False) -> None: 

274 """Ask a kernel to shut down by its kernel uuid""" 

275 

276 @kernel_method 

277 def finish_shutdown( 

278 self, 

279 kernel_id: str, 

280 waittime: t.Optional[float] = None, 

281 pollinterval: t.Optional[float] = 0.1, 

282 ) -> None: 

283 """Wait for a kernel to finish shutting down, and kill it if it doesn't""" 

284 self.log.info("Kernel shutdown: %s", kernel_id) 

285 

286 @kernel_method 

287 def cleanup_resources(self, kernel_id: str, restart: bool = False) -> None: 

288 """Clean up a kernel's resources""" 

289 

290 def remove_kernel(self, kernel_id: str) -> KernelManager: 

291 """remove a kernel from our mapping. 

292 

293 Mainly so that a kernel can be removed if it is already dead, 

294 without having to call shutdown_kernel. 

295 

296 The kernel object is returned, or `None` if not found. 

297 """ 

298 return self._kernels.pop(kernel_id, None) 

299 

300 async def _async_shutdown_all(self, now: bool = False) -> None: 

301 """Shutdown all kernels.""" 

302 kids = self.list_kernel_ids() 

303 kids += list(self._pending_kernels) 

304 kms = list(self._kernels.values()) 

305 futs = [self._async_shutdown_kernel(kid, now=now) for kid in set(kids)] 

306 await asyncio.gather(*futs) 

307 # If using pending kernels, the kernels will not have been fully shut down. 

308 if self._using_pending_kernels(): 

309 for km in kms: 

310 try: 

311 await km.ready 

312 except asyncio.CancelledError: 

313 self._pending_kernels[km.kernel_id].cancel() 

314 except Exception: 

315 # Will have been logged in _add_kernel_when_ready 

316 pass 

317 

318 shutdown_all = run_sync(_async_shutdown_all) 

319 

320 def interrupt_kernel(self, kernel_id: str) -> None: 

321 """Interrupt (SIGINT) the kernel by its uuid. 

322 

323 Parameters 

324 ========== 

325 kernel_id : uuid 

326 The id of the kernel to interrupt. 

327 """ 

328 kernel = self.get_kernel(kernel_id) 

329 if not kernel.ready.done(): 

330 msg = "Kernel is in a pending state. Cannot interrupt." 

331 raise RuntimeError(msg) 

332 out = kernel.interrupt_kernel() 

333 self.log.info("Kernel interrupted: %s", kernel_id) 

334 return out 

335 

336 @kernel_method 

337 def signal_kernel(self, kernel_id: str, signum: int) -> None: 

338 """Sends a signal to the kernel by its uuid. 

339 

340 Note that since only SIGTERM is supported on Windows, this function 

341 is only useful on Unix systems. 

342 

343 Parameters 

344 ========== 

345 kernel_id : uuid 

346 The id of the kernel to signal. 

347 signum : int 

348 Signal number to send kernel. 

349 """ 

350 self.log.info("Signaled Kernel %s with %s", kernel_id, signum) 

351 

352 async def _async_restart_kernel(self, kernel_id: str, now: bool = False) -> None: 

353 """Restart a kernel by its uuid, keeping the same ports. 

354 

355 Parameters 

356 ========== 

357 kernel_id : uuid 

358 The id of the kernel to interrupt. 

359 now : bool, optional 

360 If True, the kernel is forcefully restarted *immediately*, without 

361 having a chance to do any cleanup action. Otherwise the kernel is 

362 given 1s to clean up before a forceful restart is issued. 

363 

364 In all cases the kernel is restarted, the only difference is whether 

365 it is given a chance to perform a clean shutdown or not. 

366 """ 

367 kernel = self.get_kernel(kernel_id) 

368 if self._using_pending_kernels() and not kernel.ready.done(): 

369 msg = "Kernel is in a pending state. Cannot restart." 

370 raise RuntimeError(msg) 

371 await ensure_async(kernel.restart_kernel(now=now)) 

372 self.log.info("Kernel restarted: %s", kernel_id) 

373 

374 restart_kernel = run_sync(_async_restart_kernel) 

375 

376 @kernel_method 

377 def is_alive(self, kernel_id: str) -> bool: # type:ignore[empty-body] 

378 """Is the kernel alive. 

379 

380 This calls KernelManager.is_alive() which calls Popen.poll on the 

381 actual kernel subprocess. 

382 

383 Parameters 

384 ========== 

385 kernel_id : uuid 

386 The id of the kernel. 

387 """ 

388 

389 def _check_kernel_id(self, kernel_id: str) -> None: 

390 """check that a kernel id is valid""" 

391 if kernel_id not in self: 

392 raise KeyError("Kernel with id not found: %s" % kernel_id) 

393 

394 def get_kernel(self, kernel_id: str) -> KernelManager: 

395 """Get the single KernelManager object for a kernel by its uuid. 

396 

397 Parameters 

398 ========== 

399 kernel_id : uuid 

400 The id of the kernel. 

401 """ 

402 self._check_kernel_id(kernel_id) 

403 return self._kernels[kernel_id] 

404 

405 @kernel_method 

406 def add_restart_callback( 

407 self, kernel_id: str, callback: t.Callable, event: str = "restart" 

408 ) -> None: 

409 """add a callback for the KernelRestarter""" 

410 

411 @kernel_method 

412 def remove_restart_callback( 

413 self, kernel_id: str, callback: t.Callable, event: str = "restart" 

414 ) -> None: 

415 """remove a callback for the KernelRestarter""" 

416 

417 @kernel_method 

418 def get_connection_info(self, kernel_id: str) -> t.Dict[str, t.Any]: # type:ignore[empty-body] 

419 """Return a dictionary of connection data for a kernel. 

420 

421 Parameters 

422 ========== 

423 kernel_id : uuid 

424 The id of the kernel. 

425 

426 Returns 

427 ======= 

428 connection_dict : dict 

429 A dict of the information needed to connect to a kernel. 

430 This includes the ip address and the integer port 

431 numbers of the different channels (stdin_port, iopub_port, 

432 shell_port, hb_port). 

433 """ 

434 

435 @kernel_method 

436 def connect_iopub( # type:ignore[empty-body] 

437 self, kernel_id: str, identity: t.Optional[bytes] = None 

438 ) -> socket.socket: 

439 """Return a zmq Socket connected to the iopub channel. 

440 

441 Parameters 

442 ========== 

443 kernel_id : uuid 

444 The id of the kernel 

445 identity : bytes (optional) 

446 The zmq identity of the socket 

447 

448 Returns 

449 ======= 

450 stream : zmq Socket or ZMQStream 

451 """ 

452 

453 @kernel_method 

454 def connect_shell( # type:ignore[empty-body] 

455 self, kernel_id: str, identity: t.Optional[bytes] = None 

456 ) -> socket.socket: 

457 """Return a zmq Socket connected to the shell channel. 

458 

459 Parameters 

460 ========== 

461 kernel_id : uuid 

462 The id of the kernel 

463 identity : bytes (optional) 

464 The zmq identity of the socket 

465 

466 Returns 

467 ======= 

468 stream : zmq Socket or ZMQStream 

469 """ 

470 

471 @kernel_method 

472 def connect_control( # type:ignore[empty-body] 

473 self, kernel_id: str, identity: t.Optional[bytes] = None 

474 ) -> socket.socket: 

475 """Return a zmq Socket connected to the control channel. 

476 

477 Parameters 

478 ========== 

479 kernel_id : uuid 

480 The id of the kernel 

481 identity : bytes (optional) 

482 The zmq identity of the socket 

483 

484 Returns 

485 ======= 

486 stream : zmq Socket or ZMQStream 

487 """ 

488 

489 @kernel_method 

490 def connect_stdin( # type:ignore[empty-body] 

491 self, kernel_id: str, identity: t.Optional[bytes] = None 

492 ) -> socket.socket: 

493 """Return a zmq Socket connected to the stdin channel. 

494 

495 Parameters 

496 ========== 

497 kernel_id : uuid 

498 The id of the kernel 

499 identity : bytes (optional) 

500 The zmq identity of the socket 

501 

502 Returns 

503 ======= 

504 stream : zmq Socket or ZMQStream 

505 """ 

506 

507 @kernel_method 

508 def connect_hb( # type:ignore[empty-body] 

509 self, kernel_id: str, identity: t.Optional[bytes] = None 

510 ) -> socket.socket: 

511 """Return a zmq Socket connected to the hb channel. 

512 

513 Parameters 

514 ========== 

515 kernel_id : uuid 

516 The id of the kernel 

517 identity : bytes (optional) 

518 The zmq identity of the socket 

519 

520 Returns 

521 ======= 

522 stream : zmq Socket or ZMQStream 

523 """ 

524 

525 def new_kernel_id(self, **kwargs: t.Any) -> str: 

526 """ 

527 Returns the id to associate with the kernel for this request. Subclasses may override 

528 this method to substitute other sources of kernel ids. 

529 :param kwargs: 

530 :return: string-ized version 4 uuid 

531 """ 

532 return str(uuid.uuid4()) 

533 

534 

535class AsyncMultiKernelManager(MultiKernelManager): 

536 kernel_manager_class = DottedObjectName( 

537 "jupyter_client.ioloop.AsyncIOLoopKernelManager", 

538 config=True, 

539 help="""The kernel manager class. This is configurable to allow 

540 subclassing of the AsyncKernelManager for customized behavior. 

541 """, 

542 ) 

543 

544 use_pending_kernels = Bool( 

545 False, 

546 help="""Whether to make kernels available before the process has started. The 

547 kernel has a `.ready` future which can be awaited before connecting""", 

548 ).tag(config=True) 

549 

550 context = Instance("zmq.asyncio.Context") 

551 

552 @default("context") # type:ignore[misc] 

553 def _context_default(self) -> zmq.asyncio.Context: 

554 self._created_context = True 

555 return zmq.asyncio.Context() 

556 

557 start_kernel: t.Callable[ 

558 ..., t.Awaitable 

559 ] = MultiKernelManager._async_start_kernel # type:ignore[assignment] 

560 restart_kernel: t.Callable[ 

561 ..., t.Awaitable 

562 ] = MultiKernelManager._async_restart_kernel # type:ignore[assignment] 

563 shutdown_kernel: t.Callable[ 

564 ..., t.Awaitable 

565 ] = MultiKernelManager._async_shutdown_kernel # type:ignore[assignment] 

566 shutdown_all: t.Callable[ 

567 ..., t.Awaitable 

568 ] = MultiKernelManager._async_shutdown_all # type:ignore[assignment]