Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/identity.py: 26%
163 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# orm/identity.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8import weakref
10from . import util as orm_util
11from .. import exc as sa_exc
12from .. import util
15class IdentityMap(object):
16 def __init__(self):
17 self._dict = {}
18 self._modified = set()
19 self._wr = weakref.ref(self)
21 def _kill(self):
22 self._add_unpresent = _killed
24 def keys(self):
25 return self._dict.keys()
27 def replace(self, state):
28 raise NotImplementedError()
30 def add(self, state):
31 raise NotImplementedError()
33 def _add_unpresent(self, state, key):
34 """optional inlined form of add() which can assume item isn't present
35 in the map"""
36 self.add(state)
38 def update(self, dict_):
39 raise NotImplementedError("IdentityMap uses add() to insert data")
41 def clear(self):
42 raise NotImplementedError("IdentityMap uses remove() to remove data")
44 def _manage_incoming_state(self, state):
45 state._instance_dict = self._wr
47 if state.modified:
48 self._modified.add(state)
50 def _manage_removed_state(self, state):
51 del state._instance_dict
52 if state.modified:
53 self._modified.discard(state)
55 def _dirty_states(self):
56 return self._modified
58 def check_modified(self):
59 """return True if any InstanceStates present have been marked
60 as 'modified'.
62 """
63 return bool(self._modified)
65 def has_key(self, key):
66 return key in self
68 def popitem(self):
69 raise NotImplementedError("IdentityMap uses remove() to remove data")
71 def pop(self, key, *args):
72 raise NotImplementedError("IdentityMap uses remove() to remove data")
74 def setdefault(self, key, default=None):
75 raise NotImplementedError("IdentityMap uses add() to insert data")
77 def __len__(self):
78 return len(self._dict)
80 def copy(self):
81 raise NotImplementedError()
83 def __setitem__(self, key, value):
84 raise NotImplementedError("IdentityMap uses add() to insert data")
86 def __delitem__(self, key):
87 raise NotImplementedError("IdentityMap uses remove() to remove data")
90class WeakInstanceDict(IdentityMap):
91 def __getitem__(self, key):
92 state = self._dict[key]
93 o = state.obj()
94 if o is None:
95 raise KeyError(key)
96 return o
98 def __contains__(self, key):
99 try:
100 if key in self._dict:
101 state = self._dict[key]
102 o = state.obj()
103 else:
104 return False
105 except KeyError:
106 return False
107 else:
108 return o is not None
110 def contains_state(self, state):
111 if state.key in self._dict:
112 try:
113 return self._dict[state.key] is state
114 except KeyError:
115 return False
116 else:
117 return False
119 def replace(self, state):
120 if state.key in self._dict:
121 try:
122 existing = self._dict[state.key]
123 except KeyError:
124 # catch gc removed the key after we just checked for it
125 pass
126 else:
127 if existing is not state:
128 self._manage_removed_state(existing)
129 else:
130 return None
131 else:
132 existing = None
134 self._dict[state.key] = state
135 self._manage_incoming_state(state)
136 return existing
138 def add(self, state):
139 key = state.key
140 # inline of self.__contains__
141 if key in self._dict:
142 try:
143 existing_state = self._dict[key]
144 except KeyError:
145 # catch gc removed the key after we just checked for it
146 pass
147 else:
148 if existing_state is not state:
149 o = existing_state.obj()
150 if o is not None:
151 raise sa_exc.InvalidRequestError(
152 "Can't attach instance "
153 "%s; another instance with key %s is already "
154 "present in this session."
155 % (orm_util.state_str(state), state.key)
156 )
157 else:
158 return False
159 self._dict[key] = state
160 self._manage_incoming_state(state)
161 return True
163 def _add_unpresent(self, state, key):
164 # inlined form of add() called by loading.py
165 self._dict[key] = state
166 state._instance_dict = self._wr
168 def get(self, key, default=None):
169 if key not in self._dict:
170 return default
171 try:
172 state = self._dict[key]
173 except KeyError:
174 # catch gc removed the key after we just checked for it
175 return default
176 else:
177 o = state.obj()
178 if o is None:
179 return default
180 return o
182 def items(self):
183 values = self.all_states()
184 result = []
185 for state in values:
186 value = state.obj()
187 if value is not None:
188 result.append((state.key, value))
189 return result
191 def values(self):
192 values = self.all_states()
193 result = []
194 for state in values:
195 value = state.obj()
196 if value is not None:
197 result.append(value)
199 return result
201 def __iter__(self):
202 return iter(self.keys())
204 if util.py2k:
206 def iteritems(self):
207 return iter(self.items())
209 def itervalues(self):
210 return iter(self.values())
212 def all_states(self):
213 if util.py2k:
214 return self._dict.values()
215 else:
216 return list(self._dict.values())
218 def _fast_discard(self, state):
219 # used by InstanceState for state being
220 # GC'ed, inlines _managed_removed_state
221 try:
222 st = self._dict[state.key]
223 except KeyError:
224 # catch gc removed the key after we just checked for it
225 pass
226 else:
227 if st is state:
228 self._dict.pop(state.key, None)
230 def discard(self, state):
231 self.safe_discard(state)
233 def safe_discard(self, state):
234 if state.key in self._dict:
235 try:
236 st = self._dict[state.key]
237 except KeyError:
238 # catch gc removed the key after we just checked for it
239 pass
240 else:
241 if st is state:
242 self._dict.pop(state.key, None)
243 self._manage_removed_state(state)
246def _killed(state, key):
247 # external function to avoid creating cycles when assigned to
248 # the IdentityMap
249 raise sa_exc.InvalidRequestError(
250 "Object %s cannot be converted to 'persistent' state, as this "
251 "identity map is no longer valid. Has the owning Session "
252 "been closed?" % orm_util.state_str(state),
253 code="lkrp",
254 )