Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/sync.py: 17%

71 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# orm/sync.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""private module containing functions used for copying data 

9between instances based on join conditions. 

10 

11""" 

12 

13from . import attributes 

14from . import exc 

15from . import util as orm_util 

16from .. import util 

17 

18 

19def populate( 

20 source, 

21 source_mapper, 

22 dest, 

23 dest_mapper, 

24 synchronize_pairs, 

25 uowcommit, 

26 flag_cascaded_pks, 

27): 

28 source_dict = source.dict 

29 dest_dict = dest.dict 

30 

31 for l, r in synchronize_pairs: 

32 try: 

33 # inline of source_mapper._get_state_attr_by_column 

34 prop = source_mapper._columntoproperty[l] 

35 value = source.manager[prop.key].impl.get( 

36 source, source_dict, attributes.PASSIVE_OFF 

37 ) 

38 except exc.UnmappedColumnError as err: 

39 _raise_col_to_prop(False, source_mapper, l, dest_mapper, r, err) 

40 

41 try: 

42 # inline of dest_mapper._set_state_attr_by_column 

43 prop = dest_mapper._columntoproperty[r] 

44 dest.manager[prop.key].impl.set(dest, dest_dict, value, None) 

45 except exc.UnmappedColumnError as err: 

46 _raise_col_to_prop(True, source_mapper, l, dest_mapper, r, err) 

47 

48 # technically the "r.primary_key" check isn't 

49 # needed here, but we check for this condition to limit 

50 # how often this logic is invoked for memory/performance 

51 # reasons, since we only need this info for a primary key 

52 # destination. 

53 if ( 

54 flag_cascaded_pks 

55 and l.primary_key 

56 and r.primary_key 

57 and r.references(l) 

58 ): 

59 uowcommit.attributes[("pk_cascaded", dest, r)] = True 

60 

61 

62def bulk_populate_inherit_keys(source_dict, source_mapper, synchronize_pairs): 

63 # a simplified version of populate() used by bulk insert mode 

64 for l, r in synchronize_pairs: 

65 try: 

66 prop = source_mapper._columntoproperty[l] 

67 value = source_dict[prop.key] 

68 except exc.UnmappedColumnError as err: 

69 _raise_col_to_prop(False, source_mapper, l, source_mapper, r, err) 

70 

71 try: 

72 prop = source_mapper._columntoproperty[r] 

73 source_dict[prop.key] = value 

74 except exc.UnmappedColumnError: 

75 _raise_col_to_prop(True, source_mapper, l, source_mapper, r) 

76 

77 

78def clear(dest, dest_mapper, synchronize_pairs): 

79 for l, r in synchronize_pairs: 

80 if ( 

81 r.primary_key 

82 and dest_mapper._get_state_attr_by_column(dest, dest.dict, r) 

83 not in orm_util._none_set 

84 ): 

85 

86 raise AssertionError( 

87 "Dependency rule tried to blank-out primary key " 

88 "column '%s' on instance '%s'" % (r, orm_util.state_str(dest)) 

89 ) 

90 try: 

91 dest_mapper._set_state_attr_by_column(dest, dest.dict, r, None) 

92 except exc.UnmappedColumnError as err: 

93 _raise_col_to_prop(True, None, l, dest_mapper, r, err) 

94 

95 

96def update(source, source_mapper, dest, old_prefix, synchronize_pairs): 

97 for l, r in synchronize_pairs: 

98 try: 

99 oldvalue = source_mapper._get_committed_attr_by_column( 

100 source.obj(), l 

101 ) 

102 value = source_mapper._get_state_attr_by_column( 

103 source, source.dict, l, passive=attributes.PASSIVE_OFF 

104 ) 

105 except exc.UnmappedColumnError as err: 

106 _raise_col_to_prop(False, source_mapper, l, None, r, err) 

107 dest[r.key] = value 

108 dest[old_prefix + r.key] = oldvalue 

109 

110 

111def populate_dict(source, source_mapper, dict_, synchronize_pairs): 

112 for l, r in synchronize_pairs: 

113 try: 

114 value = source_mapper._get_state_attr_by_column( 

115 source, source.dict, l, passive=attributes.PASSIVE_OFF 

116 ) 

117 except exc.UnmappedColumnError as err: 

118 _raise_col_to_prop(False, source_mapper, l, None, r, err) 

119 

120 dict_[r.key] = value 

121 

122 

123def source_modified(uowcommit, source, source_mapper, synchronize_pairs): 

124 """return true if the source object has changes from an old to a 

125 new value on the given synchronize pairs 

126 

127 """ 

128 for l, r in synchronize_pairs: 

129 try: 

130 prop = source_mapper._columntoproperty[l] 

131 except exc.UnmappedColumnError as err: 

132 _raise_col_to_prop(False, source_mapper, l, None, r, err) 

133 history = uowcommit.get_attribute_history( 

134 source, prop.key, attributes.PASSIVE_NO_INITIALIZE 

135 ) 

136 if bool(history.deleted): 

137 return True 

138 else: 

139 return False 

140 

141 

142def _raise_col_to_prop( 

143 isdest, source_mapper, source_column, dest_mapper, dest_column, err 

144): 

145 if isdest: 

146 util.raise_( 

147 exc.UnmappedColumnError( 

148 "Can't execute sync rule for " 

149 "destination column '%s'; mapper '%s' does not map " 

150 "this column. Try using an explicit `foreign_keys` " 

151 "collection which does not include this column (or use " 

152 "a viewonly=True relation)." % (dest_column, dest_mapper) 

153 ), 

154 replace_context=err, 

155 ) 

156 else: 

157 util.raise_( 

158 exc.UnmappedColumnError( 

159 "Can't execute sync rule for " 

160 "source column '%s'; mapper '%s' does not map this " 

161 "column. Try using an explicit `foreign_keys` " 

162 "collection which does not include destination column " 

163 "'%s' (or use a viewonly=True relation)." 

164 % (source_column, source_mapper, dest_column) 

165 ), 

166 replace_context=err, 

167 )