Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/blinker/base.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

179 statements  

1from __future__ import annotations 

2 

3import collections.abc as c 

4import sys 

5import typing as t 

6import weakref 

7from collections import defaultdict 

8from contextlib import contextmanager 

9from functools import cached_property 

10from inspect import iscoroutinefunction 

11 

12from ._utilities import make_id 

13from ._utilities import make_ref 

14from ._utilities import Symbol 

15 

16F = t.TypeVar("F", bound=c.Callable[..., t.Any]) 

17 

18ANY = Symbol("ANY") 

19"""Symbol for "any sender".""" 

20 

21ANY_ID = 0 

22 

23 

24class Signal: 

25 """A notification emitter. 

26 

27 :param doc: The docstring for the signal. 

28 """ 

29 

30 ANY = ANY 

31 """An alias for the :data:`~blinker.ANY` sender symbol.""" 

32 

33 set_class: type[set[t.Any]] = set 

34 """The set class to use for tracking connected receivers and senders. 

35 Python's ``set`` is unordered. If receivers must be dispatched in the order 

36 they were connected, an ordered set implementation can be used. 

37 

38 .. versionadded:: 1.7 

39 """ 

40 

41 @cached_property 

42 def receiver_connected(self) -> Signal: 

43 """Emitted at the end of each :meth:`connect` call. 

44 

45 The signal sender is the signal instance, and the :meth:`connect` 

46 arguments are passed through: ``receiver``, ``sender``, and ``weak``. 

47 

48 .. versionadded:: 1.2 

49 """ 

50 return Signal(doc="Emitted after a receiver connects.") 

51 

52 @cached_property 

53 def receiver_disconnected(self) -> Signal: 

54 """Emitted at the end of each :meth:`disconnect` call. 

55 

56 The sender is the signal instance, and the :meth:`disconnect` arguments 

57 are passed through: ``receiver`` and ``sender``. 

58 

59 This signal is emitted **only** when :meth:`disconnect` is called 

60 explicitly. This signal cannot be emitted by an automatic disconnect 

61 when a weakly referenced receiver or sender goes out of scope, as the 

62 instance is no longer be available to be used as the sender for this 

63 signal. 

64 

65 An alternative approach is available by subscribing to 

66 :attr:`receiver_connected` and setting up a custom weakref cleanup 

67 callback on weak receivers and senders. 

68 

69 .. versionadded:: 1.2 

70 """ 

71 return Signal(doc="Emitted after a receiver disconnects.") 

72 

73 def __init__(self, doc: str | None = None) -> None: 

74 if doc: 

75 self.__doc__ = doc 

76 

77 self.receivers: dict[ 

78 t.Any, weakref.ref[c.Callable[..., t.Any]] | c.Callable[..., t.Any] 

79 ] = {} 

80 """The map of connected receivers. Useful to quickly check if any 

81 receivers are connected to the signal: ``if s.receivers:``. The 

82 structure and data is not part of the public API, but checking its 

83 boolean value is. 

84 """ 

85 

86 self.is_muted: bool = False 

87 self._by_receiver: dict[t.Any, set[t.Any]] = defaultdict(self.set_class) 

88 self._by_sender: dict[t.Any, set[t.Any]] = defaultdict(self.set_class) 

89 self._weak_senders: dict[t.Any, weakref.ref[t.Any]] = {} 

90 

91 def connect(self, receiver: F, sender: t.Any = ANY, weak: bool = True) -> F: 

92 """Connect ``receiver`` to be called when the signal is sent by 

93 ``sender``. 

94 

95 :param receiver: The callable to call when :meth:`send` is called with 

96 the given ``sender``, passing ``sender`` as a positional argument 

97 along with any extra keyword arguments. 

98 :param sender: Any object or :data:`ANY`. ``receiver`` will only be 

99 called when :meth:`send` is called with this sender. If ``ANY``, the 

100 receiver will be called for any sender. A receiver may be connected 

101 to multiple senders by calling :meth:`connect` multiple times. 

102 :param weak: Track the receiver with a :mod:`weakref`. The receiver will 

103 be automatically disconnected when it is garbage collected. When 

104 connecting a receiver defined within a function, set to ``False``, 

105 otherwise it will be disconnected when the function scope ends. 

106 """ 

107 receiver_id = make_id(receiver) 

108 sender_id = ANY_ID if sender is ANY else make_id(sender) 

109 

110 if weak: 

111 self.receivers[receiver_id] = make_ref( 

112 receiver, self._make_cleanup_receiver(receiver_id) 

113 ) 

114 else: 

115 self.receivers[receiver_id] = receiver 

116 

117 self._by_sender[sender_id].add(receiver_id) 

118 self._by_receiver[receiver_id].add(sender_id) 

119 

120 if sender is not ANY and sender_id not in self._weak_senders: 

121 # store a cleanup for weakref-able senders 

122 try: 

123 self._weak_senders[sender_id] = make_ref( 

124 sender, self._make_cleanup_sender(sender_id) 

125 ) 

126 except TypeError: 

127 pass 

128 

129 if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers: 

130 try: 

131 self.receiver_connected.send( 

132 self, receiver=receiver, sender=sender, weak=weak 

133 ) 

134 except TypeError: 

135 # TODO no explanation or test for this 

136 self.disconnect(receiver, sender) 

137 raise 

138 

139 return receiver 

140 

141 def connect_via(self, sender: t.Any, weak: bool = False) -> c.Callable[[F], F]: 

142 """Connect the decorated function to be called when the signal is sent 

143 by ``sender``. 

144 

145 The decorated function will be called when :meth:`send` is called with 

146 the given ``sender``, passing ``sender`` as a positional argument along 

147 with any extra keyword arguments. 

148 

149 :param sender: Any object or :data:`ANY`. ``receiver`` will only be 

150 called when :meth:`send` is called with this sender. If ``ANY``, the 

151 receiver will be called for any sender. A receiver may be connected 

152 to multiple senders by calling :meth:`connect` multiple times. 

153 :param weak: Track the receiver with a :mod:`weakref`. The receiver will 

154 be automatically disconnected when it is garbage collected. When 

155 connecting a receiver defined within a function, set to ``False``, 

156 otherwise it will be disconnected when the function scope ends.= 

157 

158 .. versionadded:: 1.1 

159 """ 

160 

161 def decorator(fn: F) -> F: 

162 self.connect(fn, sender, weak) 

163 return fn 

164 

165 return decorator 

166 

167 @contextmanager 

168 def connected_to( 

169 self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY 

170 ) -> c.Generator[None, None, None]: 

171 """A context manager that temporarily connects ``receiver`` to the 

172 signal while a ``with`` block executes. When the block exits, the 

173 receiver is disconnected. Useful for tests. 

174 

175 :param receiver: The callable to call when :meth:`send` is called with 

176 the given ``sender``, passing ``sender`` as a positional argument 

177 along with any extra keyword arguments. 

178 :param sender: Any object or :data:`ANY`. ``receiver`` will only be 

179 called when :meth:`send` is called with this sender. If ``ANY``, the 

180 receiver will be called for any sender. 

181 

182 .. versionadded:: 1.1 

183 """ 

184 self.connect(receiver, sender=sender, weak=False) 

185 

186 try: 

187 yield None 

188 finally: 

189 self.disconnect(receiver) 

190 

191 @contextmanager 

192 def muted(self) -> c.Generator[None, None, None]: 

193 """A context manager that temporarily disables the signal. No receivers 

194 will be called if the signal is sent, until the ``with`` block exits. 

195 Useful for tests. 

196 """ 

197 self.is_muted = True 

198 

199 try: 

200 yield None 

201 finally: 

202 self.is_muted = False 

203 

204 def send( 

205 self, 

206 sender: t.Any | None = None, 

207 /, 

208 *, 

209 _async_wrapper: c.Callable[ 

210 [c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]], c.Callable[..., t.Any] 

211 ] 

212 | None = None, 

213 **kwargs: t.Any, 

214 ) -> list[tuple[c.Callable[..., t.Any], t.Any]]: 

215 """Call all receivers that are connected to the given ``sender`` 

216 or :data:`ANY`. Each receiver is called with ``sender`` as a positional 

217 argument along with any extra keyword arguments. Return a list of 

218 ``(receiver, return value)`` tuples. 

219 

220 The order receivers are called is undefined, but can be influenced by 

221 setting :attr:`set_class`. 

222 

223 If a receiver raises an exception, that exception will propagate up. 

224 This makes debugging straightforward, with an assumption that correctly 

225 implemented receivers will not raise. 

226 

227 :param sender: Call receivers connected to this sender, in addition to 

228 those connected to :data:`ANY`. 

229 :param _async_wrapper: Will be called on any receivers that are async 

230 coroutines to turn them into sync callables. For example, could run 

231 the receiver with an event loop. 

232 :param kwargs: Extra keyword arguments to pass to each receiver. 

233 

234 .. versionchanged:: 1.7 

235 Added the ``_async_wrapper`` argument. 

236 """ 

237 if self.is_muted: 

238 return [] 

239 

240 results = [] 

241 

242 for receiver in self.receivers_for(sender): 

243 if iscoroutinefunction(receiver): 

244 if _async_wrapper is None: 

245 raise RuntimeError("Cannot send to a coroutine function.") 

246 

247 result = _async_wrapper(receiver)(sender, **kwargs) 

248 else: 

249 result = receiver(sender, **kwargs) 

250 

251 results.append((receiver, result)) 

252 

253 return results 

254 

255 async def send_async( 

256 self, 

257 sender: t.Any | None = None, 

258 /, 

259 *, 

260 _sync_wrapper: c.Callable[ 

261 [c.Callable[..., t.Any]], c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]] 

262 ] 

263 | None = None, 

264 **kwargs: t.Any, 

265 ) -> list[tuple[c.Callable[..., t.Any], t.Any]]: 

266 """Await all receivers that are connected to the given ``sender`` 

267 or :data:`ANY`. Each receiver is called with ``sender`` as a positional 

268 argument along with any extra keyword arguments. Return a list of 

269 ``(receiver, return value)`` tuples. 

270 

271 The order receivers are called is undefined, but can be influenced by 

272 setting :attr:`set_class`. 

273 

274 If a receiver raises an exception, that exception will propagate up. 

275 This makes debugging straightforward, with an assumption that correctly 

276 implemented receivers will not raise. 

277 

278 :param sender: Call receivers connected to this sender, in addition to 

279 those connected to :data:`ANY`. 

280 :param _sync_wrapper: Will be called on any receivers that are sync 

281 callables to turn them into async coroutines. For example, 

282 could call the receiver in a thread. 

283 :param kwargs: Extra keyword arguments to pass to each receiver. 

284 

285 .. versionadded:: 1.7 

286 """ 

287 if self.is_muted: 

288 return [] 

289 

290 results = [] 

291 

292 for receiver in self.receivers_for(sender): 

293 if not iscoroutinefunction(receiver): 

294 if _sync_wrapper is None: 

295 raise RuntimeError("Cannot send to a non-coroutine function.") 

296 

297 result = await _sync_wrapper(receiver)(sender, **kwargs) 

298 else: 

299 result = await receiver(sender, **kwargs) 

300 

301 results.append((receiver, result)) 

302 

303 return results 

304 

305 def has_receivers_for(self, sender: t.Any) -> bool: 

306 """Check if there is at least one receiver that will be called with the 

307 given ``sender``. A receiver connected to :data:`ANY` will always be 

308 called, regardless of sender. Does not check if weakly referenced 

309 receivers are still live. See :meth:`receivers_for` for a stronger 

310 search. 

311 

312 :param sender: Check for receivers connected to this sender, in addition 

313 to those connected to :data:`ANY`. 

314 """ 

315 if not self.receivers: 

316 return False 

317 

318 if self._by_sender[ANY_ID]: 

319 return True 

320 

321 if sender is ANY: 

322 return False 

323 

324 return make_id(sender) in self._by_sender 

325 

326 def receivers_for( 

327 self, sender: t.Any 

328 ) -> c.Generator[c.Callable[..., t.Any], None, None]: 

329 """Yield each receiver to be called for ``sender``, in addition to those 

330 to be called for :data:`ANY`. Weakly referenced receivers that are not 

331 live will be disconnected and skipped. 

332 

333 :param sender: Yield receivers connected to this sender, in addition 

334 to those connected to :data:`ANY`. 

335 """ 

336 # TODO: test receivers_for(ANY) 

337 if not self.receivers: 

338 return 

339 

340 sender_id = make_id(sender) 

341 

342 if sender_id in self._by_sender: 

343 ids = self._by_sender[ANY_ID] | self._by_sender[sender_id] 

344 else: 

345 ids = self._by_sender[ANY_ID].copy() 

346 

347 for receiver_id in ids: 

348 receiver = self.receivers.get(receiver_id) 

349 

350 if receiver is None: 

351 continue 

352 

353 if isinstance(receiver, weakref.ref): 

354 strong = receiver() 

355 

356 if strong is None: 

357 self._disconnect(receiver_id, ANY_ID) 

358 continue 

359 

360 yield strong 

361 else: 

362 yield receiver 

363 

364 def disconnect(self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY) -> None: 

365 """Disconnect ``receiver`` from being called when the signal is sent by 

366 ``sender``. 

367 

368 :param receiver: A connected receiver callable. 

369 :param sender: Disconnect from only this sender. By default, disconnect 

370 from all senders. 

371 """ 

372 sender_id: c.Hashable 

373 

374 if sender is ANY: 

375 sender_id = ANY_ID 

376 else: 

377 sender_id = make_id(sender) 

378 

379 receiver_id = make_id(receiver) 

380 self._disconnect(receiver_id, sender_id) 

381 

382 if ( 

383 "receiver_disconnected" in self.__dict__ 

384 and self.receiver_disconnected.receivers 

385 ): 

386 self.receiver_disconnected.send(self, receiver=receiver, sender=sender) 

387 

388 def _disconnect(self, receiver_id: c.Hashable, sender_id: c.Hashable) -> None: 

389 if sender_id == ANY_ID: 

390 if self._by_receiver.pop(receiver_id, None) is not None: 

391 for bucket in self._by_sender.values(): 

392 bucket.discard(receiver_id) 

393 

394 self.receivers.pop(receiver_id, None) 

395 else: 

396 self._by_sender[sender_id].discard(receiver_id) 

397 self._by_receiver[receiver_id].discard(sender_id) 

398 

399 def _make_cleanup_receiver( 

400 self, receiver_id: c.Hashable 

401 ) -> c.Callable[[weakref.ref[c.Callable[..., t.Any]]], None]: 

402 """Create a callback function to disconnect a weakly referenced 

403 receiver when it is garbage collected. 

404 """ 

405 

406 def cleanup(ref: weakref.ref[c.Callable[..., t.Any]]) -> None: 

407 # If the interpreter is shutting down, disconnecting can result in a 

408 # weird ignored exception. Don't call it in that case. 

409 if not sys.is_finalizing(): 

410 self._disconnect(receiver_id, ANY_ID) 

411 

412 return cleanup 

413 

414 def _make_cleanup_sender( 

415 self, sender_id: c.Hashable 

416 ) -> c.Callable[[weakref.ref[t.Any]], None]: 

417 """Create a callback function to disconnect all receivers for a weakly 

418 referenced sender when it is garbage collected. 

419 """ 

420 assert sender_id != ANY_ID 

421 

422 def cleanup(ref: weakref.ref[t.Any]) -> None: 

423 self._weak_senders.pop(sender_id, None) 

424 

425 for receiver_id in self._by_sender.pop(sender_id, ()): 

426 self._by_receiver[receiver_id].discard(sender_id) 

427 

428 return cleanup 

429 

430 def _cleanup_bookkeeping(self) -> None: 

431 """Prune unused sender/receiver bookkeeping. Not threadsafe. 

432 

433 Connecting & disconnecting leaves behind a small amount of bookkeeping 

434 data. Typical workloads using Blinker, for example in most web apps, 

435 Flask, CLI scripts, etc., are not adversely affected by this 

436 bookkeeping. 

437 

438 With a long-running process performing dynamic signal routing with high 

439 volume, e.g. connecting to function closures, senders are all unique 

440 object instances. Doing all of this over and over may cause memory usage 

441 to grow due to extraneous bookkeeping. (An empty ``set`` for each stale 

442 sender/receiver pair.) 

443 

444 This method will prune that bookkeeping away, with the caveat that such 

445 pruning is not threadsafe. The risk is that cleanup of a fully 

446 disconnected receiver/sender pair occurs while another thread is 

447 connecting that same pair. If you are in the highly dynamic, unique 

448 receiver/sender situation that has lead you to this method, that failure 

449 mode is perhaps not a big deal for you. 

450 """ 

451 for mapping in (self._by_sender, self._by_receiver): 

452 for ident, bucket in list(mapping.items()): 

453 if not bucket: 

454 mapping.pop(ident, None) 

455 

456 def _clear_state(self) -> None: 

457 """Disconnect all receivers and senders. Useful for tests.""" 

458 self._weak_senders.clear() 

459 self.receivers.clear() 

460 self._by_sender.clear() 

461 self._by_receiver.clear() 

462 

463 

464class NamedSignal(Signal): 

465 """A named generic notification emitter. The name is not used by the signal 

466 itself, but matches the key in the :class:`Namespace` that it belongs to. 

467 

468 :param name: The name of the signal within the namespace. 

469 :param doc: The docstring for the signal. 

470 """ 

471 

472 def __init__(self, name: str, doc: str | None = None) -> None: 

473 super().__init__(doc) 

474 

475 #: The name of this signal. 

476 self.name: str = name 

477 

478 def __repr__(self) -> str: 

479 base = super().__repr__() 

480 return f"{base[:-1]}; {self.name!r}>" # noqa: E702 

481 

482 

483class Namespace(dict[str, NamedSignal]): 

484 """A dict mapping names to signals.""" 

485 

486 def signal(self, name: str, doc: str | None = None) -> NamedSignal: 

487 """Return the :class:`NamedSignal` for the given ``name``, creating it 

488 if required. Repeated calls with the same name return the same signal. 

489 

490 :param name: The name of the signal. 

491 :param doc: The docstring of the signal. 

492 """ 

493 if name not in self: 

494 self[name] = NamedSignal(name, doc) 

495 

496 return self[name] 

497 

498 

499class _PNamespaceSignal(t.Protocol): 

500 def __call__(self, name: str, doc: str | None = None) -> NamedSignal: ... 

501 

502 

503default_namespace: Namespace = Namespace() 

504"""A default :class:`Namespace` for creating named signals. :func:`signal` 

505creates a :class:`NamedSignal` in this namespace. 

506""" 

507 

508signal: _PNamespaceSignal = default_namespace.signal 

509"""Return a :class:`NamedSignal` in :data:`default_namespace` with the given 

510``name``, creating it if required. Repeated calls with the same name return the 

511same signal. 

512"""