1# dialects/sqlite/aiosqlite.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r"""
11
12.. dialect:: sqlite+aiosqlite
13 :name: aiosqlite
14 :dbapi: aiosqlite
15 :connectstring: sqlite+aiosqlite:///file_path
16 :url: https://pypi.org/project/aiosqlite/
17
18The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
19running on top of pysqlite.
20
21aiosqlite is a wrapper around pysqlite that uses a background thread for
22each connection. It does not actually use non-blocking IO, as SQLite
23databases are not socket-based. However it does provide a working asyncio
24interface that's useful for testing and prototyping purposes.
25
26Using a special asyncio mediation layer, the aiosqlite dialect is usable
27as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
28extension package.
29
30This dialect should normally be used only with the
31:func:`_asyncio.create_async_engine` engine creation function::
32
33 from sqlalchemy.ext.asyncio import create_async_engine
34
35 engine = create_async_engine("sqlite+aiosqlite:///filename")
36
37The URL passes through all arguments to the ``pysqlite`` driver, so all
38connection arguments are the same as they are for that of :ref:`pysqlite`.
39
40.. _aiosqlite_udfs:
41
42User-Defined Functions
43----------------------
44
45aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
46in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
47
48.. _aiosqlite_serializable:
49
50Serializable isolation / Savepoints / Transactional DDL (asyncio version)
51-------------------------------------------------------------------------
52
53A newly revised version of this important section is now available
54at the top level of the SQLAlchemy SQLite documentation, in the section
55:ref:`sqlite_transactions`.
56
57
58.. _aiosqlite_pooling:
59
60Pooling Behavior
61----------------
62
63The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently
64based on the kind of SQLite database that's requested:
65
66* When a ``:memory:`` SQLite database is specified, the dialect by default
67 will use :class:`.StaticPool`. This pool maintains a single
68 connection, so that all access to the engine
69 use the same ``:memory:`` database.
70* When a file-based database is specified, the dialect will use
71 :class:`.AsyncAdaptedQueuePool` as the source of connections.
72
73 .. versionchanged:: 2.0.38
74
75 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default.
76 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
77 may be used by specifying it via the
78 :paramref:`_sa.create_engine.poolclass` parameter.
79
80""" # noqa
81
82import asyncio
83from collections import deque
84from functools import partial
85
86from .base import SQLiteExecutionContext
87from .pysqlite import SQLiteDialect_pysqlite
88from ... import pool
89from ... import util
90from ...engine import AdaptedConnection
91from ...util.concurrency import await_fallback
92from ...util.concurrency import await_only
93
94
95class AsyncAdapt_aiosqlite_cursor:
96 # TODO: base on connectors/asyncio.py
97 # see #10415
98
99 __slots__ = (
100 "_adapt_connection",
101 "_connection",
102 "description",
103 "await_",
104 "_rows",
105 "arraysize",
106 "rowcount",
107 "lastrowid",
108 )
109
110 server_side = False
111
112 def __init__(self, adapt_connection):
113 self._adapt_connection = adapt_connection
114 self._connection = adapt_connection._connection
115 self.await_ = adapt_connection.await_
116 self.arraysize = 1
117 self.rowcount = -1
118 self.description = None
119 self._rows = deque()
120
121 def close(self):
122 self._rows.clear()
123
124 def execute(self, operation, parameters=None):
125 try:
126 _cursor = self.await_(self._connection.cursor())
127
128 if parameters is None:
129 self.await_(_cursor.execute(operation))
130 else:
131 self.await_(_cursor.execute(operation, parameters))
132
133 if _cursor.description:
134 self.description = _cursor.description
135 self.lastrowid = self.rowcount = -1
136
137 if not self.server_side:
138 self._rows = deque(self.await_(_cursor.fetchall()))
139 else:
140 self.description = None
141 self.lastrowid = _cursor.lastrowid
142 self.rowcount = _cursor.rowcount
143
144 if not self.server_side:
145 self.await_(_cursor.close())
146 else:
147 self._cursor = _cursor
148 except Exception as error:
149 self._adapt_connection._handle_exception(error)
150
151 def executemany(self, operation, seq_of_parameters):
152 try:
153 _cursor = self.await_(self._connection.cursor())
154 self.await_(_cursor.executemany(operation, seq_of_parameters))
155 self.description = None
156 self.lastrowid = _cursor.lastrowid
157 self.rowcount = _cursor.rowcount
158 self.await_(_cursor.close())
159 except Exception as error:
160 self._adapt_connection._handle_exception(error)
161
162 def setinputsizes(self, *inputsizes):
163 pass
164
165 def __iter__(self):
166 while self._rows:
167 yield self._rows.popleft()
168
169 def fetchone(self):
170 if self._rows:
171 return self._rows.popleft()
172 else:
173 return None
174
175 def fetchmany(self, size=None):
176 if size is None:
177 size = self.arraysize
178
179 rr = self._rows
180 return [rr.popleft() for _ in range(min(size, len(rr)))]
181
182 def fetchall(self):
183 retval = list(self._rows)
184 self._rows.clear()
185 return retval
186
187
188class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
189 # TODO: base on connectors/asyncio.py
190 # see #10415
191 __slots__ = "_cursor"
192
193 server_side = True
194
195 def __init__(self, *arg, **kw):
196 super().__init__(*arg, **kw)
197 self._cursor = None
198
199 def close(self):
200 if self._cursor is not None:
201 self.await_(self._cursor.close())
202 self._cursor = None
203
204 def fetchone(self):
205 return self.await_(self._cursor.fetchone())
206
207 def fetchmany(self, size=None):
208 if size is None:
209 size = self.arraysize
210 return self.await_(self._cursor.fetchmany(size=size))
211
212 def fetchall(self):
213 return self.await_(self._cursor.fetchall())
214
215
216class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
217 await_ = staticmethod(await_only)
218 __slots__ = ("dbapi",)
219
220 def __init__(self, dbapi, connection):
221 self.dbapi = dbapi
222 self._connection = connection
223
224 @property
225 def isolation_level(self):
226 return self._connection.isolation_level
227
228 @isolation_level.setter
229 def isolation_level(self, value):
230 # aiosqlite's isolation_level setter works outside the Thread
231 # that it's supposed to, necessitating setting check_same_thread=False.
232 # for improved stability, we instead invent our own awaitable version
233 # using aiosqlite's async queue directly.
234
235 def set_iso(connection, value):
236 connection.isolation_level = value
237
238 function = partial(set_iso, self._connection._conn, value)
239 future = asyncio.get_event_loop().create_future()
240
241 self._connection._tx.put_nowait((future, function))
242
243 try:
244 return self.await_(future)
245 except Exception as error:
246 self._handle_exception(error)
247
248 def create_function(self, *args, **kw):
249 try:
250 self.await_(self._connection.create_function(*args, **kw))
251 except Exception as error:
252 self._handle_exception(error)
253
254 def cursor(self, server_side=False):
255 if server_side:
256 return AsyncAdapt_aiosqlite_ss_cursor(self)
257 else:
258 return AsyncAdapt_aiosqlite_cursor(self)
259
260 def execute(self, *args, **kw):
261 return self.await_(self._connection.execute(*args, **kw))
262
263 def rollback(self):
264 try:
265 self.await_(self._connection.rollback())
266 except Exception as error:
267 self._handle_exception(error)
268
269 def commit(self):
270 try:
271 self.await_(self._connection.commit())
272 except Exception as error:
273 self._handle_exception(error)
274
275 def close(self):
276 try:
277 self.await_(self._connection.close())
278 except ValueError:
279 # this is undocumented for aiosqlite, that ValueError
280 # was raised if .close() was called more than once, which is
281 # both not customary for DBAPI and is also not a DBAPI.Error
282 # exception. This is now fixed in aiosqlite via my PR
283 # https://github.com/omnilib/aiosqlite/pull/238, so we can be
284 # assured this will not become some other kind of exception,
285 # since it doesn't raise anymore.
286
287 pass
288 except Exception as error:
289 self._handle_exception(error)
290
291 def _handle_exception(self, error):
292 if (
293 isinstance(error, ValueError)
294 and error.args[0] == "no active connection"
295 ):
296 raise self.dbapi.sqlite.OperationalError(
297 "no active connection"
298 ) from error
299 else:
300 raise error
301
302
303class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
304 __slots__ = ()
305
306 await_ = staticmethod(await_fallback)
307
308
309class AsyncAdapt_aiosqlite_dbapi:
310 def __init__(self, aiosqlite, sqlite):
311 self.aiosqlite = aiosqlite
312 self.sqlite = sqlite
313 self.paramstyle = "qmark"
314 self._init_dbapi_attributes()
315
316 def _init_dbapi_attributes(self):
317 for name in (
318 "DatabaseError",
319 "Error",
320 "IntegrityError",
321 "NotSupportedError",
322 "OperationalError",
323 "ProgrammingError",
324 "sqlite_version",
325 "sqlite_version_info",
326 ):
327 setattr(self, name, getattr(self.aiosqlite, name))
328
329 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
330 setattr(self, name, getattr(self.sqlite, name))
331
332 for name in ("Binary",):
333 setattr(self, name, getattr(self.sqlite, name))
334
335 def connect(self, *arg, **kw):
336 async_fallback = kw.pop("async_fallback", False)
337
338 creator_fn = kw.pop("async_creator_fn", None)
339 if creator_fn:
340 connection = creator_fn(*arg, **kw)
341 else:
342 connection = self.aiosqlite.connect(*arg, **kw)
343 # it's a Thread. you'll thank us later
344 connection.daemon = True
345
346 if util.asbool(async_fallback):
347 return AsyncAdaptFallback_aiosqlite_connection(
348 self,
349 await_fallback(connection),
350 )
351 else:
352 return AsyncAdapt_aiosqlite_connection(
353 self,
354 await_only(connection),
355 )
356
357
358class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
359 def create_server_side_cursor(self):
360 return self._dbapi_connection.cursor(server_side=True)
361
362
363class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
364 driver = "aiosqlite"
365 supports_statement_cache = True
366
367 is_async = True
368
369 supports_server_side_cursors = True
370
371 execution_ctx_cls = SQLiteExecutionContext_aiosqlite
372
373 @classmethod
374 def import_dbapi(cls):
375 return AsyncAdapt_aiosqlite_dbapi(
376 __import__("aiosqlite"), __import__("sqlite3")
377 )
378
379 @classmethod
380 def get_pool_class(cls, url):
381 if cls._is_url_file_db(url):
382 return pool.AsyncAdaptedQueuePool
383 else:
384 return pool.StaticPool
385
386 def is_disconnect(self, e, connection, cursor):
387 if isinstance(
388 e, self.dbapi.OperationalError
389 ) and "no active connection" in str(e):
390 return True
391
392 return super().is_disconnect(e, connection, cursor)
393
394 def get_driver_connection(self, connection):
395 return connection._connection
396
397
398dialect = SQLiteDialect_aiosqlite