1# orm/dependency.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""Relationship dependencies."""
11
12from __future__ import annotations
13
14from . import attributes
15from . import exc
16from . import sync
17from . import unitofwork
18from . import util as mapperutil
19from .interfaces import MANYTOMANY
20from .interfaces import MANYTOONE
21from .interfaces import ONETOMANY
22from .. import exc as sa_exc
23from .. import sql
24from .. import util
25
26
27class DependencyProcessor:
28 def __init__(self, prop):
29 self.prop = prop
30 self.cascade = prop.cascade
31 self.mapper = prop.mapper
32 self.parent = prop.parent
33 self.secondary = prop.secondary
34 self.direction = prop.direction
35 self.post_update = prop.post_update
36 self.passive_deletes = prop.passive_deletes
37 self.passive_updates = prop.passive_updates
38 self.enable_typechecks = prop.enable_typechecks
39 if self.passive_deletes:
40 self._passive_delete_flag = attributes.PASSIVE_NO_INITIALIZE
41 else:
42 self._passive_delete_flag = attributes.PASSIVE_OFF
43 if self.passive_updates:
44 self._passive_update_flag = attributes.PASSIVE_NO_INITIALIZE
45 else:
46 self._passive_update_flag = attributes.PASSIVE_OFF
47
48 self.sort_key = "%s_%s" % (self.parent._sort_key, prop.key)
49 self.key = prop.key
50 if not self.prop.synchronize_pairs:
51 raise sa_exc.ArgumentError(
52 "Can't build a DependencyProcessor for relationship %s. "
53 "No target attributes to populate between parent and "
54 "child are present" % self.prop
55 )
56
57 @classmethod
58 def from_relationship(cls, prop):
59 return _direction_to_processor[prop.direction](prop)
60
61 def hasparent(self, state):
62 """return True if the given object instance has a parent,
63 according to the ``InstrumentedAttribute`` handled by this
64 ``DependencyProcessor``.
65
66 """
67 return self.parent.class_manager.get_impl(self.key).hasparent(state)
68
69 def per_property_preprocessors(self, uow):
70 """establish actions and dependencies related to a flush.
71
72 These actions will operate on all relevant states in
73 the aggregate.
74
75 """
76 uow.register_preprocessor(self, True)
77
78 def per_property_flush_actions(self, uow):
79 after_save = unitofwork.ProcessAll(uow, self, False, True)
80 before_delete = unitofwork.ProcessAll(uow, self, True, True)
81
82 parent_saves = unitofwork.SaveUpdateAll(
83 uow, self.parent.primary_base_mapper
84 )
85 child_saves = unitofwork.SaveUpdateAll(
86 uow, self.mapper.primary_base_mapper
87 )
88
89 parent_deletes = unitofwork.DeleteAll(
90 uow, self.parent.primary_base_mapper
91 )
92 child_deletes = unitofwork.DeleteAll(
93 uow, self.mapper.primary_base_mapper
94 )
95
96 self.per_property_dependencies(
97 uow,
98 parent_saves,
99 child_saves,
100 parent_deletes,
101 child_deletes,
102 after_save,
103 before_delete,
104 )
105
106 def per_state_flush_actions(self, uow, states, isdelete):
107 """establish actions and dependencies related to a flush.
108
109 These actions will operate on all relevant states
110 individually. This occurs only if there are cycles
111 in the 'aggregated' version of events.
112
113 """
114
115 child_base_mapper = self.mapper.primary_base_mapper
116 child_saves = unitofwork.SaveUpdateAll(uow, child_base_mapper)
117 child_deletes = unitofwork.DeleteAll(uow, child_base_mapper)
118
119 # locate and disable the aggregate processors
120 # for this dependency
121
122 if isdelete:
123 before_delete = unitofwork.ProcessAll(uow, self, True, True)
124 before_delete.disabled = True
125 else:
126 after_save = unitofwork.ProcessAll(uow, self, False, True)
127 after_save.disabled = True
128
129 # check if the "child" side is part of the cycle
130
131 if child_saves not in uow.cycles:
132 # based on the current dependencies we use, the saves/
133 # deletes should always be in the 'cycles' collection
134 # together. if this changes, we will have to break up
135 # this method a bit more.
136 assert child_deletes not in uow.cycles
137
138 # child side is not part of the cycle, so we will link per-state
139 # actions to the aggregate "saves", "deletes" actions
140 child_actions = [(child_saves, False), (child_deletes, True)]
141 child_in_cycles = False
142 else:
143 child_in_cycles = True
144
145 # check if the "parent" side is part of the cycle
146 if not isdelete:
147 parent_saves = unitofwork.SaveUpdateAll(
148 uow, self.parent.base_mapper
149 )
150 parent_deletes = before_delete = None
151 if parent_saves in uow.cycles:
152 parent_in_cycles = True
153 else:
154 parent_deletes = unitofwork.DeleteAll(uow, self.parent.base_mapper)
155 parent_saves = after_save = None
156 if parent_deletes in uow.cycles:
157 parent_in_cycles = True
158
159 # now create actions /dependencies for each state.
160
161 for state in states:
162 # detect if there's anything changed or loaded
163 # by a preprocessor on this state/attribute. In the
164 # case of deletes we may try to load missing items here as well.
165 sum_ = state.manager[self.key].impl.get_all_pending(
166 state,
167 state.dict,
168 (
169 self._passive_delete_flag
170 if isdelete
171 else attributes.PASSIVE_NO_INITIALIZE
172 ),
173 )
174
175 if not sum_:
176 continue
177
178 if isdelete:
179 before_delete = unitofwork.ProcessState(uow, self, True, state)
180 if parent_in_cycles:
181 parent_deletes = unitofwork.DeleteState(uow, state)
182 else:
183 after_save = unitofwork.ProcessState(uow, self, False, state)
184 if parent_in_cycles:
185 parent_saves = unitofwork.SaveUpdateState(uow, state)
186
187 if child_in_cycles:
188 child_actions = []
189 for child_state, child in sum_:
190 if child_state not in uow.states:
191 child_action = (None, None)
192 else:
193 (deleted, listonly) = uow.states[child_state]
194 if deleted:
195 child_action = (
196 unitofwork.DeleteState(uow, child_state),
197 True,
198 )
199 else:
200 child_action = (
201 unitofwork.SaveUpdateState(uow, child_state),
202 False,
203 )
204 child_actions.append(child_action)
205
206 # establish dependencies between our possibly per-state
207 # parent action and our possibly per-state child action.
208 for child_action, childisdelete in child_actions:
209 self.per_state_dependencies(
210 uow,
211 parent_saves,
212 parent_deletes,
213 child_action,
214 after_save,
215 before_delete,
216 isdelete,
217 childisdelete,
218 )
219
220 def presort_deletes(self, uowcommit, states):
221 return False
222
223 def presort_saves(self, uowcommit, states):
224 return False
225
226 def process_deletes(self, uowcommit, states):
227 pass
228
229 def process_saves(self, uowcommit, states):
230 pass
231
232 def prop_has_changes(self, uowcommit, states, isdelete):
233 if not isdelete or self.passive_deletes:
234 passive = (
235 attributes.PASSIVE_NO_INITIALIZE
236 | attributes.INCLUDE_PENDING_MUTATIONS
237 )
238 elif self.direction is MANYTOONE:
239 # here, we were hoping to optimize having to fetch many-to-one
240 # for history and ignore it, if there's no further cascades
241 # to take place. however there are too many less common conditions
242 # that still take place and tests in test_relationships /
243 # test_cascade etc. will still fail.
244 passive = attributes.PASSIVE_NO_FETCH_RELATED
245 else:
246 passive = (
247 attributes.PASSIVE_OFF | attributes.INCLUDE_PENDING_MUTATIONS
248 )
249
250 for s in states:
251 # TODO: add a high speed method
252 # to InstanceState which returns: attribute
253 # has a non-None value, or had one
254 history = uowcommit.get_attribute_history(s, self.key, passive)
255 if history and not history.empty():
256 return True
257 else:
258 return (
259 states
260 and not self.prop._is_self_referential
261 and self.mapper in uowcommit.mappers
262 )
263
264 def _verify_canload(self, state):
265 if self.prop.uselist and state is None:
266 raise exc.FlushError(
267 "Can't flush None value found in "
268 "collection %s" % (self.prop,)
269 )
270 elif state is not None and not self.mapper._canload(
271 state, allow_subtypes=not self.enable_typechecks
272 ):
273 if self.mapper._canload(state, allow_subtypes=True):
274 raise exc.FlushError(
275 "Attempting to flush an item of type "
276 "%(x)s as a member of collection "
277 '"%(y)s". Expected an object of type '
278 "%(z)s or a polymorphic subclass of "
279 "this type. If %(x)s is a subclass of "
280 '%(z)s, configure mapper "%(zm)s" to '
281 "load this subtype polymorphically, or "
282 "set enable_typechecks=False to allow "
283 "any subtype to be accepted for flush. "
284 % {
285 "x": state.class_,
286 "y": self.prop,
287 "z": self.mapper.class_,
288 "zm": self.mapper,
289 }
290 )
291 else:
292 raise exc.FlushError(
293 "Attempting to flush an item of type "
294 "%(x)s as a member of collection "
295 '"%(y)s". Expected an object of type '
296 "%(z)s or a polymorphic subclass of "
297 "this type."
298 % {
299 "x": state.class_,
300 "y": self.prop,
301 "z": self.mapper.class_,
302 }
303 )
304
305 def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
306 raise NotImplementedError()
307
308 def _get_reversed_processed_set(self, uow):
309 if not self.prop._reverse_property:
310 return None
311
312 process_key = tuple(
313 sorted([self.key] + [p.key for p in self.prop._reverse_property])
314 )
315 return uow.memo(("reverse_key", process_key), set)
316
317 def _post_update(self, state, uowcommit, related, is_m2o_delete=False):
318 for x in related:
319 if not is_m2o_delete or x is not None:
320 uowcommit.register_post_update(
321 state, [r for l, r in self.prop.synchronize_pairs]
322 )
323 break
324
325 def _pks_changed(self, uowcommit, state):
326 raise NotImplementedError()
327
328 def __repr__(self):
329 return "%s(%s)" % (self.__class__.__name__, self.prop)
330
331
332class OneToManyDP(DependencyProcessor):
333 def per_property_dependencies(
334 self,
335 uow,
336 parent_saves,
337 child_saves,
338 parent_deletes,
339 child_deletes,
340 after_save,
341 before_delete,
342 ):
343 if self.post_update:
344 child_post_updates = unitofwork.PostUpdateAll(
345 uow, self.mapper.primary_base_mapper, False
346 )
347 child_pre_updates = unitofwork.PostUpdateAll(
348 uow, self.mapper.primary_base_mapper, True
349 )
350
351 uow.dependencies.update(
352 [
353 (child_saves, after_save),
354 (parent_saves, after_save),
355 (after_save, child_post_updates),
356 (before_delete, child_pre_updates),
357 (child_pre_updates, parent_deletes),
358 (child_pre_updates, child_deletes),
359 ]
360 )
361 else:
362 uow.dependencies.update(
363 [
364 (parent_saves, after_save),
365 (after_save, child_saves),
366 (after_save, child_deletes),
367 (child_saves, parent_deletes),
368 (child_deletes, parent_deletes),
369 (before_delete, child_saves),
370 (before_delete, child_deletes),
371 ]
372 )
373
374 def per_state_dependencies(
375 self,
376 uow,
377 save_parent,
378 delete_parent,
379 child_action,
380 after_save,
381 before_delete,
382 isdelete,
383 childisdelete,
384 ):
385 if self.post_update:
386 child_post_updates = unitofwork.PostUpdateAll(
387 uow, self.mapper.primary_base_mapper, False
388 )
389 child_pre_updates = unitofwork.PostUpdateAll(
390 uow, self.mapper.primary_base_mapper, True
391 )
392
393 # TODO: this whole block is not covered
394 # by any tests
395 if not isdelete:
396 if childisdelete:
397 uow.dependencies.update(
398 [
399 (child_action, after_save),
400 (after_save, child_post_updates),
401 ]
402 )
403 else:
404 uow.dependencies.update(
405 [
406 (save_parent, after_save),
407 (child_action, after_save),
408 (after_save, child_post_updates),
409 ]
410 )
411 else:
412 if childisdelete:
413 uow.dependencies.update(
414 [
415 (before_delete, child_pre_updates),
416 (child_pre_updates, delete_parent),
417 ]
418 )
419 else:
420 uow.dependencies.update(
421 [
422 (before_delete, child_pre_updates),
423 (child_pre_updates, delete_parent),
424 ]
425 )
426 elif not isdelete:
427 uow.dependencies.update(
428 [
429 (save_parent, after_save),
430 (after_save, child_action),
431 (save_parent, child_action),
432 ]
433 )
434 else:
435 uow.dependencies.update(
436 [(before_delete, child_action), (child_action, delete_parent)]
437 )
438
439 def presort_deletes(self, uowcommit, states):
440 # head object is being deleted, and we manage its list of
441 # child objects the child objects have to have their
442 # foreign key to the parent set to NULL
443 should_null_fks = (
444 not self.cascade.delete and not self.passive_deletes == "all"
445 )
446
447 for state in states:
448 history = uowcommit.get_attribute_history(
449 state, self.key, self._passive_delete_flag
450 )
451 if history:
452 for child in history.deleted:
453 if child is not None and self.hasparent(child) is False:
454 if self.cascade.delete_orphan:
455 uowcommit.register_object(child, isdelete=True)
456 else:
457 uowcommit.register_object(child)
458
459 if should_null_fks:
460 for child in history.unchanged:
461 if child is not None:
462 uowcommit.register_object(
463 child, operation="delete", prop=self.prop
464 )
465
466 def presort_saves(self, uowcommit, states):
467 children_added = uowcommit.memo(("children_added", self), set)
468
469 should_null_fks = (
470 not self.cascade.delete_orphan
471 and not self.passive_deletes == "all"
472 )
473
474 for state in states:
475 pks_changed = self._pks_changed(uowcommit, state)
476
477 if not pks_changed or self.passive_updates:
478 passive = (
479 attributes.PASSIVE_NO_INITIALIZE
480 | attributes.INCLUDE_PENDING_MUTATIONS
481 )
482 else:
483 passive = (
484 attributes.PASSIVE_OFF
485 | attributes.INCLUDE_PENDING_MUTATIONS
486 )
487
488 history = uowcommit.get_attribute_history(state, self.key, passive)
489 if history:
490 for child in history.added:
491 if child is not None:
492 uowcommit.register_object(
493 child,
494 cancel_delete=True,
495 operation="add",
496 prop=self.prop,
497 )
498
499 children_added.update(history.added)
500
501 for child in history.deleted:
502 if not self.cascade.delete_orphan:
503 if should_null_fks:
504 uowcommit.register_object(
505 child,
506 isdelete=False,
507 operation="delete",
508 prop=self.prop,
509 )
510 elif self.hasparent(child) is False:
511 uowcommit.register_object(
512 child,
513 isdelete=True,
514 operation="delete",
515 prop=self.prop,
516 )
517 for c, m, st_, dct_ in self.mapper.cascade_iterator(
518 "delete", child
519 ):
520 uowcommit.register_object(st_, isdelete=True)
521
522 if pks_changed:
523 if history:
524 for child in history.unchanged:
525 if child is not None:
526 uowcommit.register_object(
527 child,
528 False,
529 self.passive_updates,
530 operation="pk change",
531 prop=self.prop,
532 )
533
534 def process_deletes(self, uowcommit, states):
535 # head object is being deleted, and we manage its list of
536 # child objects the child objects have to have their foreign
537 # key to the parent set to NULL this phase can be called
538 # safely for any cascade but is unnecessary if delete cascade
539 # is on.
540
541 if self.post_update or not self.passive_deletes == "all":
542 children_added = uowcommit.memo(("children_added", self), set)
543
544 for state in states:
545 history = uowcommit.get_attribute_history(
546 state, self.key, self._passive_delete_flag
547 )
548 if history:
549 for child in history.deleted:
550 if (
551 child is not None
552 and self.hasparent(child) is False
553 ):
554 self._synchronize(
555 state, child, None, True, uowcommit, False
556 )
557 if self.post_update and child:
558 self._post_update(child, uowcommit, [state])
559
560 if self.post_update or not self.cascade.delete:
561 for child in set(history.unchanged).difference(
562 children_added
563 ):
564 if child is not None:
565 self._synchronize(
566 state, child, None, True, uowcommit, False
567 )
568 if self.post_update and child:
569 self._post_update(
570 child, uowcommit, [state]
571 )
572
573 # technically, we can even remove each child from the
574 # collection here too. but this would be a somewhat
575 # inconsistent behavior since it wouldn't happen
576 # if the old parent wasn't deleted but child was moved.
577
578 def process_saves(self, uowcommit, states):
579 should_null_fks = (
580 not self.cascade.delete_orphan
581 and not self.passive_deletes == "all"
582 )
583
584 for state in states:
585 history = uowcommit.get_attribute_history(
586 state, self.key, attributes.PASSIVE_NO_INITIALIZE
587 )
588 if history:
589 for child in history.added:
590 self._synchronize(
591 state, child, None, False, uowcommit, False
592 )
593 if child is not None and self.post_update:
594 self._post_update(child, uowcommit, [state])
595
596 for child in history.deleted:
597 if (
598 should_null_fks
599 and not self.cascade.delete_orphan
600 and not self.hasparent(child)
601 ):
602 self._synchronize(
603 state, child, None, True, uowcommit, False
604 )
605
606 if self._pks_changed(uowcommit, state):
607 for child in history.unchanged:
608 self._synchronize(
609 state, child, None, False, uowcommit, True
610 )
611
612 def _synchronize(
613 self, state, child, associationrow, clearkeys, uowcommit, pks_changed
614 ):
615 source = state
616 dest = child
617 self._verify_canload(child)
618 if dest is None or (
619 not self.post_update and uowcommit.is_deleted(dest)
620 ):
621 return
622 if clearkeys:
623 sync.clear(dest, self.mapper, self.prop.synchronize_pairs)
624 else:
625 sync.populate(
626 source,
627 self.parent,
628 dest,
629 self.mapper,
630 self.prop.synchronize_pairs,
631 uowcommit,
632 self.passive_updates and pks_changed,
633 )
634
635 def _pks_changed(self, uowcommit, state):
636 return sync.source_modified(
637 uowcommit, state, self.parent, self.prop.synchronize_pairs
638 )
639
640
641class ManyToOneDP(DependencyProcessor):
642 def __init__(self, prop):
643 DependencyProcessor.__init__(self, prop)
644 for mapper in self.mapper.self_and_descendants:
645 mapper._dependency_processors.append(DetectKeySwitch(prop))
646
647 def per_property_dependencies(
648 self,
649 uow,
650 parent_saves,
651 child_saves,
652 parent_deletes,
653 child_deletes,
654 after_save,
655 before_delete,
656 ):
657 if self.post_update:
658 parent_post_updates = unitofwork.PostUpdateAll(
659 uow, self.parent.primary_base_mapper, False
660 )
661 parent_pre_updates = unitofwork.PostUpdateAll(
662 uow, self.parent.primary_base_mapper, True
663 )
664
665 uow.dependencies.update(
666 [
667 (child_saves, after_save),
668 (parent_saves, after_save),
669 (after_save, parent_post_updates),
670 (after_save, parent_pre_updates),
671 (before_delete, parent_pre_updates),
672 (parent_pre_updates, child_deletes),
673 (parent_pre_updates, parent_deletes),
674 ]
675 )
676 else:
677 uow.dependencies.update(
678 [
679 (child_saves, after_save),
680 (after_save, parent_saves),
681 (parent_saves, child_deletes),
682 (parent_deletes, child_deletes),
683 ]
684 )
685
686 def per_state_dependencies(
687 self,
688 uow,
689 save_parent,
690 delete_parent,
691 child_action,
692 after_save,
693 before_delete,
694 isdelete,
695 childisdelete,
696 ):
697 if self.post_update:
698 if not isdelete:
699 parent_post_updates = unitofwork.PostUpdateAll(
700 uow, self.parent.primary_base_mapper, False
701 )
702 if childisdelete:
703 uow.dependencies.update(
704 [
705 (after_save, parent_post_updates),
706 (parent_post_updates, child_action),
707 ]
708 )
709 else:
710 uow.dependencies.update(
711 [
712 (save_parent, after_save),
713 (child_action, after_save),
714 (after_save, parent_post_updates),
715 ]
716 )
717 else:
718 parent_pre_updates = unitofwork.PostUpdateAll(
719 uow, self.parent.primary_base_mapper, True
720 )
721
722 uow.dependencies.update(
723 [
724 (before_delete, parent_pre_updates),
725 (parent_pre_updates, delete_parent),
726 (parent_pre_updates, child_action),
727 ]
728 )
729
730 elif not isdelete:
731 if not childisdelete:
732 uow.dependencies.update(
733 [(child_action, after_save), (after_save, save_parent)]
734 )
735 else:
736 uow.dependencies.update([(after_save, save_parent)])
737
738 else:
739 if childisdelete:
740 uow.dependencies.update([(delete_parent, child_action)])
741
742 def presort_deletes(self, uowcommit, states):
743 if self.cascade.delete or self.cascade.delete_orphan:
744 for state in states:
745 history = uowcommit.get_attribute_history(
746 state, self.key, self._passive_delete_flag
747 )
748 if history:
749 if self.cascade.delete_orphan:
750 todelete = history.sum()
751 else:
752 todelete = history.non_deleted()
753 for child in todelete:
754 if child is None:
755 continue
756 uowcommit.register_object(
757 child,
758 isdelete=True,
759 operation="delete",
760 prop=self.prop,
761 )
762 t = self.mapper.cascade_iterator("delete", child)
763 for c, m, st_, dct_ in t:
764 uowcommit.register_object(st_, isdelete=True)
765
766 def presort_saves(self, uowcommit, states):
767 for state in states:
768 uowcommit.register_object(state, operation="add", prop=self.prop)
769 if self.cascade.delete_orphan:
770 history = uowcommit.get_attribute_history(
771 state, self.key, self._passive_delete_flag
772 )
773 if history:
774 for child in history.deleted:
775 if self.hasparent(child) is False:
776 uowcommit.register_object(
777 child,
778 isdelete=True,
779 operation="delete",
780 prop=self.prop,
781 )
782
783 t = self.mapper.cascade_iterator("delete", child)
784 for c, m, st_, dct_ in t:
785 uowcommit.register_object(st_, isdelete=True)
786
787 def process_deletes(self, uowcommit, states):
788 if (
789 self.post_update
790 and not self.cascade.delete_orphan
791 and not self.passive_deletes == "all"
792 ):
793 # post_update means we have to update our
794 # row to not reference the child object
795 # before we can DELETE the row
796 for state in states:
797 self._synchronize(state, None, None, True, uowcommit)
798 if state and self.post_update:
799 history = uowcommit.get_attribute_history(
800 state, self.key, self._passive_delete_flag
801 )
802 if history:
803 self._post_update(
804 state, uowcommit, history.sum(), is_m2o_delete=True
805 )
806
807 def process_saves(self, uowcommit, states):
808 for state in states:
809 history = uowcommit.get_attribute_history(
810 state, self.key, attributes.PASSIVE_NO_INITIALIZE
811 )
812 if history:
813 if history.added:
814 for child in history.added:
815 self._synchronize(
816 state, child, None, False, uowcommit, "add"
817 )
818 elif history.deleted:
819 self._synchronize(
820 state, None, None, True, uowcommit, "delete"
821 )
822 if self.post_update:
823 self._post_update(state, uowcommit, history.sum())
824
825 def _synchronize(
826 self,
827 state,
828 child,
829 associationrow,
830 clearkeys,
831 uowcommit,
832 operation=None,
833 ):
834 if state is None or (
835 not self.post_update and uowcommit.is_deleted(state)
836 ):
837 return
838
839 if (
840 operation is not None
841 and child is not None
842 and not uowcommit.session._contains_state(child)
843 ):
844 util.warn(
845 "Object of type %s not in session, %s "
846 "operation along '%s' won't proceed"
847 % (mapperutil.state_class_str(child), operation, self.prop)
848 )
849 return
850
851 if clearkeys or child is None:
852 sync.clear(state, self.parent, self.prop.synchronize_pairs)
853 else:
854 self._verify_canload(child)
855 sync.populate(
856 child,
857 self.mapper,
858 state,
859 self.parent,
860 self.prop.synchronize_pairs,
861 uowcommit,
862 False,
863 )
864
865
866class DetectKeySwitch(DependencyProcessor):
867 """For many-to-one relationships with no one-to-many backref,
868 searches for parents through the unit of work when a primary
869 key has changed and updates them.
870
871 Theoretically, this approach could be expanded to support transparent
872 deletion of objects referenced via many-to-one as well, although
873 the current attribute system doesn't do enough bookkeeping for this
874 to be efficient.
875
876 """
877
878 def per_property_preprocessors(self, uow):
879 if self.prop._reverse_property:
880 if self.passive_updates:
881 return
882 else:
883 if False in (
884 prop.passive_updates
885 for prop in self.prop._reverse_property
886 ):
887 return
888
889 uow.register_preprocessor(self, False)
890
891 def per_property_flush_actions(self, uow):
892 parent_saves = unitofwork.SaveUpdateAll(uow, self.parent.base_mapper)
893 after_save = unitofwork.ProcessAll(uow, self, False, False)
894 uow.dependencies.update([(parent_saves, after_save)])
895
896 def per_state_flush_actions(self, uow, states, isdelete):
897 pass
898
899 def presort_deletes(self, uowcommit, states):
900 pass
901
902 def presort_saves(self, uow, states):
903 if not self.passive_updates:
904 # for non-passive updates, register in the preprocess stage
905 # so that mapper save_obj() gets a hold of changes
906 self._process_key_switches(states, uow)
907
908 def prop_has_changes(self, uow, states, isdelete):
909 if not isdelete and self.passive_updates:
910 d = self._key_switchers(uow, states)
911 return bool(d)
912
913 return False
914
915 def process_deletes(self, uowcommit, states):
916 assert False
917
918 def process_saves(self, uowcommit, states):
919 # for passive updates, register objects in the process stage
920 # so that we avoid ManyToOneDP's registering the object without
921 # the listonly flag in its own preprocess stage (results in UPDATE)
922 # statements being emitted
923 assert self.passive_updates
924 self._process_key_switches(states, uowcommit)
925
926 def _key_switchers(self, uow, states):
927 switched, notswitched = uow.memo(
928 ("pk_switchers", self), lambda: (set(), set())
929 )
930
931 allstates = switched.union(notswitched)
932 for s in states:
933 if s not in allstates:
934 if self._pks_changed(uow, s):
935 switched.add(s)
936 else:
937 notswitched.add(s)
938 return switched
939
940 def _process_key_switches(self, deplist, uowcommit):
941 switchers = self._key_switchers(uowcommit, deplist)
942 if switchers:
943 # if primary key values have actually changed somewhere, perform
944 # a linear search through the UOW in search of a parent.
945 for state in uowcommit.session.identity_map.all_states():
946 if not issubclass(state.class_, self.parent.class_):
947 continue
948 dict_ = state.dict
949 related = state.get_impl(self.key).get(
950 state, dict_, passive=self._passive_update_flag
951 )
952 if (
953 related is not attributes.PASSIVE_NO_RESULT
954 and related is not None
955 ):
956 if self.prop.uselist:
957 if not related:
958 continue
959 related_obj = related[0]
960 else:
961 related_obj = related
962 related_state = attributes.instance_state(related_obj)
963 if related_state in switchers:
964 uowcommit.register_object(
965 state, False, self.passive_updates
966 )
967 sync.populate(
968 related_state,
969 self.mapper,
970 state,
971 self.parent,
972 self.prop.synchronize_pairs,
973 uowcommit,
974 self.passive_updates,
975 )
976
977 def _pks_changed(self, uowcommit, state):
978 return bool(state.key) and sync.source_modified(
979 uowcommit, state, self.mapper, self.prop.synchronize_pairs
980 )
981
982
983class ManyToManyDP(DependencyProcessor):
984 def per_property_dependencies(
985 self,
986 uow,
987 parent_saves,
988 child_saves,
989 parent_deletes,
990 child_deletes,
991 after_save,
992 before_delete,
993 ):
994 uow.dependencies.update(
995 [
996 (parent_saves, after_save),
997 (child_saves, after_save),
998 (after_save, child_deletes),
999 # a rowswitch on the parent from deleted to saved
1000 # can make this one occur, as the "save" may remove
1001 # an element from the
1002 # "deleted" list before we have a chance to
1003 # process its child rows
1004 (before_delete, parent_saves),
1005 (before_delete, parent_deletes),
1006 (before_delete, child_deletes),
1007 (before_delete, child_saves),
1008 ]
1009 )
1010
1011 def per_state_dependencies(
1012 self,
1013 uow,
1014 save_parent,
1015 delete_parent,
1016 child_action,
1017 after_save,
1018 before_delete,
1019 isdelete,
1020 childisdelete,
1021 ):
1022 if not isdelete:
1023 if childisdelete:
1024 uow.dependencies.update(
1025 [(save_parent, after_save), (after_save, child_action)]
1026 )
1027 else:
1028 uow.dependencies.update(
1029 [(save_parent, after_save), (child_action, after_save)]
1030 )
1031 else:
1032 uow.dependencies.update(
1033 [(before_delete, child_action), (before_delete, delete_parent)]
1034 )
1035
1036 def presort_deletes(self, uowcommit, states):
1037 # TODO: no tests fail if this whole
1038 # thing is removed !!!!
1039 if not self.passive_deletes:
1040 # if no passive deletes, load history on
1041 # the collection, so that prop_has_changes()
1042 # returns True
1043 for state in states:
1044 uowcommit.get_attribute_history(
1045 state, self.key, self._passive_delete_flag
1046 )
1047
1048 def presort_saves(self, uowcommit, states):
1049 if not self.passive_updates:
1050 # if no passive updates, load history on
1051 # each collection where parent has changed PK,
1052 # so that prop_has_changes() returns True
1053 for state in states:
1054 if self._pks_changed(uowcommit, state):
1055 uowcommit.get_attribute_history(
1056 state, self.key, attributes.PASSIVE_OFF
1057 )
1058
1059 if not self.cascade.delete_orphan:
1060 return
1061
1062 # check for child items removed from the collection
1063 # if delete_orphan check is turned on.
1064 for state in states:
1065 history = uowcommit.get_attribute_history(
1066 state, self.key, attributes.PASSIVE_NO_INITIALIZE
1067 )
1068 if history:
1069 for child in history.deleted:
1070 if self.hasparent(child) is False:
1071 uowcommit.register_object(
1072 child,
1073 isdelete=True,
1074 operation="delete",
1075 prop=self.prop,
1076 )
1077 for c, m, st_, dct_ in self.mapper.cascade_iterator(
1078 "delete", child
1079 ):
1080 uowcommit.register_object(st_, isdelete=True)
1081
1082 def process_deletes(self, uowcommit, states):
1083 secondary_delete = []
1084 secondary_insert = []
1085 secondary_update = []
1086
1087 processed = self._get_reversed_processed_set(uowcommit)
1088 tmp = set()
1089 for state in states:
1090 # this history should be cached already, as
1091 # we loaded it in preprocess_deletes
1092 history = uowcommit.get_attribute_history(
1093 state, self.key, self._passive_delete_flag
1094 )
1095 if history:
1096 for child in history.non_added():
1097 if child is None or (
1098 processed is not None and (state, child) in processed
1099 ):
1100 continue
1101 associationrow = {}
1102 if not self._synchronize(
1103 state,
1104 child,
1105 associationrow,
1106 False,
1107 uowcommit,
1108 "delete",
1109 ):
1110 continue
1111 secondary_delete.append(associationrow)
1112
1113 tmp.update((c, state) for c in history.non_added())
1114
1115 if processed is not None:
1116 processed.update(tmp)
1117
1118 self._run_crud(
1119 uowcommit, secondary_insert, secondary_update, secondary_delete
1120 )
1121
1122 def process_saves(self, uowcommit, states):
1123 secondary_delete = []
1124 secondary_insert = []
1125 secondary_update = []
1126
1127 processed = self._get_reversed_processed_set(uowcommit)
1128 tmp = set()
1129
1130 for state in states:
1131 need_cascade_pks = not self.passive_updates and self._pks_changed(
1132 uowcommit, state
1133 )
1134 if need_cascade_pks:
1135 passive = (
1136 attributes.PASSIVE_OFF
1137 | attributes.INCLUDE_PENDING_MUTATIONS
1138 )
1139 else:
1140 passive = (
1141 attributes.PASSIVE_NO_INITIALIZE
1142 | attributes.INCLUDE_PENDING_MUTATIONS
1143 )
1144 history = uowcommit.get_attribute_history(state, self.key, passive)
1145 if history:
1146 for child in history.added:
1147 if processed is not None and (state, child) in processed:
1148 continue
1149 associationrow = {}
1150 if not self._synchronize(
1151 state, child, associationrow, False, uowcommit, "add"
1152 ):
1153 continue
1154 secondary_insert.append(associationrow)
1155 for child in history.deleted:
1156 if processed is not None and (state, child) in processed:
1157 continue
1158 associationrow = {}
1159 if not self._synchronize(
1160 state,
1161 child,
1162 associationrow,
1163 False,
1164 uowcommit,
1165 "delete",
1166 ):
1167 continue
1168 secondary_delete.append(associationrow)
1169
1170 tmp.update((c, state) for c in history.added + history.deleted)
1171
1172 if need_cascade_pks:
1173 for child in history.unchanged:
1174 associationrow = {}
1175 sync.update(
1176 state,
1177 self.parent,
1178 associationrow,
1179 "old_",
1180 self.prop.synchronize_pairs,
1181 )
1182 sync.update(
1183 child,
1184 self.mapper,
1185 associationrow,
1186 "old_",
1187 self.prop.secondary_synchronize_pairs,
1188 )
1189
1190 secondary_update.append(associationrow)
1191
1192 if processed is not None:
1193 processed.update(tmp)
1194
1195 self._run_crud(
1196 uowcommit, secondary_insert, secondary_update, secondary_delete
1197 )
1198
1199 def _run_crud(
1200 self, uowcommit, secondary_insert, secondary_update, secondary_delete
1201 ):
1202 connection = uowcommit.transaction.connection(self.mapper)
1203
1204 if secondary_delete:
1205 associationrow = secondary_delete[0]
1206 statement = self.secondary.delete().where(
1207 sql.and_(
1208 *[
1209 c == sql.bindparam(c.key, type_=c.type)
1210 for c in self.secondary.c
1211 if c.key in associationrow
1212 ]
1213 )
1214 )
1215 result = connection.execute(statement, secondary_delete)
1216
1217 if (
1218 result.supports_sane_multi_rowcount()
1219 ) and result.rowcount != len(secondary_delete):
1220 raise exc.StaleDataError(
1221 "DELETE statement on table '%s' expected to delete "
1222 "%d row(s); Only %d were matched."
1223 % (
1224 self.secondary.description,
1225 len(secondary_delete),
1226 result.rowcount,
1227 )
1228 )
1229
1230 if secondary_update:
1231 associationrow = secondary_update[0]
1232 statement = self.secondary.update().where(
1233 sql.and_(
1234 *[
1235 c == sql.bindparam("old_" + c.key, type_=c.type)
1236 for c in self.secondary.c
1237 if c.key in associationrow
1238 ]
1239 )
1240 )
1241 result = connection.execute(statement, secondary_update)
1242
1243 if (
1244 result.supports_sane_multi_rowcount()
1245 ) and result.rowcount != len(secondary_update):
1246 raise exc.StaleDataError(
1247 "UPDATE statement on table '%s' expected to update "
1248 "%d row(s); Only %d were matched."
1249 % (
1250 self.secondary.description,
1251 len(secondary_update),
1252 result.rowcount,
1253 )
1254 )
1255
1256 if secondary_insert:
1257 statement = self.secondary.insert()
1258 connection.execute(statement, secondary_insert)
1259
1260 def _synchronize(
1261 self, state, child, associationrow, clearkeys, uowcommit, operation
1262 ):
1263 # this checks for None if uselist=True
1264 self._verify_canload(child)
1265
1266 # but if uselist=False we get here. If child is None,
1267 # no association row can be generated, so return.
1268 if child is None:
1269 return False
1270
1271 if child is not None and not uowcommit.session._contains_state(child):
1272 if not child.deleted:
1273 util.warn(
1274 "Object of type %s not in session, %s "
1275 "operation along '%s' won't proceed"
1276 % (mapperutil.state_class_str(child), operation, self.prop)
1277 )
1278 return False
1279
1280 sync.populate_dict(
1281 state, self.parent, associationrow, self.prop.synchronize_pairs
1282 )
1283 sync.populate_dict(
1284 child,
1285 self.mapper,
1286 associationrow,
1287 self.prop.secondary_synchronize_pairs,
1288 )
1289
1290 return True
1291
1292 def _pks_changed(self, uowcommit, state):
1293 return sync.source_modified(
1294 uowcommit, state, self.parent, self.prop.synchronize_pairs
1295 )
1296
1297
1298_direction_to_processor = {
1299 ONETOMANY: OneToManyDP,
1300 MANYTOONE: ManyToOneDP,
1301 MANYTOMANY: ManyToManyDP,
1302}