1# orm/dependency.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""Relationship dependencies.
11
12"""
13
14from __future__ import annotations
15
16from . import attributes
17from . import exc
18from . import sync
19from . import unitofwork
20from . import util as mapperutil
21from .interfaces import MANYTOMANY
22from .interfaces import MANYTOONE
23from .interfaces import ONETOMANY
24from .. import exc as sa_exc
25from .. import sql
26from .. import util
27
28
29class DependencyProcessor:
30 def __init__(self, prop):
31 self.prop = prop
32 self.cascade = prop.cascade
33 self.mapper = prop.mapper
34 self.parent = prop.parent
35 self.secondary = prop.secondary
36 self.direction = prop.direction
37 self.post_update = prop.post_update
38 self.passive_deletes = prop.passive_deletes
39 self.passive_updates = prop.passive_updates
40 self.enable_typechecks = prop.enable_typechecks
41 if self.passive_deletes:
42 self._passive_delete_flag = attributes.PASSIVE_NO_INITIALIZE
43 else:
44 self._passive_delete_flag = attributes.PASSIVE_OFF
45 if self.passive_updates:
46 self._passive_update_flag = attributes.PASSIVE_NO_INITIALIZE
47 else:
48 self._passive_update_flag = attributes.PASSIVE_OFF
49
50 self.sort_key = "%s_%s" % (self.parent._sort_key, prop.key)
51 self.key = prop.key
52 if not self.prop.synchronize_pairs:
53 raise sa_exc.ArgumentError(
54 "Can't build a DependencyProcessor for relationship %s. "
55 "No target attributes to populate between parent and "
56 "child are present" % self.prop
57 )
58
59 @classmethod
60 def from_relationship(cls, prop):
61 return _direction_to_processor[prop.direction](prop)
62
63 def hasparent(self, state):
64 """return True if the given object instance has a parent,
65 according to the ``InstrumentedAttribute`` handled by this
66 ``DependencyProcessor``.
67
68 """
69 return self.parent.class_manager.get_impl(self.key).hasparent(state)
70
71 def per_property_preprocessors(self, uow):
72 """establish actions and dependencies related to a flush.
73
74 These actions will operate on all relevant states in
75 the aggregate.
76
77 """
78 uow.register_preprocessor(self, True)
79
80 def per_property_flush_actions(self, uow):
81 after_save = unitofwork.ProcessAll(uow, self, False, True)
82 before_delete = unitofwork.ProcessAll(uow, self, True, True)
83
84 parent_saves = unitofwork.SaveUpdateAll(
85 uow, self.parent.primary_base_mapper
86 )
87 child_saves = unitofwork.SaveUpdateAll(
88 uow, self.mapper.primary_base_mapper
89 )
90
91 parent_deletes = unitofwork.DeleteAll(
92 uow, self.parent.primary_base_mapper
93 )
94 child_deletes = unitofwork.DeleteAll(
95 uow, self.mapper.primary_base_mapper
96 )
97
98 self.per_property_dependencies(
99 uow,
100 parent_saves,
101 child_saves,
102 parent_deletes,
103 child_deletes,
104 after_save,
105 before_delete,
106 )
107
108 def per_state_flush_actions(self, uow, states, isdelete):
109 """establish actions and dependencies related to a flush.
110
111 These actions will operate on all relevant states
112 individually. This occurs only if there are cycles
113 in the 'aggregated' version of events.
114
115 """
116
117 child_base_mapper = self.mapper.primary_base_mapper
118 child_saves = unitofwork.SaveUpdateAll(uow, child_base_mapper)
119 child_deletes = unitofwork.DeleteAll(uow, child_base_mapper)
120
121 # locate and disable the aggregate processors
122 # for this dependency
123
124 if isdelete:
125 before_delete = unitofwork.ProcessAll(uow, self, True, True)
126 before_delete.disabled = True
127 else:
128 after_save = unitofwork.ProcessAll(uow, self, False, True)
129 after_save.disabled = True
130
131 # check if the "child" side is part of the cycle
132
133 if child_saves not in uow.cycles:
134 # based on the current dependencies we use, the saves/
135 # deletes should always be in the 'cycles' collection
136 # together. if this changes, we will have to break up
137 # this method a bit more.
138 assert child_deletes not in uow.cycles
139
140 # child side is not part of the cycle, so we will link per-state
141 # actions to the aggregate "saves", "deletes" actions
142 child_actions = [(child_saves, False), (child_deletes, True)]
143 child_in_cycles = False
144 else:
145 child_in_cycles = True
146
147 # check if the "parent" side is part of the cycle
148 if not isdelete:
149 parent_saves = unitofwork.SaveUpdateAll(
150 uow, self.parent.base_mapper
151 )
152 parent_deletes = before_delete = None
153 if parent_saves in uow.cycles:
154 parent_in_cycles = True
155 else:
156 parent_deletes = unitofwork.DeleteAll(uow, self.parent.base_mapper)
157 parent_saves = after_save = None
158 if parent_deletes in uow.cycles:
159 parent_in_cycles = True
160
161 # now create actions /dependencies for each state.
162
163 for state in states:
164 # detect if there's anything changed or loaded
165 # by a preprocessor on this state/attribute. In the
166 # case of deletes we may try to load missing items here as well.
167 sum_ = state.manager[self.key].impl.get_all_pending(
168 state,
169 state.dict,
170 (
171 self._passive_delete_flag
172 if isdelete
173 else attributes.PASSIVE_NO_INITIALIZE
174 ),
175 )
176
177 if not sum_:
178 continue
179
180 if isdelete:
181 before_delete = unitofwork.ProcessState(uow, self, True, state)
182 if parent_in_cycles:
183 parent_deletes = unitofwork.DeleteState(uow, state)
184 else:
185 after_save = unitofwork.ProcessState(uow, self, False, state)
186 if parent_in_cycles:
187 parent_saves = unitofwork.SaveUpdateState(uow, state)
188
189 if child_in_cycles:
190 child_actions = []
191 for child_state, child in sum_:
192 if child_state not in uow.states:
193 child_action = (None, None)
194 else:
195 (deleted, listonly) = uow.states[child_state]
196 if deleted:
197 child_action = (
198 unitofwork.DeleteState(uow, child_state),
199 True,
200 )
201 else:
202 child_action = (
203 unitofwork.SaveUpdateState(uow, child_state),
204 False,
205 )
206 child_actions.append(child_action)
207
208 # establish dependencies between our possibly per-state
209 # parent action and our possibly per-state child action.
210 for child_action, childisdelete in child_actions:
211 self.per_state_dependencies(
212 uow,
213 parent_saves,
214 parent_deletes,
215 child_action,
216 after_save,
217 before_delete,
218 isdelete,
219 childisdelete,
220 )
221
222 def presort_deletes(self, uowcommit, states):
223 return False
224
225 def presort_saves(self, uowcommit, states):
226 return False
227
228 def process_deletes(self, uowcommit, states):
229 pass
230
231 def process_saves(self, uowcommit, states):
232 pass
233
234 def prop_has_changes(self, uowcommit, states, isdelete):
235 if not isdelete or self.passive_deletes:
236 passive = (
237 attributes.PASSIVE_NO_INITIALIZE
238 | attributes.INCLUDE_PENDING_MUTATIONS
239 )
240 elif self.direction is MANYTOONE:
241 # here, we were hoping to optimize having to fetch many-to-one
242 # for history and ignore it, if there's no further cascades
243 # to take place. however there are too many less common conditions
244 # that still take place and tests in test_relationships /
245 # test_cascade etc. will still fail.
246 passive = attributes.PASSIVE_NO_FETCH_RELATED
247 else:
248 passive = (
249 attributes.PASSIVE_OFF | attributes.INCLUDE_PENDING_MUTATIONS
250 )
251
252 for s in states:
253 # TODO: add a high speed method
254 # to InstanceState which returns: attribute
255 # has a non-None value, or had one
256 history = uowcommit.get_attribute_history(s, self.key, passive)
257 if history and not history.empty():
258 return True
259 else:
260 return (
261 states
262 and not self.prop._is_self_referential
263 and self.mapper in uowcommit.mappers
264 )
265
266 def _verify_canload(self, state):
267 if self.prop.uselist and state is None:
268 raise exc.FlushError(
269 "Can't flush None value found in "
270 "collection %s" % (self.prop,)
271 )
272 elif state is not None and not self.mapper._canload(
273 state, allow_subtypes=not self.enable_typechecks
274 ):
275 if self.mapper._canload(state, allow_subtypes=True):
276 raise exc.FlushError(
277 "Attempting to flush an item of type "
278 "%(x)s as a member of collection "
279 '"%(y)s". Expected an object of type '
280 "%(z)s or a polymorphic subclass of "
281 "this type. If %(x)s is a subclass of "
282 '%(z)s, configure mapper "%(zm)s" to '
283 "load this subtype polymorphically, or "
284 "set enable_typechecks=False to allow "
285 "any subtype to be accepted for flush. "
286 % {
287 "x": state.class_,
288 "y": self.prop,
289 "z": self.mapper.class_,
290 "zm": self.mapper,
291 }
292 )
293 else:
294 raise exc.FlushError(
295 "Attempting to flush an item of type "
296 "%(x)s as a member of collection "
297 '"%(y)s". Expected an object of type '
298 "%(z)s or a polymorphic subclass of "
299 "this type."
300 % {
301 "x": state.class_,
302 "y": self.prop,
303 "z": self.mapper.class_,
304 }
305 )
306
307 def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
308 raise NotImplementedError()
309
310 def _get_reversed_processed_set(self, uow):
311 if not self.prop._reverse_property:
312 return None
313
314 process_key = tuple(
315 sorted([self.key] + [p.key for p in self.prop._reverse_property])
316 )
317 return uow.memo(("reverse_key", process_key), set)
318
319 def _post_update(self, state, uowcommit, related, is_m2o_delete=False):
320 for x in related:
321 if not is_m2o_delete or x is not None:
322 uowcommit.register_post_update(
323 state, [r for l, r in self.prop.synchronize_pairs]
324 )
325 break
326
327 def _pks_changed(self, uowcommit, state):
328 raise NotImplementedError()
329
330 def __repr__(self):
331 return "%s(%s)" % (self.__class__.__name__, self.prop)
332
333
334class OneToManyDP(DependencyProcessor):
335 def per_property_dependencies(
336 self,
337 uow,
338 parent_saves,
339 child_saves,
340 parent_deletes,
341 child_deletes,
342 after_save,
343 before_delete,
344 ):
345 if self.post_update:
346 child_post_updates = unitofwork.PostUpdateAll(
347 uow, self.mapper.primary_base_mapper, False
348 )
349 child_pre_updates = unitofwork.PostUpdateAll(
350 uow, self.mapper.primary_base_mapper, True
351 )
352
353 uow.dependencies.update(
354 [
355 (child_saves, after_save),
356 (parent_saves, after_save),
357 (after_save, child_post_updates),
358 (before_delete, child_pre_updates),
359 (child_pre_updates, parent_deletes),
360 (child_pre_updates, child_deletes),
361 ]
362 )
363 else:
364 uow.dependencies.update(
365 [
366 (parent_saves, after_save),
367 (after_save, child_saves),
368 (after_save, child_deletes),
369 (child_saves, parent_deletes),
370 (child_deletes, parent_deletes),
371 (before_delete, child_saves),
372 (before_delete, child_deletes),
373 ]
374 )
375
376 def per_state_dependencies(
377 self,
378 uow,
379 save_parent,
380 delete_parent,
381 child_action,
382 after_save,
383 before_delete,
384 isdelete,
385 childisdelete,
386 ):
387 if self.post_update:
388 child_post_updates = unitofwork.PostUpdateAll(
389 uow, self.mapper.primary_base_mapper, False
390 )
391 child_pre_updates = unitofwork.PostUpdateAll(
392 uow, self.mapper.primary_base_mapper, True
393 )
394
395 # TODO: this whole block is not covered
396 # by any tests
397 if not isdelete:
398 if childisdelete:
399 uow.dependencies.update(
400 [
401 (child_action, after_save),
402 (after_save, child_post_updates),
403 ]
404 )
405 else:
406 uow.dependencies.update(
407 [
408 (save_parent, after_save),
409 (child_action, after_save),
410 (after_save, child_post_updates),
411 ]
412 )
413 else:
414 if childisdelete:
415 uow.dependencies.update(
416 [
417 (before_delete, child_pre_updates),
418 (child_pre_updates, delete_parent),
419 ]
420 )
421 else:
422 uow.dependencies.update(
423 [
424 (before_delete, child_pre_updates),
425 (child_pre_updates, delete_parent),
426 ]
427 )
428 elif not isdelete:
429 uow.dependencies.update(
430 [
431 (save_parent, after_save),
432 (after_save, child_action),
433 (save_parent, child_action),
434 ]
435 )
436 else:
437 uow.dependencies.update(
438 [(before_delete, child_action), (child_action, delete_parent)]
439 )
440
441 def presort_deletes(self, uowcommit, states):
442 # head object is being deleted, and we manage its list of
443 # child objects the child objects have to have their
444 # foreign key to the parent set to NULL
445 should_null_fks = (
446 not self.cascade.delete and not self.passive_deletes == "all"
447 )
448
449 for state in states:
450 history = uowcommit.get_attribute_history(
451 state, self.key, self._passive_delete_flag
452 )
453 if history:
454 for child in history.deleted:
455 if child is not None and self.hasparent(child) is False:
456 if self.cascade.delete_orphan:
457 uowcommit.register_object(child, isdelete=True)
458 else:
459 uowcommit.register_object(child)
460
461 if should_null_fks:
462 for child in history.unchanged:
463 if child is not None:
464 uowcommit.register_object(
465 child, operation="delete", prop=self.prop
466 )
467
468 def presort_saves(self, uowcommit, states):
469 children_added = uowcommit.memo(("children_added", self), set)
470
471 should_null_fks = (
472 not self.cascade.delete_orphan
473 and not self.passive_deletes == "all"
474 )
475
476 for state in states:
477 pks_changed = self._pks_changed(uowcommit, state)
478
479 if not pks_changed or self.passive_updates:
480 passive = (
481 attributes.PASSIVE_NO_INITIALIZE
482 | attributes.INCLUDE_PENDING_MUTATIONS
483 )
484 else:
485 passive = (
486 attributes.PASSIVE_OFF
487 | attributes.INCLUDE_PENDING_MUTATIONS
488 )
489
490 history = uowcommit.get_attribute_history(state, self.key, passive)
491 if history:
492 for child in history.added:
493 if child is not None:
494 uowcommit.register_object(
495 child,
496 cancel_delete=True,
497 operation="add",
498 prop=self.prop,
499 )
500
501 children_added.update(history.added)
502
503 for child in history.deleted:
504 if not self.cascade.delete_orphan:
505 if should_null_fks:
506 uowcommit.register_object(
507 child,
508 isdelete=False,
509 operation="delete",
510 prop=self.prop,
511 )
512 elif self.hasparent(child) is False:
513 uowcommit.register_object(
514 child,
515 isdelete=True,
516 operation="delete",
517 prop=self.prop,
518 )
519 for c, m, st_, dct_ in self.mapper.cascade_iterator(
520 "delete", child
521 ):
522 uowcommit.register_object(st_, isdelete=True)
523
524 if pks_changed:
525 if history:
526 for child in history.unchanged:
527 if child is not None:
528 uowcommit.register_object(
529 child,
530 False,
531 self.passive_updates,
532 operation="pk change",
533 prop=self.prop,
534 )
535
536 def process_deletes(self, uowcommit, states):
537 # head object is being deleted, and we manage its list of
538 # child objects the child objects have to have their foreign
539 # key to the parent set to NULL this phase can be called
540 # safely for any cascade but is unnecessary if delete cascade
541 # is on.
542
543 if self.post_update or not self.passive_deletes == "all":
544 children_added = uowcommit.memo(("children_added", self), set)
545
546 for state in states:
547 history = uowcommit.get_attribute_history(
548 state, self.key, self._passive_delete_flag
549 )
550 if history:
551 for child in history.deleted:
552 if (
553 child is not None
554 and self.hasparent(child) is False
555 ):
556 self._synchronize(
557 state, child, None, True, uowcommit, False
558 )
559 if self.post_update and child:
560 self._post_update(child, uowcommit, [state])
561
562 if self.post_update or not self.cascade.delete:
563 for child in set(history.unchanged).difference(
564 children_added
565 ):
566 if child is not None:
567 self._synchronize(
568 state, child, None, True, uowcommit, False
569 )
570 if self.post_update and child:
571 self._post_update(
572 child, uowcommit, [state]
573 )
574
575 # technically, we can even remove each child from the
576 # collection here too. but this would be a somewhat
577 # inconsistent behavior since it wouldn't happen
578 # if the old parent wasn't deleted but child was moved.
579
580 def process_saves(self, uowcommit, states):
581 should_null_fks = (
582 not self.cascade.delete_orphan
583 and not self.passive_deletes == "all"
584 )
585
586 for state in states:
587 history = uowcommit.get_attribute_history(
588 state, self.key, attributes.PASSIVE_NO_INITIALIZE
589 )
590 if history:
591 for child in history.added:
592 self._synchronize(
593 state, child, None, False, uowcommit, False
594 )
595 if child is not None and self.post_update:
596 self._post_update(child, uowcommit, [state])
597
598 for child in history.deleted:
599 if (
600 should_null_fks
601 and not self.cascade.delete_orphan
602 and not self.hasparent(child)
603 ):
604 self._synchronize(
605 state, child, None, True, uowcommit, False
606 )
607
608 if self._pks_changed(uowcommit, state):
609 for child in history.unchanged:
610 self._synchronize(
611 state, child, None, False, uowcommit, True
612 )
613
614 def _synchronize(
615 self, state, child, associationrow, clearkeys, uowcommit, pks_changed
616 ):
617 source = state
618 dest = child
619 self._verify_canload(child)
620 if dest is None or (
621 not self.post_update and uowcommit.is_deleted(dest)
622 ):
623 return
624 if clearkeys:
625 sync.clear(dest, self.mapper, self.prop.synchronize_pairs)
626 else:
627 sync.populate(
628 source,
629 self.parent,
630 dest,
631 self.mapper,
632 self.prop.synchronize_pairs,
633 uowcommit,
634 self.passive_updates and pks_changed,
635 )
636
637 def _pks_changed(self, uowcommit, state):
638 return sync.source_modified(
639 uowcommit, state, self.parent, self.prop.synchronize_pairs
640 )
641
642
643class ManyToOneDP(DependencyProcessor):
644 def __init__(self, prop):
645 DependencyProcessor.__init__(self, prop)
646 for mapper in self.mapper.self_and_descendants:
647 mapper._dependency_processors.append(DetectKeySwitch(prop))
648
649 def per_property_dependencies(
650 self,
651 uow,
652 parent_saves,
653 child_saves,
654 parent_deletes,
655 child_deletes,
656 after_save,
657 before_delete,
658 ):
659 if self.post_update:
660 parent_post_updates = unitofwork.PostUpdateAll(
661 uow, self.parent.primary_base_mapper, False
662 )
663 parent_pre_updates = unitofwork.PostUpdateAll(
664 uow, self.parent.primary_base_mapper, True
665 )
666
667 uow.dependencies.update(
668 [
669 (child_saves, after_save),
670 (parent_saves, after_save),
671 (after_save, parent_post_updates),
672 (after_save, parent_pre_updates),
673 (before_delete, parent_pre_updates),
674 (parent_pre_updates, child_deletes),
675 (parent_pre_updates, parent_deletes),
676 ]
677 )
678 else:
679 uow.dependencies.update(
680 [
681 (child_saves, after_save),
682 (after_save, parent_saves),
683 (parent_saves, child_deletes),
684 (parent_deletes, child_deletes),
685 ]
686 )
687
688 def per_state_dependencies(
689 self,
690 uow,
691 save_parent,
692 delete_parent,
693 child_action,
694 after_save,
695 before_delete,
696 isdelete,
697 childisdelete,
698 ):
699 if self.post_update:
700 if not isdelete:
701 parent_post_updates = unitofwork.PostUpdateAll(
702 uow, self.parent.primary_base_mapper, False
703 )
704 if childisdelete:
705 uow.dependencies.update(
706 [
707 (after_save, parent_post_updates),
708 (parent_post_updates, child_action),
709 ]
710 )
711 else:
712 uow.dependencies.update(
713 [
714 (save_parent, after_save),
715 (child_action, after_save),
716 (after_save, parent_post_updates),
717 ]
718 )
719 else:
720 parent_pre_updates = unitofwork.PostUpdateAll(
721 uow, self.parent.primary_base_mapper, True
722 )
723
724 uow.dependencies.update(
725 [
726 (before_delete, parent_pre_updates),
727 (parent_pre_updates, delete_parent),
728 (parent_pre_updates, child_action),
729 ]
730 )
731
732 elif not isdelete:
733 if not childisdelete:
734 uow.dependencies.update(
735 [(child_action, after_save), (after_save, save_parent)]
736 )
737 else:
738 uow.dependencies.update([(after_save, save_parent)])
739
740 else:
741 if childisdelete:
742 uow.dependencies.update([(delete_parent, child_action)])
743
744 def presort_deletes(self, uowcommit, states):
745 if self.cascade.delete or self.cascade.delete_orphan:
746 for state in states:
747 history = uowcommit.get_attribute_history(
748 state, self.key, self._passive_delete_flag
749 )
750 if history:
751 if self.cascade.delete_orphan:
752 todelete = history.sum()
753 else:
754 todelete = history.non_deleted()
755 for child in todelete:
756 if child is None:
757 continue
758 uowcommit.register_object(
759 child,
760 isdelete=True,
761 operation="delete",
762 prop=self.prop,
763 )
764 t = self.mapper.cascade_iterator("delete", child)
765 for c, m, st_, dct_ in t:
766 uowcommit.register_object(st_, isdelete=True)
767
768 def presort_saves(self, uowcommit, states):
769 for state in states:
770 uowcommit.register_object(state, operation="add", prop=self.prop)
771 if self.cascade.delete_orphan:
772 history = uowcommit.get_attribute_history(
773 state, self.key, self._passive_delete_flag
774 )
775 if history:
776 for child in history.deleted:
777 if self.hasparent(child) is False:
778 uowcommit.register_object(
779 child,
780 isdelete=True,
781 operation="delete",
782 prop=self.prop,
783 )
784
785 t = self.mapper.cascade_iterator("delete", child)
786 for c, m, st_, dct_ in t:
787 uowcommit.register_object(st_, isdelete=True)
788
789 def process_deletes(self, uowcommit, states):
790 if (
791 self.post_update
792 and not self.cascade.delete_orphan
793 and not self.passive_deletes == "all"
794 ):
795 # post_update means we have to update our
796 # row to not reference the child object
797 # before we can DELETE the row
798 for state in states:
799 self._synchronize(state, None, None, True, uowcommit)
800 if state and self.post_update:
801 history = uowcommit.get_attribute_history(
802 state, self.key, self._passive_delete_flag
803 )
804 if history:
805 self._post_update(
806 state, uowcommit, history.sum(), is_m2o_delete=True
807 )
808
809 def process_saves(self, uowcommit, states):
810 for state in states:
811 history = uowcommit.get_attribute_history(
812 state, self.key, attributes.PASSIVE_NO_INITIALIZE
813 )
814 if history:
815 if history.added:
816 for child in history.added:
817 self._synchronize(
818 state, child, None, False, uowcommit, "add"
819 )
820 elif history.deleted:
821 self._synchronize(
822 state, None, None, True, uowcommit, "delete"
823 )
824 if self.post_update:
825 self._post_update(state, uowcommit, history.sum())
826
827 def _synchronize(
828 self,
829 state,
830 child,
831 associationrow,
832 clearkeys,
833 uowcommit,
834 operation=None,
835 ):
836 if state is None or (
837 not self.post_update and uowcommit.is_deleted(state)
838 ):
839 return
840
841 if (
842 operation is not None
843 and child is not None
844 and not uowcommit.session._contains_state(child)
845 ):
846 util.warn(
847 "Object of type %s not in session, %s "
848 "operation along '%s' won't proceed"
849 % (mapperutil.state_class_str(child), operation, self.prop)
850 )
851 return
852
853 if clearkeys or child is None:
854 sync.clear(state, self.parent, self.prop.synchronize_pairs)
855 else:
856 self._verify_canload(child)
857 sync.populate(
858 child,
859 self.mapper,
860 state,
861 self.parent,
862 self.prop.synchronize_pairs,
863 uowcommit,
864 False,
865 )
866
867
868class DetectKeySwitch(DependencyProcessor):
869 """For many-to-one relationships with no one-to-many backref,
870 searches for parents through the unit of work when a primary
871 key has changed and updates them.
872
873 Theoretically, this approach could be expanded to support transparent
874 deletion of objects referenced via many-to-one as well, although
875 the current attribute system doesn't do enough bookkeeping for this
876 to be efficient.
877
878 """
879
880 def per_property_preprocessors(self, uow):
881 if self.prop._reverse_property:
882 if self.passive_updates:
883 return
884 else:
885 if False in (
886 prop.passive_updates
887 for prop in self.prop._reverse_property
888 ):
889 return
890
891 uow.register_preprocessor(self, False)
892
893 def per_property_flush_actions(self, uow):
894 parent_saves = unitofwork.SaveUpdateAll(uow, self.parent.base_mapper)
895 after_save = unitofwork.ProcessAll(uow, self, False, False)
896 uow.dependencies.update([(parent_saves, after_save)])
897
898 def per_state_flush_actions(self, uow, states, isdelete):
899 pass
900
901 def presort_deletes(self, uowcommit, states):
902 pass
903
904 def presort_saves(self, uow, states):
905 if not self.passive_updates:
906 # for non-passive updates, register in the preprocess stage
907 # so that mapper save_obj() gets a hold of changes
908 self._process_key_switches(states, uow)
909
910 def prop_has_changes(self, uow, states, isdelete):
911 if not isdelete and self.passive_updates:
912 d = self._key_switchers(uow, states)
913 return bool(d)
914
915 return False
916
917 def process_deletes(self, uowcommit, states):
918 assert False
919
920 def process_saves(self, uowcommit, states):
921 # for passive updates, register objects in the process stage
922 # so that we avoid ManyToOneDP's registering the object without
923 # the listonly flag in its own preprocess stage (results in UPDATE)
924 # statements being emitted
925 assert self.passive_updates
926 self._process_key_switches(states, uowcommit)
927
928 def _key_switchers(self, uow, states):
929 switched, notswitched = uow.memo(
930 ("pk_switchers", self), lambda: (set(), set())
931 )
932
933 allstates = switched.union(notswitched)
934 for s in states:
935 if s not in allstates:
936 if self._pks_changed(uow, s):
937 switched.add(s)
938 else:
939 notswitched.add(s)
940 return switched
941
942 def _process_key_switches(self, deplist, uowcommit):
943 switchers = self._key_switchers(uowcommit, deplist)
944 if switchers:
945 # if primary key values have actually changed somewhere, perform
946 # a linear search through the UOW in search of a parent.
947 for state in uowcommit.session.identity_map.all_states():
948 if not issubclass(state.class_, self.parent.class_):
949 continue
950 dict_ = state.dict
951 related = state.get_impl(self.key).get(
952 state, dict_, passive=self._passive_update_flag
953 )
954 if (
955 related is not attributes.PASSIVE_NO_RESULT
956 and related is not None
957 ):
958 if self.prop.uselist:
959 if not related:
960 continue
961 related_obj = related[0]
962 else:
963 related_obj = related
964 related_state = attributes.instance_state(related_obj)
965 if related_state in switchers:
966 uowcommit.register_object(
967 state, False, self.passive_updates
968 )
969 sync.populate(
970 related_state,
971 self.mapper,
972 state,
973 self.parent,
974 self.prop.synchronize_pairs,
975 uowcommit,
976 self.passive_updates,
977 )
978
979 def _pks_changed(self, uowcommit, state):
980 return bool(state.key) and sync.source_modified(
981 uowcommit, state, self.mapper, self.prop.synchronize_pairs
982 )
983
984
985class ManyToManyDP(DependencyProcessor):
986 def per_property_dependencies(
987 self,
988 uow,
989 parent_saves,
990 child_saves,
991 parent_deletes,
992 child_deletes,
993 after_save,
994 before_delete,
995 ):
996 uow.dependencies.update(
997 [
998 (parent_saves, after_save),
999 (child_saves, after_save),
1000 (after_save, child_deletes),
1001 # a rowswitch on the parent from deleted to saved
1002 # can make this one occur, as the "save" may remove
1003 # an element from the
1004 # "deleted" list before we have a chance to
1005 # process its child rows
1006 (before_delete, parent_saves),
1007 (before_delete, parent_deletes),
1008 (before_delete, child_deletes),
1009 (before_delete, child_saves),
1010 ]
1011 )
1012
1013 def per_state_dependencies(
1014 self,
1015 uow,
1016 save_parent,
1017 delete_parent,
1018 child_action,
1019 after_save,
1020 before_delete,
1021 isdelete,
1022 childisdelete,
1023 ):
1024 if not isdelete:
1025 if childisdelete:
1026 uow.dependencies.update(
1027 [(save_parent, after_save), (after_save, child_action)]
1028 )
1029 else:
1030 uow.dependencies.update(
1031 [(save_parent, after_save), (child_action, after_save)]
1032 )
1033 else:
1034 uow.dependencies.update(
1035 [(before_delete, child_action), (before_delete, delete_parent)]
1036 )
1037
1038 def presort_deletes(self, uowcommit, states):
1039 # TODO: no tests fail if this whole
1040 # thing is removed !!!!
1041 if not self.passive_deletes:
1042 # if no passive deletes, load history on
1043 # the collection, so that prop_has_changes()
1044 # returns True
1045 for state in states:
1046 uowcommit.get_attribute_history(
1047 state, self.key, self._passive_delete_flag
1048 )
1049
1050 def presort_saves(self, uowcommit, states):
1051 if not self.passive_updates:
1052 # if no passive updates, load history on
1053 # each collection where parent has changed PK,
1054 # so that prop_has_changes() returns True
1055 for state in states:
1056 if self._pks_changed(uowcommit, state):
1057 history = uowcommit.get_attribute_history(
1058 state, self.key, attributes.PASSIVE_OFF
1059 )
1060
1061 if not self.cascade.delete_orphan:
1062 return
1063
1064 # check for child items removed from the collection
1065 # if delete_orphan check is turned on.
1066 for state in states:
1067 history = uowcommit.get_attribute_history(
1068 state, self.key, attributes.PASSIVE_NO_INITIALIZE
1069 )
1070 if history:
1071 for child in history.deleted:
1072 if self.hasparent(child) is False:
1073 uowcommit.register_object(
1074 child,
1075 isdelete=True,
1076 operation="delete",
1077 prop=self.prop,
1078 )
1079 for c, m, st_, dct_ in self.mapper.cascade_iterator(
1080 "delete", child
1081 ):
1082 uowcommit.register_object(st_, isdelete=True)
1083
1084 def process_deletes(self, uowcommit, states):
1085 secondary_delete = []
1086 secondary_insert = []
1087 secondary_update = []
1088
1089 processed = self._get_reversed_processed_set(uowcommit)
1090 tmp = set()
1091 for state in states:
1092 # this history should be cached already, as
1093 # we loaded it in preprocess_deletes
1094 history = uowcommit.get_attribute_history(
1095 state, self.key, self._passive_delete_flag
1096 )
1097 if history:
1098 for child in history.non_added():
1099 if child is None or (
1100 processed is not None and (state, child) in processed
1101 ):
1102 continue
1103 associationrow = {}
1104 if not self._synchronize(
1105 state,
1106 child,
1107 associationrow,
1108 False,
1109 uowcommit,
1110 "delete",
1111 ):
1112 continue
1113 secondary_delete.append(associationrow)
1114
1115 tmp.update((c, state) for c in history.non_added())
1116
1117 if processed is not None:
1118 processed.update(tmp)
1119
1120 self._run_crud(
1121 uowcommit, secondary_insert, secondary_update, secondary_delete
1122 )
1123
1124 def process_saves(self, uowcommit, states):
1125 secondary_delete = []
1126 secondary_insert = []
1127 secondary_update = []
1128
1129 processed = self._get_reversed_processed_set(uowcommit)
1130 tmp = set()
1131
1132 for state in states:
1133 need_cascade_pks = not self.passive_updates and self._pks_changed(
1134 uowcommit, state
1135 )
1136 if need_cascade_pks:
1137 passive = (
1138 attributes.PASSIVE_OFF
1139 | attributes.INCLUDE_PENDING_MUTATIONS
1140 )
1141 else:
1142 passive = (
1143 attributes.PASSIVE_NO_INITIALIZE
1144 | attributes.INCLUDE_PENDING_MUTATIONS
1145 )
1146 history = uowcommit.get_attribute_history(state, self.key, passive)
1147 if history:
1148 for child in history.added:
1149 if processed is not None and (state, child) in processed:
1150 continue
1151 associationrow = {}
1152 if not self._synchronize(
1153 state, child, associationrow, False, uowcommit, "add"
1154 ):
1155 continue
1156 secondary_insert.append(associationrow)
1157 for child in history.deleted:
1158 if processed is not None and (state, child) in processed:
1159 continue
1160 associationrow = {}
1161 if not self._synchronize(
1162 state,
1163 child,
1164 associationrow,
1165 False,
1166 uowcommit,
1167 "delete",
1168 ):
1169 continue
1170 secondary_delete.append(associationrow)
1171
1172 tmp.update((c, state) for c in history.added + history.deleted)
1173
1174 if need_cascade_pks:
1175 for child in history.unchanged:
1176 associationrow = {}
1177 sync.update(
1178 state,
1179 self.parent,
1180 associationrow,
1181 "old_",
1182 self.prop.synchronize_pairs,
1183 )
1184 sync.update(
1185 child,
1186 self.mapper,
1187 associationrow,
1188 "old_",
1189 self.prop.secondary_synchronize_pairs,
1190 )
1191
1192 secondary_update.append(associationrow)
1193
1194 if processed is not None:
1195 processed.update(tmp)
1196
1197 self._run_crud(
1198 uowcommit, secondary_insert, secondary_update, secondary_delete
1199 )
1200
1201 def _run_crud(
1202 self, uowcommit, secondary_insert, secondary_update, secondary_delete
1203 ):
1204 connection = uowcommit.transaction.connection(self.mapper)
1205
1206 if secondary_delete:
1207 associationrow = secondary_delete[0]
1208 statement = self.secondary.delete().where(
1209 sql.and_(
1210 *[
1211 c == sql.bindparam(c.key, type_=c.type)
1212 for c in self.secondary.c
1213 if c.key in associationrow
1214 ]
1215 )
1216 )
1217 result = connection.execute(statement, secondary_delete)
1218
1219 if (
1220 result.supports_sane_multi_rowcount()
1221 ) and result.rowcount != len(secondary_delete):
1222 raise exc.StaleDataError(
1223 "DELETE statement on table '%s' expected to delete "
1224 "%d row(s); Only %d were matched."
1225 % (
1226 self.secondary.description,
1227 len(secondary_delete),
1228 result.rowcount,
1229 )
1230 )
1231
1232 if secondary_update:
1233 associationrow = secondary_update[0]
1234 statement = self.secondary.update().where(
1235 sql.and_(
1236 *[
1237 c == sql.bindparam("old_" + c.key, type_=c.type)
1238 for c in self.secondary.c
1239 if c.key in associationrow
1240 ]
1241 )
1242 )
1243 result = connection.execute(statement, secondary_update)
1244
1245 if (
1246 result.supports_sane_multi_rowcount()
1247 ) and result.rowcount != len(secondary_update):
1248 raise exc.StaleDataError(
1249 "UPDATE statement on table '%s' expected to update "
1250 "%d row(s); Only %d were matched."
1251 % (
1252 self.secondary.description,
1253 len(secondary_update),
1254 result.rowcount,
1255 )
1256 )
1257
1258 if secondary_insert:
1259 statement = self.secondary.insert()
1260 connection.execute(statement, secondary_insert)
1261
1262 def _synchronize(
1263 self, state, child, associationrow, clearkeys, uowcommit, operation
1264 ):
1265 # this checks for None if uselist=True
1266 self._verify_canload(child)
1267
1268 # but if uselist=False we get here. If child is None,
1269 # no association row can be generated, so return.
1270 if child is None:
1271 return False
1272
1273 if child is not None and not uowcommit.session._contains_state(child):
1274 if not child.deleted:
1275 util.warn(
1276 "Object of type %s not in session, %s "
1277 "operation along '%s' won't proceed"
1278 % (mapperutil.state_class_str(child), operation, self.prop)
1279 )
1280 return False
1281
1282 sync.populate_dict(
1283 state, self.parent, associationrow, self.prop.synchronize_pairs
1284 )
1285 sync.populate_dict(
1286 child,
1287 self.mapper,
1288 associationrow,
1289 self.prop.secondary_synchronize_pairs,
1290 )
1291
1292 return True
1293
1294 def _pks_changed(self, uowcommit, state):
1295 return sync.source_modified(
1296 uowcommit, state, self.parent, self.prop.synchronize_pairs
1297 )
1298
1299
1300_direction_to_processor = {
1301 ONETOMANY: OneToManyDP,
1302 MANYTOONE: ManyToOneDP,
1303 MANYTOMANY: ManyToManyDP,
1304}