Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/ipykernel/kernelbase.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

730 statements  

1"""Base class for a kernel that talks to frontends over 0MQ.""" 

2 

3# Copyright (c) IPython Development Team. 

4# Distributed under the terms of the Modified BSD License. 

5from __future__ import annotations 

6 

7import asyncio 

8import inspect 

9import itertools 

10import logging 

11import os 

12import socket 

13import sys 

14import threading 

15import time 

16import typing as t 

17import uuid 

18import warnings 

19from datetime import datetime 

20from functools import partial 

21from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal 

22 

23from .thread import CONTROL_THREAD_NAME 

24 

25if sys.platform != "win32": 

26 from signal import SIGKILL 

27else: 

28 SIGKILL = "windown-SIGKILL-sentinel" 

29 

30 

31try: 

32 # jupyter_client >= 5, use tz-aware now 

33 from jupyter_client.session import utcnow as now 

34except ImportError: 

35 # jupyter_client < 5, use local now() 

36 now = datetime.now 

37 

38import psutil 

39import zmq 

40from IPython.core.error import StdinNotImplementedError 

41from jupyter_client.session import Session 

42from tornado import ioloop 

43from tornado.queues import Queue, QueueEmpty 

44from traitlets.config.configurable import SingletonConfigurable 

45from traitlets.traitlets import ( 

46 Any, 

47 Bool, 

48 Dict, 

49 Float, 

50 Instance, 

51 Integer, 

52 List, 

53 Set, 

54 Unicode, 

55 default, 

56 observe, 

57) 

58from zmq.eventloop.zmqstream import ZMQStream 

59 

60from ipykernel.jsonutil import json_clean 

61 

62from ._version import kernel_protocol_version 

63from .iostream import OutStream 

64 

65 

66def _accepts_parameters(meth, param_names): 

67 parameters = inspect.signature(meth).parameters 

68 accepts = {param: False for param in param_names} 

69 

70 for param in param_names: 

71 param_spec = parameters.get(param) 

72 accepts[param] = ( 

73 param_spec 

74 and param_spec.kind in [param_spec.KEYWORD_ONLY, param_spec.POSITIONAL_OR_KEYWORD] 

75 ) or any(p.kind == p.VAR_KEYWORD for p in parameters.values()) 

76 

77 return accepts 

78 

79 

80class Kernel(SingletonConfigurable): 

81 """The base kernel class.""" 

82 

83 # --------------------------------------------------------------------------- 

84 # Kernel interface 

85 # --------------------------------------------------------------------------- 

86 

87 # attribute to override with a GUI 

88 eventloop = Any(None) 

89 

90 processes: dict[str, psutil.Process] = {} 

91 

92 @observe("eventloop") 

93 def _update_eventloop(self, change): 

94 """schedule call to eventloop from IOLoop""" 

95 loop = ioloop.IOLoop.current() 

96 if change.new is not None: 

97 loop.add_callback(self.enter_eventloop) 

98 

99 session = Instance(Session, allow_none=True) 

100 profile_dir = Instance("IPython.core.profiledir.ProfileDir", allow_none=True) 

101 shell_stream = Instance(ZMQStream, allow_none=True) 

102 

103 shell_streams: List[t.Any] = List( 

104 help="""Deprecated shell_streams alias. Use shell_stream 

105 

106 .. versionchanged:: 6.0 

107 shell_streams is deprecated. Use shell_stream. 

108 """ 

109 ) 

110 

111 implementation: str 

112 implementation_version: str 

113 banner: str 

114 

115 @default("shell_streams") 

116 def _shell_streams_default(self): # pragma: no cover 

117 warnings.warn( 

118 "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream", 

119 DeprecationWarning, 

120 stacklevel=2, 

121 ) 

122 if self.shell_stream is not None: 

123 return [self.shell_stream] 

124 return [] 

125 

126 @observe("shell_streams") 

127 def _shell_streams_changed(self, change): # pragma: no cover 

128 warnings.warn( 

129 "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream", 

130 DeprecationWarning, 

131 stacklevel=2, 

132 ) 

133 if len(change.new) > 1: 

134 warnings.warn( 

135 "Kernel only supports one shell stream. Additional streams will be ignored.", 

136 RuntimeWarning, 

137 stacklevel=2, 

138 ) 

139 if change.new: 

140 self.shell_stream = change.new[0] 

141 

142 control_stream = Instance(ZMQStream, allow_none=True) 

143 

144 debug_shell_socket = Any() 

145 

146 control_thread = Any() 

147 shell_channel_thread = Any() 

148 iopub_socket = Any() 

149 iopub_thread = Any() 

150 stdin_socket = Any() 

151 log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment] 

152 

153 # identities: 

154 int_id = Integer(-1) 

155 ident = Unicode() 

156 

157 @default("ident") 

158 def _default_ident(self): 

159 return str(uuid.uuid4()) 

160 

161 # This should be overridden by wrapper kernels that implement any real 

162 # language. 

163 language_info: dict[str, object] = {} 

164 

165 # any links that should go in the help menu 

166 help_links: List[dict[str, str]] = List() 

167 

168 # Experimental option to break in non-user code. 

169 # The ipykernel source is in the call stack, so the user 

170 # has to manipulate the step-over and step-into in a wize way. 

171 debug_just_my_code = Bool( 

172 True, 

173 help="""Set to False if you want to debug python standard and dependent libraries. 

174 """, 

175 ).tag(config=True) 

176 

177 # track associations with current request 

178 # Private interface 

179 

180 _darwin_app_nap = Bool( 

181 True, 

182 help="""Whether to use appnope for compatibility with OS X App Nap. 

183 

184 Only affects OS X >= 10.9. 

185 """, 

186 ).tag(config=True) 

187 

188 # track associations with current request 

189 _allow_stdin = Bool(False) 

190 _parents: Dict[str, t.Any] = Dict({"shell": {}, "control": {}}) 

191 _parent_ident = Dict({"shell": b"", "control": b""}) 

192 

193 @property 

194 def _parent_header(self): 

195 warnings.warn( 

196 "Kernel._parent_header is deprecated in ipykernel 6. Use .get_parent()", 

197 DeprecationWarning, 

198 stacklevel=2, 

199 ) 

200 return self.get_parent() 

201 

202 # Time to sleep after flushing the stdout/err buffers in each execute 

203 # cycle. While this introduces a hard limit on the minimal latency of the 

204 # execute cycle, it helps prevent output synchronization problems for 

205 # clients. 

206 # Units are in seconds. The minimum zmq latency on local host is probably 

207 # ~150 microseconds, set this to 500us for now. We may need to increase it 

208 # a little if it's not enough after more interactive testing. 

209 _execute_sleep = Float(0.0005).tag(config=True) 

210 

211 # Frequency of the kernel's event loop. 

212 # Units are in seconds, kernel subclasses for GUI toolkits may need to 

213 # adapt to milliseconds. 

214 _poll_interval = Float(0.01).tag(config=True) 

215 

216 stop_on_error_timeout = Float( 

217 0.0, 

218 config=True, 

219 help="""time (in seconds) to wait for messages to arrive 

220 when aborting queued requests after an error. 

221 

222 Requests that arrive within this window after an error 

223 will be cancelled. 

224 

225 Increase in the event of unusually slow network 

226 causing significant delays, 

227 which can manifest as e.g. "Run all" in a notebook 

228 aborting some, but not all, messages after an error. 

229 """, 

230 ) 

231 

232 # If the shutdown was requested over the network, we leave here the 

233 # necessary reply message so it can be sent by our registered atexit 

234 # handler. This ensures that the reply is only sent to clients truly at 

235 # the end of our shutdown process (which happens after the underlying 

236 # IPython shell's own shutdown). 

237 _shutdown_message = None 

238 

239 # This is a dict of port number that the kernel is listening on. It is set 

240 # by record_ports and used by connect_request. 

241 _recorded_ports = Dict() 

242 

243 # set of aborted msg_ids 

244 aborted = Set() 

245 

246 # Track execution count here. For IPython, we override this to use the 

247 # execution count we store in the shell. 

248 execution_count = 0 

249 

250 msg_types = [ 

251 "execute_request", 

252 "complete_request", 

253 "inspect_request", 

254 "history_request", 

255 "comm_info_request", 

256 "kernel_info_request", 

257 "connect_request", 

258 "shutdown_request", 

259 "is_complete_request", 

260 "interrupt_request", 

261 # deprecated: 

262 "apply_request", 

263 ] 

264 # add deprecated ipyparallel control messages 

265 control_msg_types = [ 

266 *msg_types, 

267 "clear_request", 

268 "abort_request", 

269 "debug_request", 

270 "usage_request", 

271 "create_subshell_request", 

272 "delete_subshell_request", 

273 "list_subshell_request", 

274 ] 

275 

276 def __init__(self, **kwargs): 

277 """Initialize the kernel.""" 

278 super().__init__(**kwargs) 

279 

280 # Kernel application may swap stdout and stderr to OutStream, 

281 # which is the case in `IPKernelApp.init_io`, hence `sys.stdout` 

282 # can already by different from TextIO at initialization time. 

283 self._stdout: OutStream | t.TextIO = sys.stdout 

284 self._stderr: OutStream | t.TextIO = sys.stderr 

285 

286 # Build dict of handlers for message types 

287 self.shell_handlers = {} 

288 for msg_type in self.msg_types: 

289 self.shell_handlers[msg_type] = getattr(self, msg_type) 

290 

291 self.control_handlers = {} 

292 for msg_type in self.control_msg_types: 

293 self.control_handlers[msg_type] = getattr(self, msg_type) 

294 

295 # Storing the accepted parameters for do_execute, used in execute_request 

296 self._do_exec_accepted_params = _accepts_parameters( 

297 self.do_execute, ["cell_meta", "cell_id"] 

298 ) 

299 

300 async def dispatch_control(self, msg): 

301 # Ensure only one control message is processed at a time 

302 async with asyncio.Lock(): 

303 await self.process_control(msg) 

304 

305 async def process_control(self, msg): 

306 """dispatch control requests""" 

307 if not self.session: 

308 return 

309 idents, msg = self.session.feed_identities(msg, copy=False) 

310 try: 

311 msg = self.session.deserialize(msg, content=True, copy=False) 

312 except Exception: 

313 self.log.error("Invalid Control Message", exc_info=True) # noqa: G201 

314 return 

315 

316 self.log.debug("Control received: %s", msg) 

317 

318 # Set the parent message for side effects. 

319 self.set_parent(idents, msg, channel="control") 

320 self._publish_status("busy", "control") 

321 

322 header = msg["header"] 

323 msg_type = header["msg_type"] 

324 

325 handler = self.control_handlers.get(msg_type, None) 

326 if handler is None: 

327 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) 

328 else: 

329 try: 

330 result = handler(self.control_stream, idents, msg) 

331 if inspect.isawaitable(result): 

332 await result 

333 except Exception: 

334 self.log.error("Exception in control handler:", exc_info=True) # noqa: G201 

335 

336 sys.stdout.flush() 

337 sys.stderr.flush() 

338 self._publish_status_and_flush("idle", "control", self.control_stream) 

339 

340 def should_handle(self, stream, msg, idents): 

341 """Check whether a shell-channel message should be handled 

342 

343 Allows subclasses to prevent handling of certain messages (e.g. aborted requests). 

344 """ 

345 msg_id = msg["header"]["msg_id"] 

346 if msg_id in self.aborted: 

347 # is it safe to assume a msg_id will not be resubmitted? 

348 self.aborted.remove(msg_id) 

349 self._send_abort_reply(stream, msg, idents) 

350 return False 

351 return True 

352 

353 async def dispatch_shell(self, msg, /, subshell_id: str | None = None): 

354 """dispatch shell requests""" 

355 if len(msg) == 1 and msg[0].buffer == b"stop aborting": 

356 # Dummy "stop aborting" message to stop aborting execute requests on this subshell. 

357 # This dummy message implementation allows the subshell to abort messages that are 

358 # already queued in the zmq sockets/streams without having to know any of their 

359 # details in advance. 

360 if subshell_id is None: 

361 self._aborting = False 

362 else: 

363 self.shell_channel_thread.manager.set_subshell_aborting(subshell_id, False) 

364 return 

365 

366 if not self.session: 

367 return 

368 

369 if self._supports_kernel_subshells: 

370 assert threading.current_thread() not in ( 

371 self.control_thread, 

372 self.shell_channel_thread, 

373 ) 

374 

375 idents, msg = self.session.feed_identities(msg, copy=False) 

376 try: 

377 msg = self.session.deserialize(msg, content=True, copy=False) 

378 except Exception: 

379 self.log.error("Invalid Message", exc_info=True) # noqa: G201 

380 return 

381 

382 # Set the parent message for side effects. 

383 self.set_parent(idents, msg, channel="shell") 

384 self._publish_status("busy", "shell") 

385 

386 msg_type = msg["header"]["msg_type"] 

387 assert msg["header"].get("subshell_id") == subshell_id 

388 

389 if self._supports_kernel_subshells: 

390 stream = self.shell_channel_thread.manager.get_subshell_to_shell_channel_socket( 

391 subshell_id 

392 ) 

393 else: 

394 stream = self.shell_stream 

395 

396 # Only abort execute requests 

397 if msg_type == "execute_request": 

398 if subshell_id is None: 

399 aborting = self._aborting # type:ignore[unreachable] 

400 else: 

401 aborting = self.shell_channel_thread.manager.get_subshell_aborting(subshell_id) 

402 if aborting: 

403 self._send_abort_reply(stream, msg, idents) 

404 self._publish_status_and_flush("idle", "shell", stream) 

405 return 

406 

407 # Print some info about this message and leave a '--->' marker, so it's 

408 # easier to trace visually the message chain when debugging. Each 

409 # handler prints its message at the end. 

410 self.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) 

411 self.log.debug(" Content: %s\n --->\n ", msg["content"]) 

412 

413 if not self.should_handle(stream, msg, idents): 

414 self._publish_status_and_flush("idle", "shell", stream) 

415 return 

416 

417 handler = self.shell_handlers.get(msg_type, None) 

418 if handler is None: 

419 self.log.warning("Unknown message type: %r", msg_type) 

420 else: 

421 self.log.debug("%s: %s", msg_type, msg) 

422 try: 

423 self.pre_handler_hook() 

424 except Exception: 

425 self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) 

426 try: 

427 result = handler(stream, idents, msg) 

428 if inspect.isawaitable(result): 

429 await result 

430 except Exception: 

431 self.log.error("Exception in message handler:", exc_info=True) # noqa: G201 

432 except KeyboardInterrupt: 

433 # Ctrl-c shouldn't crash the kernel here. 

434 self.log.error("KeyboardInterrupt caught in kernel.") 

435 finally: 

436 try: 

437 self.post_handler_hook() 

438 except Exception: 

439 self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) 

440 

441 sys.stdout.flush() 

442 sys.stderr.flush() 

443 self._publish_status_and_flush("idle", "shell", stream) 

444 

445 def pre_handler_hook(self): 

446 """Hook to execute before calling message handler""" 

447 # ensure default_int_handler during handler call 

448 self.saved_sigint_handler = signal(SIGINT, default_int_handler) 

449 

450 def post_handler_hook(self): 

451 """Hook to execute after calling message handler""" 

452 signal(SIGINT, self.saved_sigint_handler) 

453 

454 def enter_eventloop(self): 

455 """enter eventloop""" 

456 self.log.info("Entering eventloop %s", self.eventloop) 

457 # record handle, so we can check when this changes 

458 eventloop = self.eventloop 

459 if eventloop is None: 

460 self.log.info("Exiting as there is no eventloop") 

461 return 

462 

463 async def advance_eventloop(): 

464 # check if eventloop changed: 

465 if self.eventloop is not eventloop: 

466 self.log.info("exiting eventloop %s", eventloop) 

467 return 

468 if self.msg_queue.qsize(): 

469 self.log.debug("Delaying eventloop due to waiting messages") 

470 # still messages to process, make the eventloop wait 

471 schedule_next() 

472 return 

473 self.log.debug("Advancing eventloop %s", eventloop) 

474 try: 

475 eventloop(self) 

476 except KeyboardInterrupt: 

477 # Ctrl-C shouldn't crash the kernel 

478 self.log.error("KeyboardInterrupt caught in kernel") 

479 if self.eventloop is eventloop: 

480 # schedule advance again 

481 schedule_next() 

482 

483 def schedule_next(): 

484 """Schedule the next advance of the eventloop""" 

485 # call_later allows the io_loop to process other events if needed. 

486 # Going through schedule_dispatch ensures all other dispatches on msg_queue 

487 # are processed before we enter the eventloop, even if the previous dispatch was 

488 # already consumed from the queue by process_one and the queue is 

489 # technically empty. 

490 self.log.debug("Scheduling eventloop advance") 

491 self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop)) 

492 

493 # begin polling the eventloop 

494 schedule_next() 

495 

496 async def do_one_iteration(self): 

497 """Process a single shell message 

498 

499 Any pending control messages will be flushed as well 

500 

501 .. versionchanged:: 5 

502 This is now a coroutine 

503 """ 

504 # flush messages off of shell stream into the message queue 

505 if self.shell_stream: 

506 self.shell_stream.flush() 

507 # process at most one shell message per iteration 

508 await self.process_one(wait=False) 

509 

510 async def process_one(self, wait=True): 

511 """Process one request 

512 

513 Returns None if no message was handled. 

514 """ 

515 if wait: 

516 t, dispatch, args = await self.msg_queue.get() 

517 else: 

518 try: 

519 t, dispatch, args = self.msg_queue.get_nowait() 

520 except (asyncio.QueueEmpty, QueueEmpty): 

521 return 

522 

523 if self.control_thread is None and self.control_stream is not None: 

524 # If there isn't a separate control thread then this main thread handles both shell 

525 # and control messages. Before processing a shell message we need to flush all control 

526 # messages and allow them all to be processed. 

527 await asyncio.sleep(0) 

528 self.control_stream.flush() 

529 

530 socket = self.control_stream.socket 

531 while socket.poll(1): 

532 await asyncio.sleep(0) 

533 self.control_stream.flush() 

534 

535 await dispatch(*args) 

536 

537 async def dispatch_queue(self): 

538 """Coroutine to preserve order of message handling 

539 

540 Ensures that only one message is processing at a time, 

541 even when the handler is async 

542 """ 

543 

544 while True: 

545 try: 

546 await self.process_one() 

547 except Exception: 

548 self.log.exception("Error in message handler") 

549 

550 _message_counter = Any( 

551 help="""Monotonic counter of messages 

552 """, 

553 ) 

554 

555 @default("_message_counter") 

556 def _message_counter_default(self): 

557 return itertools.count() 

558 

559 def schedule_dispatch(self, dispatch, *args): 

560 """schedule a message for dispatch""" 

561 idx = next(self._message_counter) 

562 

563 self.msg_queue.put_nowait( 

564 ( 

565 idx, 

566 dispatch, 

567 args, 

568 ) 

569 ) 

570 # ensure the eventloop wakes up 

571 self.io_loop.add_callback(lambda: None) 

572 

573 def start(self): 

574 """register dispatchers for streams""" 

575 self.io_loop = ioloop.IOLoop.current() 

576 self.msg_queue: Queue[t.Any] = Queue() 

577 if not self.shell_channel_thread: 

578 self.io_loop.add_callback(self.dispatch_queue) 

579 

580 if self.control_stream: 

581 self.control_stream.on_recv(self.dispatch_control, copy=False) 

582 

583 if self.shell_stream: 

584 if self.shell_channel_thread: 

585 self.shell_channel_thread.manager.set_on_recv_callback(self.shell_main) 

586 self.shell_stream.on_recv(self.shell_channel_thread_main, copy=False) 

587 else: 

588 self.shell_stream.on_recv( 

589 partial( 

590 self.schedule_dispatch, 

591 self.dispatch_shell, 

592 ), 

593 copy=False, 

594 ) 

595 

596 # publish idle status 

597 self._publish_status("starting", "shell") 

598 

599 async def shell_channel_thread_main(self, msg): 

600 """Handler for shell messages received on shell_channel_thread""" 

601 assert threading.current_thread() == self.shell_channel_thread 

602 

603 if self.session is None: 

604 return 

605 

606 # deserialize only the header to get subshell_id 

607 # Keep original message to send to subshell_id unmodified. 

608 _, msg2 = self.session.feed_identities(msg, copy=False) 

609 try: 

610 msg3 = self.session.deserialize(msg2, content=False, copy=False) 

611 subshell_id = msg3["header"].get("subshell_id") 

612 

613 # Find inproc pair socket to use to send message to correct subshell. 

614 subshell_manager = self.shell_channel_thread.manager 

615 socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id) 

616 assert socket is not None 

617 socket.send_multipart(msg, copy=False) 

618 except Exception: 

619 self.log.error("Invalid message", exc_info=True) # noqa: G201 

620 

621 if self.shell_stream: 

622 self.shell_stream.flush() 

623 

624 async def shell_main(self, subshell_id: str | None, msg): 

625 """Handler of shell messages for a single subshell""" 

626 if self._supports_kernel_subshells: 

627 if subshell_id is None: 

628 assert threading.current_thread() == threading.main_thread() 

629 else: 

630 assert threading.current_thread() not in ( 

631 self.shell_channel_thread, 

632 threading.main_thread(), 

633 ) 

634 socket_pair = self.shell_channel_thread.manager.get_shell_channel_to_subshell_pair( 

635 subshell_id 

636 ) 

637 else: 

638 assert subshell_id is None 

639 assert threading.current_thread() == threading.main_thread() 

640 socket_pair = None 

641 

642 try: 

643 # Whilst executing a shell message, do not accept any other shell messages on the 

644 # same subshell, so that cells are run sequentially. Without this we can run multiple 

645 # async cells at the same time which would be a nice feature to have but is an API 

646 # change. 

647 if socket_pair: 

648 socket_pair.pause_on_recv() 

649 await self.dispatch_shell(msg, subshell_id=subshell_id) 

650 finally: 

651 if socket_pair: 

652 socket_pair.resume_on_recv() 

653 

654 def record_ports(self, ports): 

655 """Record the ports that this kernel is using. 

656 

657 The creator of the Kernel instance must call this methods if they 

658 want the :meth:`connect_request` method to return the port numbers. 

659 """ 

660 self._recorded_ports = ports 

661 

662 # --------------------------------------------------------------------------- 

663 # Kernel request handlers 

664 # --------------------------------------------------------------------------- 

665 

666 def _publish_execute_input(self, code, parent, execution_count): 

667 """Publish the code request on the iopub stream.""" 

668 if not self.session: 

669 return 

670 self.session.send( 

671 self.iopub_socket, 

672 "execute_input", 

673 {"code": code, "execution_count": execution_count}, 

674 parent=parent, 

675 ident=self._topic("execute_input"), 

676 ) 

677 

678 def _publish_status(self, status, channel, parent=None): 

679 """send status (busy/idle) on IOPub""" 

680 if not self.session: 

681 return 

682 self.session.send( 

683 self.iopub_socket, 

684 "status", 

685 {"execution_state": status}, 

686 parent=parent or self.get_parent(channel), 

687 ident=self._topic("status"), 

688 ) 

689 

690 def _publish_status_and_flush(self, status, channel, stream, parent=None): 

691 """send status on IOPub and flush specified stream to ensure reply is sent before handling the next reply""" 

692 self._publish_status(status, channel, parent) 

693 if stream and hasattr(stream, "flush"): 

694 stream.flush(zmq.POLLOUT) 

695 

696 def _publish_debug_event(self, event): 

697 if not self.session: 

698 return 

699 self.session.send( 

700 self.iopub_socket, 

701 "debug_event", 

702 event, 

703 parent=self.get_parent(), 

704 ident=self._topic("debug_event"), 

705 ) 

706 

707 def set_parent(self, ident, parent, channel="shell"): 

708 """Set the current parent request 

709 

710 Side effects (IOPub messages) and replies are associated with 

711 the request that caused them via the parent_header. 

712 

713 The parent identity is used to route input_request messages 

714 on the stdin channel. 

715 """ 

716 self._parent_ident[channel] = ident 

717 self._parents[channel] = parent 

718 

719 def get_parent(self, channel=None): 

720 """Get the parent request associated with a channel. 

721 

722 .. versionadded:: 6 

723 

724 Parameters 

725 ---------- 

726 channel : str 

727 the name of the channel ('shell' or 'control') 

728 

729 Returns 

730 ------- 

731 message : dict 

732 the parent message for the most recent request on the channel. 

733 """ 

734 

735 if channel is None: 

736 # If a channel is not specified, get information from current thread 

737 if threading.current_thread().name == CONTROL_THREAD_NAME: 

738 channel = "control" 

739 else: 

740 channel = "shell" 

741 

742 return self._parents.get(channel, {}) 

743 

744 def send_response( 

745 self, 

746 stream, 

747 msg_or_type, 

748 content=None, 

749 ident=None, 

750 buffers=None, 

751 track=False, 

752 header=None, 

753 metadata=None, 

754 channel=None, 

755 ): 

756 """Send a response to the message we're currently processing. 

757 

758 This accepts all the parameters of :meth:`jupyter_client.session.Session.send` 

759 except ``parent``. 

760 

761 This relies on :meth:`set_parent` having been called for the current 

762 message. 

763 """ 

764 if not self.session: 

765 return None 

766 return self.session.send( 

767 stream, 

768 msg_or_type, 

769 content, 

770 self.get_parent(channel), 

771 ident, 

772 buffers, 

773 track, 

774 header, 

775 metadata, 

776 ) 

777 

778 def init_metadata(self, parent): 

779 """Initialize metadata. 

780 

781 Run at the beginning of execution requests. 

782 """ 

783 # FIXME: `started` is part of ipyparallel 

784 # Remove for ipykernel 5.0 

785 return { 

786 "started": now(), 

787 } 

788 

789 def finish_metadata(self, parent, metadata, reply_content): 

790 """Finish populating metadata. 

791 

792 Run after completing an execution request. 

793 """ 

794 return metadata 

795 

796 async def execute_request(self, stream, ident, parent): 

797 """handle an execute_request""" 

798 if not self.session: 

799 return 

800 try: 

801 content = parent["content"] 

802 code = content["code"] 

803 silent = content.get("silent", False) 

804 store_history = content.get("store_history", not silent) 

805 user_expressions = content.get("user_expressions", {}) 

806 allow_stdin = content.get("allow_stdin", False) 

807 cell_meta = parent.get("metadata", {}) 

808 cell_id = cell_meta.get("cellId") 

809 except Exception: 

810 self.log.error("Got bad msg: ") 

811 self.log.error("%s", parent) 

812 return 

813 

814 stop_on_error = content.get("stop_on_error", True) 

815 

816 metadata = self.init_metadata(parent) 

817 

818 # Re-broadcast our input for the benefit of listening clients, and 

819 # start computing output 

820 if not silent: 

821 self.execution_count += 1 

822 self._publish_execute_input(code, parent, self.execution_count) 

823 

824 # Arguments based on the do_execute signature 

825 do_execute_args = { 

826 "code": code, 

827 "silent": silent, 

828 "store_history": store_history, 

829 "user_expressions": user_expressions, 

830 "allow_stdin": allow_stdin, 

831 } 

832 

833 if self._do_exec_accepted_params["cell_meta"]: 

834 do_execute_args["cell_meta"] = cell_meta 

835 if self._do_exec_accepted_params["cell_id"]: 

836 do_execute_args["cell_id"] = cell_id 

837 

838 subshell_id = parent["header"].get("subshell_id") 

839 

840 # Call do_execute with the appropriate arguments 

841 reply_content = self.do_execute(**do_execute_args) 

842 

843 if inspect.isawaitable(reply_content): 

844 reply_content = await reply_content 

845 

846 # Flush output before sending the reply. 

847 sys.stdout.flush() 

848 sys.stderr.flush() 

849 # FIXME: on rare occasions, the flush doesn't seem to make it to the 

850 # clients... This seems to mitigate the problem, but we definitely need 

851 # to better understand what's going on. 

852 if self._execute_sleep: 

853 time.sleep(self._execute_sleep) 

854 

855 # Send the reply. 

856 reply_content = json_clean(reply_content) 

857 metadata = self.finish_metadata(parent, metadata, reply_content) 

858 

859 reply_msg: dict[str, t.Any] = self.session.send( # type:ignore[assignment] 

860 stream, 

861 "execute_reply", 

862 reply_content, 

863 parent, 

864 metadata=metadata, 

865 ident=ident, 

866 ) 

867 

868 self.log.debug("%s", reply_msg) 

869 

870 if not silent and reply_msg["content"]["status"] == "error" and stop_on_error: 

871 subshell_id = parent["header"].get("subshell_id") 

872 self._abort_queues(subshell_id) 

873 

874 def do_execute( 

875 self, 

876 code, 

877 silent, 

878 store_history=True, 

879 user_expressions=None, 

880 allow_stdin=False, 

881 *, 

882 cell_meta=None, 

883 cell_id=None, 

884 ): 

885 """Execute user code. Must be overridden by subclasses.""" 

886 raise NotImplementedError 

887 

888 async def complete_request(self, stream, ident, parent): 

889 """Handle a completion request.""" 

890 if not self.session: 

891 return 

892 content = parent["content"] 

893 code = content["code"] 

894 cursor_pos = content["cursor_pos"] 

895 

896 matches = self.do_complete(code, cursor_pos) 

897 if inspect.isawaitable(matches): 

898 matches = await matches 

899 

900 matches = json_clean(matches) 

901 self.session.send(stream, "complete_reply", matches, parent, ident) 

902 

903 def do_complete(self, code, cursor_pos): 

904 """Override in subclasses to find completions.""" 

905 return { 

906 "matches": [], 

907 "cursor_end": cursor_pos, 

908 "cursor_start": cursor_pos, 

909 "metadata": {}, 

910 "status": "ok", 

911 } 

912 

913 async def inspect_request(self, stream, ident, parent): 

914 """Handle an inspect request.""" 

915 if not self.session: 

916 return 

917 content = parent["content"] 

918 

919 reply_content = self.do_inspect( 

920 content["code"], 

921 content["cursor_pos"], 

922 content.get("detail_level", 0), 

923 set(content.get("omit_sections", [])), 

924 ) 

925 if inspect.isawaitable(reply_content): 

926 reply_content = await reply_content 

927 

928 # Before we send this object over, we scrub it for JSON usage 

929 reply_content = json_clean(reply_content) 

930 msg = self.session.send(stream, "inspect_reply", reply_content, parent, ident) 

931 self.log.debug("%s", msg) 

932 

933 def do_inspect(self, code, cursor_pos, detail_level=0, omit_sections=()): 

934 """Override in subclasses to allow introspection.""" 

935 return {"status": "ok", "data": {}, "metadata": {}, "found": False} 

936 

937 async def history_request(self, stream, ident, parent): 

938 """Handle a history request.""" 

939 if not self.session: 

940 return 

941 content = parent["content"] 

942 

943 reply_content = self.do_history(**content) 

944 if inspect.isawaitable(reply_content): 

945 reply_content = await reply_content 

946 

947 reply_content = json_clean(reply_content) 

948 msg = self.session.send(stream, "history_reply", reply_content, parent, ident) 

949 self.log.debug("%s", msg) 

950 

951 def do_history( 

952 self, 

953 hist_access_type, 

954 output, 

955 raw, 

956 session=None, 

957 start=None, 

958 stop=None, 

959 n=None, 

960 pattern=None, 

961 unique=False, 

962 ): 

963 """Override in subclasses to access history.""" 

964 return {"status": "ok", "history": []} 

965 

966 async def connect_request(self, stream, ident, parent): 

967 """Handle a connect request.""" 

968 if not self.session: 

969 return 

970 content = self._recorded_ports.copy() if self._recorded_ports else {} 

971 content["status"] = "ok" 

972 msg = self.session.send(stream, "connect_reply", content, parent, ident) 

973 self.log.debug("%s", msg) 

974 

975 @property 

976 def kernel_info(self): 

977 info = { 

978 "protocol_version": kernel_protocol_version, 

979 "implementation": self.implementation, 

980 "implementation_version": self.implementation_version, 

981 "language_info": self.language_info, 

982 "banner": self.banner, 

983 "help_links": self.help_links, 

984 "supported_features": [], 

985 } 

986 if self._supports_kernel_subshells: 

987 info["supported_features"] = ["kernel subshells"] 

988 return info 

989 

990 async def kernel_info_request(self, stream, ident, parent): 

991 """Handle a kernel info request.""" 

992 if not self.session: 

993 return 

994 content = {"status": "ok"} 

995 content.update(self.kernel_info) 

996 msg = self.session.send(stream, "kernel_info_reply", content, parent, ident) 

997 self.log.debug("%s", msg) 

998 

999 async def comm_info_request(self, stream, ident, parent): 

1000 """Handle a comm info request.""" 

1001 if not self.session: 

1002 return 

1003 content = parent["content"] 

1004 target_name = content.get("target_name", None) 

1005 

1006 # Should this be moved to ipkernel? 

1007 if hasattr(self, "comm_manager"): 

1008 comms = { 

1009 k: dict(target_name=v.target_name) 

1010 for (k, v) in self.comm_manager.comms.items() 

1011 if v.target_name == target_name or target_name is None 

1012 } 

1013 else: 

1014 comms = {} 

1015 reply_content = dict(comms=comms, status="ok") 

1016 msg = self.session.send(stream, "comm_info_reply", reply_content, parent, ident) 

1017 self.log.debug("%s", msg) 

1018 

1019 def _send_interrupt_children(self): 

1020 if os.name == "nt": 

1021 self.log.error("Interrupt message not supported on Windows") 

1022 else: 

1023 pid = os.getpid() 

1024 pgid = os.getpgid(pid) 

1025 # Prefer process-group over process 

1026 # but only if the kernel is the leader of the process group 

1027 if pgid and pgid == pid and hasattr(os, "killpg"): 

1028 try: 

1029 os.killpg(pgid, SIGINT) 

1030 except OSError: 

1031 os.kill(pid, SIGINT) 

1032 raise 

1033 else: 

1034 os.kill(pid, SIGINT) 

1035 

1036 async def interrupt_request(self, stream, ident, parent): 

1037 """Handle an interrupt request.""" 

1038 if not self.session: 

1039 return 

1040 content: dict[str, t.Any] = {"status": "ok"} 

1041 try: 

1042 self._send_interrupt_children() 

1043 except OSError as err: 

1044 import traceback 

1045 

1046 content = { 

1047 "status": "error", 

1048 "traceback": traceback.format_stack(), 

1049 "ename": str(type(err).__name__), 

1050 "evalue": str(err), 

1051 } 

1052 

1053 self.session.send(stream, "interrupt_reply", content, parent, ident=ident) 

1054 return 

1055 

1056 async def shutdown_request(self, stream, ident, parent): 

1057 """Handle a shutdown request.""" 

1058 if not self.session: 

1059 return 

1060 content = self.do_shutdown(parent["content"]["restart"]) 

1061 if inspect.isawaitable(content): 

1062 content = await content 

1063 self.session.send(stream, "shutdown_reply", content, parent, ident=ident) 

1064 # same content, but different msg_id for broadcasting on IOPub 

1065 self._shutdown_message = self.session.msg("shutdown_reply", content, parent) 

1066 

1067 await self._at_shutdown() 

1068 

1069 self.log.debug("Stopping control ioloop") 

1070 if self.control_stream: 

1071 control_io_loop = self.control_stream.io_loop 

1072 control_io_loop.add_callback(control_io_loop.stop) 

1073 

1074 self.log.debug("Stopping shell ioloop") 

1075 self.io_loop.add_callback(self.io_loop.stop) 

1076 if self.shell_stream and self.shell_stream.io_loop != self.io_loop: 

1077 shell_io_loop = self.shell_stream.io_loop 

1078 shell_io_loop.add_callback(shell_io_loop.stop) 

1079 

1080 def do_shutdown(self, restart): 

1081 """Override in subclasses to do things when the frontend shuts down the 

1082 kernel. 

1083 """ 

1084 return {"status": "ok", "restart": restart} 

1085 

1086 async def is_complete_request(self, stream, ident, parent): 

1087 """Handle an is_complete request.""" 

1088 if not self.session: 

1089 return 

1090 content = parent["content"] 

1091 code = content["code"] 

1092 

1093 reply_content = self.do_is_complete(code) 

1094 if inspect.isawaitable(reply_content): 

1095 reply_content = await reply_content 

1096 reply_content = json_clean(reply_content) 

1097 reply_msg = self.session.send(stream, "is_complete_reply", reply_content, parent, ident) 

1098 self.log.debug("%s", reply_msg) 

1099 

1100 def do_is_complete(self, code): 

1101 """Override in subclasses to find completions.""" 

1102 return {"status": "unknown"} 

1103 

1104 async def debug_request(self, stream, ident, parent): 

1105 """Handle a debug request.""" 

1106 if not self.session: 

1107 return 

1108 content = parent["content"] 

1109 reply_content = self.do_debug_request(content) 

1110 if inspect.isawaitable(reply_content): 

1111 reply_content = await reply_content 

1112 reply_content = json_clean(reply_content) 

1113 reply_msg = self.session.send(stream, "debug_reply", reply_content, parent, ident) 

1114 self.log.debug("%s", reply_msg) 

1115 

1116 def get_process_metric_value(self, process, name, attribute=None): 

1117 """Get the process metric value.""" 

1118 try: 

1119 metric_value = getattr(process, name)() 

1120 if attribute is not None: # ... a named tuple 

1121 return getattr(metric_value, attribute) 

1122 # ... or a number 

1123 return metric_value 

1124 # Avoid littering logs with stack traces 

1125 # complaining about dead processes 

1126 except BaseException: 

1127 return 0 

1128 

1129 async def usage_request(self, stream, ident, parent): 

1130 """Handle a usage request.""" 

1131 if not self.session: 

1132 return 

1133 reply_content = {"hostname": socket.gethostname(), "pid": os.getpid()} 

1134 current_process = psutil.Process() 

1135 all_processes = [current_process, *current_process.children(recursive=True)] 

1136 # Ensure 1) self.processes is updated to only current subprocesses 

1137 # and 2) we reuse processes when possible (needed for accurate CPU) 

1138 self.processes = { 

1139 process.pid: self.processes.get(process.pid, process) # type:ignore[misc,call-overload] 

1140 for process in all_processes 

1141 } 

1142 reply_content["kernel_cpu"] = sum( 

1143 [ 

1144 self.get_process_metric_value(process, "cpu_percent", None) 

1145 for process in self.processes.values() 

1146 ] 

1147 ) 

1148 mem_info_type = "pss" if hasattr(current_process.memory_full_info(), "pss") else "rss" 

1149 reply_content["kernel_memory"] = sum( 

1150 [ 

1151 self.get_process_metric_value(process, "memory_full_info", mem_info_type) 

1152 for process in self.processes.values() 

1153 ] 

1154 ) 

1155 cpu_percent = psutil.cpu_percent() 

1156 # https://psutil.readthedocs.io/en/latest/index.html?highlight=cpu#psutil.cpu_percent 

1157 # The first time cpu_percent is called it will return a meaningless 0.0 value which you are supposed to ignore. 

1158 if cpu_percent is not None and cpu_percent != 0.0: # type:ignore[redundant-expr] 

1159 reply_content["host_cpu_percent"] = cpu_percent 

1160 reply_content["cpu_count"] = psutil.cpu_count(logical=True) 

1161 reply_content["host_virtual_memory"] = dict(psutil.virtual_memory()._asdict()) 

1162 reply_msg = self.session.send(stream, "usage_reply", reply_content, parent, ident) 

1163 self.log.debug("%s", reply_msg) 

1164 

1165 async def do_debug_request(self, msg): 

1166 raise NotImplementedError 

1167 

1168 async def create_subshell_request(self, socket, ident, parent) -> None: 

1169 if not self.session: 

1170 return 

1171 if not self._supports_kernel_subshells: 

1172 self.log.error("Subshells are not supported by this kernel") 

1173 return 

1174 

1175 assert threading.current_thread().name == CONTROL_THREAD_NAME 

1176 

1177 # This should only be called in the control thread if it exists. 

1178 # Request is passed to shell channel thread to process. 

1179 control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket 

1180 control_socket.send_json({"type": "create"}) 

1181 reply = control_socket.recv_json() 

1182 self.session.send(socket, "create_subshell_reply", reply, parent, ident) 

1183 

1184 async def delete_subshell_request(self, socket, ident, parent) -> None: 

1185 if not self.session: 

1186 return 

1187 if not self._supports_kernel_subshells: 

1188 self.log.error("KERNEL SUBSHELLS NOT SUPPORTED") 

1189 return 

1190 

1191 assert threading.current_thread().name == CONTROL_THREAD_NAME 

1192 

1193 try: 

1194 content = parent["content"] 

1195 subshell_id = content["subshell_id"] 

1196 except Exception: 

1197 self.log.error("Got bad msg from parent: %s", parent) 

1198 return 

1199 

1200 # This should only be called in the control thread if it exists. 

1201 # Request is passed to shell channel thread to process. 

1202 control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket 

1203 control_socket.send_json({"type": "delete", "subshell_id": subshell_id}) 

1204 reply = control_socket.recv_json() 

1205 

1206 self.session.send(socket, "delete_subshell_reply", reply, parent, ident) 

1207 

1208 async def list_subshell_request(self, socket, ident, parent) -> None: 

1209 if not self.session: 

1210 return 

1211 if not self._supports_kernel_subshells: 

1212 self.log.error("Subshells are not supported by this kernel") 

1213 return 

1214 

1215 assert threading.current_thread().name == CONTROL_THREAD_NAME 

1216 

1217 # This should only be called in the control thread if it exists. 

1218 # Request is passed to shell channel thread to process. 

1219 control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket 

1220 control_socket.send_json({"type": "list"}) 

1221 reply = control_socket.recv_json() 

1222 

1223 self.session.send(socket, "list_subshell_reply", reply, parent, ident) 

1224 

1225 # --------------------------------------------------------------------------- 

1226 # Engine methods (DEPRECATED) 

1227 # --------------------------------------------------------------------------- 

1228 

1229 async def apply_request(self, stream, ident, parent): # pragma: no cover 

1230 """Handle an apply request.""" 

1231 self.log.warning("apply_request is deprecated in kernel_base, moving to ipyparallel.") 

1232 try: 

1233 content = parent["content"] 

1234 bufs = parent["buffers"] 

1235 msg_id = parent["header"]["msg_id"] 

1236 except Exception: 

1237 self.log.error("Got bad msg: %s", parent, exc_info=True) # noqa: G201 

1238 return 

1239 

1240 md = self.init_metadata(parent) 

1241 

1242 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md) 

1243 

1244 # flush i/o 

1245 sys.stdout.flush() 

1246 sys.stderr.flush() 

1247 

1248 md = self.finish_metadata(parent, md, reply_content) 

1249 if not self.session: 

1250 return 

1251 self.session.send( 

1252 stream, 

1253 "apply_reply", 

1254 reply_content, 

1255 parent=parent, 

1256 ident=ident, 

1257 buffers=result_buf, 

1258 metadata=md, 

1259 ) 

1260 

1261 def do_apply(self, content, bufs, msg_id, reply_metadata): 

1262 """DEPRECATED""" 

1263 raise NotImplementedError 

1264 

1265 # --------------------------------------------------------------------------- 

1266 # Control messages (DEPRECATED) 

1267 # --------------------------------------------------------------------------- 

1268 

1269 async def abort_request(self, stream, ident, parent): # pragma: no cover 

1270 """abort a specific msg by id""" 

1271 self.log.warning( 

1272 "abort_request is deprecated in kernel_base. It is only part of IPython parallel" 

1273 ) 

1274 msg_ids = parent["content"].get("msg_ids", None) 

1275 if isinstance(msg_ids, str): 

1276 msg_ids = [msg_ids] 

1277 if not msg_ids: 

1278 subshell_id = parent["header"].get("subshell_id") 

1279 self._abort_queues(subshell_id) 

1280 

1281 for mid in msg_ids: 

1282 self.aborted.add(str(mid)) 

1283 

1284 content = dict(status="ok") 

1285 if not self.session: 

1286 return 

1287 reply_msg = self.session.send( 

1288 stream, "abort_reply", content=content, parent=parent, ident=ident 

1289 ) 

1290 self.log.debug("%s", reply_msg) 

1291 

1292 async def clear_request(self, stream, idents, parent): # pragma: no cover 

1293 """Clear our namespace.""" 

1294 self.log.warning( 

1295 "clear_request is deprecated in kernel_base. It is only part of IPython parallel" 

1296 ) 

1297 content = self.do_clear() 

1298 if self.session: 

1299 self.session.send(stream, "clear_reply", ident=idents, parent=parent, content=content) 

1300 

1301 def do_clear(self): 

1302 """DEPRECATED since 4.0.3""" 

1303 raise NotImplementedError 

1304 

1305 # --------------------------------------------------------------------------- 

1306 # Protected interface 

1307 # --------------------------------------------------------------------------- 

1308 

1309 def _topic(self, topic): 

1310 """prefixed topic for IOPub messages""" 

1311 base = "kernel.%s" % self.ident 

1312 

1313 return (f"{base}.{topic}").encode() 

1314 

1315 _aborting = Bool(False) 

1316 

1317 def _post_dummy_stop_aborting_message(self, subshell_id: str | None) -> None: 

1318 """Post a dummy message to the correct subshell that when handled will unset 

1319 the _aborting flag. 

1320 """ 

1321 subshell_manager = self.shell_channel_thread.manager 

1322 socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id) 

1323 assert socket is not None 

1324 

1325 msg = b"stop aborting" # Magic string for dummy message. 

1326 socket.send(msg, copy=False) 

1327 

1328 def _abort_queues(self, subshell_id: str | None = None): 

1329 # while this flag is true, 

1330 # execute requests will be aborted 

1331 

1332 if subshell_id is None: 

1333 self._aborting = True 

1334 else: 

1335 self.shell_channel_thread.manager.set_subshell_aborting(subshell_id, True) 

1336 self.log.info("Aborting queue") 

1337 

1338 if self.shell_channel_thread: 

1339 # Only really need to do this if there are messages already queued 

1340 self.shell_channel_thread.io_loop.add_callback( 

1341 self._post_dummy_stop_aborting_message, subshell_id 

1342 ) 

1343 return 

1344 

1345 # flush streams, so all currently waiting messages 

1346 # are added to the queue 

1347 if self.shell_stream: 

1348 self.shell_stream.flush() 

1349 

1350 # Callback to signal that we are done aborting 

1351 # dispatch functions _must_ be async 

1352 async def stop_aborting(): 

1353 self.log.info("Finishing abort") 

1354 self._aborting = False 

1355 

1356 # put the stop-aborting event on the message queue 

1357 # so that all messages already waiting in the queue are aborted 

1358 # before we reset the flag 

1359 schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting) 

1360 

1361 if self.stop_on_error_timeout: 

1362 # if we have a delay, give messages this long to arrive on the queue 

1363 # before we stop aborting requests 

1364 self.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting) 

1365 # If we have an eventloop, it may interfere with the call_later above. 

1366 # If the loop has a _schedule_exit method, we call that so the loop exits 

1367 # after stop_on_error_timeout, returning to the main io_loop and letting 

1368 # the call_later fire. 

1369 if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"): 

1370 self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01) 

1371 else: 

1372 schedule_stop_aborting() 

1373 

1374 def _send_abort_reply(self, stream, msg, idents): 

1375 """Send a reply to an aborted request""" 

1376 if not self.session: 

1377 return 

1378 self.log.info("Aborting %s: %s", msg["header"]["msg_id"], msg["header"]["msg_type"]) 

1379 reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply" 

1380 status = {"status": "aborted"} 

1381 md = self.init_metadata(msg) 

1382 md = self.finish_metadata(msg, md, status) 

1383 md.update(status) 

1384 

1385 self.session.send( 

1386 stream, 

1387 reply_type, 

1388 metadata=md, 

1389 content=status, 

1390 parent=msg, 

1391 ident=idents, 

1392 ) 

1393 

1394 def _no_raw_input(self): 

1395 """Raise StdinNotImplementedError if active frontend doesn't support 

1396 stdin.""" 

1397 msg = "raw_input was called, but this frontend does not support stdin." 

1398 raise StdinNotImplementedError(msg) 

1399 

1400 def getpass(self, prompt="", stream=None): 

1401 """Forward getpass to frontends 

1402 

1403 Raises 

1404 ------ 

1405 StdinNotImplementedError if active frontend doesn't support stdin. 

1406 """ 

1407 if not self._allow_stdin: 

1408 msg = "getpass was called, but this frontend does not support input requests." 

1409 raise StdinNotImplementedError(msg) 

1410 if stream is not None: 

1411 import warnings 

1412 

1413 warnings.warn( 

1414 "The `stream` parameter of `getpass.getpass` will have no effect when using ipykernel", 

1415 UserWarning, 

1416 stacklevel=2, 

1417 ) 

1418 return self._input_request( 

1419 prompt, 

1420 self._parent_ident["shell"], 

1421 self.get_parent("shell"), 

1422 password=True, 

1423 ) 

1424 

1425 def raw_input(self, prompt=""): 

1426 """Forward raw_input to frontends 

1427 

1428 Raises 

1429 ------ 

1430 StdinNotImplementedError if active frontend doesn't support stdin. 

1431 """ 

1432 if not self._allow_stdin: 

1433 msg = "raw_input was called, but this frontend does not support input requests." 

1434 raise StdinNotImplementedError(msg) 

1435 return self._input_request( 

1436 str(prompt), 

1437 self._parent_ident["shell"], 

1438 self.get_parent("shell"), 

1439 password=False, 

1440 ) 

1441 

1442 def _input_request(self, prompt, ident, parent, password=False): 

1443 # Flush output before making the request. 

1444 sys.stderr.flush() 

1445 sys.stdout.flush() 

1446 

1447 # flush the stdin socket, to purge stale replies 

1448 while True: 

1449 try: 

1450 self.stdin_socket.recv_multipart(zmq.NOBLOCK) 

1451 except zmq.ZMQError as e: 

1452 if e.errno == zmq.EAGAIN: 

1453 break 

1454 raise 

1455 

1456 # Send the input request. 

1457 assert self.session is not None 

1458 content = json_clean(dict(prompt=prompt, password=password)) 

1459 self.session.send(self.stdin_socket, "input_request", content, parent, ident=ident) 

1460 

1461 # Await a response. 

1462 while True: 

1463 try: 

1464 # Use polling with select() so KeyboardInterrupts can get 

1465 # through; doing a blocking recv() means stdin reads are 

1466 # uninterruptible on Windows. We need a timeout because 

1467 # zmq.select() is also uninterruptible, but at least this 

1468 # way reads get noticed immediately and KeyboardInterrupts 

1469 # get noticed fairly quickly by human response time standards. 

1470 rlist, _, xlist = zmq.select([self.stdin_socket], [], [self.stdin_socket], 0.01) 

1471 if rlist or xlist: 

1472 ident, reply = self.session.recv(self.stdin_socket) 

1473 if (ident, reply) != (None, None): 

1474 break 

1475 except KeyboardInterrupt: 

1476 # re-raise KeyboardInterrupt, to truncate traceback 

1477 msg = "Interrupted by user" 

1478 raise KeyboardInterrupt(msg) from None 

1479 except Exception: 

1480 self.log.warning("Invalid Message:", exc_info=True) 

1481 

1482 try: 

1483 value = reply["content"]["value"] # type:ignore[index] 

1484 except Exception: 

1485 self.log.error("Bad input_reply: %s", parent) 

1486 value = "" 

1487 if value == "\x04": 

1488 # EOF 

1489 raise EOFError 

1490 return value 

1491 

1492 def _signal_children(self, signum): 

1493 """ 

1494 Send a signal to all our children 

1495 

1496 Like `killpg`, but does not include the current process 

1497 (or possible parents). 

1498 """ 

1499 sig_rep = f"{Signals(signum)!r}" 

1500 for p in self._process_children(): 

1501 self.log.debug("Sending %s to subprocess %s", sig_rep, p) 

1502 try: 

1503 if signum == SIGTERM: 

1504 p.terminate() 

1505 elif signum == SIGKILL: 

1506 p.kill() 

1507 else: 

1508 p.send_signal(signum) 

1509 except psutil.NoSuchProcess: 

1510 pass 

1511 

1512 def _process_children(self): 

1513 """Retrieve child processes in the kernel's process group 

1514 

1515 Avoids: 

1516 - including parents and self with killpg 

1517 - including all children that may have forked-off a new group 

1518 """ 

1519 kernel_process = psutil.Process() 

1520 all_children = kernel_process.children(recursive=True) 

1521 if os.name == "nt": 

1522 return all_children 

1523 kernel_pgid = os.getpgrp() 

1524 process_group_children = [] 

1525 for child in all_children: 

1526 try: 

1527 child_pgid = os.getpgid(child.pid) 

1528 except OSError: 

1529 pass 

1530 else: 

1531 if child_pgid == kernel_pgid: 

1532 process_group_children.append(child) 

1533 return process_group_children 

1534 

1535 async def _progressively_terminate_all_children(self): 

1536 sleeps = (0.01, 0.03, 0.1, 0.3, 1, 3, 10) 

1537 if not self._process_children(): 

1538 self.log.debug("Kernel has no children.") 

1539 return 

1540 

1541 for signum in (SIGTERM, SIGKILL): 

1542 for delay in sleeps: 

1543 children = self._process_children() 

1544 if not children: 

1545 self.log.debug("No more children, continuing shutdown routine.") 

1546 return 

1547 # signals only children, not current process 

1548 self._signal_children(signum) 

1549 self.log.debug( 

1550 "Will sleep %s sec before checking for children and retrying. %s", 

1551 delay, 

1552 children, 

1553 ) 

1554 await asyncio.sleep(delay) 

1555 

1556 async def _at_shutdown(self): 

1557 """Actions taken at shutdown by the kernel, called by python's atexit.""" 

1558 try: 

1559 await self._progressively_terminate_all_children() 

1560 except Exception as e: 

1561 self.log.exception("Exception during subprocesses termination %s", e) 

1562 

1563 finally: 

1564 if self._shutdown_message is not None and self.session: 

1565 self.session.send( 

1566 self.iopub_socket, 

1567 self._shutdown_message, 

1568 ident=self._topic("shutdown"), 

1569 ) 

1570 self.log.debug("%s", self._shutdown_message) 

1571 if self.control_stream: 

1572 self.control_stream.flush(zmq.POLLOUT) 

1573 

1574 @property 

1575 def _supports_kernel_subshells(self): 

1576 return self.shell_channel_thread is not None