Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/identity.py: 26%

163 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# orm/identity.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8import weakref 

9 

10from . import util as orm_util 

11from .. import exc as sa_exc 

12from .. import util 

13 

14 

15class IdentityMap(object): 

16 def __init__(self): 

17 self._dict = {} 

18 self._modified = set() 

19 self._wr = weakref.ref(self) 

20 

21 def _kill(self): 

22 self._add_unpresent = _killed 

23 

24 def keys(self): 

25 return self._dict.keys() 

26 

27 def replace(self, state): 

28 raise NotImplementedError() 

29 

30 def add(self, state): 

31 raise NotImplementedError() 

32 

33 def _add_unpresent(self, state, key): 

34 """optional inlined form of add() which can assume item isn't present 

35 in the map""" 

36 self.add(state) 

37 

38 def update(self, dict_): 

39 raise NotImplementedError("IdentityMap uses add() to insert data") 

40 

41 def clear(self): 

42 raise NotImplementedError("IdentityMap uses remove() to remove data") 

43 

44 def _manage_incoming_state(self, state): 

45 state._instance_dict = self._wr 

46 

47 if state.modified: 

48 self._modified.add(state) 

49 

50 def _manage_removed_state(self, state): 

51 del state._instance_dict 

52 if state.modified: 

53 self._modified.discard(state) 

54 

55 def _dirty_states(self): 

56 return self._modified 

57 

58 def check_modified(self): 

59 """return True if any InstanceStates present have been marked 

60 as 'modified'. 

61 

62 """ 

63 return bool(self._modified) 

64 

65 def has_key(self, key): 

66 return key in self 

67 

68 def popitem(self): 

69 raise NotImplementedError("IdentityMap uses remove() to remove data") 

70 

71 def pop(self, key, *args): 

72 raise NotImplementedError("IdentityMap uses remove() to remove data") 

73 

74 def setdefault(self, key, default=None): 

75 raise NotImplementedError("IdentityMap uses add() to insert data") 

76 

77 def __len__(self): 

78 return len(self._dict) 

79 

80 def copy(self): 

81 raise NotImplementedError() 

82 

83 def __setitem__(self, key, value): 

84 raise NotImplementedError("IdentityMap uses add() to insert data") 

85 

86 def __delitem__(self, key): 

87 raise NotImplementedError("IdentityMap uses remove() to remove data") 

88 

89 

90class WeakInstanceDict(IdentityMap): 

91 def __getitem__(self, key): 

92 state = self._dict[key] 

93 o = state.obj() 

94 if o is None: 

95 raise KeyError(key) 

96 return o 

97 

98 def __contains__(self, key): 

99 try: 

100 if key in self._dict: 

101 state = self._dict[key] 

102 o = state.obj() 

103 else: 

104 return False 

105 except KeyError: 

106 return False 

107 else: 

108 return o is not None 

109 

110 def contains_state(self, state): 

111 if state.key in self._dict: 

112 try: 

113 return self._dict[state.key] is state 

114 except KeyError: 

115 return False 

116 else: 

117 return False 

118 

119 def replace(self, state): 

120 if state.key in self._dict: 

121 try: 

122 existing = self._dict[state.key] 

123 except KeyError: 

124 # catch gc removed the key after we just checked for it 

125 pass 

126 else: 

127 if existing is not state: 

128 self._manage_removed_state(existing) 

129 else: 

130 return None 

131 else: 

132 existing = None 

133 

134 self._dict[state.key] = state 

135 self._manage_incoming_state(state) 

136 return existing 

137 

138 def add(self, state): 

139 key = state.key 

140 # inline of self.__contains__ 

141 if key in self._dict: 

142 try: 

143 existing_state = self._dict[key] 

144 except KeyError: 

145 # catch gc removed the key after we just checked for it 

146 pass 

147 else: 

148 if existing_state is not state: 

149 o = existing_state.obj() 

150 if o is not None: 

151 raise sa_exc.InvalidRequestError( 

152 "Can't attach instance " 

153 "%s; another instance with key %s is already " 

154 "present in this session." 

155 % (orm_util.state_str(state), state.key) 

156 ) 

157 else: 

158 return False 

159 self._dict[key] = state 

160 self._manage_incoming_state(state) 

161 return True 

162 

163 def _add_unpresent(self, state, key): 

164 # inlined form of add() called by loading.py 

165 self._dict[key] = state 

166 state._instance_dict = self._wr 

167 

168 def get(self, key, default=None): 

169 if key not in self._dict: 

170 return default 

171 try: 

172 state = self._dict[key] 

173 except KeyError: 

174 # catch gc removed the key after we just checked for it 

175 return default 

176 else: 

177 o = state.obj() 

178 if o is None: 

179 return default 

180 return o 

181 

182 def items(self): 

183 values = self.all_states() 

184 result = [] 

185 for state in values: 

186 value = state.obj() 

187 if value is not None: 

188 result.append((state.key, value)) 

189 return result 

190 

191 def values(self): 

192 values = self.all_states() 

193 result = [] 

194 for state in values: 

195 value = state.obj() 

196 if value is not None: 

197 result.append(value) 

198 

199 return result 

200 

201 def __iter__(self): 

202 return iter(self.keys()) 

203 

204 if util.py2k: 

205 

206 def iteritems(self): 

207 return iter(self.items()) 

208 

209 def itervalues(self): 

210 return iter(self.values()) 

211 

212 def all_states(self): 

213 if util.py2k: 

214 return self._dict.values() 

215 else: 

216 return list(self._dict.values()) 

217 

218 def _fast_discard(self, state): 

219 # used by InstanceState for state being 

220 # GC'ed, inlines _managed_removed_state 

221 try: 

222 st = self._dict[state.key] 

223 except KeyError: 

224 # catch gc removed the key after we just checked for it 

225 pass 

226 else: 

227 if st is state: 

228 self._dict.pop(state.key, None) 

229 

230 def discard(self, state): 

231 self.safe_discard(state) 

232 

233 def safe_discard(self, state): 

234 if state.key in self._dict: 

235 try: 

236 st = self._dict[state.key] 

237 except KeyError: 

238 # catch gc removed the key after we just checked for it 

239 pass 

240 else: 

241 if st is state: 

242 self._dict.pop(state.key, None) 

243 self._manage_removed_state(state) 

244 

245 

246def _killed(state, key): 

247 # external function to avoid creating cycles when assigned to 

248 # the IdentityMap 

249 raise sa_exc.InvalidRequestError( 

250 "Object %s cannot be converted to 'persistent' state, as this " 

251 "identity map is no longer valid. Has the owning Session " 

252 "been closed?" % orm_util.state_str(state), 

253 code="lkrp", 

254 )