1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9r"""
10
11.. dialect:: sqlite+aiosqlite
12 :name: aiosqlite
13 :dbapi: aiosqlite
14 :connectstring: sqlite+aiosqlite:///file_path
15 :url: https://pypi.org/project/aiosqlite/
16
17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
18running on top of pysqlite.
19
20aiosqlite is a wrapper around pysqlite that uses a background thread for
21each connection. It does not actually use non-blocking IO, as SQLite
22databases are not socket-based. However it does provide a working asyncio
23interface that's useful for testing and prototyping purposes.
24
25Using a special asyncio mediation layer, the aiosqlite dialect is usable
26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
27extension package.
28
29This dialect should normally be used only with the
30:func:`_asyncio.create_async_engine` engine creation function::
31
32 from sqlalchemy.ext.asyncio import create_async_engine
33
34 engine = create_async_engine("sqlite+aiosqlite:///filename")
35
36The URL passes through all arguments to the ``pysqlite`` driver, so all
37connection arguments are the same as they are for that of :ref:`pysqlite`.
38
39.. _aiosqlite_udfs:
40
41User-Defined Functions
42----------------------
43
44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
46
47.. _aiosqlite_serializable:
48
49Serializable isolation / Savepoints / Transactional DDL (asyncio version)
50-------------------------------------------------------------------------
51
52A newly revised version of this important section is now available
53at the top level of the SQLAlchemy SQLite documentation, in the section
54:ref:`sqlite_transactions`.
55
56
57.. _aiosqlite_pooling:
58
59Pooling Behavior
60----------------
61
62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
63based on the kind of SQLite database that's requested:
64
65* When a ``:memory:`` SQLite database is specified, the dialect by default
66 will use :class:`.StaticPool`. This pool maintains a single
67 connection, so that all access to the engine
68 use the same ``:memory:`` database.
69* When a file-based database is specified, the dialect will use
70 :class:`.AsyncAdaptedQueuePool` as the source of connections.
71
72 .. versionchanged:: 2.0.38
73
74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
76 may be used by specifying it via the
77 :paramref:`_sa.create_engine.poolclass` parameter.
78
79""" # noqa
80
81from __future__ import annotations
82
83import asyncio
84from collections import deque
85from functools import partial
86from threading import Thread
87from types import ModuleType
88from typing import Any
89from typing import cast
90from typing import Deque
91from typing import Iterator
92from typing import NoReturn
93from typing import Optional
94from typing import Sequence
95from typing import TYPE_CHECKING
96from typing import Union
97
98from .base import SQLiteExecutionContext
99from .pysqlite import SQLiteDialect_pysqlite
100from ... import pool
101from ... import util
102from ...connectors.asyncio import AsyncAdapt_dbapi_module
103from ...connectors.asyncio import AsyncAdapt_terminate
104from ...engine import AdaptedConnection
105from ...util.concurrency import await_fallback
106from ...util.concurrency import await_only
107
108if TYPE_CHECKING:
109 from ...connectors.asyncio import AsyncIODBAPIConnection
110 from ...connectors.asyncio import AsyncIODBAPICursor
111 from ...engine.interfaces import _DBAPICursorDescription
112 from ...engine.interfaces import _DBAPIMultiExecuteParams
113 from ...engine.interfaces import _DBAPISingleExecuteParams
114 from ...engine.interfaces import DBAPIConnection
115 from ...engine.interfaces import DBAPICursor
116 from ...engine.interfaces import DBAPIModule
117 from ...engine.url import URL
118 from ...pool.base import PoolProxiedConnection
119
120
121class AsyncAdapt_aiosqlite_cursor:
122 # TODO: base on connectors/asyncio.py
123 # see #10415
124
125 __slots__ = (
126 "_adapt_connection",
127 "_connection",
128 "description",
129 "await_",
130 "_rows",
131 "arraysize",
132 "rowcount",
133 "lastrowid",
134 )
135
136 server_side = False
137
138 def __init__(self, adapt_connection: AsyncAdapt_aiosqlite_connection):
139 self._adapt_connection = adapt_connection
140 self._connection = adapt_connection._connection
141 self.await_ = adapt_connection.await_
142 self.arraysize = 1
143 self.rowcount = -1
144 self.description: Optional[_DBAPICursorDescription] = None
145 self._rows: Deque[Any] = deque()
146
147 async def _async_soft_close(self) -> None:
148 return
149
150 def close(self) -> None:
151 self._rows.clear()
152
153 def execute(
154 self,
155 operation: Any,
156 parameters: Optional[_DBAPISingleExecuteParams] = None,
157 ) -> Any:
158
159 try:
160 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
161
162 if parameters is None:
163 self.await_(_cursor.execute(operation))
164 else:
165 self.await_(_cursor.execute(operation, parameters))
166
167 if _cursor.description:
168 self.description = _cursor.description
169 self.lastrowid = self.rowcount = -1
170
171 if not self.server_side:
172 self._rows = deque(self.await_(_cursor.fetchall()))
173 else:
174 self.description = None
175 self.lastrowid = _cursor.lastrowid
176 self.rowcount = _cursor.rowcount
177
178 if not self.server_side:
179 self.await_(_cursor.close())
180 else:
181 self._cursor = _cursor # type: ignore[misc]
182 except Exception as error:
183 self._adapt_connection._handle_exception(error)
184
185 def executemany(
186 self,
187 operation: Any,
188 seq_of_parameters: _DBAPIMultiExecuteParams,
189 ) -> Any:
190 try:
191 _cursor: AsyncIODBAPICursor = self.await_(self._connection.cursor()) # type: ignore[arg-type] # noqa: E501
192 self.await_(_cursor.executemany(operation, seq_of_parameters))
193 self.description = None
194 self.lastrowid = _cursor.lastrowid
195 self.rowcount = _cursor.rowcount
196 self.await_(_cursor.close())
197 except Exception as error:
198 self._adapt_connection._handle_exception(error)
199
200 def setinputsizes(self, *inputsizes: Any) -> None:
201 pass
202
203 def __iter__(self) -> Iterator[Any]:
204 while self._rows:
205 yield self._rows.popleft()
206
207 def fetchone(self) -> Optional[Any]:
208 if self._rows:
209 return self._rows.popleft()
210 else:
211 return None
212
213 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
214 if size is None:
215 size = self.arraysize
216
217 rr = self._rows
218 return [rr.popleft() for _ in range(min(size, len(rr)))]
219
220 def fetchall(self) -> Sequence[Any]:
221 retval = list(self._rows)
222 self._rows.clear()
223 return retval
224
225
226class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
227 # TODO: base on connectors/asyncio.py
228 # see #10415
229 __slots__ = "_cursor"
230
231 server_side = True
232
233 def __init__(self, *arg: Any, **kw: Any) -> None:
234 super().__init__(*arg, **kw)
235 self._cursor: Optional[AsyncIODBAPICursor] = None
236
237 def close(self) -> None:
238 if self._cursor is not None:
239 self.await_(self._cursor.close())
240 self._cursor = None
241
242 def fetchone(self) -> Optional[Any]:
243 assert self._cursor is not None
244 return self.await_(self._cursor.fetchone())
245
246 def fetchmany(self, size: Optional[int] = None) -> Sequence[Any]:
247 assert self._cursor is not None
248 if size is None:
249 size = self.arraysize
250 return self.await_(self._cursor.fetchmany(size=size))
251
252 def fetchall(self) -> Sequence[Any]:
253 assert self._cursor is not None
254 return self.await_(self._cursor.fetchall())
255
256
257class AsyncAdapt_aiosqlite_connection(AsyncAdapt_terminate, AdaptedConnection):
258 await_ = staticmethod(await_only)
259 __slots__ = ("dbapi",)
260
261 def __init__(self, dbapi: Any, connection: AsyncIODBAPIConnection) -> None:
262 self.dbapi = dbapi
263 self._connection = connection
264
265 @property
266 def isolation_level(self) -> Optional[str]:
267 return cast(str, self._connection.isolation_level)
268
269 @isolation_level.setter
270 def isolation_level(self, value: Optional[str]) -> None:
271 # aiosqlite's isolation_level setter works outside the Thread
272 # that it's supposed to, necessitating setting check_same_thread=False.
273 # for improved stability, we instead invent our own awaitable version
274 # using aiosqlite's async queue directly.
275
276 def set_iso(
277 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str]
278 ) -> None:
279 connection.isolation_level = value
280
281 function = partial(set_iso, self._connection._conn, value)
282 future = asyncio.get_event_loop().create_future()
283
284 self._connection._tx.put_nowait((future, function))
285
286 try:
287 self.await_(future)
288 except Exception as error:
289 self._handle_exception(error)
290
291 def create_function(self, *args: Any, **kw: Any) -> None:
292 try:
293 self.await_(self._connection.create_function(*args, **kw))
294 except Exception as error:
295 self._handle_exception(error)
296
297 def cursor(self, server_side: bool = False) -> AsyncAdapt_aiosqlite_cursor:
298 if server_side:
299 return AsyncAdapt_aiosqlite_ss_cursor(self)
300 else:
301 return AsyncAdapt_aiosqlite_cursor(self)
302
303 def execute(self, *args: Any, **kw: Any) -> Any:
304 return self.await_(self._connection.execute(*args, **kw))
305
306 def rollback(self) -> None:
307 try:
308 self.await_(self._connection.rollback())
309 except Exception as error:
310 self._handle_exception(error)
311
312 def commit(self) -> None:
313 try:
314 self.await_(self._connection.commit())
315 except Exception as error:
316 self._handle_exception(error)
317
318 def close(self) -> None:
319 try:
320 self.await_(self._connection.close())
321 except ValueError:
322 # this is undocumented for aiosqlite, that ValueError
323 # was raised if .close() was called more than once, which is
324 # both not customary for DBAPI and is also not a DBAPI.Error
325 # exception. This is now fixed in aiosqlite via my PR
326 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
327 # assured this will not become some other kind of exception,
328 # since it doesn't raise anymore.
329
330 pass
331 except Exception as error:
332 self._handle_exception(error)
333
334 def _handle_exception(self, error: Exception) -> NoReturn:
335 if (
336 isinstance(error, ValueError)
337 and error.args[0] == "no active connection"
338 ):
339 raise self.dbapi.sqlite.OperationalError(
340 "no active connection"
341 ) from error
342 else:
343 raise error
344
345 async def _terminate_graceful_close(self) -> None:
346 """Try to close connection gracefully"""
347 await self._connection.close()
348
349 def _terminate_force_close(self) -> None:
350 """Terminate the connection"""
351
352 # this was added in aiosqlite 0.22.1. if stop() is not present,
353 # the dialect should indicate has_terminate=False
354 try:
355 meth = self._connection.stop
356 except AttributeError as ae:
357 raise NotImplementedError(
358 "terminate_force_close() not implemented by this DBAPI shim"
359 ) from ae
360 else:
361 meth()
362
363
364class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
365 __slots__ = ()
366
367 await_ = staticmethod(await_fallback)
368
369
370class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module):
371 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType):
372 self.aiosqlite = aiosqlite
373 self.sqlite = sqlite
374 self.paramstyle = "qmark"
375 self.has_stop = hasattr(aiosqlite.Connection, "stop")
376 self._init_dbapi_attributes()
377
378 def _init_dbapi_attributes(self) -> None:
379 for name in (
380 "DatabaseError",
381 "Error",
382 "IntegrityError",
383 "NotSupportedError",
384 "OperationalError",
385 "ProgrammingError",
386 "sqlite_version",
387 "sqlite_version_info",
388 ):
389 setattr(self, name, getattr(self.aiosqlite, name))
390
391 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
392 setattr(self, name, getattr(self.sqlite, name))
393
394 for name in ("Binary",):
395 setattr(self, name, getattr(self.sqlite, name))
396
397 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection:
398 async_fallback = kw.pop("async_fallback", False)
399
400 creator_fn = kw.pop("async_creator_fn", None)
401 if creator_fn:
402 connection = creator_fn(*arg, **kw)
403 else:
404 connection = self.aiosqlite.connect(*arg, **kw)
405
406 # aiosqlite uses a Thread. you'll thank us later
407 if isinstance(connection, Thread):
408 # Connection itself was a thread in version prior to 0.22
409 connection.daemon = True
410 else:
411 # in 0.22+ instead it contains a thread.
412 connection._thread.daemon = True
413
414 if util.asbool(async_fallback):
415 return AsyncAdaptFallback_aiosqlite_connection(
416 self,
417 await_fallback(connection),
418 )
419 else:
420 return AsyncAdapt_aiosqlite_connection(
421 self,
422 await_only(connection),
423 )
424
425
426class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
427 def create_server_side_cursor(self) -> DBAPICursor:
428 return self._dbapi_connection.cursor(server_side=True)
429
430
431class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
432 driver = "aiosqlite"
433 supports_statement_cache = True
434
435 is_async = True
436 has_terminate = True
437
438 supports_server_side_cursors = True
439
440 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
441
442 def __init__(self, **kwargs: Any):
443 super().__init__(**kwargs)
444 if self.dbapi and not self.dbapi.has_stop:
445 self.has_terminate = False
446
447 @classmethod
448 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi:
449 return AsyncAdapt_aiosqlite_dbapi(
450 __import__("aiosqlite"), __import__("sqlite3")
451 )
452
453 @classmethod
454 def get_pool_class(cls, url: URL) -> type[pool.Pool]:
455 if cls._is_url_file_db(url):
456 return pool.AsyncAdaptedQueuePool
457 else:
458 return pool.StaticPool
459
460 def is_disconnect(
461 self,
462 e: DBAPIModule.Error,
463 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
464 cursor: Optional[DBAPICursor],
465 ) -> bool:
466 self.dbapi = cast("DBAPIModule", self.dbapi)
467 if isinstance(
468 e, self.dbapi.OperationalError
469 ) and "no active connection" in str(e):
470 return True
471
472 return super().is_disconnect(e, connection, cursor)
473
474 def get_driver_connection(
475 self, connection: DBAPIConnection
476 ) -> AsyncIODBAPIConnection:
477 return connection._connection # type: ignore[no-any-return]
478
479 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
480 dbapi_connection.terminate()
481
482
483dialect = SQLiteDialect_aiosqlite