Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/comm/base_comm.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Default classes for Comm and CommManager, for usage in IPython.
2"""
4# Copyright (c) IPython Development Team.
5# Distributed under the terms of the Modified BSD License.
6from __future__ import annotations
8import contextlib
9import logging
10import typing as t
11import uuid
13from traitlets.utils.importstring import import_item
15import comm
17if t.TYPE_CHECKING:
18 from zmq.eventloop.zmqstream import ZMQStream
20logger = logging.getLogger("Comm")
22MessageType = t.Dict[str, t.Any]
23MaybeDict = t.Optional[t.Dict[str, t.Any]]
24BuffersType = t.Optional[t.List[bytes]]
25CommCallback = t.Callable[[MessageType], None]
26CommTargetCallback = t.Callable[["BaseComm", MessageType], None]
29class BaseComm:
30 """Class for communicating between a Frontend and a Kernel
32 Must be subclassed with a publish_msg method implementation which
33 sends comm messages through the iopub channel.
34 """
36 def __init__(
37 self,
38 target_name: str = "comm",
39 data: MaybeDict = None,
40 metadata: MaybeDict = None,
41 buffers: BuffersType = None,
42 comm_id: str | None = None,
43 primary: bool = True,
44 target_module: str | None = None,
45 topic: bytes | None = None,
46 _open_data: MaybeDict = None,
47 _close_data: MaybeDict = None,
48 **kwargs: t.Any,
49 ) -> None:
50 super().__init__(**kwargs)
52 self.comm_id = comm_id if comm_id else uuid.uuid4().hex
53 self.primary = primary
54 self.target_name = target_name
55 self.target_module = target_module
56 self.topic = topic if topic else ("comm-%s" % self.comm_id).encode("ascii")
58 self._open_data = _open_data if _open_data else {}
59 self._close_data = _close_data if _close_data else {}
61 self._msg_callback: CommCallback | None = None
62 self._close_callback: CommCallback | None = None
64 self._closed = True
66 if self.primary:
67 # I am primary, open my peer.
68 self.open(data=data, metadata=metadata, buffers=buffers)
69 else:
70 self._closed = False
72 def publish_msg(
73 self,
74 msg_type: str, # noqa: ARG002
75 data: MaybeDict = None, # noqa: ARG002
76 metadata: MaybeDict = None, # noqa: ARG002
77 buffers: BuffersType = None, # noqa: ARG002
78 **keys: t.Any, # noqa: ARG002
79 ) -> None:
80 msg = "publish_msg Comm method is not implemented"
81 raise NotImplementedError(msg)
83 def __del__(self) -> None:
84 """trigger close on gc"""
85 with contextlib.suppress(Exception):
86 # any number of things can have gone horribly wrong
87 # when called during interpreter teardown
88 self.close(deleting=True)
90 # publishing messages
92 def open(
93 self, data: MaybeDict = None, metadata: MaybeDict = None, buffers: BuffersType = None
94 ) -> None:
95 """Open the frontend-side version of this comm"""
97 if data is None:
98 data = self._open_data
99 comm_manager = comm.get_comm_manager()
100 if comm_manager is None:
101 msg = "Comms cannot be opened without a comm_manager." # type:ignore[unreachable]
102 raise RuntimeError(msg)
104 comm_manager.register_comm(self)
105 try:
106 self.publish_msg(
107 "comm_open",
108 data=data,
109 metadata=metadata,
110 buffers=buffers,
111 target_name=self.target_name,
112 target_module=self.target_module,
113 )
114 self._closed = False
115 except Exception:
116 comm_manager.unregister_comm(self)
117 raise
119 def close(
120 self,
121 data: MaybeDict = None,
122 metadata: MaybeDict = None,
123 buffers: BuffersType = None,
124 deleting: bool = False,
125 ) -> None:
126 """Close the frontend-side version of this comm"""
127 if self._closed:
128 # only close once
129 return
130 self._closed = True
131 if data is None:
132 data = self._close_data
133 self.publish_msg(
134 "comm_close",
135 data=data,
136 metadata=metadata,
137 buffers=buffers,
138 )
139 if not deleting:
140 # If deleting, the comm can't be registered
141 comm.get_comm_manager().unregister_comm(self)
143 def send(
144 self, data: MaybeDict = None, metadata: MaybeDict = None, buffers: BuffersType = None
145 ) -> None:
146 """Send a message to the frontend-side version of this comm"""
147 self.publish_msg(
148 "comm_msg",
149 data=data,
150 metadata=metadata,
151 buffers=buffers,
152 )
154 # registering callbacks
156 def on_close(self, callback: CommCallback | None) -> None:
157 """Register a callback for comm_close
159 Will be called with the `data` of the close message.
161 Call `on_close(None)` to disable an existing callback.
162 """
163 self._close_callback = callback
165 def on_msg(self, callback: CommCallback | None) -> None:
166 """Register a callback for comm_msg
168 Will be called with the `data` of any comm_msg messages.
170 Call `on_msg(None)` to disable an existing callback.
171 """
172 self._msg_callback = callback
174 # handling of incoming messages
176 def handle_close(self, msg: MessageType) -> None:
177 """Handle a comm_close message"""
178 logger.debug("handle_close[%s](%s)", self.comm_id, msg)
179 if self._close_callback:
180 self._close_callback(msg)
182 def handle_msg(self, msg: MessageType) -> None:
183 """Handle a comm_msg message"""
184 logger.debug("handle_msg[%s](%s)", self.comm_id, msg)
185 if self._msg_callback:
186 from IPython import get_ipython
188 shell = get_ipython()
189 if shell:
190 shell.events.trigger("pre_execute")
191 self._msg_callback(msg)
192 if shell:
193 shell.events.trigger("post_execute")
196class CommManager:
197 """Default CommManager singleton implementation for Comms in the Kernel"""
199 # Public APIs
201 def __init__(self) -> None:
202 self.comms: dict[str, BaseComm] = {}
203 self.targets: dict[str, CommTargetCallback] = {}
205 def register_target(self, target_name: str, f: CommTargetCallback | str) -> None:
206 """Register a callable f for a given target name
208 f will be called with two arguments when a comm_open message is received with `target`:
210 - the Comm instance
211 - the `comm_open` message itself.
213 f can be a Python callable or an import string for one.
214 """
215 if isinstance(f, str):
216 f = import_item(f)
218 self.targets[target_name] = t.cast(CommTargetCallback, f)
220 def unregister_target(self, target_name: str, f: CommTargetCallback) -> CommTargetCallback: # noqa: ARG002
221 """Unregister a callable registered with register_target"""
222 return self.targets.pop(target_name)
224 def register_comm(self, comm: BaseComm) -> str:
225 """Register a new comm"""
226 comm_id = comm.comm_id
227 self.comms[comm_id] = comm
228 return comm_id
230 def unregister_comm(self, comm: BaseComm) -> None:
231 """Unregister a comm, and close its counterpart"""
232 # unlike get_comm, this should raise a KeyError
233 comm = self.comms.pop(comm.comm_id)
235 def get_comm(self, comm_id: str) -> BaseComm | None:
236 """Get a comm with a particular id
238 Returns the comm if found, otherwise None.
240 This will not raise an error,
241 it will log messages if the comm cannot be found.
242 """
243 try:
244 return self.comms[comm_id]
245 except KeyError:
246 logger.warning("No such comm: %s", comm_id)
247 if logger.isEnabledFor(logging.DEBUG):
248 # don't create the list of keys if debug messages aren't enabled
249 logger.debug("Current comms: %s", list(self.comms.keys()))
250 return None
252 # Message handlers
254 def comm_open(self, stream: ZMQStream, ident: str, msg: MessageType) -> None: # noqa: ARG002
255 """Handler for comm_open messages"""
256 from comm import create_comm
258 content = msg["content"]
259 comm_id = content["comm_id"]
260 target_name = content["target_name"]
261 f = self.targets.get(target_name, None)
262 comm = create_comm(
263 comm_id=comm_id,
264 primary=False,
265 target_name=target_name,
266 )
267 self.register_comm(comm)
268 if f is None:
269 logger.error("No such comm target registered: %s", target_name)
270 else:
271 try:
272 f(comm, msg)
273 return
274 except Exception:
275 logger.error("Exception opening comm with target: %s", target_name, exc_info=True)
277 # Failure.
278 try:
279 comm.close()
280 except Exception:
281 logger.error(
282 """Could not close comm during `comm_open` failure
283 clean-up. The comm may not have been opened yet.""",
284 exc_info=True,
285 )
287 def comm_msg(self, stream: ZMQStream, ident: str, msg: MessageType) -> None: # noqa: ARG002
288 """Handler for comm_msg messages"""
289 content = msg["content"]
290 comm_id = content["comm_id"]
291 comm = self.get_comm(comm_id)
292 if comm is None:
293 return
295 try:
296 comm.handle_msg(msg)
297 except Exception:
298 logger.error("Exception in comm_msg for %s", comm_id, exc_info=True)
300 def comm_close(self, stream: ZMQStream, ident: str, msg: MessageType) -> None: # noqa: ARG002
301 """Handler for comm_close messages"""
302 content = msg["content"]
303 comm_id = content["comm_id"]
304 comm = self.get_comm(comm_id)
305 if comm is None:
306 return
308 self.comms[comm_id]._closed = True
309 del self.comms[comm_id]
311 try:
312 comm.handle_close(msg)
313 except Exception:
314 logger.error("Exception in comm_close for %s", comm_id, exc_info=True)
317__all__ = ["CommManager", "BaseComm"]