Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/blinker/base.py: 31%

217 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-22 06:29 +0000

1"""Signals and events. 

2 

3A small implementation of signals, inspired by a snippet of Django signal 

4API client code seen in a blog post. Signals are first-class objects and 

5each manages its own receivers and message emission. 

6 

7The :func:`signal` function provides singleton behavior for named signals. 

8 

9""" 

10from __future__ import annotations 

11 

12import typing as t 

13from collections import defaultdict 

14from contextlib import contextmanager 

15from inspect import iscoroutinefunction 

16from warnings import warn 

17from weakref import WeakValueDictionary 

18 

19from blinker._utilities import annotatable_weakref 

20from blinker._utilities import hashable_identity 

21from blinker._utilities import IdentityType 

22from blinker._utilities import lazy_property 

23from blinker._utilities import reference 

24from blinker._utilities import symbol 

25from blinker._utilities import WeakTypes 

26 

27if t.TYPE_CHECKING: 

28 import typing_extensions as te 

29 

30 T_callable = t.TypeVar("T_callable", bound=t.Callable[..., t.Any]) 

31 

32 T = t.TypeVar("T") 

33 P = te.ParamSpec("P") 

34 

35 AsyncWrapperType = t.Callable[[t.Callable[P, t.Awaitable[T]]], t.Callable[P, T]] 

36 SyncWrapperType = t.Callable[[t.Callable[P, T]], t.Callable[P, t.Awaitable[T]]] 

37 

38ANY = symbol("ANY") 

39ANY.__doc__ = 'Token for "any sender".' 

40ANY_ID = 0 

41 

42# NOTE: We need a reference to cast for use in weakref callbacks otherwise 

43# t.cast may have already been set to None during finalization. 

44cast = t.cast 

45 

46 

47class Signal: 

48 """A notification emitter.""" 

49 

50 #: An :obj:`ANY` convenience synonym, allows ``Signal.ANY`` 

51 #: without an additional import. 

52 ANY = ANY 

53 

54 set_class: type[set] = set 

55 

56 @lazy_property 

57 def receiver_connected(self) -> Signal: 

58 """Emitted after each :meth:`connect`. 

59 

60 The signal sender is the signal instance, and the :meth:`connect` 

61 arguments are passed through: *receiver*, *sender*, and *weak*. 

62 

63 .. versionadded:: 1.2 

64 

65 """ 

66 return Signal(doc="Emitted after a receiver connects.") 

67 

68 @lazy_property 

69 def receiver_disconnected(self) -> Signal: 

70 """Emitted after :meth:`disconnect`. 

71 

72 The sender is the signal instance, and the :meth:`disconnect` arguments 

73 are passed through: *receiver* and *sender*. 

74 

75 Note, this signal is emitted **only** when :meth:`disconnect` is 

76 called explicitly. 

77 

78 The disconnect signal can not be emitted by an automatic disconnect 

79 (due to a weakly referenced receiver or sender going out of scope), 

80 as the receiver and/or sender instances are no longer available for 

81 use at the time this signal would be emitted. 

82 

83 An alternative approach is available by subscribing to 

84 :attr:`receiver_connected` and setting up a custom weakref cleanup 

85 callback on weak receivers and senders. 

86 

87 .. versionadded:: 1.2 

88 

89 """ 

90 return Signal(doc="Emitted after a receiver disconnects.") 

91 

92 def __init__(self, doc: str | None = None) -> None: 

93 """ 

94 :param doc: optional. If provided, will be assigned to the signal's 

95 __doc__ attribute. 

96 

97 """ 

98 if doc: 

99 self.__doc__ = doc 

100 #: A mapping of connected receivers. 

101 #: 

102 #: The values of this mapping are not meaningful outside of the 

103 #: internal :class:`Signal` implementation, however the boolean value 

104 #: of the mapping is useful as an extremely efficient check to see if 

105 #: any receivers are connected to the signal. 

106 self.receivers: dict[IdentityType, t.Callable | annotatable_weakref] = {} 

107 self.is_muted = False 

108 self._by_receiver: dict[IdentityType, set[IdentityType]] = defaultdict( 

109 self.set_class 

110 ) 

111 self._by_sender: dict[IdentityType, set[IdentityType]] = defaultdict( 

112 self.set_class 

113 ) 

114 self._weak_senders: dict[IdentityType, annotatable_weakref] = {} 

115 

116 def connect( 

117 self, receiver: T_callable, sender: t.Any = ANY, weak: bool = True 

118 ) -> T_callable: 

119 """Connect *receiver* to signal events sent by *sender*. 

120 

121 :param receiver: A callable. Will be invoked by :meth:`send` with 

122 `sender=` as a single positional argument and any ``kwargs`` that 

123 were provided to a call to :meth:`send`. 

124 

125 :param sender: Any object or :obj:`ANY`, defaults to ``ANY``. 

126 Restricts notifications delivered to *receiver* to only those 

127 :meth:`send` emissions sent by *sender*. If ``ANY``, the receiver 

128 will always be notified. A *receiver* may be connected to 

129 multiple *sender* values on the same Signal through multiple calls 

130 to :meth:`connect`. 

131 

132 :param weak: If true, the Signal will hold a weakref to *receiver* 

133 and automatically disconnect when *receiver* goes out of scope or 

134 is garbage collected. Defaults to True. 

135 

136 """ 

137 receiver_id = hashable_identity(receiver) 

138 receiver_ref: T_callable | annotatable_weakref 

139 

140 if weak: 

141 receiver_ref = reference(receiver, self._cleanup_receiver) 

142 receiver_ref.receiver_id = receiver_id 

143 else: 

144 receiver_ref = receiver 

145 sender_id: IdentityType 

146 if sender is ANY: 

147 sender_id = ANY_ID 

148 else: 

149 sender_id = hashable_identity(sender) 

150 

151 self.receivers.setdefault(receiver_id, receiver_ref) 

152 self._by_sender[sender_id].add(receiver_id) 

153 self._by_receiver[receiver_id].add(sender_id) 

154 del receiver_ref 

155 

156 if sender is not ANY and sender_id not in self._weak_senders: 

157 # wire together a cleanup for weakref-able senders 

158 try: 

159 sender_ref = reference(sender, self._cleanup_sender) 

160 sender_ref.sender_id = sender_id 

161 except TypeError: 

162 pass 

163 else: 

164 self._weak_senders.setdefault(sender_id, sender_ref) 

165 del sender_ref 

166 

167 # broadcast this connection. if receivers raise, disconnect. 

168 if "receiver_connected" in self.__dict__ and self.receiver_connected.receivers: 

169 try: 

170 self.receiver_connected.send( 

171 self, receiver=receiver, sender=sender, weak=weak 

172 ) 

173 except TypeError as e: 

174 self.disconnect(receiver, sender) 

175 raise e 

176 if receiver_connected.receivers and self is not receiver_connected: 

177 try: 

178 receiver_connected.send( 

179 self, receiver_arg=receiver, sender_arg=sender, weak_arg=weak 

180 ) 

181 except TypeError as e: 

182 self.disconnect(receiver, sender) 

183 raise e 

184 return receiver 

185 

186 def connect_via( 

187 self, sender: t.Any, weak: bool = False 

188 ) -> t.Callable[[T_callable], T_callable]: 

189 """Connect the decorated function as a receiver for *sender*. 

190 

191 :param sender: Any object or :obj:`ANY`. The decorated function 

192 will only receive :meth:`send` emissions sent by *sender*. If 

193 ``ANY``, the receiver will always be notified. A function may be 

194 decorated multiple times with differing *sender* values. 

195 

196 :param weak: If true, the Signal will hold a weakref to the 

197 decorated function and automatically disconnect when *receiver* 

198 goes out of scope or is garbage collected. Unlike 

199 :meth:`connect`, this defaults to False. 

200 

201 The decorated function will be invoked by :meth:`send` with 

202 `sender=` as a single positional argument and any ``kwargs`` that 

203 were provided to the call to :meth:`send`. 

204 

205 

206 .. versionadded:: 1.1 

207 

208 """ 

209 

210 def decorator(fn: T_callable) -> T_callable: 

211 self.connect(fn, sender, weak) 

212 return fn 

213 

214 return decorator 

215 

216 @contextmanager 

217 def connected_to( 

218 self, receiver: t.Callable, sender: t.Any = ANY 

219 ) -> t.Generator[None, None, None]: 

220 """Execute a block with the signal temporarily connected to *receiver*. 

221 

222 :param receiver: a receiver callable 

223 :param sender: optional, a sender to filter on 

224 

225 This is a context manager for use in the ``with`` statement. It can 

226 be useful in unit tests. *receiver* is connected to the signal for 

227 the duration of the ``with`` block, and will be disconnected 

228 automatically when exiting the block: 

229 

230 .. code-block:: python 

231 

232 with on_ready.connected_to(receiver): 

233 # do stuff 

234 on_ready.send(123) 

235 

236 .. versionadded:: 1.1 

237 

238 """ 

239 self.connect(receiver, sender=sender, weak=False) 

240 try: 

241 yield None 

242 finally: 

243 self.disconnect(receiver) 

244 

245 @contextmanager 

246 def muted(self) -> t.Generator[None, None, None]: 

247 """Context manager for temporarily disabling signal. 

248 Useful for test purposes. 

249 """ 

250 self.is_muted = True 

251 try: 

252 yield None 

253 except Exception as e: 

254 raise e 

255 finally: 

256 self.is_muted = False 

257 

258 def temporarily_connected_to( 

259 self, receiver: t.Callable, sender: t.Any = ANY 

260 ) -> t.ContextManager[None]: 

261 """An alias for :meth:`connected_to`. 

262 

263 :param receiver: a receiver callable 

264 :param sender: optional, a sender to filter on 

265 

266 .. versionadded:: 0.9 

267 

268 .. versionchanged:: 1.1 

269 Renamed to :meth:`connected_to`. ``temporarily_connected_to`` was 

270 deprecated in 1.2 and will be removed in a subsequent version. 

271 

272 """ 

273 warn( 

274 "temporarily_connected_to is deprecated; use connected_to instead.", 

275 DeprecationWarning, 

276 ) 

277 return self.connected_to(receiver, sender) 

278 

279 def send( 

280 self, 

281 *sender: t.Any, 

282 _async_wrapper: AsyncWrapperType | None = None, 

283 **kwargs: t.Any, 

284 ) -> list[tuple[t.Callable, t.Any]]: 

285 """Emit this signal on behalf of *sender*, passing on ``kwargs``. 

286 

287 Returns a list of 2-tuples, pairing receivers with their return 

288 value. The ordering of receiver notification is undefined. 

289 

290 :param sender: Any object or ``None``. If omitted, synonymous 

291 with ``None``. Only accepts one positional argument. 

292 :param _async_wrapper: A callable that should wrap a coroutine 

293 receiver and run it when called synchronously. 

294 

295 :param kwargs: Data to be sent to receivers. 

296 """ 

297 if self.is_muted: 

298 return [] 

299 

300 sender = self._extract_sender(sender) 

301 results = [] 

302 for receiver in self.receivers_for(sender): 

303 if iscoroutinefunction(receiver): 

304 if _async_wrapper is None: 

305 raise RuntimeError("Cannot send to a coroutine function") 

306 receiver = _async_wrapper(receiver) 

307 result = receiver(sender, **kwargs) 

308 results.append((receiver, result)) 

309 return results 

310 

311 async def send_async( 

312 self, 

313 *sender: t.Any, 

314 _sync_wrapper: SyncWrapperType | None = None, 

315 **kwargs: t.Any, 

316 ) -> list[tuple[t.Callable, t.Any]]: 

317 """Emit this signal on behalf of *sender*, passing on ``kwargs``. 

318 

319 Returns a list of 2-tuples, pairing receivers with their return 

320 value. The ordering of receiver notification is undefined. 

321 

322 :param sender: Any object or ``None``. If omitted, synonymous 

323 with ``None``. Only accepts one positional argument. 

324 :param _sync_wrapper: A callable that should wrap a synchronous 

325 receiver and run it when awaited. 

326 

327 :param kwargs: Data to be sent to receivers. 

328 """ 

329 if self.is_muted: 

330 return [] 

331 

332 sender = self._extract_sender(sender) 

333 results = [] 

334 for receiver in self.receivers_for(sender): 

335 if not iscoroutinefunction(receiver): 

336 if _sync_wrapper is None: 

337 raise RuntimeError("Cannot send to a non-coroutine function") 

338 receiver = _sync_wrapper(receiver) 

339 result = await receiver(sender, **kwargs) 

340 results.append((receiver, result)) 

341 return results 

342 

343 def _extract_sender(self, sender: t.Any) -> t.Any: 

344 if not self.receivers: 

345 # Ensure correct signature even on no-op sends, disable with -O 

346 # for lowest possible cost. 

347 if __debug__ and sender and len(sender) > 1: 

348 raise TypeError( 

349 f"send() accepts only one positional argument, {len(sender)} given" 

350 ) 

351 return [] 

352 

353 # Using '*sender' rather than 'sender=None' allows 'sender' to be 

354 # used as a keyword argument- i.e. it's an invisible name in the 

355 # function signature. 

356 if len(sender) == 0: 

357 sender = None 

358 elif len(sender) > 1: 

359 raise TypeError( 

360 f"send() accepts only one positional argument, {len(sender)} given" 

361 ) 

362 else: 

363 sender = sender[0] 

364 return sender 

365 

366 def has_receivers_for(self, sender: t.Any) -> bool: 

367 """True if there is probably a receiver for *sender*. 

368 

369 Performs an optimistic check only. Does not guarantee that all 

370 weakly referenced receivers are still alive. See 

371 :meth:`receivers_for` for a stronger search. 

372 

373 """ 

374 if not self.receivers: 

375 return False 

376 if self._by_sender[ANY_ID]: 

377 return True 

378 if sender is ANY: 

379 return False 

380 return hashable_identity(sender) in self._by_sender 

381 

382 def receivers_for( 

383 self, sender: t.Any 

384 ) -> t.Generator[t.Callable[[t.Any], t.Any], None, None]: 

385 """Iterate all live receivers listening for *sender*.""" 

386 # TODO: test receivers_for(ANY) 

387 if self.receivers: 

388 sender_id = hashable_identity(sender) 

389 if sender_id in self._by_sender: 

390 ids = self._by_sender[ANY_ID] | self._by_sender[sender_id] 

391 else: 

392 ids = self._by_sender[ANY_ID].copy() 

393 for receiver_id in ids: 

394 receiver = self.receivers.get(receiver_id) 

395 if receiver is None: 

396 continue 

397 if isinstance(receiver, WeakTypes): 

398 strong = receiver() 

399 if strong is None: 

400 self._disconnect(receiver_id, ANY_ID) 

401 continue 

402 receiver = strong 

403 yield receiver # type: ignore[misc] 

404 

405 def disconnect(self, receiver: t.Callable, sender: t.Any = ANY) -> None: 

406 """Disconnect *receiver* from this signal's events. 

407 

408 :param receiver: a previously :meth:`connected<connect>` callable 

409 

410 :param sender: a specific sender to disconnect from, or :obj:`ANY` 

411 to disconnect from all senders. Defaults to ``ANY``. 

412 

413 """ 

414 sender_id: IdentityType 

415 if sender is ANY: 

416 sender_id = ANY_ID 

417 else: 

418 sender_id = hashable_identity(sender) 

419 receiver_id = hashable_identity(receiver) 

420 self._disconnect(receiver_id, sender_id) 

421 

422 if ( 

423 "receiver_disconnected" in self.__dict__ 

424 and self.receiver_disconnected.receivers 

425 ): 

426 self.receiver_disconnected.send(self, receiver=receiver, sender=sender) 

427 

428 def _disconnect(self, receiver_id: IdentityType, sender_id: IdentityType) -> None: 

429 if sender_id == ANY_ID: 

430 if self._by_receiver.pop(receiver_id, False): 

431 for bucket in self._by_sender.values(): 

432 bucket.discard(receiver_id) 

433 self.receivers.pop(receiver_id, None) 

434 else: 

435 self._by_sender[sender_id].discard(receiver_id) 

436 self._by_receiver[receiver_id].discard(sender_id) 

437 

438 def _cleanup_receiver(self, receiver_ref: annotatable_weakref) -> None: 

439 """Disconnect a receiver from all senders.""" 

440 self._disconnect(cast(IdentityType, receiver_ref.receiver_id), ANY_ID) 

441 

442 def _cleanup_sender(self, sender_ref: annotatable_weakref) -> None: 

443 """Disconnect all receivers from a sender.""" 

444 sender_id = cast(IdentityType, sender_ref.sender_id) 

445 assert sender_id != ANY_ID 

446 self._weak_senders.pop(sender_id, None) 

447 for receiver_id in self._by_sender.pop(sender_id, ()): 

448 self._by_receiver[receiver_id].discard(sender_id) 

449 

450 def _cleanup_bookkeeping(self) -> None: 

451 """Prune unused sender/receiver bookkeeping. Not threadsafe. 

452 

453 Connecting & disconnecting leave behind a small amount of bookkeeping 

454 for the receiver and sender values. Typical workloads using Blinker, 

455 for example in most web apps, Flask, CLI scripts, etc., are not 

456 adversely affected by this bookkeeping. 

457 

458 With a long-running Python process performing dynamic signal routing 

459 with high volume- e.g. connecting to function closures, "senders" are 

460 all unique object instances, and doing all of this over and over- you 

461 may see memory usage will grow due to extraneous bookkeeping. (An empty 

462 set() for each stale sender/receiver pair.) 

463 

464 This method will prune that bookkeeping away, with the caveat that such 

465 pruning is not threadsafe. The risk is that cleanup of a fully 

466 disconnected receiver/sender pair occurs while another thread is 

467 connecting that same pair. If you are in the highly dynamic, unique 

468 receiver/sender situation that has lead you to this method, that 

469 failure mode is perhaps not a big deal for you. 

470 """ 

471 for mapping in (self._by_sender, self._by_receiver): 

472 for _id, bucket in list(mapping.items()): 

473 if not bucket: 

474 mapping.pop(_id, None) 

475 

476 def _clear_state(self) -> None: 

477 """Throw away all signal state. Useful for unit tests.""" 

478 self._weak_senders.clear() 

479 self.receivers.clear() 

480 self._by_sender.clear() 

481 self._by_receiver.clear() 

482 

483 

484receiver_connected = Signal( 

485 """\ 

486Sent by a :class:`Signal` after a receiver connects. 

487 

488:argument: the Signal that was connected to 

489:keyword receiver_arg: the connected receiver 

490:keyword sender_arg: the sender to connect to 

491:keyword weak_arg: true if the connection to receiver_arg is a weak reference 

492 

493.. deprecated:: 1.2 

494 

495As of 1.2, individual signals have their own private 

496:attr:`~Signal.receiver_connected` and 

497:attr:`~Signal.receiver_disconnected` signals with a slightly simplified 

498call signature. This global signal is planned to be removed in 1.6. 

499 

500""" 

501) 

502 

503 

504class NamedSignal(Signal): 

505 """A named generic notification emitter.""" 

506 

507 def __init__(self, name: str, doc: str | None = None) -> None: 

508 Signal.__init__(self, doc) 

509 

510 #: The name of this signal. 

511 self.name = name 

512 

513 def __repr__(self) -> str: 

514 base = Signal.__repr__(self) 

515 return f"{base[:-1]}; {self.name!r}>" # noqa: E702 

516 

517 

518class Namespace(dict): 

519 """A mapping of signal names to signals.""" 

520 

521 def signal(self, name: str, doc: str | None = None) -> NamedSignal: 

522 """Return the :class:`NamedSignal` *name*, creating it if required. 

523 

524 Repeated calls to this function will return the same signal object. 

525 

526 """ 

527 try: 

528 return self[name] # type: ignore[no-any-return] 

529 except KeyError: 

530 result = self.setdefault(name, NamedSignal(name, doc)) 

531 return result # type: ignore[no-any-return] 

532 

533 

534class WeakNamespace(WeakValueDictionary): 

535 """A weak mapping of signal names to signals. 

536 

537 Automatically cleans up unused Signals when the last reference goes out 

538 of scope. This namespace implementation exists for a measure of legacy 

539 compatibility with Blinker <= 1.2, and may be dropped in the future. 

540 

541 .. versionadded:: 1.3 

542 

543 """ 

544 

545 def signal(self, name: str, doc: str | None = None) -> NamedSignal: 

546 """Return the :class:`NamedSignal` *name*, creating it if required. 

547 

548 Repeated calls to this function will return the same signal object. 

549 

550 """ 

551 try: 

552 return self[name] # type: ignore[no-any-return] 

553 except KeyError: 

554 result = self.setdefault(name, NamedSignal(name, doc)) 

555 return result # type: ignore[no-any-return] 

556 

557 

558signal = Namespace().signal