1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: sqlite+aiosqlite
12 :name: aiosqlite
13 :dbapi: aiosqlite
14 :connectstring: sqlite+aiosqlite:///file_path
15 :url: https://pypi.org/project/aiosqlite/
16
17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
18running on top of pysqlite.
19
20aiosqlite is a wrapper around pysqlite that uses a background thread for
21each connection. It does not actually use non-blocking IO, as SQLite
22databases are not socket-based. However it does provide a working asyncio
23interface that's useful for testing and prototyping purposes.
24
25Using a special asyncio mediation layer, the aiosqlite dialect is usable
26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
27extension package.
28
29This dialect should normally be used only with the
30:func:`_asyncio.create_async_engine` engine creation function::
31
32 from sqlalchemy.ext.asyncio import create_async_engine
33
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52A newly revised version of this important section is now available
53at the top level of the SQLAlchemy SQLite documentation, in the section
54:ref:`sqlite_transactions`.
55
56
57.. _aiosqlite_pooling:
58
59Pooling Behavior
60----------------
61
62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
63based on the kind of SQLite database that's requested:
64
65* When a ``:memory:`` SQLite database is specified, the dialect by default
66 will use :class:`.StaticPool`. This pool maintains a single
67 connection, so that all access to the engine
68 use the same ``:memory:`` database.
69* When a file-based database is specified, the dialect will use
70 :class:`.AsyncAdaptedQueuePool` as the source of connections.
71
72 .. versionchanged:: 2.0.38
73
74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
76 may be used by specifying it via the
77 :paramref:`_sa.create_engine.poolclass` parameter.
78
79""" # noqa
80from __future__ import annotations
81
82import asyncio
83from collections import deque
84from functools import partial
85from types import ModuleType
86from typing import Any
87from typing import cast
88from typing import Deque
89from typing import Iterator
90from typing import NoReturn
91from typing import Optional
92from typing import Sequence
93from typing import TYPE_CHECKING
94from typing import Union
95
96from .base import SQLiteExecutionContext
97from .pysqlite import SQLiteDialect_pysqlite
98from ... import pool
99from ... import util
100from ...connectors.asyncio import AsyncAdapt_dbapi_module
101from ...engine import AdaptedConnection
102from ...util.concurrency import await_fallback
103from ...util.concurrency import await_only
104
105if TYPE_CHECKING:
106 from ...connectors.asyncio import AsyncIODBAPIConnection
107 from ...connectors.asyncio import AsyncIODBAPICursor
108 from ...engine.interfaces import _DBAPICursorDescription
109 from ...engine.interfaces import _DBAPIMultiExecuteParams
110 from ...engine.interfaces import _DBAPISingleExecuteParams
111 from ...engine.interfaces import DBAPIConnection
112 from ...engine.interfaces import DBAPICursor
113 from ...engine.interfaces import DBAPIModule
114 from ...engine.url import URL
115 from ...pool.base import PoolProxiedConnection
116
117
118class AsyncAdapt_aiosqlite_cursor:
119 # TODO: base on connectors/asyncio.py
120 # see #10415
121
122 __slots__ = (
123 "_adapt_connection",
124 "_connection",
125 "description",
126 "await_",
127 "_rows",
128 "arraysize",
129 "rowcount",
130 "lastrowid",
131 )
132
133 server_side = False
134
135 def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection):
136 self._adapt_connection = adapt_connection
137 self._connection = adapt_connection._connection
138 self.await_ = adapt_connection.await_
139 self.arraysize = 1
140 self.rowcount = -1
141 self.description: Optional[_DBAPICursorDescription] = None
142 self._rows: Deque[Any] = deque()
143
144 def close(self) -> None:
145 self._rows.clear()
146
147 def execute(
148 self,
149 operation: Any,
150 parameters: Optional[_DBAPISingleExecuteParams] = None,
151 ) -> Any:
152
153 try:
154 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
155
156 if parameters is None:
157 self.await_(_cursor.execute(operation))
158 else:
159 self.await_(_cursor.execute(operation, parameters))
160
161 if _cursor.description:
162 self.description = _cursor.description
163 self.lastrowid = self.rowcount = -1
164
165 if not self.server_side:
166 self._rows = deque(self.await_(_cursor.fetchall()))
167 else:
168 self.description = None
169 self.lastrowid = _cursor.lastrowid
170 self.rowcount = _cursor.rowcount
171
172 if not self.server_side:
173 self.await_(_cursor.close())
174 else:
175 self._cursor = _cursor # type: ignore[misc]
176 except Exception as error:
177 self._adapt_connection._handle_exception(error)
178
179 def executemany(
180 self,
181 operation: Any,
182 seq_of_parameters: _DBAPIMultiExecuteParams,
183 ) -> Any:
184 try:
185 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
186 self.await_(_cursor.executemany(operation, seq_of_parameters))
187 self.description = None
188 self.lastrowid = _cursor.lastrowid
189 self.rowcount = _cursor.rowcount
190 self.await_(_cursor.close())
191 except Exception as error:
192 self._adapt_connection._handle_exception(error)
193
194 def setinputsizes(self, *inputsizes: Any) -> None:
195 pass
196
197 def __iter__(self) -> Iterator[Any]:
198 while self._rows:
199 yield self._rows.popleft()
200
201 def fetchone(self) -> Optional[Any]:
202 if self._rows:
203 return self._rows.popleft()
204 else:
205 return None
206
207 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
208 if size is None:
209 size = self.arraysize
210
211 rr = self._rows
212 return [rr.popleft() for _ in range(min(size, len(rr)))]
213
214 def fetchall(self) -> Sequence[Any]:
215 retval = list(self._rows)
216 self._rows.clear()
217 return retval
218
219
220class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
221 # TODO: base on connectors/asyncio.py
222 # see #10415
223 __slots__ = "_cursor"
224
225 server_side = True
226
227 def __init__(self, *arg: Any, **kw: Any) -> None:
228 super().__init__(*arg, **kw)
229 self._cursor: Optional[AsyncIODBAPICursor] = None
230
231 def close(self) -> None:
232 if self._cursor is not None:
233 self.await_(self._cursor.close())
234 self._cursor = None
235
236 def fetchone(self) -> Optional[Any]:
237 assert self._cursor is not None
238 return self.await_(self._cursor.fetchone())
239
240 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
241 assert self._cursor is not None
242 if size is None:
243 size = self.arraysize
244 return self.await_(self._cursor.fetchmany(size=size))
245
246 def fetchall(self) -> Sequence[Any]:
247 assert self._cursor is not None
248 return self.await_(self._cursor.fetchall())
249
250
251class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
252 await_ = staticmethod(await_only)
253 __slots__ = ("dbapi",)
254
255 def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None:
256 self.dbapi = dbapi
257 self._connection = connection
258
259 @property
260 def isolation_level(self) -> Optional[str]:
261 return cast(str, self._connection.isolation_level)
262
263 @isolation_level.setter
264 def isolation_level(self, value: Optional[str]) -> None:
265 # aiosqlite's isolation_level setter works outside the Thread
266 # that it's supposed to, necessitating setting check_same_thread=False.
267 # for improved stability, we instead invent our own awaitable version
268 # using aiosqlite's async queue directly.
269
270 def set_iso(
271 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
272 ) -> None:
273 connection.isolation_level = value
274
275 function = partial(set_iso, self._connection._conn, value)
276 future = asyncio.get_event_loop().create_future()
277
278 self._connection._tx.put_nowait((future, function))
279
280 try:
281 self.await_(future)
282 except Exception as error:
283 self._handle_exception(error)
284
285 def create_function(self, *args: Any, **kw: Any) -> None:
286 try:
287 self.await_(self._connection.create_function(*args, **kw))
288 except Exception as error:
289 self._handle_exception(error)
290
291 def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor:
292 if server_side:
293 return AsyncAdapt_aiosqlite_ss_cursor(self)
294 else:
295 return AsyncAdapt_aiosqlite_cursor(self)
296
297 def execute(self, *args: Any, **kw: Any) -> Any:
298 return self.await_(self._connection.execute(*args, **kw))
299
300 def rollback(self) -> None:
301 try:
302 self.await_(self._connection.rollback())
303 except Exception as error:
304 self._handle_exception(error)
305
306 def commit(self) -> None:
307 try:
308 self.await_(self._connection.commit())
309 except Exception as error:
310 self._handle_exception(error)
311
312 def close(self) -> None:
313 try:
314 self.await_(self._connection.close())
315 except ValueError:
316 # this is undocumented for aiosqlite, that ValueError
317 # was raised if .close() was called more than once, which is
318 # both not customary for DBAPI and is also not a DBAPI.Error
319 # exception. This is now fixed in aiosqlite via my PR
320 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
321 # assured this will not become some other kind of exception,
322 # since it doesn't raise anymore.
323
324 pass
325 except Exception as error:
326 self._handle_exception(error)
327
328 def _handle_exception(self, error: Exception) -> NoReturn:
329 if (
330 isinstance(error, ValueError)
331 and error.args[0] == "no active connection"
332 ):
333 raise self.dbapi.sqlite.OperationalError(
334 "no active connection"
335 ) from error
336 else:
337 raise error
338
339
340class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
341 __slots__ = ()
342
343 await_ = staticmethod(await_fallback)
344
345
346class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
347 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
348 self.aiosqlite = aiosqlite
349 self.sqlite = sqlite
350 self.paramstyle = "qmark"
351 self._init_dbapi_attributes()
352
353 def _init_dbapi_attributes(self) -> None:
354 for name in (
355 "DatabaseError",
356 "Error",
357 "IntegrityError",
358 "NotSupportedError",
359 "OperationalError",
360 "ProgrammingError",
361 "sqlite_version",
362 "sqlite_version_info",
363 ):
364 setattr(self, name, getattr(self.aiosqlite, name))
365
366 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
367 setattr(self, name, getattr(self.sqlite, name))
368
369 for name in ("Binary",):
370 setattr(self, name, getattr(self.sqlite, name))
371
372 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
373 async_fallback = kw.pop("async_fallback", False)
374
375 creator_fn = kw.pop("async_creator_fn", None)
376 if creator_fn:
377 connection = creator_fn(*arg, **kw)
378 else:
379 connection = self.aiosqlite.connect(*arg, **kw)
380 # it's a Thread. you'll thank us later
381 connection.daemon = True
382
383 if util.asbool(async_fallback):
384 return AsyncAdaptFallback_aiosqlite_connection(
385 self,
386 await_fallback(connection),
387 )
388 else:
389 return AsyncAdapt_aiosqlite_connection(
390 self,
391 await_only(connection),
392 )
393
394
395class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
396 def create_server_side_cursor(self) -> DBAPICursor:
397 return self._dbapi_connection.cursor(server_side=True)
398
399
400class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
401 driver = "aiosqlite"
402 supports_statement_cache = True
403
404 is_async = True
405
406 supports_server_side_cursors = True
407
408 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
409
410 @classmethod
411 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
412 return AsyncAdapt_aiosqlite_dbapi(
413 __import__("aiosqlite"), __import__("sqlite3")
414 )
415
416 @classmethod
417 def get_pool_class(cls, url: URL) -> type[pool.Pool]:
418 if cls._is_url_file_db(url):
419 return pool.AsyncAdaptedQueuePool
420 else:
421 return pool.StaticPool
422
423 def is_disconnect(
424 self,
425 e: DBAPIModule.Error,
426 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
427 cursor: Optional[DBAPICursor],
428 ) -> bool:
429 self.dbapi = cast("DBAPIModule", self.dbapi)
430 if isinstance(
431 e, self.dbapi.OperationalError
432 ) and "no active connection" in str(e):
433 return True
434
435 return super().is_disconnect(e, connection, cursor)
436
437 def get_driver_connection(
438 self, connection: DBAPIConnection
439 ) -> AsyncIODBAPIConnection:
440 return connection._connection # type: ignore[no-any-return]
441
442
443dialect = SQLiteDialect_aiosqlite