Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/comm/base_comm.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

147 statements  

1"""Default classes for Comm and CommManager, for usage in IPython. 

2""" 

3 

4# Copyright (c) IPython Development Team. 

5# Distributed under the terms of the Modified BSD License. 

6from __future__ import annotations 

7 

8import contextlib 

9import logging 

10import typing as t 

11import uuid 

12 

13from traitlets.utils.importstring import import_item 

14 

15import comm 

16 

17if t.TYPE_CHECKING: 

18 from zmq.eventloop.zmqstream import ZMQStream 

19 

20logger = logging.getLogger("Comm") 

21 

22MessageType = t.Dict[str, t.Any] 

23MaybeDict = t.Optional[t.Dict[str, t.Any]] 

24BuffersType = t.Optional[t.List[bytes]] 

25CommCallback = t.Callable[[MessageType], None] 

26CommTargetCallback = t.Callable[["BaseComm", MessageType], None] 

27 

28 

29class BaseComm: 

30 """Class for communicating between a Frontend and a Kernel 

31 

32 Must be subclassed with a publish_msg method implementation which 

33 sends comm messages through the iopub channel. 

34 """ 

35 

36 def __init__( 

37 self, 

38 target_name: str = "comm", 

39 data: MaybeDict = None, 

40 metadata: MaybeDict = None, 

41 buffers: BuffersType = None, 

42 comm_id: str | None = None, 

43 primary: bool = True, 

44 target_module: str | None = None, 

45 topic: bytes | None = None, 

46 _open_data: MaybeDict = None, 

47 _close_data: MaybeDict = None, 

48 **kwargs: t.Any, 

49 ) -> None: 

50 super().__init__(**kwargs) 

51 

52 self.comm_id = comm_id if comm_id else uuid.uuid4().hex 

53 self.primary = primary 

54 self.target_name = target_name 

55 self.target_module = target_module 

56 self.topic = topic if topic else ("comm-%s" % self.comm_id).encode("ascii") 

57 

58 self._open_data = _open_data if _open_data else {} 

59 self._close_data = _close_data if _close_data else {} 

60 

61 self._msg_callback: CommCallback | None = None 

62 self._close_callback: CommCallback | None = None 

63 

64 self._closed = True 

65 

66 if self.primary: 

67 # I am primary, open my peer. 

68 self.open(data=data, metadata=metadata, buffers=buffers) 

69 else: 

70 self._closed = False 

71 

72 def publish_msg( 

73 self, 

74 msg_type: str, # noqa: ARG002 

75 data: MaybeDict = None, # noqa: ARG002 

76 metadata: MaybeDict = None, # noqa: ARG002 

77 buffers: BuffersType = None, # noqa: ARG002 

78 **keys: t.Any, # noqa: ARG002 

79 ) -> None: 

80 msg = "publish_msg Comm method is not implemented" 

81 raise NotImplementedError(msg) 

82 

83 def __del__(self) -> None: 

84 """trigger close on gc""" 

85 with contextlib.suppress(Exception): 

86 # any number of things can have gone horribly wrong 

87 # when called during interpreter teardown 

88 self.close(deleting=True) 

89 

90 # publishing messages 

91 

92 def open( 

93 self, data: MaybeDict = None, metadata: MaybeDict = None, buffers: BuffersType = None 

94 ) -> None: 

95 """Open the frontend-side version of this comm""" 

96 

97 if data is None: 

98 data = self._open_data 

99 comm_manager = comm.get_comm_manager() 

100 if comm_manager is None: 

101 msg = "Comms cannot be opened without a comm_manager." # type:ignore[unreachable] 

102 raise RuntimeError(msg) 

103 

104 comm_manager.register_comm(self) 

105 try: 

106 self.publish_msg( 

107 "comm_open", 

108 data=data, 

109 metadata=metadata, 

110 buffers=buffers, 

111 target_name=self.target_name, 

112 target_module=self.target_module, 

113 ) 

114 self._closed = False 

115 except Exception: 

116 comm_manager.unregister_comm(self) 

117 raise 

118 

119 def close( 

120 self, 

121 data: MaybeDict = None, 

122 metadata: MaybeDict = None, 

123 buffers: BuffersType = None, 

124 deleting: bool = False, 

125 ) -> None: 

126 """Close the frontend-side version of this comm""" 

127 if self._closed: 

128 # only close once 

129 return 

130 self._closed = True 

131 if data is None: 

132 data = self._close_data 

133 self.publish_msg( 

134 "comm_close", 

135 data=data, 

136 metadata=metadata, 

137 buffers=buffers, 

138 ) 

139 if not deleting: 

140 # If deleting, the comm can't be registered 

141 comm.get_comm_manager().unregister_comm(self) 

142 

143 def send( 

144 self, data: MaybeDict = None, metadata: MaybeDict = None, buffers: BuffersType = None 

145 ) -> None: 

146 """Send a message to the frontend-side version of this comm""" 

147 self.publish_msg( 

148 "comm_msg", 

149 data=data, 

150 metadata=metadata, 

151 buffers=buffers, 

152 ) 

153 

154 # registering callbacks 

155 

156 def on_close(self, callback: CommCallback | None) -> None: 

157 """Register a callback for comm_close 

158 

159 Will be called with the `data` of the close message. 

160 

161 Call `on_close(None)` to disable an existing callback. 

162 """ 

163 self._close_callback = callback 

164 

165 def on_msg(self, callback: CommCallback | None) -> None: 

166 """Register a callback for comm_msg 

167 

168 Will be called with the `data` of any comm_msg messages. 

169 

170 Call `on_msg(None)` to disable an existing callback. 

171 """ 

172 self._msg_callback = callback 

173 

174 # handling of incoming messages 

175 

176 def handle_close(self, msg: MessageType) -> None: 

177 """Handle a comm_close message""" 

178 logger.debug("handle_close[%s](%s)", self.comm_id, msg) 

179 if self._close_callback: 

180 self._close_callback(msg) 

181 

182 def handle_msg(self, msg: MessageType) -> None: 

183 """Handle a comm_msg message""" 

184 logger.debug("handle_msg[%s](%s)", self.comm_id, msg) 

185 if self._msg_callback: 

186 from IPython import get_ipython 

187 

188 shell = get_ipython() 

189 if shell: 

190 shell.events.trigger("pre_execute") 

191 self._msg_callback(msg) 

192 if shell: 

193 shell.events.trigger("post_execute") 

194 

195 

196class CommManager: 

197 """Default CommManager singleton implementation for Comms in the Kernel""" 

198 

199 # Public APIs 

200 

201 def __init__(self) -> None: 

202 self.comms: dict[str, BaseComm] = {} 

203 self.targets: dict[str, CommTargetCallback] = {} 

204 

205 def register_target(self, target_name: str, f: CommTargetCallback | str) -> None: 

206 """Register a callable f for a given target name 

207 

208 f will be called with two arguments when a comm_open message is received with `target`: 

209 

210 - the Comm instance 

211 - the `comm_open` message itself. 

212 

213 f can be a Python callable or an import string for one. 

214 """ 

215 if isinstance(f, str): 

216 f = import_item(f) 

217 

218 self.targets[target_name] = t.cast(CommTargetCallback, f) 

219 

220 def unregister_target(self, target_name: str, f: CommTargetCallback) -> CommTargetCallback: # noqa: ARG002 

221 """Unregister a callable registered with register_target""" 

222 return self.targets.pop(target_name) 

223 

224 def register_comm(self, comm: BaseComm) -> str: 

225 """Register a new comm""" 

226 comm_id = comm.comm_id 

227 self.comms[comm_id] = comm 

228 return comm_id 

229 

230 def unregister_comm(self, comm: BaseComm) -> None: 

231 """Unregister a comm, and close its counterpart""" 

232 # unlike get_comm, this should raise a KeyError 

233 comm = self.comms.pop(comm.comm_id) 

234 

235 def get_comm(self, comm_id: str) -> BaseComm | None: 

236 """Get a comm with a particular id 

237 

238 Returns the comm if found, otherwise None. 

239 

240 This will not raise an error, 

241 it will log messages if the comm cannot be found. 

242 """ 

243 try: 

244 return self.comms[comm_id] 

245 except KeyError: 

246 logger.warning("No such comm: %s", comm_id) 

247 if logger.isEnabledFor(logging.DEBUG): 

248 # don't create the list of keys if debug messages aren't enabled 

249 logger.debug("Current comms: %s", list(self.comms.keys())) 

250 return None 

251 

252 # Message handlers 

253 

254 def comm_open(self, stream: ZMQStream, ident: str, msg: MessageType) -> None: # noqa: ARG002 

255 """Handler for comm_open messages""" 

256 from comm import create_comm 

257 

258 content = msg["content"] 

259 comm_id = content["comm_id"] 

260 target_name = content["target_name"] 

261 f = self.targets.get(target_name, None) 

262 comm = create_comm( 

263 comm_id=comm_id, 

264 primary=False, 

265 target_name=target_name, 

266 ) 

267 self.register_comm(comm) 

268 if f is None: 

269 logger.error("No such comm target registered: %s", target_name) 

270 else: 

271 try: 

272 f(comm, msg) 

273 return 

274 except Exception: 

275 logger.error("Exception opening comm with target: %s", target_name, exc_info=True) 

276 

277 # Failure. 

278 try: 

279 comm.close() 

280 except Exception: 

281 logger.error( 

282 """Could not close comm during `comm_open` failure 

283 clean-up. The comm may not have been opened yet.""", 

284 exc_info=True, 

285 ) 

286 

287 def comm_msg(self, stream: ZMQStream, ident: str, msg: MessageType) -> None: # noqa: ARG002 

288 """Handler for comm_msg messages""" 

289 content = msg["content"] 

290 comm_id = content["comm_id"] 

291 comm = self.get_comm(comm_id) 

292 if comm is None: 

293 return 

294 

295 try: 

296 comm.handle_msg(msg) 

297 except Exception: 

298 logger.error("Exception in comm_msg for %s", comm_id, exc_info=True) 

299 

300 def comm_close(self, stream: ZMQStream, ident: str, msg: MessageType) -> None: # noqa: ARG002 

301 """Handler for comm_close messages""" 

302 content = msg["content"] 

303 comm_id = content["comm_id"] 

304 comm = self.get_comm(comm_id) 

305 if comm is None: 

306 return 

307 

308 self.comms[comm_id]._closed = True 

309 del self.comms[comm_id] 

310 

311 try: 

312 comm.handle_close(msg) 

313 except Exception: 

314 logger.error("Exception in comm_close for %s", comm_id, exc_info=True) 

315 

316 

317__all__ = ["CommManager", "BaseComm"]