1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r"""
11
12.. dialect:: sqlite+aiosqlite
13 :name: aiosqlite
14 :dbapi: aiosqlite
15 :connectstring: sqlite+aiosqlite:///file_path
16 :url: https://pypi.org/project/aiosqlite/
17
18The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
19running on top of pysqlite.
20
21aiosqlite is a wrapper around pysqlite that uses a background thread for
22each connection. It does not actually use non-blocking IO, as SQLite
23databases are not socket-based. However it does provide a working asyncio
24interface that's useful for testing and prototyping purposes.
25
26Using a special asyncio mediation layer, the aiosqlite dialect is usable
27as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
28extension package.
29
30This dialect should normally be used only with the
31:func:`_asyncio.create_async_engine` engine creation function::
32
33 from sqlalchemy.ext.asyncio import create_async_engine
34
35 engine = create_async_engine("sqlite+aiosqlite:///filename")
36
37The URL passes through all arguments to the ``pysqlite`` driver, so all
38connection arguments are the same as they are for that of :ref:`pysqlite`.
39
40.. _aiosqlite_udfs:
41
42User-Defined Functions
43----------------------
44
45aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
46in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
47
48.. _aiosqlite_serializable:
49
50Serializable isolation / Savepoints / Transactional DDL (asyncio version)
51-------------------------------------------------------------------------
52
53A newly revised version of this important section is now available
54at the top level of the SQLAlchemy SQLite documentation, in the section
55:ref:`sqlite_transactions`.
56
57
58.. _aiosqlite_pooling:
59
60Pooling Behavior
61----------------
62
63The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
64based on the kind of SQLite database that's requested:
65
66* When a ``:memory:`` SQLite database is specified, the dialect by default
67 will use :class:`.StaticPool`. This pool maintains a single
68 connection, so that all access to the engine
69 use the same ``:memory:`` database.
70* When a file-based database is specified, the dialect will use
71 :class:`.AsyncAdaptedQueuePool` as the source of connections.
72
73 .. versionchanged:: 2.0.38
74
75 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
76 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
77 may be used by specifying it via the
78 :paramref:`_sa.create_engine.poolclass` parameter.
79
80""" # noqa
81
82import asyncio
83from functools import partial
84
85from .base import SQLiteExecutionContext
86from .pysqlite import SQLiteDialect_pysqlite
87from ... import pool
88from ...connectors.asyncio import AsyncAdapt_dbapi_connection
89from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
90from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
91from ...util.concurrency import await_
92
93
94class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor):
95 __slots__ = ()
96
97
98class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor):
99 __slots__ = ()
100
101
102class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection):
103 __slots__ = ()
104
105 _cursor_cls = AsyncAdapt_aiosqlite_cursor
106 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor
107
108 @property
109 def isolation_level(self):
110 return self._connection.isolation_level
111
112 @isolation_level.setter
113 def isolation_level(self, value):
114 # aiosqlite's isolation_level setter works outside the Thread
115 # that it's supposed to, necessitating setting check_same_thread=False.
116 # for improved stability, we instead invent our own awaitable version
117 # using aiosqlite's async queue directly.
118
119 def set_iso(connection, value):
120 connection.isolation_level = value
121
122 function = partial(set_iso, self._connection._conn, value)
123 future = asyncio.get_event_loop().create_future()
124
125 self._connection._tx.put_nowait((future, function))
126
127 try:
128 return await_(future)
129 except Exception as error:
130 self._handle_exception(error)
131
132 def create_function(self, *args, **kw):
133 try:
134 await_(self._connection.create_function(*args, **kw))
135 except Exception as error:
136 self._handle_exception(error)
137
138 def rollback(self):
139 if self._connection._connection:
140 super().rollback()
141
142 def commit(self):
143 if self._connection._connection:
144 super().commit()
145
146 def close(self):
147 try:
148 await_(self._connection.close())
149 except ValueError:
150 # this is undocumented for aiosqlite, that ValueError
151 # was raised if .close() was called more than once, which is
152 # both not customary for DBAPI and is also not a DBAPI.Error
153 # exception. This is now fixed in aiosqlite via my PR
154 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
155 # assured this will not become some other kind of exception,
156 # since it doesn't raise anymore.
157
158 pass
159 except Exception as error:
160 self._handle_exception(error)
161
162 def _handle_exception(self, error):
163 if isinstance(error, ValueError) and error.args[0].lower() in (
164 "no active connection",
165 "connection closed",
166 ):
167 raise self.dbapi.sqlite.OperationalError(error.args[0]) from error
168 else:
169 super()._handle_exception(error)
170
171
172class AsyncAdapt_aiosqlite_dbapi:
173 def __init__(self, aiosqlite, sqlite):
174 self.aiosqlite = aiosqlite
175 self.sqlite = sqlite
176 self.paramstyle = "qmark"
177 self._init_dbapi_attributes()
178
179 def _init_dbapi_attributes(self):
180 for name in (
181 "DatabaseError",
182 "Error",
183 "IntegrityError",
184 "NotSupportedError",
185 "OperationalError",
186 "ProgrammingError",
187 "sqlite_version",
188 "sqlite_version_info",
189 ):
190 setattr(self, name, getattr(self.aiosqlite, name))
191
192 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
193 setattr(self, name, getattr(self.sqlite, name))
194
195 for name in ("Binary",):
196 setattr(self, name, getattr(self.sqlite, name))
197
198 def connect(self, *arg, **kw):
199 creator_fn = kw.pop("async_creator_fn", None)
200 if creator_fn:
201 connection = creator_fn(*arg, **kw)
202 else:
203 connection = self.aiosqlite.connect(*arg, **kw)
204 # it's a Thread. you'll thank us later
205 connection.daemon = True
206
207 return AsyncAdapt_aiosqlite_connection(
208 self,
209 await_(connection),
210 )
211
212
213class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
214 def create_server_side_cursor(self):
215 return self._dbapi_connection.cursor(server_side=True)
216
217
218class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
219 driver = "aiosqlite"
220 supports_statement_cache = True
221
222 is_async = True
223
224 supports_server_side_cursors = True
225
226 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
227
228 @classmethod
229 def import_dbapi(cls):
230 return AsyncAdapt_aiosqlite_dbapi(
231 __import__("aiosqlite"), __import__("sqlite3")
232 )
233
234 @classmethod
235 def get_pool_class(cls, url):
236 if cls._is_url_file_db(url):
237 return pool.AsyncAdaptedQueuePool
238 else:
239 return pool.StaticPool
240
241 def is_disconnect(self, e, connection, cursor):
242 if isinstance(e, self.dbapi.OperationalError):
243 err_lower = str(e).lower()
244 if (
245 "no active connection" in err_lower
246 or "connection closed" in err_lower
247 ):
248 return True
249
250 return super().is_disconnect(e, connection, cursor)
251
252 def get_driver_connection(self, connection):
253 return connection._connection
254
255
256dialect = SQLiteDialect_aiosqlite