Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/sync.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

71 statements  

1# orm/sync.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9 

10"""private module containing functions used for copying data 

11between instances based on join conditions. 

12 

13""" 

14 

15from __future__ import annotations 

16 

17from . import exc 

18from . import util as orm_util 

19from .base import PassiveFlag 

20 

21 

22def populate( 

23 source, 

24 source_mapper, 

25 dest, 

26 dest_mapper, 

27 synchronize_pairs, 

28 uowcommit, 

29 flag_cascaded_pks, 

30): 

31 source_dict = source.dict 

32 dest_dict = dest.dict 

33 

34 for l, r in synchronize_pairs: 

35 try: 

36 # inline of source_mapper._get_state_attr_by_column 

37 prop = source_mapper._columntoproperty[l] 

38 value = source.manager[prop.key].impl.get( 

39 source, source_dict, PassiveFlag.PASSIVE_OFF 

40 ) 

41 except exc.UnmappedColumnError as err: 

42 _raise_col_to_prop(False, source_mapper, l, dest_mapper, r, err) 

43 

44 try: 

45 # inline of dest_mapper._set_state_attr_by_column 

46 prop = dest_mapper._columntoproperty[r] 

47 dest.manager[prop.key].impl.set(dest, dest_dict, value, None) 

48 except exc.UnmappedColumnError as err: 

49 _raise_col_to_prop(True, source_mapper, l, dest_mapper, r, err) 

50 

51 # technically the "r.primary_key" check isn't 

52 # needed here, but we check for this condition to limit 

53 # how often this logic is invoked for memory/performance 

54 # reasons, since we only need this info for a primary key 

55 # destination. 

56 if ( 

57 flag_cascaded_pks 

58 and l.primary_key 

59 and r.primary_key 

60 and r.references(l) 

61 ): 

62 uowcommit.attributes[("pk_cascaded", dest, r)] = True 

63 

64 

65def bulk_populate_inherit_keys(source_dict, source_mapper, synchronize_pairs): 

66 # a simplified version of populate() used by bulk insert mode 

67 for l, r in synchronize_pairs: 

68 try: 

69 prop = source_mapper._columntoproperty[l] 

70 value = source_dict[prop.key] 

71 except exc.UnmappedColumnError as err: 

72 _raise_col_to_prop(False, source_mapper, l, source_mapper, r, err) 

73 

74 try: 

75 prop = source_mapper._columntoproperty[r] 

76 source_dict[prop.key] = value 

77 except exc.UnmappedColumnError as err: 

78 _raise_col_to_prop(True, source_mapper, l, source_mapper, r, err) 

79 

80 

81def clear(dest, dest_mapper, synchronize_pairs): 

82 for l, r in synchronize_pairs: 

83 if ( 

84 r.primary_key 

85 and dest_mapper._get_state_attr_by_column(dest, dest.dict, r) 

86 not in orm_util._none_set 

87 ): 

88 raise AssertionError( 

89 f"Dependency rule on column '{l}' " 

90 "tried to blank-out primary key " 

91 f"column '{r}' on instance '{orm_util.state_str(dest)}'" 

92 ) 

93 try: 

94 dest_mapper._set_state_attr_by_column(dest, dest.dict, r, None) 

95 except exc.UnmappedColumnError as err: 

96 _raise_col_to_prop(True, None, l, dest_mapper, r, err) 

97 

98 

99def update(source, source_mapper, dest, old_prefix, synchronize_pairs): 

100 for l, r in synchronize_pairs: 

101 try: 

102 oldvalue = source_mapper._get_committed_attr_by_column( 

103 source.obj(), l 

104 ) 

105 value = source_mapper._get_state_attr_by_column( 

106 source, source.dict, l, passive=PassiveFlag.PASSIVE_OFF 

107 ) 

108 except exc.UnmappedColumnError as err: 

109 _raise_col_to_prop(False, source_mapper, l, None, r, err) 

110 dest[r.key] = value 

111 dest[old_prefix + r.key] = oldvalue 

112 

113 

114def populate_dict(source, source_mapper, dict_, synchronize_pairs): 

115 for l, r in synchronize_pairs: 

116 try: 

117 value = source_mapper._get_state_attr_by_column( 

118 source, source.dict, l, passive=PassiveFlag.PASSIVE_OFF 

119 ) 

120 except exc.UnmappedColumnError as err: 

121 _raise_col_to_prop(False, source_mapper, l, None, r, err) 

122 

123 dict_[r.key] = value 

124 

125 

126def source_modified(uowcommit, source, source_mapper, synchronize_pairs): 

127 """return true if the source object has changes from an old to a 

128 new value on the given synchronize pairs 

129 

130 """ 

131 for l, r in synchronize_pairs: 

132 try: 

133 prop = source_mapper._columntoproperty[l] 

134 except exc.UnmappedColumnError as err: 

135 _raise_col_to_prop(False, source_mapper, l, None, r, err) 

136 history = uowcommit.get_attribute_history( 

137 source, prop.key, PassiveFlag.PASSIVE_NO_INITIALIZE 

138 ) 

139 if bool(history.deleted): 

140 return True 

141 else: 

142 return False 

143 

144 

145def _raise_col_to_prop( 

146 isdest, source_mapper, source_column, dest_mapper, dest_column, err 

147): 

148 if isdest: 

149 raise exc.UnmappedColumnError( 

150 "Can't execute sync rule for " 

151 "destination column '%s'; mapper '%s' does not map " 

152 "this column. Try using an explicit `foreign_keys` " 

153 "collection which does not include this column (or use " 

154 "a viewonly=True relation)." % (dest_column, dest_mapper) 

155 ) from err 

156 else: 

157 raise exc.UnmappedColumnError( 

158 "Can't execute sync rule for " 

159 "source column '%s'; mapper '%s' does not map this " 

160 "column. Try using an explicit `foreign_keys` " 

161 "collection which does not include destination column " 

162 "'%s' (or use a viewonly=True relation)." 

163 % (source_column, source_mapper, dest_column) 

164 ) from err