Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/dispatch/dispatcher.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

193 statements  

1import asyncio 

2import logging 

3import threading 

4import weakref 

5 

6from asgiref.sync import async_to_sync, iscoroutinefunction, sync_to_async 

7 

8from django.utils.inspect import func_accepts_kwargs 

9 

10logger = logging.getLogger("django.dispatch") 

11 

12 

13def _make_id(target): 

14 if hasattr(target, "__func__"): 

15 return (id(target.__self__), id(target.__func__)) 

16 return id(target) 

17 

18 

19NONE_ID = _make_id(None) 

20 

21# A marker for caching 

22NO_RECEIVERS = object() 

23 

24 

25class Signal: 

26 """ 

27 Base class for all signals 

28 

29 Internal attributes: 

30 

31 receivers 

32 { receiverkey (id) : weakref(receiver) } 

33 """ 

34 

35 def __init__(self, use_caching=False): 

36 """ 

37 Create a new signal. 

38 """ 

39 self.receivers = [] 

40 self.lock = threading.Lock() 

41 self.use_caching = use_caching 

42 # For convenience we create empty caches even if they are not used. 

43 # A note about caching: if use_caching is defined, then for each 

44 # distinct sender we cache the receivers that sender has in 

45 # 'sender_receivers_cache'. The cache is cleaned when .connect() or 

46 # .disconnect() is called and populated on send(). 

47 self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {} 

48 self._dead_receivers = False 

49 

50 def connect(self, receiver, sender=None, weak=True, dispatch_uid=None): 

51 """ 

52 Connect receiver to sender for signal. 

53 

54 Arguments: 

55 

56 receiver 

57 A function or an instance method which is to receive signals. 

58 Receivers must be hashable objects. Receivers can be 

59 asynchronous. 

60 

61 If weak is True, then receiver must be weak referenceable. 

62 

63 Receivers must be able to accept keyword arguments. 

64 

65 If a receiver is connected with a dispatch_uid argument, it 

66 will not be added if another receiver was already connected 

67 with that dispatch_uid. 

68 

69 sender 

70 The sender to which the receiver should respond. Must either be 

71 a Python object, or None to receive events from any sender. 

72 

73 weak 

74 Whether to use weak references to the receiver. By default, the 

75 module will attempt to use weak references to the receiver 

76 objects. If this parameter is false, then strong references will 

77 be used. 

78 

79 dispatch_uid 

80 An identifier used to uniquely identify a particular instance of 

81 a receiver. This will usually be a string, though it may be 

82 anything hashable. 

83 """ 

84 from django.conf import settings 

85 

86 # If DEBUG is on, check that we got a good receiver 

87 if settings.configured and settings.DEBUG: 

88 if not callable(receiver): 

89 raise TypeError("Signal receivers must be callable.") 

90 # Check for **kwargs 

91 if not func_accepts_kwargs(receiver): 

92 raise ValueError( 

93 "Signal receivers must accept keyword arguments (**kwargs)." 

94 ) 

95 

96 if dispatch_uid: 

97 lookup_key = (dispatch_uid, _make_id(sender)) 

98 else: 

99 lookup_key = (_make_id(receiver), _make_id(sender)) 

100 

101 is_async = iscoroutinefunction(receiver) 

102 

103 if weak: 

104 ref = weakref.ref 

105 receiver_object = receiver 

106 # Check for bound methods 

107 if hasattr(receiver, "__self__") and hasattr(receiver, "__func__"): 

108 ref = weakref.WeakMethod 

109 receiver_object = receiver.__self__ 

110 receiver = ref(receiver) 

111 weakref.finalize(receiver_object, self._remove_receiver) 

112 

113 with self.lock: 

114 self._clear_dead_receivers() 

115 if not any(r_key == lookup_key for r_key, _, _ in self.receivers): 

116 self.receivers.append((lookup_key, receiver, is_async)) 

117 self.sender_receivers_cache.clear() 

118 

119 def disconnect(self, receiver=None, sender=None, dispatch_uid=None): 

120 """ 

121 Disconnect receiver from sender for signal. 

122 

123 If weak references are used, disconnect need not be called. The receiver 

124 will be removed from dispatch automatically. 

125 

126 Arguments: 

127 

128 receiver 

129 The registered receiver to disconnect. May be none if 

130 dispatch_uid is specified. 

131 

132 sender 

133 The registered sender to disconnect 

134 

135 dispatch_uid 

136 the unique identifier of the receiver to disconnect 

137 """ 

138 if dispatch_uid: 

139 lookup_key = (dispatch_uid, _make_id(sender)) 

140 else: 

141 lookup_key = (_make_id(receiver), _make_id(sender)) 

142 

143 disconnected = False 

144 with self.lock: 

145 self._clear_dead_receivers() 

146 for index in range(len(self.receivers)): 

147 r_key, *_ = self.receivers[index] 

148 if r_key == lookup_key: 

149 disconnected = True 

150 del self.receivers[index] 

151 break 

152 self.sender_receivers_cache.clear() 

153 return disconnected 

154 

155 def has_listeners(self, sender=None): 

156 sync_receivers, async_receivers = self._live_receivers(sender) 

157 return bool(sync_receivers) or bool(async_receivers) 

158 

159 def send(self, sender, **named): 

160 """ 

161 Send signal from sender to all connected receivers. 

162 

163 If any receiver raises an error, the error propagates back through send, 

164 terminating the dispatch loop. So it's possible that all receivers 

165 won't be called if an error is raised. 

166 

167 If any receivers are asynchronous, they are called after all the 

168 synchronous receivers via a single call to async_to_sync(). They are 

169 also executed concurrently with asyncio.gather(). 

170 

171 Arguments: 

172 

173 sender 

174 The sender of the signal. Either a specific object or None. 

175 

176 named 

177 Named arguments which will be passed to receivers. 

178 

179 Return a list of tuple pairs [(receiver, response), ... ]. 

180 """ 

181 if ( 

182 not self.receivers 

183 or self.sender_receivers_cache.get(sender) is NO_RECEIVERS 

184 ): 

185 return [] 

186 responses = [] 

187 sync_receivers, async_receivers = self._live_receivers(sender) 

188 for receiver in sync_receivers: 

189 response = receiver(signal=self, sender=sender, **named) 

190 responses.append((receiver, response)) 

191 if async_receivers: 

192 

193 async def asend(): 

194 async_responses = await asyncio.gather( 

195 *( 

196 receiver(signal=self, sender=sender, **named) 

197 for receiver in async_receivers 

198 ) 

199 ) 

200 return zip(async_receivers, async_responses) 

201 

202 responses.extend(async_to_sync(asend)()) 

203 return responses 

204 

205 async def asend(self, sender, **named): 

206 """ 

207 Send signal from sender to all connected receivers in async mode. 

208 

209 All sync receivers will be wrapped by sync_to_async() 

210 If any receiver raises an error, the error propagates back through 

211 send, terminating the dispatch loop. So it's possible that all 

212 receivers won't be called if an error is raised. 

213 

214 If any receivers are synchronous, they are grouped and called behind a 

215 sync_to_async() adaption before executing any asynchronous receivers. 

216 

217 If any receivers are asynchronous, they are grouped and executed 

218 concurrently with asyncio.gather(). 

219 

220 Arguments: 

221 

222 sender 

223 The sender of the signal. Either a specific object or None. 

224 

225 named 

226 Named arguments which will be passed to receivers. 

227 

228 Return a list of tuple pairs [(receiver, response), ...]. 

229 """ 

230 if ( 

231 not self.receivers 

232 or self.sender_receivers_cache.get(sender) is NO_RECEIVERS 

233 ): 

234 return [] 

235 sync_receivers, async_receivers = self._live_receivers(sender) 

236 if sync_receivers: 

237 

238 @sync_to_async 

239 def sync_send(): 

240 responses = [] 

241 for receiver in sync_receivers: 

242 response = receiver(signal=self, sender=sender, **named) 

243 responses.append((receiver, response)) 

244 return responses 

245 

246 else: 

247 

248 async def sync_send(): 

249 return [] 

250 

251 responses, async_responses = await asyncio.gather( 

252 sync_send(), 

253 asyncio.gather( 

254 *( 

255 receiver(signal=self, sender=sender, **named) 

256 for receiver in async_receivers 

257 ) 

258 ), 

259 ) 

260 responses.extend(zip(async_receivers, async_responses)) 

261 return responses 

262 

263 def _log_robust_failure(self, receiver, err): 

264 logger.error( 

265 "Error calling %s in Signal.send_robust() (%s)", 

266 receiver.__qualname__, 

267 err, 

268 exc_info=err, 

269 ) 

270 

271 def send_robust(self, sender, **named): 

272 """ 

273 Send signal from sender to all connected receivers catching errors. 

274 

275 If any receivers are asynchronous, they are called after all the 

276 synchronous receivers via a single call to async_to_sync(). They are 

277 also executed concurrently with asyncio.gather(). 

278 

279 Arguments: 

280 

281 sender 

282 The sender of the signal. Can be any Python object (normally one 

283 registered with a connect if you actually want something to 

284 occur). 

285 

286 named 

287 Named arguments which will be passed to receivers. 

288 

289 Return a list of tuple pairs [(receiver, response), ... ]. 

290 

291 If any receiver raises an error (specifically any subclass of 

292 Exception), return the error instance as the result for that receiver. 

293 """ 

294 if ( 

295 not self.receivers 

296 or self.sender_receivers_cache.get(sender) is NO_RECEIVERS 

297 ): 

298 return [] 

299 

300 # Call each receiver with whatever arguments it can accept. 

301 # Return a list of tuple pairs [(receiver, response), ... ]. 

302 responses = [] 

303 sync_receivers, async_receivers = self._live_receivers(sender) 

304 for receiver in sync_receivers: 

305 try: 

306 response = receiver(signal=self, sender=sender, **named) 

307 except Exception as err: 

308 self._log_robust_failure(receiver, err) 

309 responses.append((receiver, err)) 

310 else: 

311 responses.append((receiver, response)) 

312 if async_receivers: 

313 

314 async def asend_and_wrap_exception(receiver): 

315 try: 

316 response = await receiver(signal=self, sender=sender, **named) 

317 except Exception as err: 

318 self._log_robust_failure(receiver, err) 

319 return err 

320 return response 

321 

322 async def asend(): 

323 async_responses = await asyncio.gather( 

324 *( 

325 asend_and_wrap_exception(receiver) 

326 for receiver in async_receivers 

327 ) 

328 ) 

329 return zip(async_receivers, async_responses) 

330 

331 responses.extend(async_to_sync(asend)()) 

332 return responses 

333 

334 async def asend_robust(self, sender, **named): 

335 """ 

336 Send signal from sender to all connected receivers catching errors. 

337 

338 If any receivers are synchronous, they are grouped and called behind a 

339 sync_to_async() adaption before executing any asynchronous receivers. 

340 

341 If any receivers are asynchronous, they are grouped and executed 

342 concurrently with asyncio.gather. 

343 

344 Arguments: 

345 

346 sender 

347 The sender of the signal. Can be any Python object (normally one 

348 registered with a connect if you actually want something to 

349 occur). 

350 

351 named 

352 Named arguments which will be passed to receivers. 

353 

354 Return a list of tuple pairs [(receiver, response), ... ]. 

355 

356 If any receiver raises an error (specifically any subclass of 

357 Exception), return the error instance as the result for that receiver. 

358 """ 

359 if ( 

360 not self.receivers 

361 or self.sender_receivers_cache.get(sender) is NO_RECEIVERS 

362 ): 

363 return [] 

364 

365 # Call each receiver with whatever arguments it can accept. 

366 # Return a list of tuple pairs [(receiver, response), ... ]. 

367 sync_receivers, async_receivers = self._live_receivers(sender) 

368 

369 if sync_receivers: 

370 

371 @sync_to_async 

372 def sync_send(): 

373 responses = [] 

374 for receiver in sync_receivers: 

375 try: 

376 response = receiver(signal=self, sender=sender, **named) 

377 except Exception as err: 

378 self._log_robust_failure(receiver, err) 

379 responses.append((receiver, err)) 

380 else: 

381 responses.append((receiver, response)) 

382 return responses 

383 

384 else: 

385 

386 async def sync_send(): 

387 return [] 

388 

389 async def asend_and_wrap_exception(receiver): 

390 try: 

391 response = await receiver(signal=self, sender=sender, **named) 

392 except Exception as err: 

393 self._log_robust_failure(receiver, err) 

394 return err 

395 return response 

396 

397 responses, async_responses = await asyncio.gather( 

398 sync_send(), 

399 asyncio.gather( 

400 *(asend_and_wrap_exception(receiver) for receiver in async_receivers), 

401 ), 

402 ) 

403 responses.extend(zip(async_receivers, async_responses)) 

404 return responses 

405 

406 def _clear_dead_receivers(self): 

407 # Note: caller is assumed to hold self.lock. 

408 if self._dead_receivers: 

409 self._dead_receivers = False 

410 self.receivers = [ 

411 r 

412 for r in self.receivers 

413 if not (isinstance(r[1], weakref.ReferenceType) and r[1]() is None) 

414 ] 

415 

416 def _live_receivers(self, sender): 

417 """ 

418 Filter sequence of receivers to get resolved, live receivers. 

419 

420 This checks for weak references and resolves them, then returning only 

421 live receivers. 

422 """ 

423 receivers = None 

424 if self.use_caching and not self._dead_receivers: 

425 receivers = self.sender_receivers_cache.get(sender) 

426 # We could end up here with NO_RECEIVERS even if we do check this case in 

427 # .send() prior to calling _live_receivers() due to concurrent .send() call. 

428 if receivers is NO_RECEIVERS: 

429 return [], [] 

430 if receivers is None: 

431 with self.lock: 

432 self._clear_dead_receivers() 

433 senderkey = _make_id(sender) 

434 receivers = [] 

435 for (_receiverkey, r_senderkey), receiver, is_async in self.receivers: 

436 if r_senderkey == NONE_ID or r_senderkey == senderkey: 

437 receivers.append((receiver, is_async)) 

438 if self.use_caching: 

439 if not receivers: 

440 self.sender_receivers_cache[sender] = NO_RECEIVERS 

441 else: 

442 # Note, we must cache the weakref versions. 

443 self.sender_receivers_cache[sender] = receivers 

444 non_weak_sync_receivers = [] 

445 non_weak_async_receivers = [] 

446 for receiver, is_async in receivers: 

447 if isinstance(receiver, weakref.ReferenceType): 

448 # Dereference the weak reference. 

449 receiver = receiver() 

450 if receiver is not None: 

451 if is_async: 

452 non_weak_async_receivers.append(receiver) 

453 else: 

454 non_weak_sync_receivers.append(receiver) 

455 else: 

456 if is_async: 

457 non_weak_async_receivers.append(receiver) 

458 else: 

459 non_weak_sync_receivers.append(receiver) 

460 return non_weak_sync_receivers, non_weak_async_receivers 

461 

462 def _remove_receiver(self, receiver=None): 

463 # Mark that the self.receivers list has dead weakrefs. If so, we will 

464 # clean those up in connect, disconnect and _live_receivers while 

465 # holding self.lock. Note that doing the cleanup here isn't a good 

466 # idea, _remove_receiver() will be called as side effect of garbage 

467 # collection, and so the call can happen while we are already holding 

468 # self.lock. 

469 self._dead_receivers = True 

470 

471 

472def receiver(signal, **kwargs): 

473 """ 

474 A decorator for connecting receivers to signals. Used by passing in the 

475 signal (or list of signals) and keyword arguments to connect:: 

476 

477 @receiver(post_save, sender=MyModel) 

478 def signal_receiver(sender, **kwargs): 

479 ... 

480 

481 @receiver([post_save, post_delete], sender=MyModel) 

482 def signals_receiver(sender, **kwargs): 

483 ... 

484 """ 

485 

486 def _decorator(func): 

487 if isinstance(signal, (list, tuple)): 

488 for s in signal: 

489 s.connect(func, **kwargs) 

490 else: 

491 signal.connect(func, **kwargs) 

492 return func 

493 

494 return _decorator