1import asyncio
2import logging
3import threading
4import weakref
5
6from asgiref.sync import async_to_sync, iscoroutinefunction, sync_to_async
7
8from django.utils.inspect import func_accepts_kwargs
9
10logger = logging.getLogger("django.dispatch")
11
12
13def _make_id(target):
14 if hasattr(target, "__func__"):
15 return (id(target.__self__), id(target.__func__))
16 return id(target)
17
18
19NONE_ID = _make_id(None)
20
21# A marker for caching
22NO_RECEIVERS = object()
23
24
25class Signal:
26 """
27 Base class for all signals
28
29 Internal attributes:
30
31 receivers
32 { receiverkey (id) : weakref(receiver) }
33 """
34
35 def __init__(self, use_caching=False):
36 """
37 Create a new signal.
38 """
39 self.receivers = []
40 self.lock = threading.Lock()
41 self.use_caching = use_caching
42 # For convenience we create empty caches even if they are not used.
43 # A note about caching: if use_caching is defined, then for each
44 # distinct sender we cache the receivers that sender has in
45 # 'sender_receivers_cache'. The cache is cleaned when .connect() or
46 # .disconnect() is called and populated on send().
47 self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
48 self._dead_receivers = False
49
50 def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
51 """
52 Connect receiver to sender for signal.
53
54 Arguments:
55
56 receiver
57 A function or an instance method which is to receive signals.
58 Receivers must be hashable objects. Receivers can be
59 asynchronous.
60
61 If weak is True, then receiver must be weak referenceable.
62
63 Receivers must be able to accept keyword arguments.
64
65 If a receiver is connected with a dispatch_uid argument, it
66 will not be added if another receiver was already connected
67 with that dispatch_uid.
68
69 sender
70 The sender to which the receiver should respond. Must either be
71 a Python object, or None to receive events from any sender.
72
73 weak
74 Whether to use weak references to the receiver. By default, the
75 module will attempt to use weak references to the receiver
76 objects. If this parameter is false, then strong references will
77 be used.
78
79 dispatch_uid
80 An identifier used to uniquely identify a particular instance of
81 a receiver. This will usually be a string, though it may be
82 anything hashable.
83 """
84 from django.conf import settings
85
86 # If DEBUG is on, check that we got a good receiver
87 if settings.configured and settings.DEBUG:
88 if not callable(receiver):
89 raise TypeError("Signal receivers must be callable.")
90 # Check for **kwargs
91 if not func_accepts_kwargs(receiver):
92 raise ValueError(
93 "Signal receivers must accept keyword arguments (**kwargs)."
94 )
95
96 if dispatch_uid:
97 lookup_key = (dispatch_uid, _make_id(sender))
98 else:
99 lookup_key = (_make_id(receiver), _make_id(sender))
100
101 is_async = iscoroutinefunction(receiver)
102
103 if weak:
104 ref = weakref.ref
105 receiver_object = receiver
106 # Check for bound methods
107 if hasattr(receiver, "__self__") and hasattr(receiver, "__func__"):
108 ref = weakref.WeakMethod
109 receiver_object = receiver.__self__
110 receiver = ref(receiver)
111 weakref.finalize(receiver_object, self._remove_receiver)
112
113 with self.lock:
114 self._clear_dead_receivers()
115 if not any(r_key == lookup_key for r_key, _, _ in self.receivers):
116 self.receivers.append((lookup_key, receiver, is_async))
117 self.sender_receivers_cache.clear()
118
119 def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
120 """
121 Disconnect receiver from sender for signal.
122
123 If weak references are used, disconnect need not be called. The receiver
124 will be removed from dispatch automatically.
125
126 Arguments:
127
128 receiver
129 The registered receiver to disconnect. May be none if
130 dispatch_uid is specified.
131
132 sender
133 The registered sender to disconnect
134
135 dispatch_uid
136 the unique identifier of the receiver to disconnect
137 """
138 if dispatch_uid:
139 lookup_key = (dispatch_uid, _make_id(sender))
140 else:
141 lookup_key = (_make_id(receiver), _make_id(sender))
142
143 disconnected = False
144 with self.lock:
145 self._clear_dead_receivers()
146 for index in range(len(self.receivers)):
147 r_key, *_ = self.receivers[index]
148 if r_key == lookup_key:
149 disconnected = True
150 del self.receivers[index]
151 break
152 self.sender_receivers_cache.clear()
153 return disconnected
154
155 def has_listeners(self, sender=None):
156 sync_receivers, async_receivers = self._live_receivers(sender)
157 return bool(sync_receivers) or bool(async_receivers)
158
159 def send(self, sender, **named):
160 """
161 Send signal from sender to all connected receivers.
162
163 If any receiver raises an error, the error propagates back through send,
164 terminating the dispatch loop. So it's possible that all receivers
165 won't be called if an error is raised.
166
167 If any receivers are asynchronous, they are called after all the
168 synchronous receivers via a single call to async_to_sync(). They are
169 also executed concurrently with asyncio.gather().
170
171 Arguments:
172
173 sender
174 The sender of the signal. Either a specific object or None.
175
176 named
177 Named arguments which will be passed to receivers.
178
179 Return a list of tuple pairs [(receiver, response), ... ].
180 """
181 if (
182 not self.receivers
183 or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
184 ):
185 return []
186 responses = []
187 sync_receivers, async_receivers = self._live_receivers(sender)
188 for receiver in sync_receivers:
189 response = receiver(signal=self, sender=sender, **named)
190 responses.append((receiver, response))
191 if async_receivers:
192
193 async def asend():
194 async_responses = await asyncio.gather(
195 *(
196 receiver(signal=self, sender=sender, **named)
197 for receiver in async_receivers
198 )
199 )
200 return zip(async_receivers, async_responses)
201
202 responses.extend(async_to_sync(asend)())
203 return responses
204
205 async def asend(self, sender, **named):
206 """
207 Send signal from sender to all connected receivers in async mode.
208
209 All sync receivers will be wrapped by sync_to_async()
210 If any receiver raises an error, the error propagates back through
211 send, terminating the dispatch loop. So it's possible that all
212 receivers won't be called if an error is raised.
213
214 If any receivers are synchronous, they are grouped and called behind a
215 sync_to_async() adaption before executing any asynchronous receivers.
216
217 If any receivers are asynchronous, they are grouped and executed
218 concurrently with asyncio.gather().
219
220 Arguments:
221
222 sender
223 The sender of the signal. Either a specific object or None.
224
225 named
226 Named arguments which will be passed to receivers.
227
228 Return a list of tuple pairs [(receiver, response), ...].
229 """
230 if (
231 not self.receivers
232 or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
233 ):
234 return []
235 sync_receivers, async_receivers = self._live_receivers(sender)
236 if sync_receivers:
237
238 @sync_to_async
239 def sync_send():
240 responses = []
241 for receiver in sync_receivers:
242 response = receiver(signal=self, sender=sender, **named)
243 responses.append((receiver, response))
244 return responses
245
246 else:
247
248 async def sync_send():
249 return []
250
251 responses, async_responses = await asyncio.gather(
252 sync_send(),
253 asyncio.gather(
254 *(
255 receiver(signal=self, sender=sender, **named)
256 for receiver in async_receivers
257 )
258 ),
259 )
260 responses.extend(zip(async_receivers, async_responses))
261 return responses
262
263 def _log_robust_failure(self, receiver, err):
264 logger.error(
265 "Error calling %s in Signal.send_robust() (%s)",
266 receiver.__qualname__,
267 err,
268 exc_info=err,
269 )
270
271 def send_robust(self, sender, **named):
272 """
273 Send signal from sender to all connected receivers catching errors.
274
275 If any receivers are asynchronous, they are called after all the
276 synchronous receivers via a single call to async_to_sync(). They are
277 also executed concurrently with asyncio.gather().
278
279 Arguments:
280
281 sender
282 The sender of the signal. Can be any Python object (normally one
283 registered with a connect if you actually want something to
284 occur).
285
286 named
287 Named arguments which will be passed to receivers.
288
289 Return a list of tuple pairs [(receiver, response), ... ].
290
291 If any receiver raises an error (specifically any subclass of
292 Exception), return the error instance as the result for that receiver.
293 """
294 if (
295 not self.receivers
296 or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
297 ):
298 return []
299
300 # Call each receiver with whatever arguments it can accept.
301 # Return a list of tuple pairs [(receiver, response), ... ].
302 responses = []
303 sync_receivers, async_receivers = self._live_receivers(sender)
304 for receiver in sync_receivers:
305 try:
306 response = receiver(signal=self, sender=sender, **named)
307 except Exception as err:
308 self._log_robust_failure(receiver, err)
309 responses.append((receiver, err))
310 else:
311 responses.append((receiver, response))
312 if async_receivers:
313
314 async def asend_and_wrap_exception(receiver):
315 try:
316 response = await receiver(signal=self, sender=sender, **named)
317 except Exception as err:
318 self._log_robust_failure(receiver, err)
319 return err
320 return response
321
322 async def asend():
323 async_responses = await asyncio.gather(
324 *(
325 asend_and_wrap_exception(receiver)
326 for receiver in async_receivers
327 )
328 )
329 return zip(async_receivers, async_responses)
330
331 responses.extend(async_to_sync(asend)())
332 return responses
333
334 async def asend_robust(self, sender, **named):
335 """
336 Send signal from sender to all connected receivers catching errors.
337
338 If any receivers are synchronous, they are grouped and called behind a
339 sync_to_async() adaption before executing any asynchronous receivers.
340
341 If any receivers are asynchronous, they are grouped and executed
342 concurrently with asyncio.gather.
343
344 Arguments:
345
346 sender
347 The sender of the signal. Can be any Python object (normally one
348 registered with a connect if you actually want something to
349 occur).
350
351 named
352 Named arguments which will be passed to receivers.
353
354 Return a list of tuple pairs [(receiver, response), ... ].
355
356 If any receiver raises an error (specifically any subclass of
357 Exception), return the error instance as the result for that receiver.
358 """
359 if (
360 not self.receivers
361 or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
362 ):
363 return []
364
365 # Call each receiver with whatever arguments it can accept.
366 # Return a list of tuple pairs [(receiver, response), ... ].
367 sync_receivers, async_receivers = self._live_receivers(sender)
368
369 if sync_receivers:
370
371 @sync_to_async
372 def sync_send():
373 responses = []
374 for receiver in sync_receivers:
375 try:
376 response = receiver(signal=self, sender=sender, **named)
377 except Exception as err:
378 self._log_robust_failure(receiver, err)
379 responses.append((receiver, err))
380 else:
381 responses.append((receiver, response))
382 return responses
383
384 else:
385
386 async def sync_send():
387 return []
388
389 async def asend_and_wrap_exception(receiver):
390 try:
391 response = await receiver(signal=self, sender=sender, **named)
392 except Exception as err:
393 self._log_robust_failure(receiver, err)
394 return err
395 return response
396
397 responses, async_responses = await asyncio.gather(
398 sync_send(),
399 asyncio.gather(
400 *(asend_and_wrap_exception(receiver) for receiver in async_receivers),
401 ),
402 )
403 responses.extend(zip(async_receivers, async_responses))
404 return responses
405
406 def _clear_dead_receivers(self):
407 # Note: caller is assumed to hold self.lock.
408 if self._dead_receivers:
409 self._dead_receivers = False
410 self.receivers = [
411 r
412 for r in self.receivers
413 if not (isinstance(r[1], weakref.ReferenceType) and r[1]() is None)
414 ]
415
416 def _live_receivers(self, sender):
417 """
418 Filter sequence of receivers to get resolved, live receivers.
419
420 This checks for weak references and resolves them, then returning only
421 live receivers.
422 """
423 receivers = None
424 if self.use_caching and not self._dead_receivers:
425 receivers = self.sender_receivers_cache.get(sender)
426 # We could end up here with NO_RECEIVERS even if we do check this case in
427 # .send() prior to calling _live_receivers() due to concurrent .send() call.
428 if receivers is NO_RECEIVERS:
429 return [], []
430 if receivers is None:
431 with self.lock:
432 self._clear_dead_receivers()
433 senderkey = _make_id(sender)
434 receivers = []
435 for (_receiverkey, r_senderkey), receiver, is_async in self.receivers:
436 if r_senderkey == NONE_ID or r_senderkey == senderkey:
437 receivers.append((receiver, is_async))
438 if self.use_caching:
439 if not receivers:
440 self.sender_receivers_cache[sender] = NO_RECEIVERS
441 else:
442 # Note, we must cache the weakref versions.
443 self.sender_receivers_cache[sender] = receivers
444 non_weak_sync_receivers = []
445 non_weak_async_receivers = []
446 for receiver, is_async in receivers:
447 if isinstance(receiver, weakref.ReferenceType):
448 # Dereference the weak reference.
449 receiver = receiver()
450 if receiver is not None:
451 if is_async:
452 non_weak_async_receivers.append(receiver)
453 else:
454 non_weak_sync_receivers.append(receiver)
455 else:
456 if is_async:
457 non_weak_async_receivers.append(receiver)
458 else:
459 non_weak_sync_receivers.append(receiver)
460 return non_weak_sync_receivers, non_weak_async_receivers
461
462 def _remove_receiver(self, receiver=None):
463 # Mark that the self.receivers list has dead weakrefs. If so, we will
464 # clean those up in connect, disconnect and _live_receivers while
465 # holding self.lock. Note that doing the cleanup here isn't a good
466 # idea, _remove_receiver() will be called as side effect of garbage
467 # collection, and so the call can happen while we are already holding
468 # self.lock.
469 self._dead_receivers = True
470
471
472def receiver(signal, **kwargs):
473 """
474 A decorator for connecting receivers to signals. Used by passing in the
475 signal (or list of signals) and keyword arguments to connect::
476
477 @receiver(post_save, sender=MyModel)
478 def signal_receiver(sender, **kwargs):
479 ...
480
481 @receiver([post_save, post_delete], sender=MyModel)
482 def signals_receiver(sender, **kwargs):
483 ...
484 """
485
486 def _decorator(func):
487 if isinstance(signal, (list, tuple)):
488 for s in signal:
489 s.connect(func, **kwargs)
490 else:
491 signal.connect(func, **kwargs)
492 return func
493
494 return _decorator