1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r"""
11
12.. dialect:: sqlite+aiosqlite
13 :name: aiosqlite
14 :dbapi: aiosqlite
15 :connectstring: sqlite+aiosqlite:///file_path
16 :url: https://pypi.org/project/aiosqlite/
17
18The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
19running on top of pysqlite.
20
21aiosqlite is a wrapper around pysqlite that uses a background thread for
22each connection. It does not actually use non-blocking IO, as SQLite
23databases are not socket-based. However it does provide a working asyncio
24interface that's useful for testing and prototyping purposes.
25
26Using a special asyncio mediation layer, the aiosqlite dialect is usable
27as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
28extension package.
29
30This dialect should normally be used only with the
31:func:`_asyncio.create_async_engine` engine creation function::
32
33 from sqlalchemy.ext.asyncio import create_async_engine
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52Similarly to pysqlite, aiosqlite does not support SAVEPOINT feature.
53
54The solution is similar to :ref:`pysqlite_serializable`. This is achieved by the event listeners in async::
55
56 from sqlalchemy import create_engine, event
57 from sqlalchemy.ext.asyncio import create_async_engine
58
59 engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
60
61 @event.listens_for(engine.sync_engine, "connect")
62 def do_connect(dbapi_connection, connection_record):
63 # disable aiosqlite's emitting of the BEGIN statement entirely.
64 # also stops it from emitting COMMIT before any DDL.
65 dbapi_connection.isolation_level = None
66
67 @event.listens_for(engine.sync_engine, "begin")
68 def do_begin(conn):
69 # emit our own BEGIN
70 conn.exec_driver_sql("BEGIN")
71
72.. warning:: When using the above recipe, it is advised to not use the
73 :paramref:`.Connection.execution_options.isolation_level` setting on
74 :class:`_engine.Connection` and :func:`_sa.create_engine`
75 with the SQLite driver,
76 as this function necessarily will also alter the ".isolation_level" setting.
77
78""" # noqa
79
80import asyncio
81from functools import partial
82
83from .base import SQLiteExecutionContext
84from .pysqlite import SQLiteDialect_pysqlite
85from ... import pool
86from ...connectors.asyncio import AsyncAdapt_dbapi_connection
87from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
88from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
89from ...util.concurrency import await_
90
91
92class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor):
93 __slots__ = ()
94
95
96class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor):
97 __slots__ = ()
98
99
100class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection):
101 __slots__ = ()
102
103 _cursor_cls = AsyncAdapt_aiosqlite_cursor
104 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor
105
106 @property
107 def isolation_level(self):
108 return self._connection.isolation_level
109
110 @isolation_level.setter
111 def isolation_level(self, value):
112 # aiosqlite's isolation_level setter works outside the Thread
113 # that it's supposed to, necessitating setting check_same_thread=False.
114 # for improved stability, we instead invent our own awaitable version
115 # using aiosqlite's async queue directly.
116
117 def set_iso(connection, value):
118 connection.isolation_level = value
119
120 function = partial(set_iso, self._connection._conn, value)
121 future = asyncio.get_event_loop().create_future()
122
123 self._connection._tx.put_nowait((future, function))
124
125 try:
126 return await_(future)
127 except Exception as error:
128 self._handle_exception(error)
129
130 def create_function(self, *args, **kw):
131 try:
132 await_(self._connection.create_function(*args, **kw))
133 except Exception as error:
134 self._handle_exception(error)
135
136 def rollback(self):
137 if self._connection._connection:
138 super().rollback()
139
140 def commit(self):
141 if self._connection._connection:
142 super().commit()
143
144 def close(self):
145 try:
146 await_(self._connection.close())
147 except ValueError:
148 # this is undocumented for aiosqlite, that ValueError
149 # was raised if .close() was called more than once, which is
150 # both not customary for DBAPI and is also not a DBAPI.Error
151 # exception. This is now fixed in aiosqlite via my PR
152 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
153 # assured this will not become some other kind of exception,
154 # since it doesn't raise anymore.
155
156 pass
157 except Exception as error:
158 self._handle_exception(error)
159
160 def _handle_exception(self, error):
161 if isinstance(error, ValueError) and error.args[0].lower() in (
162 "no active connection",
163 "connection closed",
164 ):
165 raise self.dbapi.sqlite.OperationalError(error.args[0]) from error
166 else:
167 super()._handle_exception(error)
168
169
170class AsyncAdapt_aiosqlite_dbapi:
171 def __init__(self, aiosqlite, sqlite):
172 self.aiosqlite = aiosqlite
173 self.sqlite = sqlite
174 self.paramstyle = "qmark"
175 self._init_dbapi_attributes()
176
177 def _init_dbapi_attributes(self):
178 for name in (
179 "DatabaseError",
180 "Error",
181 "IntegrityError",
182 "NotSupportedError",
183 "OperationalError",
184 "ProgrammingError",
185 "sqlite_version",
186 "sqlite_version_info",
187 ):
188 setattr(self, name, getattr(self.aiosqlite, name))
189
190 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
191 setattr(self, name, getattr(self.sqlite, name))
192
193 for name in ("Binary",):
194 setattr(self, name, getattr(self.sqlite, name))
195
196 def connect(self, *arg, **kw):
197 creator_fn = kw.pop("async_creator_fn", None)
198 if creator_fn:
199 connection = creator_fn(*arg, **kw)
200 else:
201 connection = self.aiosqlite.connect(*arg, **kw)
202 # it's a Thread. you'll thank us later
203 connection.daemon = True
204
205 return AsyncAdapt_aiosqlite_connection(
206 self,
207 await_(connection),
208 )
209
210
211class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
212 def create_server_side_cursor(self):
213 return self._dbapi_connection.cursor(server_side=True)
214
215
216class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
217 driver = "aiosqlite"
218 supports_statement_cache = True
219
220 is_async = True
221
222 supports_server_side_cursors = True
223
224 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
225
226 @classmethod
227 def import_dbapi(cls):
228 return AsyncAdapt_aiosqlite_dbapi(
229 __import__("aiosqlite"), __import__("sqlite3")
230 )
231
232 @classmethod
233 def get_pool_class(cls, url):
234 if cls._is_url_file_db(url):
235 return pool.NullPool
236 else:
237 return pool.StaticPool
238
239 def is_disconnect(self, e, connection, cursor):
240 if isinstance(e, self.dbapi.OperationalError):
241 err_lower = str(e).lower()
242 if (
243 "no active connection" in err_lower
244 or "connection closed" in err_lower
245 ):
246 return True
247
248 return super().is_disconnect(e, connection, cursor)
249
250 def get_driver_connection(self, connection):
251 return connection._connection
252
253
254dialect = SQLiteDialect_aiosqlite