Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/sync.py: 17%
71 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# orm/sync.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8"""private module containing functions used for copying data
9between instances based on join conditions.
11"""
13from . import attributes
14from . import exc
15from . import util as orm_util
16from .. import util
19def populate(
20 source,
21 source_mapper,
22 dest,
23 dest_mapper,
24 synchronize_pairs,
25 uowcommit,
26 flag_cascaded_pks,
27):
28 source_dict = source.dict
29 dest_dict = dest.dict
31 for l, r in synchronize_pairs:
32 try:
33 # inline of source_mapper._get_state_attr_by_column
34 prop = source_mapper._columntoproperty[l]
35 value = source.manager[prop.key].impl.get(
36 source, source_dict, attributes.PASSIVE_OFF
37 )
38 except exc.UnmappedColumnError as err:
39 _raise_col_to_prop(False, source_mapper, l, dest_mapper, r, err)
41 try:
42 # inline of dest_mapper._set_state_attr_by_column
43 prop = dest_mapper._columntoproperty[r]
44 dest.manager[prop.key].impl.set(dest, dest_dict, value, None)
45 except exc.UnmappedColumnError as err:
46 _raise_col_to_prop(True, source_mapper, l, dest_mapper, r, err)
48 # technically the "r.primary_key" check isn't
49 # needed here, but we check for this condition to limit
50 # how often this logic is invoked for memory/performance
51 # reasons, since we only need this info for a primary key
52 # destination.
53 if (
54 flag_cascaded_pks
55 and l.primary_key
56 and r.primary_key
57 and r.references(l)
58 ):
59 uowcommit.attributes[("pk_cascaded", dest, r)] = True
62def bulk_populate_inherit_keys(source_dict, source_mapper, synchronize_pairs):
63 # a simplified version of populate() used by bulk insert mode
64 for l, r in synchronize_pairs:
65 try:
66 prop = source_mapper._columntoproperty[l]
67 value = source_dict[prop.key]
68 except exc.UnmappedColumnError as err:
69 _raise_col_to_prop(False, source_mapper, l, source_mapper, r, err)
71 try:
72 prop = source_mapper._columntoproperty[r]
73 source_dict[prop.key] = value
74 except exc.UnmappedColumnError:
75 _raise_col_to_prop(True, source_mapper, l, source_mapper, r)
78def clear(dest, dest_mapper, synchronize_pairs):
79 for l, r in synchronize_pairs:
80 if (
81 r.primary_key
82 and dest_mapper._get_state_attr_by_column(dest, dest.dict, r)
83 not in orm_util._none_set
84 ):
86 raise AssertionError(
87 "Dependency rule tried to blank-out primary key "
88 "column '%s' on instance '%s'" % (r, orm_util.state_str(dest))
89 )
90 try:
91 dest_mapper._set_state_attr_by_column(dest, dest.dict, r, None)
92 except exc.UnmappedColumnError as err:
93 _raise_col_to_prop(True, None, l, dest_mapper, r, err)
96def update(source, source_mapper, dest, old_prefix, synchronize_pairs):
97 for l, r in synchronize_pairs:
98 try:
99 oldvalue = source_mapper._get_committed_attr_by_column(
100 source.obj(), l
101 )
102 value = source_mapper._get_state_attr_by_column(
103 source, source.dict, l, passive=attributes.PASSIVE_OFF
104 )
105 except exc.UnmappedColumnError as err:
106 _raise_col_to_prop(False, source_mapper, l, None, r, err)
107 dest[r.key] = value
108 dest[old_prefix + r.key] = oldvalue
111def populate_dict(source, source_mapper, dict_, synchronize_pairs):
112 for l, r in synchronize_pairs:
113 try:
114 value = source_mapper._get_state_attr_by_column(
115 source, source.dict, l, passive=attributes.PASSIVE_OFF
116 )
117 except exc.UnmappedColumnError as err:
118 _raise_col_to_prop(False, source_mapper, l, None, r, err)
120 dict_[r.key] = value
123def source_modified(uowcommit, source, source_mapper, synchronize_pairs):
124 """return true if the source object has changes from an old to a
125 new value on the given synchronize pairs
127 """
128 for l, r in synchronize_pairs:
129 try:
130 prop = source_mapper._columntoproperty[l]
131 except exc.UnmappedColumnError as err:
132 _raise_col_to_prop(False, source_mapper, l, None, r, err)
133 history = uowcommit.get_attribute_history(
134 source, prop.key, attributes.PASSIVE_NO_INITIALIZE
135 )
136 if bool(history.deleted):
137 return True
138 else:
139 return False
142def _raise_col_to_prop(
143 isdest, source_mapper, source_column, dest_mapper, dest_column, err
144):
145 if isdest:
146 util.raise_(
147 exc.UnmappedColumnError(
148 "Can't execute sync rule for "
149 "destination column '%s'; mapper '%s' does not map "
150 "this column. Try using an explicit `foreign_keys` "
151 "collection which does not include this column (or use "
152 "a viewonly=True relation)." % (dest_column, dest_mapper)
153 ),
154 replace_context=err,
155 )
156 else:
157 util.raise_(
158 exc.UnmappedColumnError(
159 "Can't execute sync rule for "
160 "source column '%s'; mapper '%s' does not map this "
161 "column. Try using an explicit `foreign_keys` "
162 "collection which does not include destination column "
163 "'%s' (or use a viewonly=True relation)."
164 % (source_column, source_mapper, dest_column)
165 ),
166 replace_context=err,
167 )