Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy_utils/observer.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

92 statements  

1""" 

2This module provides a decorator function for observing changes in a given 

3property. Internally the decorator is implemented using SQLAlchemy event 

4listeners. Both column properties and relationship properties can be observed. 

5 

6Property observers can be used for pre-calculating aggregates and automatic 

7real-time data denormalization. 

8 

9Simple observers 

10---------------- 

11 

12At the heart of the observer extension is the :func:`observes` decorator. You 

13mark some property path as being observed and the marked method will get 

14notified when any changes are made to given path. 

15 

16Consider the following model structure: 

17 

18:: 

19 

20 class Director(Base): 

21 __tablename__ = 'director' 

22 id = sa.Column(sa.Integer, primary_key=True) 

23 name = sa.Column(sa.String) 

24 date_of_birth = sa.Column(sa.Date) 

25 

26 class Movie(Base): 

27 __tablename__ = 'movie' 

28 id = sa.Column(sa.Integer, primary_key=True) 

29 name = sa.Column(sa.String) 

30 director_id = sa.Column(sa.Integer, sa.ForeignKey(Director.id)) 

31 director = sa.orm.relationship(Director, backref='movies') 

32 

33 

34Now consider we want to show movies in some listing ordered by director id 

35first and movie id secondly. If we have many movies then using joins and 

36ordering by Director.name will be very slow. Here is where denormalization 

37and :func:`observes` comes to rescue the day. Let's add a new column called 

38director_name to Movie which will get automatically copied from associated 

39Director. 

40 

41 

42:: 

43 

44 from sqlalchemy_utils import observes 

45 

46 

47 class Movie(Base): 

48 # same as before.. 

49 director_name = sa.Column(sa.String) 

50 

51 @observes('director') 

52 def director_observer(self, director): 

53 self.director_name = director.name 

54 

55.. note:: 

56 

57 This example could be done much more efficiently using a compound foreign 

58 key from director_name, director_id to Director.name, Director.id but for 

59 the sake of simplicity we added this as an example. 

60 

61 

62Observes vs aggregated 

63---------------------- 

64 

65:func:`observes` and :func:`.aggregates.aggregated` can be used for similar 

66things. However performance wise you should take the following things into 

67consideration: 

68 

69* :func:`observes` works always inside transaction and deals with objects. If 

70 the relationship observer is observing has a large number of objects it's 

71 better to use :func:`.aggregates.aggregated`. 

72* :func:`.aggregates.aggregated` always executes one additional query per 

73 aggregate so in scenarios where the observed relationship has only a handful 

74 of objects it's better to use :func:`observes` instead. 

75 

76 

77Example 1. Movie with many ratings 

78 

79Let's say we have a Movie object with potentially thousands of ratings. In this 

80case we should always use :func:`.aggregates.aggregated` since iterating 

81through thousands of objects is slow and very memory consuming. 

82 

83Example 2. Product with denormalized catalog name 

84 

85Each product belongs to one catalog. Here it is natural to use :func:`observes` 

86for data denormalization. 

87 

88 

89Deeply nested observing 

90----------------------- 

91 

92Consider the following model structure where Catalog has many Categories and 

93Category has many Products. 

94 

95:: 

96 

97 class Catalog(Base): 

98 __tablename__ = 'catalog' 

99 id = sa.Column(sa.Integer, primary_key=True) 

100 product_count = sa.Column(sa.Integer, default=0) 

101 

102 @observes('categories.products') 

103 def product_observer(self, products): 

104 self.product_count = len(products) 

105 

106 categories = sa.orm.relationship('Category', backref='catalog') 

107 

108 class Category(Base): 

109 __tablename__ = 'category' 

110 id = sa.Column(sa.Integer, primary_key=True) 

111 catalog_id = sa.Column(sa.Integer, sa.ForeignKey('catalog.id')) 

112 

113 products = sa.orm.relationship('Product', backref='category') 

114 

115 class Product(Base): 

116 __tablename__ = 'product' 

117 id = sa.Column(sa.Integer, primary_key=True) 

118 price = sa.Column(sa.Numeric) 

119 

120 category_id = sa.Column(sa.Integer, sa.ForeignKey('category.id')) 

121 

122 

123:func:`observes` is smart enough to: 

124 

125* Notify catalog objects of any changes in associated Product objects 

126* Notify catalog objects of any changes in Category objects that affect 

127 products (for example if Category gets deleted, or a new Category is added to 

128 Catalog with any number of Products) 

129 

130 

131:: 

132 

133 category = Category( 

134 products=[Product(), Product()] 

135 ) 

136 category2 = Category( 

137 product=[Product()] 

138 ) 

139 

140 catalog = Catalog( 

141 categories=[category, category2] 

142 ) 

143 session.add(catalog) 

144 session.commit() 

145 catalog.product_count # 2 

146 

147 session.delete(category) 

148 session.commit() 

149 catalog.product_count # 1 

150 

151 

152Observing multiple columns 

153-------------------------- 

154 

155You can also observe multiple columns by specifying all the observable columns 

156in the decorator. 

157 

158 

159:: 

160 

161 class Order(Base): 

162 __tablename__ = 'order' 

163 id = sa.Column(sa.Integer, primary_key=True) 

164 unit_price = sa.Column(sa.Integer) 

165 amount = sa.Column(sa.Integer) 

166 total_price = sa.Column(sa.Integer) 

167 

168 @observes('amount', 'unit_price') 

169 def total_price_observer(self, amount, unit_price): 

170 self.total_price = amount * unit_price 

171 

172 

173 

174""" 

175 

176import itertools 

177from collections import defaultdict, namedtuple 

178from collections.abc import Iterable 

179 

180import sqlalchemy as sa 

181 

182from .functions import getdotattr, has_changes 

183from .path import AttrPath 

184from .utils import is_sequence 

185 

186Callback = namedtuple('Callback', ['func', 'backref', 'fullpath']) 

187 

188 

189class PropertyObserver: 

190 def __init__(self): 

191 self.listener_args = [ 

192 (sa.orm.Mapper, 'mapper_configured', self.update_generator_registry), 

193 (sa.orm.Mapper, 'after_configured', self.gather_paths), 

194 (sa.orm.session.Session, 'before_flush', self.invoke_callbacks), 

195 ] 

196 self.callback_map = defaultdict(list) 

197 # TODO: make the registry a WeakKey dict 

198 self.generator_registry = defaultdict(list) 

199 

200 def remove_listeners(self): 

201 for args in self.listener_args: 

202 sa.event.remove(*args) 

203 

204 def register_listeners(self): 

205 for args in self.listener_args: 

206 if not sa.event.contains(*args): 

207 sa.event.listen(*args) 

208 

209 def __repr__(self): 

210 return '<PropertyObserver>' 

211 

212 def update_generator_registry(self, mapper, class_): 

213 """ 

214 Adds generator functions to generator_registry. 

215 """ 

216 

217 for generator in class_.__dict__.values(): 

218 if hasattr(generator, '__observes__'): 

219 self.generator_registry[class_].append(generator) 

220 

221 def gather_paths(self): 

222 for class_, generators in self.generator_registry.items(): 

223 for callback in generators: 

224 full_paths = [] 

225 for call_path in callback.__observes__: 

226 full_paths.append(AttrPath(class_, call_path)) 

227 

228 for path in full_paths: 

229 self.callback_map[class_].append( 

230 Callback(func=callback, backref=None, fullpath=full_paths) 

231 ) 

232 

233 for index in range(len(path)): 

234 i = index + 1 

235 prop = path[index].property 

236 if isinstance(prop, sa.orm.RelationshipProperty): 

237 prop_class = path[index].property.mapper.class_ 

238 self.callback_map[prop_class].append( 

239 Callback( 

240 func=callback, 

241 backref=~(path[:i]), 

242 fullpath=full_paths, 

243 ) 

244 ) 

245 

246 def gather_callback_args(self, obj, callbacks): 

247 session = sa.orm.object_session(obj) 

248 for callback in callbacks: 

249 backref = callback.backref 

250 

251 root_objs = getdotattr(obj, backref) if backref else obj 

252 if root_objs: 

253 if not isinstance(root_objs, Iterable): 

254 root_objs = [root_objs] 

255 

256 with session.no_autoflush: 

257 for root_obj in root_objs: 

258 if root_obj: 

259 args = self.get_callback_args(root_obj, callback) 

260 if args: 

261 yield args 

262 

263 def get_callback_args(self, root_obj, callback): 

264 session = sa.orm.object_session(root_obj) 

265 objects = [ 

266 getdotattr(root_obj, path, lambda obj: obj not in session.deleted) 

267 for path in callback.fullpath 

268 ] 

269 paths = [str(path) for path in callback.fullpath] 

270 for path in paths: 

271 if '.' in path or has_changes(root_obj, path): 

272 return (root_obj, callback.func, objects) 

273 

274 def iterate_objects_and_callbacks(self, session): 

275 objs = itertools.chain(session.new, session.dirty, session.deleted) 

276 for obj in objs: 

277 for class_, callbacks in self.callback_map.items(): 

278 if isinstance(obj, class_): 

279 yield obj, callbacks 

280 

281 def invoke_callbacks(self, session, ctx, instances): 

282 callback_args = defaultdict(lambda: defaultdict(set)) 

283 for obj, callbacks in self.iterate_objects_and_callbacks(session): 

284 args = self.gather_callback_args(obj, callbacks) 

285 for root_obj, func, objects in args: 

286 if not callback_args[root_obj][func]: 

287 callback_args[root_obj][func] = {} 

288 for i, object_ in enumerate(objects): 

289 if is_sequence(object_): 

290 callback_args[root_obj][func][i] = callback_args[root_obj][ 

291 func 

292 ].get(i, set()) | set(object_) 

293 else: 

294 callback_args[root_obj][func][i] = object_ 

295 

296 for root_obj, callback_objs in callback_args.items(): 

297 for callback, objs in callback_objs.items(): 

298 callback(root_obj, *[objs[i] for i in range(len(objs))]) 

299 

300 

301observer = PropertyObserver() 

302 

303 

304def observes(*paths, **observer_kw): 

305 """ 

306 Mark method as property observer for the given property path. Inside 

307 transaction observer gathers all changes made in given property path and 

308 feeds the changed objects to observer-marked method at the before flush 

309 phase. 

310 

311 :: 

312 

313 from sqlalchemy_utils import observes 

314 

315 

316 class Catalog(Base): 

317 __tablename__ = 'catalog' 

318 id = sa.Column(sa.Integer, primary_key=True) 

319 category_count = sa.Column(sa.Integer, default=0) 

320 

321 @observes('categories') 

322 def category_observer(self, categories): 

323 self.category_count = len(categories) 

324 

325 class Category(Base): 

326 __tablename__ = 'category' 

327 id = sa.Column(sa.Integer, primary_key=True) 

328 catalog_id = sa.Column(sa.Integer, sa.ForeignKey('catalog.id')) 

329 

330 

331 catalog = Catalog(categories=[Category(), Category()]) 

332 session.add(catalog) 

333 session.commit() 

334 

335 catalog.category_count # 2 

336 

337 

338 .. versionadded: 0.28.0 

339 

340 :param paths: One or more dot-notated property paths, eg. 

341 'categories.products.price' 

342 :param observer_kw: A dictionary where value for key 'observer' contains 

343 :meth:`PropertyObserver` object 

344 """ 

345 observer_ = observer_kw.pop('observer', observer) 

346 observer_.register_listeners() 

347 

348 def wraps(func): 

349 def wrapper(self, *args, **kwargs): 

350 return func(self, *args, **kwargs) 

351 

352 wrapper.__observes__ = paths 

353 return wrapper 

354 

355 return wraps