Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/blinker/base.py: 31%
217 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-22 06:29 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-22 06:29 +0000
1"""Signals and events.
3A small implementation of signals, inspired by a snippet of Django signal
4API client code seen in a blog post. Signals are first-class objects and
5each manages its own receivers and message emission.
7The :func:`signal` function provides singleton behavior for named signals.
9"""
10from __future__ import annotations
12import typing as t
13from collections import defaultdict
14from contextlib import contextmanager
15from inspect import iscoroutinefunction
16from warnings import warn
17from weakref import WeakValueDictionary
19from blinker._utilities import annotatable_weakref
20from blinker._utilities import hashable_identity
21from blinker._utilities import IdentityType
22from blinker._utilities import lazy_property
23from blinker._utilities import reference
24from blinker._utilities import symbol
25from blinker._utilities import WeakTypes
27if t.TYPE_CHECKING:
28 import typing_extensions as te
30 T_callable = t.TypeVar("T_callable", bound=t.Callable[..., t.Any])
32 T = t.TypeVar("T")
33 P = te.ParamSpec("P")
35 AsyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]]
36 SyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]]
38ANY = symbol("ANY")
39ANY.__doc__ = 'Token for "any sender".'
40ANY_ID = 0
42# NOTE: We need a reference to cast for use in weakref callbacks otherwise
43# t.cast may have already been set to None during finalization.
44cast = t.cast
47class Signal:
48 """A notification emitter."""
50 #: An :obj:`ANY` convenience synonym, allows ``Signal.ANY``
51 #: without an additional import.
52 ANY = ANY
54 set_class: type[set] = set
56 @lazy_property
57 def receiver_connected(self) -> Signal:
58 """Emitted after each :meth:`connect`.
60 The signal sender is the signal instance, and the :meth:`connect`
61 arguments are passed through: *receiver*, *sender*, and *weak*.
63 .. versionadded:: 1.2
65 """
66 return Signal(doc="Emitted after a receiver connects.")
68 @lazy_property
69 def receiver_disconnected(self) -> Signal:
70 """Emitted after :meth:`disconnect`.
72 The sender is the signal instance, and the :meth:`disconnect` arguments
73 are passed through: *receiver* and *sender*.
75 Note, this signal is emitted **only** when :meth:`disconnect` is
76 called explicitly.
78 The disconnect signal can not be emitted by an automatic disconnect
79 (due to a weakly referenced receiver or sender going out of scope),
80 as the receiver and/or sender instances are no longer available for
81 use at the time this signal would be emitted.
83 An alternative approach is available by subscribing to
84 :attr:`receiver_connected` and setting up a custom weakref cleanup
85 callback on weak receivers and senders.
87 .. versionadded:: 1.2
89 """
90 return Signal(doc="Emitted after a receiver disconnects.")
92 def __init__(self, doc: str | None = None) -> None:
93 """
94 :param doc: optional. If provided, will be assigned to the signal's
95 __doc__ attribute.
97 """
98 if doc:
99 self.__doc__ = doc
100 #: A mapping of connected receivers.
101 #:
102 #: The values of this mapping are not meaningful outside of the
103 #: internal :class:`Signal` implementation, however the boolean value
104 #: of the mapping is useful as an extremely efficient check to see if
105 #: any receivers are connected to the signal.
106 self.receivers: dict[IdentityType, t.Callable | annotatable_weakref] = {}
107 self.is_muted = False
108 self._by_receiver: dict[IdentityType, set[IdentityType]] = defaultdict(
109 self.set_class
110 )
111 self._by_sender: dict[IdentityType, set[IdentityType]] = defaultdict(
112 self.set_class
113 )
114 self._weak_senders: dict[IdentityType, annotatable_weakref] = {}
116 def connect(
117 self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True
118 ) -> T_callable:
119 """Connect *receiver* to signal events sent by *sender*.
121 :param receiver: A callable. Will be invoked by :meth:`send` with
122 `sender=` as a single positional argument and any ``kwargs`` that
123 were provided to a call to :meth:`send`.
125 :param sender: Any object or :obj:`ANY`, defaults to ``ANY``.
126 Restricts notifications delivered to *receiver* to only those
127 :meth:`send` emissions sent by *sender*. If ``ANY``, the receiver
128 will always be notified. A *receiver* may be connected to
129 multiple *sender* values on the same Signal through multiple calls
130 to :meth:`connect`.
132 :param weak: If true, the Signal will hold a weakref to *receiver*
133 and automatically disconnect when *receiver* goes out of scope or
134 is garbage collected. Defaults to True.
136 """
137 receiver_id = hashable_identity(receiver)
138 receiver_ref: T_callable | annotatable_weakref
140 if weak:
141 receiver_ref = reference(receiver, self._cleanup_receiver)
142 receiver_ref.receiver_id = receiver_id
143 else:
144 receiver_ref = receiver
145 sender_id: IdentityType
146 if sender is ANY:
147 sender_id = ANY_ID
148 else:
149 sender_id = hashable_identity(sender)
151 self.receivers.setdefault(receiver_id, receiver_ref)
152 self._by_sender[sender_id].add(receiver_id)
153 self._by_receiver[receiver_id].add(sender_id)
154 del receiver_ref
156 if sender is not ANY and sender_id not in self._weak_senders:
157 # wire together a cleanup for weakref-able senders
158 try:
159 sender_ref = reference(sender, self._cleanup_sender)
160 sender_ref.sender_id = sender_id
161 except TypeError:
162 pass
163 else:
164 self._weak_senders.setdefault(sender_id, sender_ref)
165 del sender_ref
167 # broadcast this connection. if receivers raise, disconnect.
168 if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers:
169 try:
170 self.receiver_connected.send(
171 self, receiver=receiver, sender=sender, weak=weak
172 )
173 except TypeError as e:
174 self.disconnect(receiver, sender)
175 raise e
176 if receiver_connected.receivers and self is not receiver_connected:
177 try:
178 receiver_connected.send(
179 self, receiver_arg=receiver, sender_arg=sender, weak_arg=weak
180 )
181 except TypeError as e:
182 self.disconnect(receiver, sender)
183 raise e
184 return receiver
186 def connect_via(
187 self, sender: t.Any, weak: bool = False
188 ) -> t.Callable[[T_callable], T_callable]:
189 """Connect the decorated function as a receiver for *sender*.
191 :param sender: Any object or :obj:`ANY`. The decorated function
192 will only receive :meth:`send` emissions sent by *sender*. If
193 ``ANY``, the receiver will always be notified. A function may be
194 decorated multiple times with differing *sender* values.
196 :param weak: If true, the Signal will hold a weakref to the
197 decorated function and automatically disconnect when *receiver*
198 goes out of scope or is garbage collected. Unlike
199 :meth:`connect`, this defaults to False.
201 The decorated function will be invoked by :meth:`send` with
202 `sender=` as a single positional argument and any ``kwargs`` that
203 were provided to the call to :meth:`send`.
206 .. versionadded:: 1.1
208 """
210 def decorator(fn: T_callable) -> T_callable:
211 self.connect(fn, sender, weak)
212 return fn
214 return decorator
216 @contextmanager
217 def connected_to(
218 self, receiver: t.Callable, sender: t.Any = ANY
219 ) -> t.Generator[None, None, None]:
220 """Execute a block with the signal temporarily connected to *receiver*.
222 :param receiver: a receiver callable
223 :param sender: optional, a sender to filter on
225 This is a context manager for use in the ``with`` statement. It can
226 be useful in unit tests. *receiver* is connected to the signal for
227 the duration of the ``with`` block, and will be disconnected
228 automatically when exiting the block:
230 .. code-block:: python
232 with on_ready.connected_to(receiver):
233 # do stuff
234 on_ready.send(123)
236 .. versionadded:: 1.1
238 """
239 self.connect(receiver, sender=sender, weak=False)
240 try:
241 yield None
242 finally:
243 self.disconnect(receiver)
245 @contextmanager
246 def muted(self) -> t.Generator[None, None, None]:
247 """Context manager for temporarily disabling signal.
248 Useful for test purposes.
249 """
250 self.is_muted = True
251 try:
252 yield None
253 except Exception as e:
254 raise e
255 finally:
256 self.is_muted = False
258 def temporarily_connected_to(
259 self, receiver: t.Callable, sender: t.Any = ANY
260 ) -> t.ContextManager[None]:
261 """An alias for :meth:`connected_to`.
263 :param receiver: a receiver callable
264 :param sender: optional, a sender to filter on
266 .. versionadded:: 0.9
268 .. versionchanged:: 1.1
269 Renamed to :meth:`connected_to`. ``temporarily_connected_to`` was
270 deprecated in 1.2 and will be removed in a subsequent version.
272 """
273 warn(
274 "temporarily_connected_to is deprecated; use connected_to instead.",
275 DeprecationWarning,
276 )
277 return self.connected_to(receiver, sender)
279 def send(
280 self,
281 *sender: t.Any,
282 _async_wrapper: AsyncWrapperType | None = None,
283 **kwargs: t.Any,
284 ) -> list[tuple[t.Callable, t.Any]]:
285 """Emit this signal on behalf of *sender*, passing on ``kwargs``.
287 Returns a list of 2-tuples, pairing receivers with their return
288 value. The ordering of receiver notification is undefined.
290 :param sender: Any object or ``None``. If omitted, synonymous
291 with ``None``. Only accepts one positional argument.
292 :param _async_wrapper: A callable that should wrap a coroutine
293 receiver and run it when called synchronously.
295 :param kwargs: Data to be sent to receivers.
296 """
297 if self.is_muted:
298 return []
300 sender = self._extract_sender(sender)
301 results = []
302 for receiver in self.receivers_for(sender):
303 if iscoroutinefunction(receiver):
304 if _async_wrapper is None:
305 raise RuntimeError("Cannot send to a coroutine function")
306 receiver = _async_wrapper(receiver)
307 result = receiver(sender, **kwargs)
308 results.append((receiver, result))
309 return results
311 async def send_async(
312 self,
313 *sender: t.Any,
314 _sync_wrapper: SyncWrapperType | None = None,
315 **kwargs: t.Any,
316 ) -> list[tuple[t.Callable, t.Any]]:
317 """Emit this signal on behalf of *sender*, passing on ``kwargs``.
319 Returns a list of 2-tuples, pairing receivers with their return
320 value. The ordering of receiver notification is undefined.
322 :param sender: Any object or ``None``. If omitted, synonymous
323 with ``None``. Only accepts one positional argument.
324 :param _sync_wrapper: A callable that should wrap a synchronous
325 receiver and run it when awaited.
327 :param kwargs: Data to be sent to receivers.
328 """
329 if self.is_muted:
330 return []
332 sender = self._extract_sender(sender)
333 results = []
334 for receiver in self.receivers_for(sender):
335 if not iscoroutinefunction(receiver):
336 if _sync_wrapper is None:
337 raise RuntimeError("Cannot send to a non-coroutine function")
338 receiver = _sync_wrapper(receiver)
339 result = await receiver(sender, **kwargs)
340 results.append((receiver, result))
341 return results
343 def _extract_sender(self, sender: t.Any) -> t.Any:
344 if not self.receivers:
345 # Ensure correct signature even on no-op sends, disable with -O
346 # for lowest possible cost.
347 if __debug__ and sender and len(sender) > 1:
348 raise TypeError(
349 f"send() accepts only one positional argument, {len(sender)} given"
350 )
351 return []
353 # Using '*sender' rather than 'sender=None' allows 'sender' to be
354 # used as a keyword argument- i.e. it's an invisible name in the
355 # function signature.
356 if len(sender) == 0:
357 sender = None
358 elif len(sender) > 1:
359 raise TypeError(
360 f"send() accepts only one positional argument, {len(sender)} given"
361 )
362 else:
363 sender = sender[0]
364 return sender
366 def has_receivers_for(self, sender: t.Any) -> bool:
367 """True if there is probably a receiver for *sender*.
369 Performs an optimistic check only. Does not guarantee that all
370 weakly referenced receivers are still alive. See
371 :meth:`receivers_for` for a stronger search.
373 """
374 if not self.receivers:
375 return False
376 if self._by_sender[ANY_ID]:
377 return True
378 if sender is ANY:
379 return False
380 return hashable_identity(sender) in self._by_sender
382 def receivers_for(
383 self, sender: t.Any
384 ) -> t.Generator[t.Callable[[t.Any], t.Any], None, None]:
385 """Iterate all live receivers listening for *sender*."""
386 # TODO: test receivers_for(ANY)
387 if self.receivers:
388 sender_id = hashable_identity(sender)
389 if sender_id in self._by_sender:
390 ids = self._by_sender[ANY_ID] | self._by_sender[sender_id]
391 else:
392 ids = self._by_sender[ANY_ID].copy()
393 for receiver_id in ids:
394 receiver = self.receivers.get(receiver_id)
395 if receiver is None:
396 continue
397 if isinstance(receiver, WeakTypes):
398 strong = receiver()
399 if strong is None:
400 self._disconnect(receiver_id, ANY_ID)
401 continue
402 receiver = strong
403 yield receiver # type: ignore[misc]
405 def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None:
406 """Disconnect *receiver* from this signal's events.
408 :param receiver: a previously :meth:`connected<connect>` callable
410 :param sender: a specific sender to disconnect from, or :obj:`ANY`
411 to disconnect from all senders. Defaults to ``ANY``.
413 """
414 sender_id: IdentityType
415 if sender is ANY:
416 sender_id = ANY_ID
417 else:
418 sender_id = hashable_identity(sender)
419 receiver_id = hashable_identity(receiver)
420 self._disconnect(receiver_id, sender_id)
422 if (
423 "receiver_disconnected" in self.__dict__
424 and self.receiver_disconnected.receivers
425 ):
426 self.receiver_disconnected.send(self, receiver=receiver, sender=sender)
428 def _disconnect(self, receiver_id: IdentityType, sender_id: IdentityType) -> None:
429 if sender_id == ANY_ID:
430 if self._by_receiver.pop(receiver_id, False):
431 for bucket in self._by_sender.values():
432 bucket.discard(receiver_id)
433 self.receivers.pop(receiver_id, None)
434 else:
435 self._by_sender[sender_id].discard(receiver_id)
436 self._by_receiver[receiver_id].discard(sender_id)
438 def _cleanup_receiver(self, receiver_ref: annotatable_weakref) -> None:
439 """Disconnect a receiver from all senders."""
440 self._disconnect(cast(IdentityType, receiver_ref.receiver_id), ANY_ID)
442 def _cleanup_sender(self, sender_ref: annotatable_weakref) -> None:
443 """Disconnect all receivers from a sender."""
444 sender_id = cast(IdentityType, sender_ref.sender_id)
445 assert sender_id != ANY_ID
446 self._weak_senders.pop(sender_id, None)
447 for receiver_id in self._by_sender.pop(sender_id, ()):
448 self._by_receiver[receiver_id].discard(sender_id)
450 def _cleanup_bookkeeping(self) -> None:
451 """Prune unused sender/receiver bookkeeping. Not threadsafe.
453 Connecting & disconnecting leave behind a small amount of bookkeeping
454 for the receiver and sender values. Typical workloads using Blinker,
455 for example in most web apps, Flask, CLI scripts, etc., are not
456 adversely affected by this bookkeeping.
458 With a long-running Python process performing dynamic signal routing
459 with high volume- e.g. connecting to function closures, "senders" are
460 all unique object instances, and doing all of this over and over- you
461 may see memory usage will grow due to extraneous bookkeeping. (An empty
462 set() for each stale sender/receiver pair.)
464 This method will prune that bookkeeping away, with the caveat that such
465 pruning is not threadsafe. The risk is that cleanup of a fully
466 disconnected receiver/sender pair occurs while another thread is
467 connecting that same pair. If you are in the highly dynamic, unique
468 receiver/sender situation that has lead you to this method, that
469 failure mode is perhaps not a big deal for you.
470 """
471 for mapping in (self._by_sender, self._by_receiver):
472 for _id, bucket in list(mapping.items()):
473 if not bucket:
474 mapping.pop(_id, None)
476 def _clear_state(self) -> None:
477 """Throw away all signal state. Useful for unit tests."""
478 self._weak_senders.clear()
479 self.receivers.clear()
480 self._by_sender.clear()
481 self._by_receiver.clear()
484receiver_connected = Signal(
485 """\
486Sent by a :class:`Signal` after a receiver connects.
488:argument: the Signal that was connected to
489:keyword receiver_arg: the connected receiver
490:keyword sender_arg: the sender to connect to
491:keyword weak_arg: true if the connection to receiver_arg is a weak reference
493.. deprecated:: 1.2
495As of 1.2, individual signals have their own private
496:attr:`~Signal.receiver_connected` and
497:attr:`~Signal.receiver_disconnected` signals with a slightly simplified
498call signature. This global signal is planned to be removed in 1.6.
500"""
501)
504class NamedSignal(Signal):
505 """A named generic notification emitter."""
507 def __init__(self, name: str, doc: str | None = None) -> None:
508 Signal.__init__(self, doc)
510 #: The name of this signal.
511 self.name = name
513 def __repr__(self) -> str:
514 base = Signal.__repr__(self)
515 return f"{base[:-1]}; {self.name!r}>" # noqa: E702
518class Namespace(dict):
519 """A mapping of signal names to signals."""
521 def signal(self, name: str, doc: str | None = None) -> NamedSignal:
522 """Return the :class:`NamedSignal` *name*, creating it if required.
524 Repeated calls to this function will return the same signal object.
526 """
527 try:
528 return self[name] # type: ignore[no-any-return]
529 except KeyError:
530 result = self.setdefault(name, NamedSignal(name, doc))
531 return result # type: ignore[no-any-return]
534class WeakNamespace(WeakValueDictionary):
535 """A weak mapping of signal names to signals.
537 Automatically cleans up unused Signals when the last reference goes out
538 of scope. This namespace implementation exists for a measure of legacy
539 compatibility with Blinker <= 1.2, and may be dropped in the future.
541 .. versionadded:: 1.3
543 """
545 def signal(self, name: str, doc: str | None = None) -> NamedSignal:
546 """Return the :class:`NamedSignal` *name*, creating it if required.
548 Repeated calls to this function will return the same signal object.
550 """
551 try:
552 return self[name] # type: ignore[no-any-return]
553 except KeyError:
554 result = self.setdefault(name, NamedSignal(name, doc))
555 return result # type: ignore[no-any-return]
558signal = Namespace().signal