Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/util.py: 19%
108 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# engine/util.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8from .. import exc
9from .. import util
10from ..util import collections_abc
11from ..util import immutabledict
14def connection_memoize(key):
15 """Decorator, memoize a function in a connection.info stash.
17 Only applicable to functions which take no arguments other than a
18 connection. The memo will be stored in ``connection.info[key]``.
19 """
21 @util.decorator
22 def decorated(fn, self, connection):
23 connection = connection.connect()
24 try:
25 return connection.info[key]
26 except KeyError:
27 connection.info[key] = val = fn(self, connection)
28 return val
30 return decorated
33_no_tuple = ()
34_no_kw = util.immutabledict()
37def _distill_params(connection, multiparams, params):
38 r"""Given arguments from the calling form \*multiparams, \**params,
39 return a list of bind parameter structures, usually a list of
40 dictionaries.
42 In the case of 'raw' execution which accepts positional parameters,
43 it may be a list of tuples or lists.
45 """
47 if not multiparams:
48 if params:
49 connection._warn_for_legacy_exec_format()
50 return [params]
51 else:
52 return []
53 elif len(multiparams) == 1:
54 zero = multiparams[0]
55 if isinstance(zero, (list, tuple)):
56 if (
57 not zero
58 or hasattr(zero[0], "__iter__")
59 and not hasattr(zero[0], "strip")
60 ):
61 # execute(stmt, [{}, {}, {}, ...])
62 # execute(stmt, [(), (), (), ...])
63 return zero
64 else:
65 # this is used by exec_driver_sql only, so a deprecation
66 # warning would already be coming from passing a plain
67 # textual statement with positional parameters to
68 # execute().
69 # execute(stmt, ("value", "value"))
70 return [zero]
71 elif hasattr(zero, "keys"):
72 # execute(stmt, {"key":"value"})
73 return [zero]
74 else:
75 connection._warn_for_legacy_exec_format()
76 # execute(stmt, "value")
77 return [[zero]]
78 else:
79 connection._warn_for_legacy_exec_format()
80 if hasattr(multiparams[0], "__iter__") and not hasattr(
81 multiparams[0], "strip"
82 ):
83 return multiparams
84 else:
85 return [multiparams]
88def _distill_cursor_params(connection, multiparams, params):
89 """_distill_params without any warnings. more appropriate for
90 "cursor" params that can include tuple arguments, lists of tuples,
91 etc.
93 """
95 if not multiparams:
96 if params:
97 return [params]
98 else:
99 return []
100 elif len(multiparams) == 1:
101 zero = multiparams[0]
102 if isinstance(zero, (list, tuple)):
103 if (
104 not zero
105 or hasattr(zero[0], "__iter__")
106 and not hasattr(zero[0], "strip")
107 ):
108 # execute(stmt, [{}, {}, {}, ...])
109 # execute(stmt, [(), (), (), ...])
110 return zero
111 else:
112 # this is used by exec_driver_sql only, so a deprecation
113 # warning would already be coming from passing a plain
114 # textual statement with positional parameters to
115 # execute().
116 # execute(stmt, ("value", "value"))
118 return [zero]
119 elif hasattr(zero, "keys"):
120 # execute(stmt, {"key":"value"})
121 return [zero]
122 else:
123 # execute(stmt, "value")
124 return [[zero]]
125 else:
126 if hasattr(multiparams[0], "__iter__") and not hasattr(
127 multiparams[0], "strip"
128 ):
129 return multiparams
130 else:
131 return [multiparams]
134def _distill_params_20(params):
135 if params is None:
136 return _no_tuple, _no_kw
137 elif isinstance(params, list):
138 # collections_abc.MutableSequence): # avoid abc.__instancecheck__
139 if params and not isinstance(
140 params[0], (collections_abc.Mapping, tuple)
141 ):
142 raise exc.ArgumentError(
143 "List argument must consist only of tuples or dictionaries"
144 )
146 return (params,), _no_kw
147 elif isinstance(
148 params,
149 (tuple, dict, immutabledict),
150 # only do abc.__instancecheck__ for Mapping after we've checked
151 # for plain dictionaries and would otherwise raise
152 ) or isinstance(params, collections_abc.Mapping):
153 return (params,), _no_kw
154 else:
155 raise exc.ArgumentError("mapping or sequence expected for parameters")
158class TransactionalContext(object):
159 """Apply Python context manager behavior to transaction objects.
161 Performs validation to ensure the subject of the transaction is not
162 used if the transaction were ended prematurely.
164 """
166 _trans_subject = None
168 def _transaction_is_active(self):
169 raise NotImplementedError()
171 def _transaction_is_closed(self):
172 raise NotImplementedError()
174 def _rollback_can_be_called(self):
175 """indicates the object is in a state that is known to be acceptable
176 for rollback() to be called.
178 This does not necessarily mean rollback() will succeed or not raise
179 an error, just that there is currently no state detected that indicates
180 rollback() would fail or emit warnings.
182 It also does not mean that there's a transaction in progress, as
183 it is usually safe to call rollback() even if no transaction is
184 present.
186 .. versionadded:: 1.4.28
188 """
189 raise NotImplementedError()
191 def _get_subject(self):
192 raise NotImplementedError()
194 @classmethod
195 def _trans_ctx_check(cls, subject):
196 trans_context = subject._trans_context_manager
197 if trans_context:
198 if not trans_context._transaction_is_active():
199 raise exc.InvalidRequestError(
200 "Can't operate on closed transaction inside context "
201 "manager. Please complete the context manager "
202 "before emitting further commands."
203 )
205 def __enter__(self):
206 subject = self._get_subject()
208 # none for outer transaction, may be non-None for nested
209 # savepoint, legacy nesting cases
210 trans_context = subject._trans_context_manager
211 self._outer_trans_ctx = trans_context
213 self._trans_subject = subject
214 subject._trans_context_manager = self
215 return self
217 def __exit__(self, type_, value, traceback):
218 subject = self._trans_subject
220 # simplistically we could assume that
221 # "subject._trans_context_manager is self". However, any calling
222 # code that is manipulating __exit__ directly would break this
223 # assumption. alembic context manager
224 # is an example of partial use that just calls __exit__ and
225 # not __enter__ at the moment. it's safe to assume this is being done
226 # in the wild also
227 out_of_band_exit = (
228 subject is None or subject._trans_context_manager is not self
229 )
231 if type_ is None and self._transaction_is_active():
232 try:
233 self.commit()
234 except:
235 with util.safe_reraise():
236 if self._rollback_can_be_called():
237 self.rollback()
238 finally:
239 if not out_of_band_exit:
240 subject._trans_context_manager = self._outer_trans_ctx
241 self._trans_subject = self._outer_trans_ctx = None
242 else:
243 try:
244 if not self._transaction_is_active():
245 if not self._transaction_is_closed():
246 self.close()
247 else:
248 if self._rollback_can_be_called():
249 self.rollback()
250 finally:
251 if not out_of_band_exit:
252 subject._trans_context_manager = self._outer_trans_ctx
253 self._trans_subject = self._outer_trans_ctx = None