Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/blinker/base.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import collections.abc as c
4import sys
5import typing as t
6import weakref
7from collections import defaultdict
8from contextlib import contextmanager
9from functools import cached_property
10from inspect import iscoroutinefunction
12from ._utilities import make_id
13from ._utilities import make_ref
14from ._utilities import Symbol
16F = t.TypeVar("F", bound=c.Callable[..., t.Any])
18ANY = Symbol("ANY")
19"""Symbol for "any sender"."""
21ANY_ID = 0
24class Signal:
25 """A notification emitter.
27 :param doc: The docstring for the signal.
28 """
30 ANY = ANY
31 """An alias for the :data:`~blinker.ANY` sender symbol."""
33 set_class: type[set[t.Any]] = set
34 """The set class to use for tracking connected receivers and senders.
35 Python's ``set`` is unordered. If receivers must be dispatched in the order
36 they were connected, an ordered set implementation can be used.
38 .. versionadded:: 1.7
39 """
41 @cached_property
42 def receiver_connected(self) -> Signal:
43 """Emitted at the end of each :meth:`connect` call.
45 The signal sender is the signal instance, and the :meth:`connect`
46 arguments are passed through: ``receiver``, ``sender``, and ``weak``.
48 .. versionadded:: 1.2
49 """
50 return Signal(doc="Emitted after a receiver connects.")
52 @cached_property
53 def receiver_disconnected(self) -> Signal:
54 """Emitted at the end of each :meth:`disconnect` call.
56 The sender is the signal instance, and the :meth:`disconnect` arguments
57 are passed through: ``receiver`` and ``sender``.
59 This signal is emitted **only** when :meth:`disconnect` is called
60 explicitly. This signal cannot be emitted by an automatic disconnect
61 when a weakly referenced receiver or sender goes out of scope, as the
62 instance is no longer be available to be used as the sender for this
63 signal.
65 An alternative approach is available by subscribing to
66 :attr:`receiver_connected` and setting up a custom weakref cleanup
67 callback on weak receivers and senders.
69 .. versionadded:: 1.2
70 """
71 return Signal(doc="Emitted after a receiver disconnects.")
73 def __init__(self, doc: str | None = None) -> None:
74 if doc:
75 self.__doc__ = doc
77 self.receivers: dict[
78 t.Any, weakref.ref[c.Callable[..., t.Any]] | c.Callable[..., t.Any]
79 ] = {}
80 """The map of connected receivers. Useful to quickly check if any
81 receivers are connected to the signal: ``if s.receivers:``. The
82 structure and data is not part of the public API, but checking its
83 boolean value is.
84 """
86 self.is_muted: bool = False
87 self._by_receiver: dict[t.Any, set[t.Any]] = defaultdict(self.set_class)
88 self._by_sender: dict[t.Any, set[t.Any]] = defaultdict(self.set_class)
89 self._weak_senders: dict[t.Any, weakref.ref[t.Any]] = {}
91 def connect(self, receiver: F, sender: t.Any = ANY, weak: bool = True) -> F:
92 """Connect ``receiver`` to be called when the signal is sent by
93 ``sender``.
95 :param receiver: The callable to call when :meth:`send` is called with
96 the given ``sender``, passing ``sender`` as a positional argument
97 along with any extra keyword arguments.
98 :param sender: Any object or :data:`ANY`. ``receiver`` will only be
99 called when :meth:`send` is called with this sender. If ``ANY``, the
100 receiver will be called for any sender. A receiver may be connected
101 to multiple senders by calling :meth:`connect` multiple times.
102 :param weak: Track the receiver with a :mod:`weakref`. The receiver will
103 be automatically disconnected when it is garbage collected. When
104 connecting a receiver defined within a function, set to ``False``,
105 otherwise it will be disconnected when the function scope ends.
106 """
107 receiver_id = make_id(receiver)
108 sender_id = ANY_ID if sender is ANY else make_id(sender)
110 if weak:
111 self.receivers[receiver_id] = make_ref(
112 receiver, self._make_cleanup_receiver(receiver_id)
113 )
114 else:
115 self.receivers[receiver_id] = receiver
117 self._by_sender[sender_id].add(receiver_id)
118 self._by_receiver[receiver_id].add(sender_id)
120 if sender is not ANY and sender_id not in self._weak_senders:
121 # store a cleanup for weakref-able senders
122 try:
123 self._weak_senders[sender_id] = make_ref(
124 sender, self._make_cleanup_sender(sender_id)
125 )
126 except TypeError:
127 pass
129 if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers:
130 try:
131 self.receiver_connected.send(
132 self, receiver=receiver, sender=sender, weak=weak
133 )
134 except TypeError:
135 # TODO no explanation or test for this
136 self.disconnect(receiver, sender)
137 raise
139 return receiver
141 def connect_via(self, sender: t.Any, weak: bool = False) -> c.Callable[[F], F]:
142 """Connect the decorated function to be called when the signal is sent
143 by ``sender``.
145 The decorated function will be called when :meth:`send` is called with
146 the given ``sender``, passing ``sender`` as a positional argument along
147 with any extra keyword arguments.
149 :param sender: Any object or :data:`ANY`. ``receiver`` will only be
150 called when :meth:`send` is called with this sender. If ``ANY``, the
151 receiver will be called for any sender. A receiver may be connected
152 to multiple senders by calling :meth:`connect` multiple times.
153 :param weak: Track the receiver with a :mod:`weakref`. The receiver will
154 be automatically disconnected when it is garbage collected. When
155 connecting a receiver defined within a function, set to ``False``,
156 otherwise it will be disconnected when the function scope ends.=
158 .. versionadded:: 1.1
159 """
161 def decorator(fn: F) -> F:
162 self.connect(fn, sender, weak)
163 return fn
165 return decorator
167 @contextmanager
168 def connected_to(
169 self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY
170 ) -> c.Generator[None, None, None]:
171 """A context manager that temporarily connects ``receiver`` to the
172 signal while a ``with`` block executes. When the block exits, the
173 receiver is disconnected. Useful for tests.
175 :param receiver: The callable to call when :meth:`send` is called with
176 the given ``sender``, passing ``sender`` as a positional argument
177 along with any extra keyword arguments.
178 :param sender: Any object or :data:`ANY`. ``receiver`` will only be
179 called when :meth:`send` is called with this sender. If ``ANY``, the
180 receiver will be called for any sender.
182 .. versionadded:: 1.1
183 """
184 self.connect(receiver, sender=sender, weak=False)
186 try:
187 yield None
188 finally:
189 self.disconnect(receiver)
191 @contextmanager
192 def muted(self) -> c.Generator[None, None, None]:
193 """A context manager that temporarily disables the signal. No receivers
194 will be called if the signal is sent, until the ``with`` block exits.
195 Useful for tests.
196 """
197 self.is_muted = True
199 try:
200 yield None
201 finally:
202 self.is_muted = False
204 def send(
205 self,
206 sender: t.Any | None = None,
207 /,
208 *,
209 _async_wrapper: c.Callable[
210 [c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]], c.Callable[..., t.Any]
211 ]
212 | None = None,
213 **kwargs: t.Any,
214 ) -> list[tuple[c.Callable[..., t.Any], t.Any]]:
215 """Call all receivers that are connected to the given ``sender``
216 or :data:`ANY`. Each receiver is called with ``sender`` as a positional
217 argument along with any extra keyword arguments. Return a list of
218 ``(receiver, return value)`` tuples.
220 The order receivers are called is undefined, but can be influenced by
221 setting :attr:`set_class`.
223 If a receiver raises an exception, that exception will propagate up.
224 This makes debugging straightforward, with an assumption that correctly
225 implemented receivers will not raise.
227 :param sender: Call receivers connected to this sender, in addition to
228 those connected to :data:`ANY`.
229 :param _async_wrapper: Will be called on any receivers that are async
230 coroutines to turn them into sync callables. For example, could run
231 the receiver with an event loop.
232 :param kwargs: Extra keyword arguments to pass to each receiver.
234 .. versionchanged:: 1.7
235 Added the ``_async_wrapper`` argument.
236 """
237 if self.is_muted:
238 return []
240 results = []
242 for receiver in self.receivers_for(sender):
243 if iscoroutinefunction(receiver):
244 if _async_wrapper is None:
245 raise RuntimeError("Cannot send to a coroutine function.")
247 result = _async_wrapper(receiver)(sender, **kwargs)
248 else:
249 result = receiver(sender, **kwargs)
251 results.append((receiver, result))
253 return results
255 async def send_async(
256 self,
257 sender: t.Any | None = None,
258 /,
259 *,
260 _sync_wrapper: c.Callable[
261 [c.Callable[..., t.Any]], c.Callable[..., c.Coroutine[t.Any, t.Any, t.Any]]
262 ]
263 | None = None,
264 **kwargs: t.Any,
265 ) -> list[tuple[c.Callable[..., t.Any], t.Any]]:
266 """Await all receivers that are connected to the given ``sender``
267 or :data:`ANY`. Each receiver is called with ``sender`` as a positional
268 argument along with any extra keyword arguments. Return a list of
269 ``(receiver, return value)`` tuples.
271 The order receivers are called is undefined, but can be influenced by
272 setting :attr:`set_class`.
274 If a receiver raises an exception, that exception will propagate up.
275 This makes debugging straightforward, with an assumption that correctly
276 implemented receivers will not raise.
278 :param sender: Call receivers connected to this sender, in addition to
279 those connected to :data:`ANY`.
280 :param _sync_wrapper: Will be called on any receivers that are sync
281 callables to turn them into async coroutines. For example,
282 could call the receiver in a thread.
283 :param kwargs: Extra keyword arguments to pass to each receiver.
285 .. versionadded:: 1.7
286 """
287 if self.is_muted:
288 return []
290 results = []
292 for receiver in self.receivers_for(sender):
293 if not iscoroutinefunction(receiver):
294 if _sync_wrapper is None:
295 raise RuntimeError("Cannot send to a non-coroutine function.")
297 result = await _sync_wrapper(receiver)(sender, **kwargs)
298 else:
299 result = await receiver(sender, **kwargs)
301 results.append((receiver, result))
303 return results
305 def has_receivers_for(self, sender: t.Any) -> bool:
306 """Check if there is at least one receiver that will be called with the
307 given ``sender``. A receiver connected to :data:`ANY` will always be
308 called, regardless of sender. Does not check if weakly referenced
309 receivers are still live. See :meth:`receivers_for` for a stronger
310 search.
312 :param sender: Check for receivers connected to this sender, in addition
313 to those connected to :data:`ANY`.
314 """
315 if not self.receivers:
316 return False
318 if self._by_sender[ANY_ID]:
319 return True
321 if sender is ANY:
322 return False
324 return make_id(sender) in self._by_sender
326 def receivers_for(
327 self, sender: t.Any
328 ) -> c.Generator[c.Callable[..., t.Any], None, None]:
329 """Yield each receiver to be called for ``sender``, in addition to those
330 to be called for :data:`ANY`. Weakly referenced receivers that are not
331 live will be disconnected and skipped.
333 :param sender: Yield receivers connected to this sender, in addition
334 to those connected to :data:`ANY`.
335 """
336 # TODO: test receivers_for(ANY)
337 if not self.receivers:
338 return
340 sender_id = make_id(sender)
342 if sender_id in self._by_sender:
343 ids = self._by_sender[ANY_ID] | self._by_sender[sender_id]
344 else:
345 ids = self._by_sender[ANY_ID].copy()
347 for receiver_id in ids:
348 receiver = self.receivers.get(receiver_id)
350 if receiver is None:
351 continue
353 if isinstance(receiver, weakref.ref):
354 strong = receiver()
356 if strong is None:
357 self._disconnect(receiver_id, ANY_ID)
358 continue
360 yield strong
361 else:
362 yield receiver
364 def disconnect(self, receiver: c.Callable[..., t.Any], sender: t.Any = ANY) -> None:
365 """Disconnect ``receiver`` from being called when the signal is sent by
366 ``sender``.
368 :param receiver: A connected receiver callable.
369 :param sender: Disconnect from only this sender. By default, disconnect
370 from all senders.
371 """
372 sender_id: c.Hashable
374 if sender is ANY:
375 sender_id = ANY_ID
376 else:
377 sender_id = make_id(sender)
379 receiver_id = make_id(receiver)
380 self._disconnect(receiver_id, sender_id)
382 if (
383 "receiver_disconnected" in self.__dict__
384 and self.receiver_disconnected.receivers
385 ):
386 self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
388 def _disconnect(self, receiver_id: c.Hashable, sender_id: c.Hashable) -> None:
389 if sender_id == ANY_ID:
390 if self._by_receiver.pop(receiver_id, None) is not None:
391 for bucket in self._by_sender.values():
392 bucket.discard(receiver_id)
394 self.receivers.pop(receiver_id, None)
395 else:
396 self._by_sender[sender_id].discard(receiver_id)
397 self._by_receiver[receiver_id].discard(sender_id)
399 def _make_cleanup_receiver(
400 self, receiver_id: c.Hashable
401 ) -> c.Callable[[weakref.ref[c.Callable[..., t.Any]]], None]:
402 """Create a callback function to disconnect a weakly referenced
403 receiver when it is garbage collected.
404 """
406 def cleanup(ref: weakref.ref[c.Callable[..., t.Any]]) -> None:
407 # If the interpreter is shutting down, disconnecting can result in a
408 # weird ignored exception. Don't call it in that case.
409 if not sys.is_finalizing():
410 self._disconnect(receiver_id, ANY_ID)
412 return cleanup
414 def _make_cleanup_sender(
415 self, sender_id: c.Hashable
416 ) -> c.Callable[[weakref.ref[t.Any]], None]:
417 """Create a callback function to disconnect all receivers for a weakly
418 referenced sender when it is garbage collected.
419 """
420 assert sender_id != ANY_ID
422 def cleanup(ref: weakref.ref[t.Any]) -> None:
423 self._weak_senders.pop(sender_id, None)
425 for receiver_id in self._by_sender.pop(sender_id, ()):
426 self._by_receiver[receiver_id].discard(sender_id)
428 return cleanup
430 def _cleanup_bookkeeping(self) -> None:
431 """Prune unused sender/receiver bookkeeping. Not threadsafe.
433 Connecting & disconnecting leaves behind a small amount of bookkeeping
434 data. Typical workloads using Blinker, for example in most web apps,
435 Flask, CLI scripts, etc., are not adversely affected by this
436 bookkeeping.
438 With a long-running process performing dynamic signal routing with high
439 volume, e.g. connecting to function closures, senders are all unique
440 object instances. Doing all of this over and over may cause memory usage
441 to grow due to extraneous bookkeeping. (An empty ``set`` for each stale
442 sender/receiver pair.)
444 This method will prune that bookkeeping away, with the caveat that such
445 pruning is not threadsafe. The risk is that cleanup of a fully
446 disconnected receiver/sender pair occurs while another thread is
447 connecting that same pair. If you are in the highly dynamic, unique
448 receiver/sender situation that has lead you to this method, that failure
449 mode is perhaps not a big deal for you.
450 """
451 for mapping in (self._by_sender, self._by_receiver):
452 for ident, bucket in list(mapping.items()):
453 if not bucket:
454 mapping.pop(ident, None)
456 def _clear_state(self) -> None:
457 """Disconnect all receivers and senders. Useful for tests."""
458 self._weak_senders.clear()
459 self.receivers.clear()
460 self._by_sender.clear()
461 self._by_receiver.clear()
464class NamedSignal(Signal):
465 """A named generic notification emitter. The name is not used by the signal
466 itself, but matches the key in the :class:`Namespace` that it belongs to.
468 :param name: The name of the signal within the namespace.
469 :param doc: The docstring for the signal.
470 """
472 def __init__(self, name: str, doc: str | None = None) -> None:
473 super().__init__(doc)
475 #: The name of this signal.
476 self.name: str = name
478 def __repr__(self) -> str:
479 base = super().__repr__()
480 return f"{base[:-1]}; {self.name!r}>" # noqa: E702
483class Namespace(dict[str, NamedSignal]):
484 """A dict mapping names to signals."""
486 def signal(self, name: str, doc: str | None = None) -> NamedSignal:
487 """Return the :class:`NamedSignal` for the given ``name``, creating it
488 if required. Repeated calls with the same name return the same signal.
490 :param name: The name of the signal.
491 :param doc: The docstring of the signal.
492 """
493 if name not in self:
494 self[name] = NamedSignal(name, doc)
496 return self[name]
499class _PNamespaceSignal(t.Protocol):
500 def __call__(self, name: str, doc: str | None = None) -> NamedSignal: ...
503default_namespace: Namespace = Namespace()
504"""A default :class:`Namespace` for creating named signals. :func:`signal`
505creates a :class:`NamedSignal` in this namespace.
506"""
508signal: _PNamespaceSignal = default_namespace.signal
509"""Return a :class:`NamedSignal` in :data:`default_namespace` with the given
510``name``, creating it if required. Repeated calls with the same name return the
511same signal.
512"""