Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 36%
180 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# sqlite/aiosqlite.py
2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8r"""
10.. dialect:: sqlite+aiosqlite
11 :name: aiosqlite
12 :dbapi: aiosqlite
13 :connectstring: sqlite+aiosqlite:///file_path
14 :url: https://pypi.org/project/aiosqlite/
16The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
17running on top of pysqlite.
19aiosqlite is a wrapper around pysqlite that uses a background thread for
20each connection. It does not actually use non-blocking IO, as SQLite
21databases are not socket-based. However it does provide a working asyncio
22interface that's useful for testing and prototyping purposes.
24Using a special asyncio mediation layer, the aiosqlite dialect is usable
25as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
26extension package.
28This dialect should normally be used only with the
29:func:`_asyncio.create_async_engine` engine creation function::
31 from sqlalchemy.ext.asyncio import create_async_engine
32 engine = create_async_engine("sqlite+aiosqlite:///filename")
34The URL passes through all arguments to the ``pysqlite`` driver, so all
35connection arguments are the same as they are for that of :ref:`pysqlite`.
37.. _aiosqlite_udfs:
39User-Defined Functions
40----------------------
42aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
43in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46""" # noqa
48from .base import SQLiteExecutionContext
49from .pysqlite import SQLiteDialect_pysqlite
50from ... import pool
51from ... import util
52from ...engine import AdaptedConnection
53from ...util.concurrency import await_fallback
54from ...util.concurrency import await_only
57class AsyncAdapt_aiosqlite_cursor:
58 __slots__ = (
59 "_adapt_connection",
60 "_connection",
61 "description",
62 "await_",
63 "_rows",
64 "arraysize",
65 "rowcount",
66 "lastrowid",
67 )
69 server_side = False
71 def __init__(self, adapt_connection):
72 self._adapt_connection = adapt_connection
73 self._connection = adapt_connection._connection
74 self.await_ = adapt_connection.await_
75 self.arraysize = 1
76 self.rowcount = -1
77 self.description = None
78 self._rows = []
80 def close(self):
81 self._rows[:] = []
83 def execute(self, operation, parameters=None):
84 try:
85 _cursor = self.await_(self._connection.cursor())
87 if parameters is None:
88 self.await_(_cursor.execute(operation))
89 else:
90 self.await_(_cursor.execute(operation, parameters))
92 if _cursor.description:
93 self.description = _cursor.description
94 self.lastrowid = self.rowcount = -1
96 if not self.server_side:
97 self._rows = self.await_(_cursor.fetchall())
98 else:
99 self.description = None
100 self.lastrowid = _cursor.lastrowid
101 self.rowcount = _cursor.rowcount
103 if not self.server_side:
104 self.await_(_cursor.close())
105 else:
106 self._cursor = _cursor
107 except Exception as error:
108 self._adapt_connection._handle_exception(error)
110 def executemany(self, operation, seq_of_parameters):
111 try:
112 _cursor = self.await_(self._connection.cursor())
113 self.await_(_cursor.executemany(operation, seq_of_parameters))
114 self.description = None
115 self.lastrowid = _cursor.lastrowid
116 self.rowcount = _cursor.rowcount
117 self.await_(_cursor.close())
118 except Exception as error:
119 self._adapt_connection._handle_exception(error)
121 def setinputsizes(self, *inputsizes):
122 pass
124 def __iter__(self):
125 while self._rows:
126 yield self._rows.pop(0)
128 def fetchone(self):
129 if self._rows:
130 return self._rows.pop(0)
131 else:
132 return None
134 def fetchmany(self, size=None):
135 if size is None:
136 size = self.arraysize
138 retval = self._rows[0:size]
139 self._rows[:] = self._rows[size:]
140 return retval
142 def fetchall(self):
143 retval = self._rows[:]
144 self._rows[:] = []
145 return retval
148class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
149 __slots__ = "_cursor"
151 server_side = True
153 def __init__(self, *arg, **kw):
154 super().__init__(*arg, **kw)
155 self._cursor = None
157 def close(self):
158 if self._cursor is not None:
159 self.await_(self._cursor.close())
160 self._cursor = None
162 def fetchone(self):
163 return self.await_(self._cursor.fetchone())
165 def fetchmany(self, size=None):
166 if size is None:
167 size = self.arraysize
168 return self.await_(self._cursor.fetchmany(size=size))
170 def fetchall(self):
171 return self.await_(self._cursor.fetchall())
174class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
175 await_ = staticmethod(await_only)
176 __slots__ = ("dbapi", "_connection")
178 def __init__(self, dbapi, connection):
179 self.dbapi = dbapi
180 self._connection = connection
182 @property
183 def isolation_level(self):
184 return self._connection.isolation_level
186 @isolation_level.setter
187 def isolation_level(self, value):
188 try:
189 self._connection.isolation_level = value
190 except Exception as error:
191 self._handle_exception(error)
193 def create_function(self, *args, **kw):
194 try:
195 self.await_(self._connection.create_function(*args, **kw))
196 except Exception as error:
197 self._handle_exception(error)
199 def cursor(self, server_side=False):
200 if server_side:
201 return AsyncAdapt_aiosqlite_ss_cursor(self)
202 else:
203 return AsyncAdapt_aiosqlite_cursor(self)
205 def execute(self, *args, **kw):
206 return self.await_(self._connection.execute(*args, **kw))
208 def rollback(self):
209 try:
210 self.await_(self._connection.rollback())
211 except Exception as error:
212 self._handle_exception(error)
214 def commit(self):
215 try:
216 self.await_(self._connection.commit())
217 except Exception as error:
218 self._handle_exception(error)
220 def close(self):
221 try:
222 self.await_(self._connection.close())
223 except Exception as error:
224 self._handle_exception(error)
226 def _handle_exception(self, error):
227 if (
228 isinstance(error, ValueError)
229 and error.args[0] == "no active connection"
230 ):
231 util.raise_(
232 self.dbapi.sqlite.OperationalError("no active connection"),
233 from_=error,
234 )
235 else:
236 raise error
239class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
240 __slots__ = ()
242 await_ = staticmethod(await_fallback)
245class AsyncAdapt_aiosqlite_dbapi:
246 def __init__(self, aiosqlite, sqlite):
247 self.aiosqlite = aiosqlite
248 self.sqlite = sqlite
249 self.paramstyle = "qmark"
250 self._init_dbapi_attributes()
252 def _init_dbapi_attributes(self):
253 for name in (
254 "DatabaseError",
255 "Error",
256 "IntegrityError",
257 "NotSupportedError",
258 "OperationalError",
259 "ProgrammingError",
260 "sqlite_version",
261 "sqlite_version_info",
262 ):
263 setattr(self, name, getattr(self.aiosqlite, name))
265 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
266 setattr(self, name, getattr(self.sqlite, name))
268 for name in ("Binary",):
269 setattr(self, name, getattr(self.sqlite, name))
271 def connect(self, *arg, **kw):
272 async_fallback = kw.pop("async_fallback", False)
274 # Q. WHY do we need this?
275 # A. Because there is no way to set connection.isolation_level
276 # otherwise
277 # Q. BUT HOW do you know it is SAFE ?????
278 # A. The only operation that isn't safe is the isolation level set
279 # operation which aiosqlite appears to have let slip through even
280 # though pysqlite appears to do check_same_thread for this.
281 # All execute operations etc. should be safe because they all
282 # go through the single executor thread.
284 kw["check_same_thread"] = False
286 connection = self.aiosqlite.connect(*arg, **kw)
288 # it's a Thread. you'll thank us later
289 connection.daemon = True
291 if util.asbool(async_fallback):
292 return AsyncAdaptFallback_aiosqlite_connection(
293 self,
294 await_fallback(connection),
295 )
296 else:
297 return AsyncAdapt_aiosqlite_connection(
298 self,
299 await_only(connection),
300 )
303class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
304 def create_server_side_cursor(self):
305 return self._dbapi_connection.cursor(server_side=True)
308class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
309 driver = "aiosqlite"
310 supports_statement_cache = True
312 is_async = True
314 supports_server_side_cursors = True
316 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
318 @classmethod
319 def dbapi(cls):
320 return AsyncAdapt_aiosqlite_dbapi(
321 __import__("aiosqlite"), __import__("sqlite3")
322 )
324 @classmethod
325 def get_pool_class(cls, url):
326 if cls._is_url_file_db(url):
327 return pool.NullPool
328 else:
329 return pool.StaticPool
331 def is_disconnect(self, e, connection, cursor):
332 if isinstance(
333 e, self.dbapi.OperationalError
334 ) and "no active connection" in str(e):
335 return True
337 return super().is_disconnect(e, connection, cursor)
339 def get_driver_connection(self, connection):
340 return connection._connection
343dialect = SQLiteDialect_aiosqlite