Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/event/registry.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

178 statements  

1# event/registry.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Provides managed registration services on behalf of :func:`.listen` 

9arguments. 

10 

11By "managed registration", we mean that event listening functions and 

12other objects can be added to various collections in such a way that their 

13membership in all those collections can be revoked at once, based on 

14an equivalent :class:`._EventKey`. 

15 

16""" 

17from __future__ import annotations 

18 

19import collections 

20import types 

21import typing 

22from typing import Any 

23from typing import Callable 

24from typing import cast 

25from typing import Deque 

26from typing import Dict 

27from typing import Generic 

28from typing import Iterable 

29from typing import Optional 

30from typing import Tuple 

31from typing import TypeVar 

32from typing import Union 

33import weakref 

34 

35from .. import exc 

36from .. import util 

37 

38if typing.TYPE_CHECKING: 

39 from .attr import RefCollection 

40 from .base import dispatcher 

41 

42_ListenerFnType = Callable[..., Any] 

43_ListenerFnKeyType = Union[int, Tuple[int, int]] 

44_EventKeyTupleType = Tuple[int, str, _ListenerFnKeyType] 

45 

46 

47_ET = TypeVar("_ET", bound="EventTarget") 

48 

49 

50class EventTarget: 

51 """represents an event target, that is, something we can listen on 

52 either with that target as a class or as an instance. 

53 

54 Examples include: Connection, Mapper, Table, Session, 

55 InstrumentedAttribute, Engine, Pool, Dialect. 

56 

57 """ 

58 

59 __slots__ = () 

60 

61 dispatch: dispatcher[Any] 

62 

63 

64_RefCollectionToListenerType = Dict[ 

65 "weakref.ref[RefCollection[Any]]", 

66 "weakref.ref[_ListenerFnType]", 

67] 

68 

69_key_to_collection: Dict[_EventKeyTupleType, _RefCollectionToListenerType] = ( 

70 collections.defaultdict(dict) 

71) 

72""" 

73Given an original listen() argument, can locate all 

74listener collections and the listener fn contained 

75 

76(target, identifier, fn) -> { 

77 ref(listenercollection) -> ref(listener_fn) 

78 ref(listenercollection) -> ref(listener_fn) 

79 ref(listenercollection) -> ref(listener_fn) 

80 } 

81""" 

82 

83_ListenerToEventKeyType = Dict[ 

84 "weakref.ref[_ListenerFnType]", 

85 _EventKeyTupleType, 

86] 

87_collection_to_key: Dict[ 

88 weakref.ref[RefCollection[Any]], 

89 _ListenerToEventKeyType, 

90] = collections.defaultdict(dict) 

91""" 

92Given a _ListenerCollection or _ClsLevelListener, can locate 

93all the original listen() arguments and the listener fn contained 

94 

95ref(listenercollection) -> { 

96 ref(listener_fn) -> (target, identifier, fn), 

97 ref(listener_fn) -> (target, identifier, fn), 

98 ref(listener_fn) -> (target, identifier, fn), 

99 } 

100""" 

101 

102 

103def _collection_gced(ref: weakref.ref[Any]) -> None: 

104 # defaultdict, so can't get a KeyError 

105 if not _collection_to_key or ref not in _collection_to_key: 

106 return 

107 

108 ref = cast("weakref.ref[RefCollection[EventTarget]]", ref) 

109 

110 listener_to_key = _collection_to_key.pop(ref) 

111 for key in listener_to_key.values(): 

112 if key in _key_to_collection: 

113 # defaultdict, so can't get a KeyError 

114 dispatch_reg = _key_to_collection[key] 

115 dispatch_reg.pop(ref) 

116 if not dispatch_reg: 

117 _key_to_collection.pop(key) 

118 

119 

120def _stored_in_collection( 

121 event_key: _EventKey[_ET], owner: RefCollection[_ET] 

122) -> bool: 

123 key = event_key._key 

124 

125 dispatch_reg = _key_to_collection[key] 

126 

127 owner_ref = owner.ref 

128 listen_ref = weakref.ref(event_key._listen_fn) 

129 

130 if owner_ref in dispatch_reg: 

131 return False 

132 

133 dispatch_reg[owner_ref] = listen_ref 

134 

135 listener_to_key = _collection_to_key[owner_ref] 

136 listener_to_key[listen_ref] = key 

137 

138 return True 

139 

140 

141def _removed_from_collection( 

142 event_key: _EventKey[_ET], owner: RefCollection[_ET] 

143) -> None: 

144 key = event_key._key 

145 

146 dispatch_reg = _key_to_collection[key] 

147 

148 listen_ref = weakref.ref(event_key._listen_fn) 

149 

150 owner_ref = owner.ref 

151 dispatch_reg.pop(owner_ref, None) 

152 if not dispatch_reg: 

153 del _key_to_collection[key] 

154 

155 if owner_ref in _collection_to_key: 

156 listener_to_key = _collection_to_key[owner_ref] 

157 listener_to_key.pop(listen_ref) 

158 

159 

160def _stored_in_collection_multi( 

161 newowner: RefCollection[_ET], 

162 oldowner: RefCollection[_ET], 

163 elements: Iterable[_ListenerFnType], 

164) -> None: 

165 if not elements: 

166 return 

167 

168 oldowner_ref = oldowner.ref 

169 newowner_ref = newowner.ref 

170 

171 old_listener_to_key = _collection_to_key[oldowner_ref] 

172 new_listener_to_key = _collection_to_key[newowner_ref] 

173 

174 for listen_fn in elements: 

175 listen_ref = weakref.ref(listen_fn) 

176 try: 

177 key = old_listener_to_key[listen_ref] 

178 except KeyError: 

179 # can occur during interpreter shutdown. 

180 # see #6740 

181 continue 

182 

183 try: 

184 dispatch_reg = _key_to_collection[key] 

185 except KeyError: 

186 continue 

187 

188 if newowner_ref in dispatch_reg: 

189 assert dispatch_reg[newowner_ref] == listen_ref 

190 else: 

191 dispatch_reg[newowner_ref] = listen_ref 

192 

193 new_listener_to_key[listen_ref] = key 

194 

195 

196def _clear( 

197 owner: RefCollection[_ET], 

198 elements: Iterable[_ListenerFnType], 

199) -> None: 

200 if not elements: 

201 return 

202 

203 owner_ref = owner.ref 

204 listener_to_key = _collection_to_key[owner_ref] 

205 for listen_fn in elements: 

206 listen_ref = weakref.ref(listen_fn) 

207 key = listener_to_key[listen_ref] 

208 dispatch_reg = _key_to_collection[key] 

209 dispatch_reg.pop(owner_ref, None) 

210 

211 if not dispatch_reg: 

212 del _key_to_collection[key] 

213 

214 

215class _EventKey(Generic[_ET]): 

216 """Represent :func:`.listen` arguments.""" 

217 

218 __slots__ = ( 

219 "target", 

220 "identifier", 

221 "fn", 

222 "fn_key", 

223 "fn_wrap", 

224 "dispatch_target", 

225 ) 

226 

227 target: _ET 

228 identifier: str 

229 fn: _ListenerFnType 

230 fn_key: _ListenerFnKeyType 

231 dispatch_target: Any 

232 _fn_wrap: Optional[_ListenerFnType] 

233 

234 def __init__( 

235 self, 

236 target: _ET, 

237 identifier: str, 

238 fn: _ListenerFnType, 

239 dispatch_target: Any, 

240 _fn_wrap: Optional[_ListenerFnType] = None, 

241 ): 

242 self.target = target 

243 self.identifier = identifier 

244 self.fn = fn 

245 if isinstance(fn, types.MethodType): 

246 self.fn_key = id(fn.__func__), id(fn.__self__) 

247 else: 

248 self.fn_key = id(fn) 

249 self.fn_wrap = _fn_wrap 

250 self.dispatch_target = dispatch_target 

251 

252 @property 

253 def _key(self) -> _EventKeyTupleType: 

254 return (id(self.target), self.identifier, self.fn_key) 

255 

256 def with_wrapper(self, fn_wrap: _ListenerFnType) -> _EventKey[_ET]: 

257 if fn_wrap is self._listen_fn: 

258 return self 

259 else: 

260 return _EventKey( 

261 self.target, 

262 self.identifier, 

263 self.fn, 

264 self.dispatch_target, 

265 _fn_wrap=fn_wrap, 

266 ) 

267 

268 def with_dispatch_target(self, dispatch_target: Any) -> _EventKey[_ET]: 

269 if dispatch_target is self.dispatch_target: 

270 return self 

271 else: 

272 return _EventKey( 

273 self.target, 

274 self.identifier, 

275 self.fn, 

276 dispatch_target, 

277 _fn_wrap=self.fn_wrap, 

278 ) 

279 

280 def listen(self, *args: Any, **kw: Any) -> None: 

281 once = kw.pop("once", False) 

282 once_unless_exception = kw.pop("_once_unless_exception", False) 

283 named = kw.pop("named", False) 

284 

285 target, identifier, fn = ( 

286 self.dispatch_target, 

287 self.identifier, 

288 self._listen_fn, 

289 ) 

290 

291 dispatch_collection = getattr(target.dispatch, identifier) 

292 

293 adjusted_fn = dispatch_collection._adjust_fn_spec(fn, named) 

294 

295 self = self.with_wrapper(adjusted_fn) 

296 

297 stub_function = getattr( 

298 self.dispatch_target.dispatch._events, self.identifier 

299 ) 

300 if hasattr(stub_function, "_sa_warn"): 

301 stub_function._sa_warn() 

302 

303 if once or once_unless_exception: 

304 self.with_wrapper( 

305 util.only_once( 

306 self._listen_fn, retry_on_exception=once_unless_exception 

307 ) 

308 ).listen(*args, **kw) 

309 else: 

310 self.dispatch_target.dispatch._listen(self, *args, **kw) 

311 

312 def remove(self) -> None: 

313 key = self._key 

314 

315 if key not in _key_to_collection: 

316 raise exc.InvalidRequestError( 

317 "No listeners found for event %s / %r / %s " 

318 % (self.target, self.identifier, self.fn) 

319 ) 

320 

321 dispatch_reg = _key_to_collection.pop(key) 

322 

323 for collection_ref, listener_ref in dispatch_reg.items(): 

324 collection = collection_ref() 

325 listener_fn = listener_ref() 

326 if collection is not None and listener_fn is not None: 

327 collection.remove(self.with_wrapper(listener_fn)) 

328 

329 def contains(self) -> bool: 

330 """Return True if this event key is registered to listen.""" 

331 return self._key in _key_to_collection 

332 

333 def base_listen( 

334 self, 

335 propagate: bool = False, 

336 insert: bool = False, 

337 named: bool = False, 

338 retval: Optional[bool] = None, 

339 asyncio: bool = False, 

340 ) -> None: 

341 target, identifier = self.dispatch_target, self.identifier 

342 

343 dispatch_collection = getattr(target.dispatch, identifier) 

344 

345 for_modify = dispatch_collection.for_modify(target.dispatch) 

346 if asyncio: 

347 for_modify._set_asyncio() 

348 

349 if insert: 

350 for_modify.insert(self, propagate) 

351 else: 

352 for_modify.append(self, propagate) 

353 

354 @property 

355 def _listen_fn(self) -> _ListenerFnType: 

356 return self.fn_wrap or self.fn 

357 

358 def append_to_list( 

359 self, 

360 owner: RefCollection[_ET], 

361 list_: Deque[_ListenerFnType], 

362 ) -> bool: 

363 if _stored_in_collection(self, owner): 

364 list_.append(self._listen_fn) 

365 return True 

366 else: 

367 return False 

368 

369 def remove_from_list( 

370 self, 

371 owner: RefCollection[_ET], 

372 list_: Deque[_ListenerFnType], 

373 ) -> None: 

374 _removed_from_collection(self, owner) 

375 list_.remove(self._listen_fn) 

376 

377 def prepend_to_list( 

378 self, 

379 owner: RefCollection[_ET], 

380 list_: Deque[_ListenerFnType], 

381 ) -> bool: 

382 if _stored_in_collection(self, owner): 

383 list_.appendleft(self._listen_fn) 

384 return True 

385 else: 

386 return False