1# orm/dependency.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""Relationship dependencies."""
11
12from __future__ import annotations
13
14from . import attributes
15from . import exc
16from . import sync
17from . import unitofwork
18from . import util as mapperutil
19from .interfaces import MANYTOMANY
20from .interfaces import MANYTOONE
21from .interfaces import ONETOMANY
22from .. import exc as sa_exc
23from .. import sql
24from .. import util
25
26
27class _DependencyProcessor:
28 def __init__(self, prop):
29 self.prop = prop
30 self.cascade = prop.cascade
31 self.mapper = prop.mapper
32 self.parent = prop.parent
33 self.secondary = prop.secondary
34 self.direction = prop.direction
35 self.post_update = prop.post_update
36 self.passive_deletes = prop.passive_deletes
37 self.passive_updates = prop.passive_updates
38 self.enable_typechecks = prop.enable_typechecks
39 if self.passive_deletes:
40 self._passive_delete_flag = attributes.PASSIVE_NO_INITIALIZE
41 else:
42 self._passive_delete_flag = attributes.PASSIVE_OFF
43 if self.passive_updates:
44 self._passive_update_flag = attributes.PASSIVE_NO_INITIALIZE
45 else:
46 self._passive_update_flag = attributes.PASSIVE_OFF
47
48 self.sort_key = "%s_%s" % (self.parent._sort_key, prop.key)
49 self.key = prop.key
50 if not self.prop.synchronize_pairs:
51 raise sa_exc.ArgumentError(
52 "Can't build a DependencyProcessor for relationship %s. "
53 "No target attributes to populate between parent and "
54 "child are present" % self.prop
55 )
56
57 @classmethod
58 def from_relationship(cls, prop):
59 return _direction_to_processor[prop.direction](prop)
60
61 def hasparent(self, state):
62 """return True if the given object instance has a parent,
63 according to the ``InstrumentedAttribute`` handled by this
64 ``DependencyProcessor``.
65
66 """
67 return self.parent.class_manager.get_impl(self.key).hasparent(state)
68
69 def per_property_preprocessors(self, uow):
70 """establish actions and dependencies related to a flush.
71
72 These actions will operate on all relevant states in
73 the aggregate.
74
75 """
76 uow.register_preprocessor(self, True)
77
78 def per_property_flush_actions(self, uow):
79 after_save = unitofwork._ProcessAll(uow, self, False, True)
80 before_delete = unitofwork._ProcessAll(uow, self, True, True)
81
82 parent_saves = unitofwork._SaveUpdateAll(
83 uow, self.parent.primary_base_mapper
84 )
85 child_saves = unitofwork._SaveUpdateAll(
86 uow, self.mapper.primary_base_mapper
87 )
88
89 parent_deletes = unitofwork._DeleteAll(
90 uow, self.parent.primary_base_mapper
91 )
92 child_deletes = unitofwork._DeleteAll(
93 uow, self.mapper.primary_base_mapper
94 )
95
96 self.per_property_dependencies(
97 uow,
98 parent_saves,
99 child_saves,
100 parent_deletes,
101 child_deletes,
102 after_save,
103 before_delete,
104 )
105
106 def per_state_flush_actions(self, uow, states, isdelete):
107 """establish actions and dependencies related to a flush.
108
109 These actions will operate on all relevant states
110 individually. This occurs only if there are cycles
111 in the 'aggregated' version of events.
112
113 """
114
115 child_base_mapper = self.mapper.primary_base_mapper
116 child_saves = unitofwork._SaveUpdateAll(uow, child_base_mapper)
117 child_deletes = unitofwork._DeleteAll(uow, child_base_mapper)
118
119 # locate and disable the aggregate processors
120 # for this dependency
121
122 if isdelete:
123 before_delete = unitofwork._ProcessAll(uow, self, True, True)
124 before_delete.disabled = True
125 else:
126 after_save = unitofwork._ProcessAll(uow, self, False, True)
127 after_save.disabled = True
128
129 # check if the "child" side is part of the cycle
130
131 if child_saves not in uow.cycles:
132 # based on the current dependencies we use, the saves/
133 # deletes should always be in the 'cycles' collection
134 # together. if this changes, we will have to break up
135 # this method a bit more.
136 assert child_deletes not in uow.cycles
137
138 # child side is not part of the cycle, so we will link per-state
139 # actions to the aggregate "saves", "deletes" actions
140 child_actions = [(child_saves, False), (child_deletes, True)]
141 child_in_cycles = False
142 else:
143 child_in_cycles = True
144
145 # check if the "parent" side is part of the cycle
146 if not isdelete:
147 parent_saves = unitofwork._SaveUpdateAll(
148 uow, self.parent.base_mapper
149 )
150 parent_deletes = before_delete = None
151 if parent_saves in uow.cycles:
152 parent_in_cycles = True
153 else:
154 parent_deletes = unitofwork._DeleteAll(
155 uow, self.parent.base_mapper
156 )
157 parent_saves = after_save = None
158 if parent_deletes in uow.cycles:
159 parent_in_cycles = True
160
161 # now create actions /dependencies for each state.
162
163 for state in states:
164 # detect if there's anything changed or loaded
165 # by a preprocessor on this state/attribute. In the
166 # case of deletes we may try to load missing items here as well.
167 sum_ = state.manager[self.key].impl.get_all_pending(
168 state,
169 state.dict,
170 (
171 self._passive_delete_flag
172 if isdelete
173 else attributes.PASSIVE_NO_INITIALIZE
174 ),
175 )
176
177 if not sum_:
178 continue
179
180 if isdelete:
181 before_delete = unitofwork._ProcessState(
182 uow, self, True, state
183 )
184 if parent_in_cycles:
185 parent_deletes = unitofwork._DeleteState(uow, state)
186 else:
187 after_save = unitofwork._ProcessState(uow, self, False, state)
188 if parent_in_cycles:
189 parent_saves = unitofwork._SaveUpdateState(uow, state)
190
191 if child_in_cycles:
192 child_actions = []
193 for child_state, child in sum_:
194 if child_state not in uow.states:
195 child_action = (None, None)
196 else:
197 (deleted, listonly) = uow.states[child_state]
198 if deleted:
199 child_action = (
200 unitofwork._DeleteState(uow, child_state),
201 True,
202 )
203 else:
204 child_action = (
205 unitofwork._SaveUpdateState(uow, child_state),
206 False,
207 )
208 child_actions.append(child_action)
209
210 # establish dependencies between our possibly per-state
211 # parent action and our possibly per-state child action.
212 for child_action, childisdelete in child_actions:
213 self.per_state_dependencies(
214 uow,
215 parent_saves,
216 parent_deletes,
217 child_action,
218 after_save,
219 before_delete,
220 isdelete,
221 childisdelete,
222 )
223
224 def presort_deletes(self, uowcommit, states):
225 return False
226
227 def presort_saves(self, uowcommit, states):
228 return False
229
230 def process_deletes(self, uowcommit, states):
231 pass
232
233 def process_saves(self, uowcommit, states):
234 pass
235
236 def prop_has_changes(self, uowcommit, states, isdelete):
237 if not isdelete or self.passive_deletes:
238 passive = (
239 attributes.PASSIVE_NO_INITIALIZE
240 | attributes.INCLUDE_PENDING_MUTATIONS
241 )
242 elif self.direction is MANYTOONE:
243 # here, we were hoping to optimize having to fetch many-to-one
244 # for history and ignore it, if there's no further cascades
245 # to take place. however there are too many less common conditions
246 # that still take place and tests in test_relationships /
247 # test_cascade etc. will still fail.
248 passive = attributes.PASSIVE_NO_FETCH_RELATED
249 else:
250 passive = (
251 attributes.PASSIVE_OFF | attributes.INCLUDE_PENDING_MUTATIONS
252 )
253
254 for s in states:
255 # TODO: add a high speed method
256 # to InstanceState which returns: attribute
257 # has a non-None value, or had one
258 history = uowcommit.get_attribute_history(s, self.key, passive)
259 if history and not history.empty():
260 return True
261 else:
262 return (
263 states
264 and not self.prop._is_self_referential
265 and self.mapper in uowcommit.mappers
266 )
267
268 def _verify_canload(self, state):
269 if self.prop.uselist and state is None:
270 raise exc.FlushError(
271 "Can't flush None value found in "
272 "collection %s" % (self.prop,)
273 )
274 elif state is not None and not self.mapper._canload(
275 state, allow_subtypes=not self.enable_typechecks
276 ):
277 if self.mapper._canload(state, allow_subtypes=True):
278 raise exc.FlushError(
279 "Attempting to flush an item of type "
280 "%(x)s as a member of collection "
281 '"%(y)s". Expected an object of type '
282 "%(z)s or a polymorphic subclass of "
283 "this type. If %(x)s is a subclass of "
284 '%(z)s, configure mapper "%(zm)s" to '
285 "load this subtype polymorphically, or "
286 "set enable_typechecks=False to allow "
287 "any subtype to be accepted for flush. "
288 % {
289 "x": state.class_,
290 "y": self.prop,
291 "z": self.mapper.class_,
292 "zm": self.mapper,
293 }
294 )
295 else:
296 raise exc.FlushError(
297 "Attempting to flush an item of type "
298 "%(x)s as a member of collection "
299 '"%(y)s". Expected an object of type '
300 "%(z)s or a polymorphic subclass of "
301 "this type."
302 % {
303 "x": state.class_,
304 "y": self.prop,
305 "z": self.mapper.class_,
306 }
307 )
308
309 def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
310 raise NotImplementedError()
311
312 def _get_reversed_processed_set(self, uow):
313 if not self.prop._reverse_property:
314 return None
315
316 process_key = tuple(
317 sorted([self.key] + [p.key for p in self.prop._reverse_property])
318 )
319 return uow.memo(("reverse_key", process_key), set)
320
321 def _post_update(self, state, uowcommit, related, is_m2o_delete=False):
322 for x in related:
323 if not is_m2o_delete or x is not None:
324 uowcommit.register_post_update(
325 state, [r for l, r in self.prop.synchronize_pairs]
326 )
327 break
328
329 def _pks_changed(self, uowcommit, state):
330 raise NotImplementedError()
331
332 def __repr__(self):
333 return "%s(%s)" % (self.__class__.__name__, self.prop)
334
335
336class _OneToManyDP(_DependencyProcessor):
337 def per_property_dependencies(
338 self,
339 uow,
340 parent_saves,
341 child_saves,
342 parent_deletes,
343 child_deletes,
344 after_save,
345 before_delete,
346 ):
347 if self.post_update:
348 child_post_updates = unitofwork._PostUpdateAll(
349 uow, self.mapper.primary_base_mapper, False
350 )
351 child_pre_updates = unitofwork._PostUpdateAll(
352 uow, self.mapper.primary_base_mapper, True
353 )
354
355 uow.dependencies.update(
356 [
357 (child_saves, after_save),
358 (parent_saves, after_save),
359 (after_save, child_post_updates),
360 (before_delete, child_pre_updates),
361 (child_pre_updates, parent_deletes),
362 (child_pre_updates, child_deletes),
363 ]
364 )
365 else:
366 uow.dependencies.update(
367 [
368 (parent_saves, after_save),
369 (after_save, child_saves),
370 (after_save, child_deletes),
371 (child_saves, parent_deletes),
372 (child_deletes, parent_deletes),
373 (before_delete, child_saves),
374 (before_delete, child_deletes),
375 ]
376 )
377
378 def per_state_dependencies(
379 self,
380 uow,
381 save_parent,
382 delete_parent,
383 child_action,
384 after_save,
385 before_delete,
386 isdelete,
387 childisdelete,
388 ):
389 if self.post_update:
390 child_post_updates = unitofwork._PostUpdateAll(
391 uow, self.mapper.primary_base_mapper, False
392 )
393 child_pre_updates = unitofwork._PostUpdateAll(
394 uow, self.mapper.primary_base_mapper, True
395 )
396
397 # TODO: this whole block is not covered
398 # by any tests
399 if not isdelete:
400 if childisdelete:
401 uow.dependencies.update(
402 [
403 (child_action, after_save),
404 (after_save, child_post_updates),
405 ]
406 )
407 else:
408 uow.dependencies.update(
409 [
410 (save_parent, after_save),
411 (child_action, after_save),
412 (after_save, child_post_updates),
413 ]
414 )
415 else:
416 if childisdelete:
417 uow.dependencies.update(
418 [
419 (before_delete, child_pre_updates),
420 (child_pre_updates, delete_parent),
421 ]
422 )
423 else:
424 uow.dependencies.update(
425 [
426 (before_delete, child_pre_updates),
427 (child_pre_updates, delete_parent),
428 ]
429 )
430 elif not isdelete:
431 uow.dependencies.update(
432 [
433 (save_parent, after_save),
434 (after_save, child_action),
435 (save_parent, child_action),
436 ]
437 )
438 else:
439 uow.dependencies.update(
440 [(before_delete, child_action), (child_action, delete_parent)]
441 )
442
443 def presort_deletes(self, uowcommit, states):
444 # head object is being deleted, and we manage its list of
445 # child objects the child objects have to have their
446 # foreign key to the parent set to NULL
447 should_null_fks = (
448 not self.cascade.delete and not self.passive_deletes == "all"
449 )
450
451 for state in states:
452 history = uowcommit.get_attribute_history(
453 state, self.key, self._passive_delete_flag
454 )
455 if history:
456 for child in history.deleted:
457 if child is not None and self.hasparent(child) is False:
458 if self.cascade.delete_orphan:
459 uowcommit.register_object(child, isdelete=True)
460 else:
461 uowcommit.register_object(child)
462
463 if should_null_fks:
464 for child in history.unchanged:
465 if child is not None:
466 uowcommit.register_object(
467 child, operation="delete", prop=self.prop
468 )
469
470 def presort_saves(self, uowcommit, states):
471 children_added = uowcommit.memo(("children_added", self), set)
472
473 should_null_fks = (
474 not self.cascade.delete_orphan
475 and not self.passive_deletes == "all"
476 )
477
478 for state in states:
479 pks_changed = self._pks_changed(uowcommit, state)
480
481 if not pks_changed or self.passive_updates:
482 passive = (
483 attributes.PASSIVE_NO_INITIALIZE
484 | attributes.INCLUDE_PENDING_MUTATIONS
485 )
486 else:
487 passive = (
488 attributes.PASSIVE_OFF
489 | attributes.INCLUDE_PENDING_MUTATIONS
490 )
491
492 history = uowcommit.get_attribute_history(state, self.key, passive)
493 if history:
494 for child in history.added:
495 if child is not None:
496 uowcommit.register_object(
497 child,
498 cancel_delete=True,
499 operation="add",
500 prop=self.prop,
501 )
502
503 children_added.update(history.added)
504
505 for child in history.deleted:
506 if not self.cascade.delete_orphan:
507 if should_null_fks:
508 uowcommit.register_object(
509 child,
510 isdelete=False,
511 operation="delete",
512 prop=self.prop,
513 )
514 elif self.hasparent(child) is False:
515 uowcommit.register_object(
516 child,
517 isdelete=True,
518 operation="delete",
519 prop=self.prop,
520 )
521 for c, m, st_, dct_ in self.mapper.cascade_iterator(
522 "delete", child
523 ):
524 uowcommit.register_object(st_, isdelete=True)
525
526 if pks_changed:
527 if history:
528 for child in history.unchanged:
529 if child is not None:
530 uowcommit.register_object(
531 child,
532 False,
533 self.passive_updates,
534 operation="pk change",
535 prop=self.prop,
536 )
537
538 def process_deletes(self, uowcommit, states):
539 # head object is being deleted, and we manage its list of
540 # child objects the child objects have to have their foreign
541 # key to the parent set to NULL this phase can be called
542 # safely for any cascade but is unnecessary if delete cascade
543 # is on.
544
545 if self.post_update or not self.passive_deletes == "all":
546 children_added = uowcommit.memo(("children_added", self), set)
547
548 for state in states:
549 history = uowcommit.get_attribute_history(
550 state, self.key, self._passive_delete_flag
551 )
552 if history:
553 for child in history.deleted:
554 if (
555 child is not None
556 and self.hasparent(child) is False
557 ):
558 self._synchronize(
559 state, child, None, True, uowcommit, False
560 )
561 if self.post_update and child:
562 self._post_update(child, uowcommit, [state])
563
564 if self.post_update or not self.cascade.delete:
565 for child in set(history.unchanged).difference(
566 children_added
567 ):
568 if child is not None:
569 self._synchronize(
570 state, child, None, True, uowcommit, False
571 )
572 if self.post_update and child:
573 self._post_update(
574 child, uowcommit, [state]
575 )
576
577 # technically, we can even remove each child from the
578 # collection here too. but this would be a somewhat
579 # inconsistent behavior since it wouldn't happen
580 # if the old parent wasn't deleted but child was moved.
581
582 def process_saves(self, uowcommit, states):
583 should_null_fks = (
584 not self.cascade.delete_orphan
585 and not self.passive_deletes == "all"
586 )
587
588 for state in states:
589 history = uowcommit.get_attribute_history(
590 state, self.key, attributes.PASSIVE_NO_INITIALIZE
591 )
592 if history:
593 for child in history.added:
594 self._synchronize(
595 state, child, None, False, uowcommit, False
596 )
597 if child is not None and self.post_update:
598 self._post_update(child, uowcommit, [state])
599
600 for child in history.deleted:
601 if (
602 should_null_fks
603 and not self.cascade.delete_orphan
604 and not self.hasparent(child)
605 ):
606 self._synchronize(
607 state, child, None, True, uowcommit, False
608 )
609
610 if self._pks_changed(uowcommit, state):
611 for child in history.unchanged:
612 self._synchronize(
613 state, child, None, False, uowcommit, True
614 )
615
616 def _synchronize(
617 self, state, child, associationrow, clearkeys, uowcommit, pks_changed
618 ):
619 source = state
620 dest = child
621 self._verify_canload(child)
622 if dest is None or (
623 not self.post_update and uowcommit.is_deleted(dest)
624 ):
625 return
626 if clearkeys:
627 sync._clear(dest, self.mapper, self.prop.synchronize_pairs)
628 else:
629 sync._populate(
630 source,
631 self.parent,
632 dest,
633 self.mapper,
634 self.prop.synchronize_pairs,
635 uowcommit,
636 self.passive_updates and pks_changed,
637 )
638
639 def _pks_changed(self, uowcommit, state):
640 return sync._source_modified(
641 uowcommit, state, self.parent, self.prop.synchronize_pairs
642 )
643
644
645class _ManyToOneDP(_DependencyProcessor):
646 def __init__(self, prop):
647 _DependencyProcessor.__init__(self, prop)
648 for mapper in self.mapper.self_and_descendants:
649 mapper._dependency_processors.append(_DetectKeySwitch(prop))
650
651 def per_property_dependencies(
652 self,
653 uow,
654 parent_saves,
655 child_saves,
656 parent_deletes,
657 child_deletes,
658 after_save,
659 before_delete,
660 ):
661 if self.post_update:
662 parent_post_updates = unitofwork._PostUpdateAll(
663 uow, self.parent.primary_base_mapper, False
664 )
665 parent_pre_updates = unitofwork._PostUpdateAll(
666 uow, self.parent.primary_base_mapper, True
667 )
668
669 uow.dependencies.update(
670 [
671 (child_saves, after_save),
672 (parent_saves, after_save),
673 (after_save, parent_post_updates),
674 (after_save, parent_pre_updates),
675 (before_delete, parent_pre_updates),
676 (parent_pre_updates, child_deletes),
677 (parent_pre_updates, parent_deletes),
678 ]
679 )
680 else:
681 uow.dependencies.update(
682 [
683 (child_saves, after_save),
684 (after_save, parent_saves),
685 (parent_saves, child_deletes),
686 (parent_deletes, child_deletes),
687 ]
688 )
689
690 def per_state_dependencies(
691 self,
692 uow,
693 save_parent,
694 delete_parent,
695 child_action,
696 after_save,
697 before_delete,
698 isdelete,
699 childisdelete,
700 ):
701 if self.post_update:
702 if not isdelete:
703 parent_post_updates = unitofwork._PostUpdateAll(
704 uow, self.parent.primary_base_mapper, False
705 )
706 if childisdelete:
707 uow.dependencies.update(
708 [
709 (after_save, parent_post_updates),
710 (parent_post_updates, child_action),
711 ]
712 )
713 else:
714 uow.dependencies.update(
715 [
716 (save_parent, after_save),
717 (child_action, after_save),
718 (after_save, parent_post_updates),
719 ]
720 )
721 else:
722 parent_pre_updates = unitofwork._PostUpdateAll(
723 uow, self.parent.primary_base_mapper, True
724 )
725
726 uow.dependencies.update(
727 [
728 (before_delete, parent_pre_updates),
729 (parent_pre_updates, delete_parent),
730 (parent_pre_updates, child_action),
731 ]
732 )
733
734 elif not isdelete:
735 if not childisdelete:
736 uow.dependencies.update(
737 [(child_action, after_save), (after_save, save_parent)]
738 )
739 else:
740 uow.dependencies.update([(after_save, save_parent)])
741
742 else:
743 if childisdelete:
744 uow.dependencies.update([(delete_parent, child_action)])
745
746 def presort_deletes(self, uowcommit, states):
747 if self.cascade.delete or self.cascade.delete_orphan:
748 for state in states:
749 history = uowcommit.get_attribute_history(
750 state, self.key, self._passive_delete_flag
751 )
752 if history:
753 if self.cascade.delete_orphan:
754 todelete = history.sum()
755 else:
756 todelete = history.non_deleted()
757 for child in todelete:
758 if child is None:
759 continue
760 uowcommit.register_object(
761 child,
762 isdelete=True,
763 operation="delete",
764 prop=self.prop,
765 )
766 t = self.mapper.cascade_iterator("delete", child)
767 for c, m, st_, dct_ in t:
768 uowcommit.register_object(st_, isdelete=True)
769
770 def presort_saves(self, uowcommit, states):
771 for state in states:
772 uowcommit.register_object(state, operation="add", prop=self.prop)
773 if self.cascade.delete_orphan:
774 history = uowcommit.get_attribute_history(
775 state, self.key, self._passive_delete_flag
776 )
777 if history:
778 for child in history.deleted:
779 if self.hasparent(child) is False:
780 uowcommit.register_object(
781 child,
782 isdelete=True,
783 operation="delete",
784 prop=self.prop,
785 )
786
787 t = self.mapper.cascade_iterator("delete", child)
788 for c, m, st_, dct_ in t:
789 uowcommit.register_object(st_, isdelete=True)
790
791 def process_deletes(self, uowcommit, states):
792 if (
793 self.post_update
794 and not self.cascade.delete_orphan
795 and not self.passive_deletes == "all"
796 ):
797 # post_update means we have to update our
798 # row to not reference the child object
799 # before we can DELETE the row
800 for state in states:
801 self._synchronize(state, None, None, True, uowcommit)
802 if state and self.post_update:
803 history = uowcommit.get_attribute_history(
804 state, self.key, self._passive_delete_flag
805 )
806 if history:
807 self._post_update(
808 state, uowcommit, history.sum(), is_m2o_delete=True
809 )
810
811 def process_saves(self, uowcommit, states):
812 for state in states:
813 history = uowcommit.get_attribute_history(
814 state, self.key, attributes.PASSIVE_NO_INITIALIZE
815 )
816 if history:
817 if history.added:
818 for child in history.added:
819 self._synchronize(
820 state, child, None, False, uowcommit, "add"
821 )
822 elif history.deleted:
823 self._synchronize(
824 state, None, None, True, uowcommit, "delete"
825 )
826 if self.post_update:
827 self._post_update(state, uowcommit, history.sum())
828
829 def _synchronize(
830 self,
831 state,
832 child,
833 associationrow,
834 clearkeys,
835 uowcommit,
836 operation=None,
837 ):
838 if state is None or (
839 not self.post_update and uowcommit.is_deleted(state)
840 ):
841 return
842
843 if (
844 operation is not None
845 and child is not None
846 and not uowcommit.session._contains_state(child)
847 ):
848 util.warn(
849 "Object of type %s not in session, %s "
850 "operation along '%s' won't proceed"
851 % (mapperutil.state_class_str(child), operation, self.prop)
852 )
853 return
854
855 if clearkeys or child is None:
856 sync._clear(state, self.parent, self.prop.synchronize_pairs)
857 else:
858 self._verify_canload(child)
859 sync._populate(
860 child,
861 self.mapper,
862 state,
863 self.parent,
864 self.prop.synchronize_pairs,
865 uowcommit,
866 False,
867 )
868
869
870class _DetectKeySwitch(_DependencyProcessor):
871 """For many-to-one relationships with no one-to-many backref,
872 searches for parents through the unit of work when a primary
873 key has changed and updates them.
874
875 Theoretically, this approach could be expanded to support transparent
876 deletion of objects referenced via many-to-one as well, although
877 the current attribute system doesn't do enough bookkeeping for this
878 to be efficient.
879
880 """
881
882 def per_property_preprocessors(self, uow):
883 if self.prop._reverse_property:
884 if self.passive_updates:
885 return
886 else:
887 if False in (
888 prop.passive_updates
889 for prop in self.prop._reverse_property
890 ):
891 return
892
893 uow.register_preprocessor(self, False)
894
895 def per_property_flush_actions(self, uow):
896 parent_saves = unitofwork._SaveUpdateAll(uow, self.parent.base_mapper)
897 after_save = unitofwork._ProcessAll(uow, self, False, False)
898 uow.dependencies.update([(parent_saves, after_save)])
899
900 def per_state_flush_actions(self, uow, states, isdelete):
901 pass
902
903 def presort_deletes(self, uowcommit, states):
904 pass
905
906 def presort_saves(self, uow, states):
907 if not self.passive_updates:
908 # for non-passive updates, register in the preprocess stage
909 # so that mapper save_obj() gets a hold of changes
910 self._process_key_switches(states, uow)
911
912 def prop_has_changes(self, uow, states, isdelete):
913 if not isdelete and self.passive_updates:
914 d = self._key_switchers(uow, states)
915 return bool(d)
916
917 return False
918
919 def process_deletes(self, uowcommit, states):
920 assert False
921
922 def process_saves(self, uowcommit, states):
923 # for passive updates, register objects in the process stage
924 # so that we avoid ManyToOneDP's registering the object without
925 # the listonly flag in its own preprocess stage (results in UPDATE)
926 # statements being emitted
927 assert self.passive_updates
928 self._process_key_switches(states, uowcommit)
929
930 def _key_switchers(self, uow, states):
931 switched, notswitched = uow.memo(
932 ("pk_switchers", self), lambda: (set(), set())
933 )
934
935 allstates = switched.union(notswitched)
936 for s in states:
937 if s not in allstates:
938 if self._pks_changed(uow, s):
939 switched.add(s)
940 else:
941 notswitched.add(s)
942 return switched
943
944 def _process_key_switches(self, deplist, uowcommit):
945 switchers = self._key_switchers(uowcommit, deplist)
946 if switchers:
947 # if primary key values have actually changed somewhere, perform
948 # a linear search through the UOW in search of a parent.
949 for state in uowcommit.session.identity_map.all_states():
950 if not issubclass(state.class_, self.parent.class_):
951 continue
952 dict_ = state.dict
953 related = state.get_impl(self.key).get(
954 state, dict_, passive=self._passive_update_flag
955 )
956 if (
957 related is not attributes.PASSIVE_NO_RESULT
958 and related is not None
959 ):
960 if self.prop.uselist:
961 if not related:
962 continue
963 related_obj = related[0]
964 else:
965 related_obj = related
966 related_state = attributes.instance_state(related_obj)
967 if related_state in switchers:
968 uowcommit.register_object(
969 state, False, self.passive_updates
970 )
971 sync._populate(
972 related_state,
973 self.mapper,
974 state,
975 self.parent,
976 self.prop.synchronize_pairs,
977 uowcommit,
978 self.passive_updates,
979 )
980
981 def _pks_changed(self, uowcommit, state):
982 return bool(state.key) and sync._source_modified(
983 uowcommit, state, self.mapper, self.prop.synchronize_pairs
984 )
985
986
987class _ManyToManyDP(_DependencyProcessor):
988 def per_property_dependencies(
989 self,
990 uow,
991 parent_saves,
992 child_saves,
993 parent_deletes,
994 child_deletes,
995 after_save,
996 before_delete,
997 ):
998 uow.dependencies.update(
999 [
1000 (parent_saves, after_save),
1001 (child_saves, after_save),
1002 (after_save, child_deletes),
1003 # a rowswitch on the parent from deleted to saved
1004 # can make this one occur, as the "save" may remove
1005 # an element from the
1006 # "deleted" list before we have a chance to
1007 # process its child rows
1008 (before_delete, parent_saves),
1009 (before_delete, parent_deletes),
1010 (before_delete, child_deletes),
1011 (before_delete, child_saves),
1012 ]
1013 )
1014
1015 def per_state_dependencies(
1016 self,
1017 uow,
1018 save_parent,
1019 delete_parent,
1020 child_action,
1021 after_save,
1022 before_delete,
1023 isdelete,
1024 childisdelete,
1025 ):
1026 if not isdelete:
1027 if childisdelete:
1028 uow.dependencies.update(
1029 [(save_parent, after_save), (after_save, child_action)]
1030 )
1031 else:
1032 uow.dependencies.update(
1033 [(save_parent, after_save), (child_action, after_save)]
1034 )
1035 else:
1036 uow.dependencies.update(
1037 [(before_delete, child_action), (before_delete, delete_parent)]
1038 )
1039
1040 def presort_deletes(self, uowcommit, states):
1041 # TODO: no tests fail if this whole
1042 # thing is removed !!!!
1043 if not self.passive_deletes:
1044 # if no passive deletes, load history on
1045 # the collection, so that prop_has_changes()
1046 # returns True
1047 for state in states:
1048 uowcommit.get_attribute_history(
1049 state, self.key, self._passive_delete_flag
1050 )
1051
1052 def presort_saves(self, uowcommit, states):
1053 if not self.passive_updates:
1054 # if no passive updates, load history on
1055 # each collection where parent has changed PK,
1056 # so that prop_has_changes() returns True
1057 for state in states:
1058 if self._pks_changed(uowcommit, state):
1059 uowcommit.get_attribute_history(
1060 state, self.key, attributes.PASSIVE_OFF
1061 )
1062
1063 if not self.cascade.delete_orphan:
1064 return
1065
1066 # check for child items removed from the collection
1067 # if delete_orphan check is turned on.
1068 for state in states:
1069 history = uowcommit.get_attribute_history(
1070 state, self.key, attributes.PASSIVE_NO_INITIALIZE
1071 )
1072 if history:
1073 for child in history.deleted:
1074 if self.hasparent(child) is False:
1075 uowcommit.register_object(
1076 child,
1077 isdelete=True,
1078 operation="delete",
1079 prop=self.prop,
1080 )
1081 for c, m, st_, dct_ in self.mapper.cascade_iterator(
1082 "delete", child
1083 ):
1084 uowcommit.register_object(st_, isdelete=True)
1085
1086 def process_deletes(self, uowcommit, states):
1087 secondary_delete = []
1088 secondary_insert = []
1089 secondary_update = []
1090
1091 processed = self._get_reversed_processed_set(uowcommit)
1092 tmp = set()
1093 for state in states:
1094 # this history should be cached already, as
1095 # we loaded it in preprocess_deletes
1096 history = uowcommit.get_attribute_history(
1097 state, self.key, self._passive_delete_flag
1098 )
1099 if history:
1100 for child in history.non_added():
1101 if child is None or (
1102 processed is not None and (state, child) in processed
1103 ):
1104 continue
1105 associationrow = {}
1106 if not self._synchronize(
1107 state,
1108 child,
1109 associationrow,
1110 False,
1111 uowcommit,
1112 "delete",
1113 ):
1114 continue
1115 secondary_delete.append(associationrow)
1116
1117 tmp.update((c, state) for c in history.non_added())
1118
1119 if processed is not None:
1120 processed.update(tmp)
1121
1122 self._run_crud(
1123 uowcommit, secondary_insert, secondary_update, secondary_delete
1124 )
1125
1126 def process_saves(self, uowcommit, states):
1127 secondary_delete = []
1128 secondary_insert = []
1129 secondary_update = []
1130
1131 processed = self._get_reversed_processed_set(uowcommit)
1132 tmp = set()
1133
1134 for state in states:
1135 need_cascade_pks = not self.passive_updates and self._pks_changed(
1136 uowcommit, state
1137 )
1138 if need_cascade_pks:
1139 passive = (
1140 attributes.PASSIVE_OFF
1141 | attributes.INCLUDE_PENDING_MUTATIONS
1142 )
1143 else:
1144 passive = (
1145 attributes.PASSIVE_NO_INITIALIZE
1146 | attributes.INCLUDE_PENDING_MUTATIONS
1147 )
1148 history = uowcommit.get_attribute_history(state, self.key, passive)
1149 if history:
1150 for child in history.added:
1151 if processed is not None and (state, child) in processed:
1152 continue
1153 associationrow = {}
1154 if not self._synchronize(
1155 state, child, associationrow, False, uowcommit, "add"
1156 ):
1157 continue
1158 secondary_insert.append(associationrow)
1159 for child in history.deleted:
1160 if processed is not None and (state, child) in processed:
1161 continue
1162 associationrow = {}
1163 if not self._synchronize(
1164 state,
1165 child,
1166 associationrow,
1167 False,
1168 uowcommit,
1169 "delete",
1170 ):
1171 continue
1172 secondary_delete.append(associationrow)
1173
1174 tmp.update((c, state) for c in history.added + history.deleted)
1175
1176 if need_cascade_pks:
1177 for child in history.unchanged:
1178 associationrow = {}
1179 sync._update(
1180 state,
1181 self.parent,
1182 associationrow,
1183 "old_",
1184 self.prop.synchronize_pairs,
1185 )
1186 sync._update(
1187 child,
1188 self.mapper,
1189 associationrow,
1190 "old_",
1191 self.prop.secondary_synchronize_pairs,
1192 )
1193
1194 secondary_update.append(associationrow)
1195
1196 if processed is not None:
1197 processed.update(tmp)
1198
1199 self._run_crud(
1200 uowcommit, secondary_insert, secondary_update, secondary_delete
1201 )
1202
1203 def _run_crud(
1204 self, uowcommit, secondary_insert, secondary_update, secondary_delete
1205 ):
1206 connection = uowcommit.transaction.connection(self.mapper)
1207
1208 if secondary_delete:
1209 associationrow = secondary_delete[0]
1210 statement = self.secondary.delete().where(
1211 sql.and_(
1212 *[
1213 c == sql.bindparam(c.key, type_=c.type)
1214 for c in self.secondary.c
1215 if c.key in associationrow
1216 ]
1217 )
1218 )
1219 result = connection.execute(statement, secondary_delete)
1220
1221 if (
1222 result.supports_sane_multi_rowcount()
1223 ) and result.rowcount != len(secondary_delete):
1224 raise exc.StaleDataError(
1225 "DELETE statement on table '%s' expected to delete "
1226 "%d row(s); Only %d were matched."
1227 % (
1228 self.secondary.description,
1229 len(secondary_delete),
1230 result.rowcount,
1231 )
1232 )
1233
1234 if secondary_update:
1235 associationrow = secondary_update[0]
1236 statement = self.secondary.update().where(
1237 sql.and_(
1238 *[
1239 c == sql.bindparam("old_" + c.key, type_=c.type)
1240 for c in self.secondary.c
1241 if c.key in associationrow
1242 ]
1243 )
1244 )
1245 result = connection.execute(statement, secondary_update)
1246
1247 if (
1248 result.supports_sane_multi_rowcount()
1249 ) and result.rowcount != len(secondary_update):
1250 raise exc.StaleDataError(
1251 "UPDATE statement on table '%s' expected to update "
1252 "%d row(s); Only %d were matched."
1253 % (
1254 self.secondary.description,
1255 len(secondary_update),
1256 result.rowcount,
1257 )
1258 )
1259
1260 if secondary_insert:
1261 statement = self.secondary.insert()
1262 connection.execute(statement, secondary_insert)
1263
1264 def _synchronize(
1265 self, state, child, associationrow, clearkeys, uowcommit, operation
1266 ):
1267 # this checks for None if uselist=True
1268 self._verify_canload(child)
1269
1270 # but if uselist=False we get here. If child is None,
1271 # no association row can be generated, so return.
1272 if child is None:
1273 return False
1274
1275 if child is not None and not uowcommit.session._contains_state(child):
1276 if not child.deleted:
1277 util.warn(
1278 "Object of type %s not in session, %s "
1279 "operation along '%s' won't proceed"
1280 % (mapperutil.state_class_str(child), operation, self.prop)
1281 )
1282 return False
1283
1284 sync._populate_dict(
1285 state, self.parent, associationrow, self.prop.synchronize_pairs
1286 )
1287 sync._populate_dict(
1288 child,
1289 self.mapper,
1290 associationrow,
1291 self.prop.secondary_synchronize_pairs,
1292 )
1293
1294 return True
1295
1296 def _pks_changed(self, uowcommit, state):
1297 return sync._source_modified(
1298 uowcommit, state, self.parent, self.prop.synchronize_pairs
1299 )
1300
1301
1302_direction_to_processor = {
1303 ONETOMANY: _OneToManyDP,
1304 MANYTOONE: _ManyToOneDP,
1305 MANYTOMANY: _ManyToManyDP,
1306}