1"""
2This module provides a decorator function for observing changes in a given
3property. Internally the decorator is implemented using SQLAlchemy event
4listeners. Both column properties and relationship properties can be observed.
5
6Property observers can be used for pre-calculating aggregates and automatic
7real-time data denormalization.
8
9Simple observers
10----------------
11
12At the heart of the observer extension is the :func:`observes` decorator. You
13mark some property path as being observed and the marked method will get
14notified when any changes are made to given path.
15
16Consider the following model structure:
17
18::
19
20 class Director(Base):
21 __tablename__ = 'director'
22 id = sa.Column(sa.Integer, primary_key=True)
23 name = sa.Column(sa.String)
24 date_of_birth = sa.Column(sa.Date)
25
26 class Movie(Base):
27 __tablename__ = 'movie'
28 id = sa.Column(sa.Integer, primary_key=True)
29 name = sa.Column(sa.String)
30 director_id = sa.Column(sa.Integer, sa.ForeignKey(Director.id))
31 director = sa.orm.relationship(Director, backref='movies')
32
33
34Now consider we want to show movies in some listing ordered by director id
35first and movie id secondly. If we have many movies then using joins and
36ordering by Director.name will be very slow. Here is where denormalization
37and :func:`observes` comes to rescue the day. Let's add a new column called
38director_name to Movie which will get automatically copied from associated
39Director.
40
41
42::
43
44 from sqlalchemy_utils import observes
45
46
47 class Movie(Base):
48 # same as before..
49 director_name = sa.Column(sa.String)
50
51 @observes('director')
52 def director_observer(self, director):
53 self.director_name = director.name
54
55.. note::
56
57 This example could be done much more efficiently using a compound foreign
58 key from director_name, director_id to Director.name, Director.id but for
59 the sake of simplicity we added this as an example.
60
61
62Observes vs aggregated
63----------------------
64
65:func:`observes` and :func:`.aggregates.aggregated` can be used for similar
66things. However performance wise you should take the following things into
67consideration:
68
69* :func:`observes` works always inside transaction and deals with objects. If
70 the relationship observer is observing has a large number of objects it's
71 better to use :func:`.aggregates.aggregated`.
72* :func:`.aggregates.aggregated` always executes one additional query per
73 aggregate so in scenarios where the observed relationship has only a handful
74 of objects it's better to use :func:`observes` instead.
75
76
77Example 1. Movie with many ratings
78
79Let's say we have a Movie object with potentially thousands of ratings. In this
80case we should always use :func:`.aggregates.aggregated` since iterating
81through thousands of objects is slow and very memory consuming.
82
83Example 2. Product with denormalized catalog name
84
85Each product belongs to one catalog. Here it is natural to use :func:`observes`
86for data denormalization.
87
88
89Deeply nested observing
90-----------------------
91
92Consider the following model structure where Catalog has many Categories and
93Category has many Products.
94
95::
96
97 class Catalog(Base):
98 __tablename__ = 'catalog'
99 id = sa.Column(sa.Integer, primary_key=True)
100 product_count = sa.Column(sa.Integer, default=0)
101
102 @observes('categories.products')
103 def product_observer(self, products):
104 self.product_count = len(products)
105
106 categories = sa.orm.relationship('Category', backref='catalog')
107
108 class Category(Base):
109 __tablename__ = 'category'
110 id = sa.Column(sa.Integer, primary_key=True)
111 catalog_id = sa.Column(sa.Integer, sa.ForeignKey('catalog.id'))
112
113 products = sa.orm.relationship('Product', backref='category')
114
115 class Product(Base):
116 __tablename__ = 'product'
117 id = sa.Column(sa.Integer, primary_key=True)
118 price = sa.Column(sa.Numeric)
119
120 category_id = sa.Column(sa.Integer, sa.ForeignKey('category.id'))
121
122
123:func:`observes` is smart enough to:
124
125* Notify catalog objects of any changes in associated Product objects
126* Notify catalog objects of any changes in Category objects that affect
127 products (for example if Category gets deleted, or a new Category is added to
128 Catalog with any number of Products)
129
130
131::
132
133 category = Category(
134 products=[Product(), Product()]
135 )
136 category2 = Category(
137 product=[Product()]
138 )
139
140 catalog = Catalog(
141 categories=[category, category2]
142 )
143 session.add(catalog)
144 session.commit()
145 catalog.product_count # 2
146
147 session.delete(category)
148 session.commit()
149 catalog.product_count # 1
150
151
152Observing multiple columns
153--------------------------
154
155You can also observe multiple columns by specifying all the observable columns
156in the decorator.
157
158
159::
160
161 class Order(Base):
162 __tablename__ = 'order'
163 id = sa.Column(sa.Integer, primary_key=True)
164 unit_price = sa.Column(sa.Integer)
165 amount = sa.Column(sa.Integer)
166 total_price = sa.Column(sa.Integer)
167
168 @observes('amount', 'unit_price')
169 def total_price_observer(self, amount, unit_price):
170 self.total_price = amount * unit_price
171
172
173
174"""
175
176import itertools
177from collections import defaultdict, namedtuple
178from collections.abc import Iterable
179
180import sqlalchemy as sa
181
182from .functions import getdotattr, has_changes
183from .path import AttrPath
184from .utils import is_sequence
185
186Callback = namedtuple('Callback', ['func', 'backref', 'fullpath'])
187
188
189class PropertyObserver:
190 def __init__(self):
191 self.listener_args = [
192 (sa.orm.Mapper, 'mapper_configured', self.update_generator_registry),
193 (sa.orm.Mapper, 'after_configured', self.gather_paths),
194 (sa.orm.session.Session, 'before_flush', self.invoke_callbacks),
195 ]
196 self.callback_map = defaultdict(list)
197 # TODO: make the registry a WeakKey dict
198 self.generator_registry = defaultdict(list)
199
200 def remove_listeners(self):
201 for args in self.listener_args:
202 sa.event.remove(*args)
203
204 def register_listeners(self):
205 for args in self.listener_args:
206 if not sa.event.contains(*args):
207 sa.event.listen(*args)
208
209 def __repr__(self):
210 return '<PropertyObserver>'
211
212 def update_generator_registry(self, mapper, class_):
213 """
214 Adds generator functions to generator_registry.
215 """
216
217 for generator in class_.__dict__.values():
218 if hasattr(generator, '__observes__'):
219 self.generator_registry[class_].append(generator)
220
221 def gather_paths(self):
222 for class_, generators in self.generator_registry.items():
223 for callback in generators:
224 full_paths = []
225 for call_path in callback.__observes__:
226 full_paths.append(AttrPath(class_, call_path))
227
228 for path in full_paths:
229 self.callback_map[class_].append(
230 Callback(func=callback, backref=None, fullpath=full_paths)
231 )
232
233 for index in range(len(path)):
234 i = index + 1
235 prop = path[index].property
236 if isinstance(prop, sa.orm.RelationshipProperty):
237 prop_class = path[index].property.mapper.class_
238 self.callback_map[prop_class].append(
239 Callback(
240 func=callback,
241 backref=~(path[:i]),
242 fullpath=full_paths,
243 )
244 )
245
246 def gather_callback_args(self, obj, callbacks):
247 session = sa.orm.object_session(obj)
248 for callback in callbacks:
249 backref = callback.backref
250
251 root_objs = getdotattr(obj, backref) if backref else obj
252 if root_objs:
253 if not isinstance(root_objs, Iterable):
254 root_objs = [root_objs]
255
256 with session.no_autoflush:
257 for root_obj in root_objs:
258 if root_obj:
259 args = self.get_callback_args(root_obj, callback)
260 if args:
261 yield args
262
263 def get_callback_args(self, root_obj, callback):
264 session = sa.orm.object_session(root_obj)
265 objects = [
266 getdotattr(root_obj, path, lambda obj: obj not in session.deleted)
267 for path in callback.fullpath
268 ]
269 paths = [str(path) for path in callback.fullpath]
270 for path in paths:
271 if '.' in path or has_changes(root_obj, path):
272 return (root_obj, callback.func, objects)
273
274 def iterate_objects_and_callbacks(self, session):
275 objs = itertools.chain(session.new, session.dirty, session.deleted)
276 for obj in objs:
277 for class_, callbacks in self.callback_map.items():
278 if isinstance(obj, class_):
279 yield obj, callbacks
280
281 def invoke_callbacks(self, session, ctx, instances):
282 callback_args = defaultdict(lambda: defaultdict(set))
283 for obj, callbacks in self.iterate_objects_and_callbacks(session):
284 args = self.gather_callback_args(obj, callbacks)
285 for root_obj, func, objects in args:
286 if not callback_args[root_obj][func]:
287 callback_args[root_obj][func] = {}
288 for i, object_ in enumerate(objects):
289 if is_sequence(object_):
290 callback_args[root_obj][func][i] = callback_args[root_obj][
291 func
292 ].get(i, set()) | set(object_)
293 else:
294 callback_args[root_obj][func][i] = object_
295
296 for root_obj, callback_objs in callback_args.items():
297 for callback, objs in callback_objs.items():
298 callback(root_obj, *[objs[i] for i in range(len(objs))])
299
300
301observer = PropertyObserver()
302
303
304def observes(*paths, **observer_kw):
305 """
306 Mark method as property observer for the given property path. Inside
307 transaction observer gathers all changes made in given property path and
308 feeds the changed objects to observer-marked method at the before flush
309 phase.
310
311 ::
312
313 from sqlalchemy_utils import observes
314
315
316 class Catalog(Base):
317 __tablename__ = 'catalog'
318 id = sa.Column(sa.Integer, primary_key=True)
319 category_count = sa.Column(sa.Integer, default=0)
320
321 @observes('categories')
322 def category_observer(self, categories):
323 self.category_count = len(categories)
324
325 class Category(Base):
326 __tablename__ = 'category'
327 id = sa.Column(sa.Integer, primary_key=True)
328 catalog_id = sa.Column(sa.Integer, sa.ForeignKey('catalog.id'))
329
330
331 catalog = Catalog(categories=[Category(), Category()])
332 session.add(catalog)
333 session.commit()
334
335 catalog.category_count # 2
336
337
338 .. versionadded: 0.28.0
339
340 :param paths: One or more dot-notated property paths, eg.
341 'categories.products.price'
342 :param observer_kw: A dictionary where value for key 'observer' contains
343 :meth:`PropertyObserver` object
344 """
345 observer_ = observer_kw.pop('observer', observer)
346 observer_.register_listeners()
347
348 def wraps(func):
349 def wrapper(self, *args, **kwargs):
350 return func(self, *args, **kwargs)
351
352 wrapper.__observes__ = paths
353 return wrapper
354
355 return wraps