Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/blinker/base.py: 30%

218 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1"""Signals and events. 

2 

3A small implementation of signals, inspired by a snippet of Django signal 

4API client code seen in a blog post. Signals are first-class objects and 

5each manages its own receivers and message emission. 

6 

7The :func:`signal` function provides singleton behavior for named signals. 

8 

9""" 

10from __future__ import annotations 

11 

12import typing as t 

13from collections import defaultdict 

14from contextlib import contextmanager 

15from warnings import warn 

16from weakref import WeakValueDictionary 

17 

18from blinker._utilities import annotatable_weakref 

19from blinker._utilities import hashable_identity 

20from blinker._utilities import IdentityType 

21from blinker._utilities import is_coroutine_function 

22from blinker._utilities import lazy_property 

23from blinker._utilities import reference 

24from blinker._utilities import symbol 

25from blinker._utilities import WeakTypes 

26 

27if t.TYPE_CHECKING: 

28 import typing_extensions as te 

29 

30 T_callable = t.TypeVar("T_callable", bound=t.Callable[..., t.Any]) 

31 

32 T = t.TypeVar("T") 

33 P = te.ParamSpec("P") 

34 

35 AsyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]] 

36 SyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]] 

37 

38ANY = symbol("ANY") 

39ANY.__doc__ = 'Token for "any sender".' 

40ANY_ID = 0 

41 

42 

43class Signal: 

44 """A notification emitter.""" 

45 

46 #: An :obj:`ANY` convenience synonym, allows ``Signal.ANY`` 

47 #: without an additional import. 

48 ANY = ANY 

49 

50 @lazy_property 

51 def receiver_connected(self) -> Signal: 

52 """Emitted after each :meth:`connect`. 

53 

54 The signal sender is the signal instance, and the :meth:`connect` 

55 arguments are passed through: *receiver*, *sender*, and *weak*. 

56 

57 .. versionadded:: 1.2 

58 

59 """ 

60 return Signal(doc="Emitted after a receiver connects.") 

61 

62 @lazy_property 

63 def receiver_disconnected(self) -> Signal: 

64 """Emitted after :meth:`disconnect`. 

65 

66 The sender is the signal instance, and the :meth:`disconnect` arguments 

67 are passed through: *receiver* and *sender*. 

68 

69 Note, this signal is emitted **only** when :meth:`disconnect` is 

70 called explicitly. 

71 

72 The disconnect signal can not be emitted by an automatic disconnect 

73 (due to a weakly referenced receiver or sender going out of scope), 

74 as the receiver and/or sender instances are no longer available for 

75 use at the time this signal would be emitted. 

76 

77 An alternative approach is available by subscribing to 

78 :attr:`receiver_connected` and setting up a custom weakref cleanup 

79 callback on weak receivers and senders. 

80 

81 .. versionadded:: 1.2 

82 

83 """ 

84 return Signal(doc="Emitted after a receiver disconnects.") 

85 

86 def __init__(self, doc: str | None = None) -> None: 

87 """ 

88 :param doc: optional. If provided, will be assigned to the signal's 

89 __doc__ attribute. 

90 

91 """ 

92 if doc: 

93 self.__doc__ = doc 

94 #: A mapping of connected receivers. 

95 #: 

96 #: The values of this mapping are not meaningful outside of the 

97 #: internal :class:`Signal` implementation, however the boolean value 

98 #: of the mapping is useful as an extremely efficient check to see if 

99 #: any receivers are connected to the signal. 

100 self.receivers: dict[IdentityType, t.Callable | annotatable_weakref] = {} 

101 self.is_muted = False 

102 self._by_receiver: dict[IdentityType, set[IdentityType]] = defaultdict(set) 

103 self._by_sender: dict[IdentityType, set[IdentityType]] = defaultdict(set) 

104 self._weak_senders: dict[IdentityType, annotatable_weakref] = {} 

105 

106 def connect( 

107 self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True 

108 ) -> T_callable: 

109 """Connect *receiver* to signal events sent by *sender*. 

110 

111 :param receiver: A callable. Will be invoked by :meth:`send` with 

112 `sender=` as a single positional argument and any ``kwargs`` that 

113 were provided to a call to :meth:`send`. 

114 

115 :param sender: Any object or :obj:`ANY`, defaults to ``ANY``. 

116 Restricts notifications delivered to *receiver* to only those 

117 :meth:`send` emissions sent by *sender*. If ``ANY``, the receiver 

118 will always be notified. A *receiver* may be connected to 

119 multiple *sender* values on the same Signal through multiple calls 

120 to :meth:`connect`. 

121 

122 :param weak: If true, the Signal will hold a weakref to *receiver* 

123 and automatically disconnect when *receiver* goes out of scope or 

124 is garbage collected. Defaults to True. 

125 

126 """ 

127 receiver_id = hashable_identity(receiver) 

128 receiver_ref: T_callable | annotatable_weakref 

129 

130 if weak: 

131 receiver_ref = reference(receiver, self._cleanup_receiver) 

132 receiver_ref.receiver_id = receiver_id 

133 else: 

134 receiver_ref = receiver 

135 sender_id: IdentityType 

136 if sender is ANY: 

137 sender_id = ANY_ID 

138 else: 

139 sender_id = hashable_identity(sender) 

140 

141 self.receivers.setdefault(receiver_id, receiver_ref) 

142 self._by_sender[sender_id].add(receiver_id) 

143 self._by_receiver[receiver_id].add(sender_id) 

144 del receiver_ref 

145 

146 if sender is not ANY and sender_id not in self._weak_senders: 

147 # wire together a cleanup for weakref-able senders 

148 try: 

149 sender_ref = reference(sender, self._cleanup_sender) 

150 sender_ref.sender_id = sender_id 

151 except TypeError: 

152 pass 

153 else: 

154 self._weak_senders.setdefault(sender_id, sender_ref) 

155 del sender_ref 

156 

157 # broadcast this connection. if receivers raise, disconnect. 

158 if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers: 

159 try: 

160 self.receiver_connected.send( 

161 self, receiver=receiver, sender=sender, weak=weak 

162 ) 

163 except TypeError as e: 

164 self.disconnect(receiver, sender) 

165 raise e 

166 if receiver_connected.receivers and self is not receiver_connected: 

167 try: 

168 receiver_connected.send( 

169 self, receiver_arg=receiver, sender_arg=sender, weak_arg=weak 

170 ) 

171 except TypeError as e: 

172 self.disconnect(receiver, sender) 

173 raise e 

174 return receiver 

175 

176 def connect_via( 

177 self, sender: t.Any, weak: bool = False 

178 ) -> t.Callable[[T_callable], T_callable]: 

179 """Connect the decorated function as a receiver for *sender*. 

180 

181 :param sender: Any object or :obj:`ANY`. The decorated function 

182 will only receive :meth:`send` emissions sent by *sender*. If 

183 ``ANY``, the receiver will always be notified. A function may be 

184 decorated multiple times with differing *sender* values. 

185 

186 :param weak: If true, the Signal will hold a weakref to the 

187 decorated function and automatically disconnect when *receiver* 

188 goes out of scope or is garbage collected. Unlike 

189 :meth:`connect`, this defaults to False. 

190 

191 The decorated function will be invoked by :meth:`send` with 

192 `sender=` as a single positional argument and any ``kwargs`` that 

193 were provided to the call to :meth:`send`. 

194 

195 

196 .. versionadded:: 1.1 

197 

198 """ 

199 

200 def decorator(fn: T_callable) -> T_callable: 

201 self.connect(fn, sender, weak) 

202 return fn 

203 

204 return decorator 

205 

206 @contextmanager 

207 def connected_to( 

208 self, receiver: t.Callable, sender: t.Any = ANY 

209 ) -> t.Generator[None, None, None]: 

210 """Execute a block with the signal temporarily connected to *receiver*. 

211 

212 :param receiver: a receiver callable 

213 :param sender: optional, a sender to filter on 

214 

215 This is a context manager for use in the ``with`` statement. It can 

216 be useful in unit tests. *receiver* is connected to the signal for 

217 the duration of the ``with`` block, and will be disconnected 

218 automatically when exiting the block: 

219 

220 .. code-block:: python 

221 

222 with on_ready.connected_to(receiver): 

223 # do stuff 

224 on_ready.send(123) 

225 

226 .. versionadded:: 1.1 

227 

228 """ 

229 self.connect(receiver, sender=sender, weak=False) 

230 try: 

231 yield None 

232 except Exception as e: 

233 self.disconnect(receiver) 

234 raise e 

235 else: 

236 self.disconnect(receiver) 

237 

238 @contextmanager 

239 def muted(self) -> t.Generator[None, None, None]: 

240 """Context manager for temporarily disabling signal. 

241 Useful for test purposes. 

242 """ 

243 self.is_muted = True 

244 try: 

245 yield None 

246 except Exception as e: 

247 raise e 

248 finally: 

249 self.is_muted = False 

250 

251 def temporarily_connected_to( 

252 self, receiver: t.Callable, sender: t.Any = ANY 

253 ) -> t.ContextManager[None]: 

254 """An alias for :meth:`connected_to`. 

255 

256 :param receiver: a receiver callable 

257 :param sender: optional, a sender to filter on 

258 

259 .. versionadded:: 0.9 

260 

261 .. versionchanged:: 1.1 

262 Renamed to :meth:`connected_to`. ``temporarily_connected_to`` was 

263 deprecated in 1.2 and will be removed in a subsequent version. 

264 

265 """ 

266 warn( 

267 "temporarily_connected_to is deprecated; use connected_to instead.", 

268 DeprecationWarning, 

269 ) 

270 return self.connected_to(receiver, sender) 

271 

272 def send( 

273 self, 

274 *sender: t.Any, 

275 _async_wrapper: AsyncWrapperType | None = None, 

276 **kwargs: t.Any, 

277 ) -> list[tuple[t.Callable, t.Any]]: 

278 """Emit this signal on behalf of *sender*, passing on ``kwargs``. 

279 

280 Returns a list of 2-tuples, pairing receivers with their return 

281 value. The ordering of receiver notification is undefined. 

282 

283 :param sender: Any object or ``None``. If omitted, synonymous 

284 with ``None``. Only accepts one positional argument. 

285 :param _async_wrapper: A callable that should wrap a coroutine 

286 receiver and run it when called synchronously. 

287 

288 :param kwargs: Data to be sent to receivers. 

289 """ 

290 if self.is_muted: 

291 return [] 

292 

293 sender = self._extract_sender(sender) 

294 results = [] 

295 for receiver in self.receivers_for(sender): 

296 if is_coroutine_function(receiver): 

297 if _async_wrapper is None: 

298 raise RuntimeError("Cannot send to a coroutine function") 

299 receiver = _async_wrapper(receiver) 

300 result = receiver(sender, **kwargs) # type: ignore[call-arg] 

301 results.append((receiver, result)) 

302 return results 

303 

304 async def send_async( 

305 self, 

306 *sender: t.Any, 

307 _sync_wrapper: SyncWrapperType | None = None, 

308 **kwargs: t.Any, 

309 ) -> list[tuple[t.Callable, t.Any]]: 

310 """Emit this signal on behalf of *sender*, passing on ``kwargs``. 

311 

312 Returns a list of 2-tuples, pairing receivers with their return 

313 value. The ordering of receiver notification is undefined. 

314 

315 :param sender: Any object or ``None``. If omitted, synonymous 

316 with ``None``. Only accepts one positional argument. 

317 :param _sync_wrapper: A callable that should wrap a synchronous 

318 receiver and run it when awaited. 

319 

320 :param kwargs: Data to be sent to receivers. 

321 """ 

322 if self.is_muted: 

323 return [] 

324 

325 sender = self._extract_sender(sender) 

326 results = [] 

327 for receiver in self.receivers_for(sender): 

328 if not is_coroutine_function(receiver): 

329 if _sync_wrapper is None: 

330 raise RuntimeError("Cannot send to a non-coroutine function") 

331 receiver = _sync_wrapper(receiver) # type: ignore[arg-type] 

332 result = await receiver(sender, **kwargs) # type: ignore[call-arg, misc] 

333 results.append((receiver, result)) 

334 return results 

335 

336 def _extract_sender(self, sender: t.Any) -> t.Any: 

337 if not self.receivers: 

338 # Ensure correct signature even on no-op sends, disable with -O 

339 # for lowest possible cost. 

340 if __debug__ and sender and len(sender) > 1: 

341 raise TypeError( 

342 f"send() accepts only one positional argument, {len(sender)} given" 

343 ) 

344 return [] 

345 

346 # Using '*sender' rather than 'sender=None' allows 'sender' to be 

347 # used as a keyword argument- i.e. it's an invisible name in the 

348 # function signature. 

349 if len(sender) == 0: 

350 sender = None 

351 elif len(sender) > 1: 

352 raise TypeError( 

353 f"send() accepts only one positional argument, {len(sender)} given" 

354 ) 

355 else: 

356 sender = sender[0] 

357 return sender 

358 

359 def has_receivers_for(self, sender: t.Any) -> bool: 

360 """True if there is probably a receiver for *sender*. 

361 

362 Performs an optimistic check only. Does not guarantee that all 

363 weakly referenced receivers are still alive. See 

364 :meth:`receivers_for` for a stronger search. 

365 

366 """ 

367 if not self.receivers: 

368 return False 

369 if self._by_sender[ANY_ID]: 

370 return True 

371 if sender is ANY: 

372 return False 

373 return hashable_identity(sender) in self._by_sender 

374 

375 def receivers_for( 

376 self, sender: t.Any 

377 ) -> t.Generator[t.Callable | annotatable_weakref, None, None]: 

378 """Iterate all live receivers listening for *sender*.""" 

379 # TODO: test receivers_for(ANY) 

380 if self.receivers: 

381 sender_id = hashable_identity(sender) 

382 if sender_id in self._by_sender: 

383 ids = self._by_sender[ANY_ID] | self._by_sender[sender_id] 

384 else: 

385 ids = self._by_sender[ANY_ID].copy() 

386 for receiver_id in ids: 

387 receiver = self.receivers.get(receiver_id) 

388 if receiver is None: 

389 continue 

390 if isinstance(receiver, WeakTypes): 

391 strong = receiver() 

392 if strong is None: 

393 self._disconnect(receiver_id, ANY_ID) 

394 continue 

395 receiver = strong 

396 yield receiver # type: ignore[misc] 

397 

398 def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None: 

399 """Disconnect *receiver* from this signal's events. 

400 

401 :param receiver: a previously :meth:`connected<connect>` callable 

402 

403 :param sender: a specific sender to disconnect from, or :obj:`ANY` 

404 to disconnect from all senders. Defaults to ``ANY``. 

405 

406 """ 

407 sender_id: IdentityType 

408 if sender is ANY: 

409 sender_id = ANY_ID 

410 else: 

411 sender_id = hashable_identity(sender) 

412 receiver_id = hashable_identity(receiver) 

413 self._disconnect(receiver_id, sender_id) 

414 

415 if ( 

416 "receiver_disconnected" in self.__dict__ 

417 and self.receiver_disconnected.receivers 

418 ): 

419 self.receiver_disconnected.send(self, receiver=receiver, sender=sender) 

420 

421 def _disconnect(self, receiver_id: IdentityType, sender_id: IdentityType) -> None: 

422 if sender_id == ANY_ID: 

423 if self._by_receiver.pop(receiver_id, False): 

424 for bucket in self._by_sender.values(): 

425 bucket.discard(receiver_id) 

426 self.receivers.pop(receiver_id, None) 

427 else: 

428 self._by_sender[sender_id].discard(receiver_id) 

429 self._by_receiver[receiver_id].discard(sender_id) 

430 

431 def _cleanup_receiver(self, receiver_ref: annotatable_weakref) -> None: 

432 """Disconnect a receiver from all senders.""" 

433 self._disconnect(t.cast(IdentityType, receiver_ref.receiver_id), ANY_ID) 

434 

435 def _cleanup_sender(self, sender_ref: annotatable_weakref) -> None: 

436 """Disconnect all receivers from a sender.""" 

437 sender_id = t.cast(IdentityType, sender_ref.sender_id) 

438 assert sender_id != ANY_ID 

439 self._weak_senders.pop(sender_id, None) 

440 for receiver_id in self._by_sender.pop(sender_id, ()): 

441 self._by_receiver[receiver_id].discard(sender_id) 

442 

443 def _cleanup_bookkeeping(self) -> None: 

444 """Prune unused sender/receiver bookkeeping. Not threadsafe. 

445 

446 Connecting & disconnecting leave behind a small amount of bookkeeping 

447 for the receiver and sender values. Typical workloads using Blinker, 

448 for example in most web apps, Flask, CLI scripts, etc., are not 

449 adversely affected by this bookkeeping. 

450 

451 With a long-running Python process performing dynamic signal routing 

452 with high volume- e.g. connecting to function closures, "senders" are 

453 all unique object instances, and doing all of this over and over- you 

454 may see memory usage will grow due to extraneous bookkeeping. (An empty 

455 set() for each stale sender/receiver pair.) 

456 

457 This method will prune that bookkeeping away, with the caveat that such 

458 pruning is not threadsafe. The risk is that cleanup of a fully 

459 disconnected receiver/sender pair occurs while another thread is 

460 connecting that same pair. If you are in the highly dynamic, unique 

461 receiver/sender situation that has lead you to this method, that 

462 failure mode is perhaps not a big deal for you. 

463 """ 

464 for mapping in (self._by_sender, self._by_receiver): 

465 for _id, bucket in list(mapping.items()): 

466 if not bucket: 

467 mapping.pop(_id, None) 

468 

469 def _clear_state(self) -> None: 

470 """Throw away all signal state. Useful for unit tests.""" 

471 self._weak_senders.clear() 

472 self.receivers.clear() 

473 self._by_sender.clear() 

474 self._by_receiver.clear() 

475 

476 

477receiver_connected = Signal( 

478 """\ 

479Sent by a :class:`Signal` after a receiver connects. 

480 

481:argument: the Signal that was connected to 

482:keyword receiver_arg: the connected receiver 

483:keyword sender_arg: the sender to connect to 

484:keyword weak_arg: true if the connection to receiver_arg is a weak reference 

485 

486.. deprecated:: 1.2 

487 

488As of 1.2, individual signals have their own private 

489:attr:`~Signal.receiver_connected` and 

490:attr:`~Signal.receiver_disconnected` signals with a slightly simplified 

491call signature. This global signal is planned to be removed in 1.6. 

492 

493""" 

494) 

495 

496 

497class NamedSignal(Signal): 

498 """A named generic notification emitter.""" 

499 

500 def __init__(self, name: str, doc: str | None = None) -> None: 

501 Signal.__init__(self, doc) 

502 

503 #: The name of this signal. 

504 self.name = name 

505 

506 def __repr__(self) -> str: 

507 base = Signal.__repr__(self) 

508 return f"{base[:-1]}; {self.name!r}>" 

509 

510 

511class Namespace(dict): 

512 """A mapping of signal names to signals.""" 

513 

514 def signal(self, name: str, doc: str | None = None) -> NamedSignal: 

515 """Return the :class:`NamedSignal` *name*, creating it if required. 

516 

517 Repeated calls to this function will return the same signal object. 

518 

519 """ 

520 try: 

521 return self[name] # type: ignore[no-any-return] 

522 except KeyError: 

523 result = self.setdefault(name, NamedSignal(name, doc)) 

524 return result # type: ignore[no-any-return] 

525 

526 

527class WeakNamespace(WeakValueDictionary): 

528 """A weak mapping of signal names to signals. 

529 

530 Automatically cleans up unused Signals when the last reference goes out 

531 of scope. This namespace implementation exists for a measure of legacy 

532 compatibility with Blinker <= 1.2, and may be dropped in the future. 

533 

534 .. versionadded:: 1.3 

535 

536 """ 

537 

538 def signal(self, name: str, doc: str | None = None) -> NamedSignal: 

539 """Return the :class:`NamedSignal` *name*, creating it if required. 

540 

541 Repeated calls to this function will return the same signal object. 

542 

543 """ 

544 try: 

545 return self[name] # type: ignore[no-any-return] 

546 except KeyError: 

547 result = self.setdefault(name, NamedSignal(name, doc)) 

548 return result # type: ignore[no-any-return] 

549 

550 

551signal = Namespace().signal