Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

118 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9r""" 

10 

11.. dialect:: sqlite+aiosqlite 

12 :name: aiosqlite 

13 :dbapi: aiosqlite 

14 :connectstring: sqlite+aiosqlite:///file_path 

15 :url: https://pypi.org/project/aiosqlite/ 

16 

17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

18running on top of pysqlite. 

19 

20aiosqlite is a wrapper around pysqlite that uses a background thread for 

21each connection. It does not actually use non-blocking IO, as SQLite 

22databases are not socket-based. However it does provide a working asyncio 

23interface that's useful for testing and prototyping purposes. 

24 

25Using a special asyncio mediation layer, the aiosqlite dialect is usable 

26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

27extension package. 

28 

29This dialect should normally be used only with the 

30:func:`_asyncio.create_async_engine` engine creation function:: 

31 

32 from sqlalchemy.ext.asyncio import create_async_engine 

33 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52A newly revised version of this important section is now available 

53at the top level of the SQLAlchemy SQLite documentation, in the section 

54:ref:`sqlite_transactions`. 

55 

56 

57.. _aiosqlite_pooling: 

58 

59Pooling Behavior 

60---------------- 

61 

62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

63based on the kind of SQLite database that's requested: 

64 

65* When a ``:memory:`` SQLite database is specified, the dialect by default 

66 will use :class:`.StaticPool`. This pool maintains a single 

67 connection, so that all access to the engine 

68 use the same ``:memory:`` database. 

69* When a file-based database is specified, the dialect will use 

70 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

71 

72 .. versionchanged:: 2.0.38 

73 

74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

76 may be used by specifying it via the 

77 :paramref:`_sa.create_engine.poolclass` parameter. 

78 

79""" # noqa 

80from __future__ import annotations 

81 

82import asyncio 

83from functools import partial 

84from types import ModuleType 

85from typing import Any 

86from typing import cast 

87from typing import NoReturn 

88from typing import Optional 

89from typing import TYPE_CHECKING 

90from typing import Union 

91 

92from .base import SQLiteExecutionContext 

93from .pysqlite import SQLiteDialect_pysqlite 

94from ... import pool 

95from ...connectors.asyncio import AsyncAdapt_dbapi_connection 

96from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 

97from ...connectors.asyncio import AsyncAdapt_dbapi_module 

98from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 

99from ...util.concurrency import await_ 

100 

101if TYPE_CHECKING: 

102 from ...connectors.asyncio import AsyncIODBAPIConnection 

103 from ...engine.interfaces import DBAPIConnection 

104 from ...engine.interfaces import DBAPICursor 

105 from ...engine.interfaces import DBAPIModule 

106 from ...engine.url import URL 

107 from ...pool.base import PoolProxiedConnection 

108 

109 

110class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor): 

111 __slots__ = () 

112 

113 

114class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor): 

115 __slots__ = () 

116 

117 

118class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection): 

119 __slots__ = () 

120 

121 _cursor_cls = AsyncAdapt_aiosqlite_cursor 

122 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor 

123 

124 @property 

125 def isolation_level(self) -> Optional[str]: 

126 return cast(str, self._connection.isolation_level) 

127 

128 @isolation_level.setter 

129 def isolation_level(self, value: Optional[str]) -> None: 

130 # aiosqlite's isolation_level setter works outside the Thread 

131 # that it's supposed to, necessitating setting check_same_thread=False. 

132 # for improved stability, we instead invent our own awaitable version 

133 # using aiosqlite's async queue directly. 

134 

135 def set_iso( 

136 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 

137 ) -> None: 

138 connection.isolation_level = value 

139 

140 function = partial(set_iso, self._connection._conn, value) 

141 future = asyncio.get_event_loop().create_future() 

142 

143 self._connection._tx.put_nowait((future, function)) 

144 

145 try: 

146 await_(future) 

147 except Exception as error: 

148 self._handle_exception(error) 

149 

150 def create_function(self, *args: Any, **kw: Any) -> None: 

151 try: 

152 await_(self._connection.create_function(*args, **kw)) 

153 except Exception as error: 

154 self._handle_exception(error) 

155 

156 def rollback(self) -> None: 

157 if self._connection._connection: 

158 super().rollback() 

159 

160 def commit(self) -> None: 

161 if self._connection._connection: 

162 super().commit() 

163 

164 def close(self) -> None: 

165 try: 

166 await_(self._connection.close()) 

167 except ValueError: 

168 # this is undocumented for aiosqlite, that ValueError 

169 # was raised if .close() was called more than once, which is 

170 # both not customary for DBAPI and is also not a DBAPI.Error 

171 # exception. This is now fixed in aiosqlite via my PR 

172 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

173 # assured this will not become some other kind of exception, 

174 # since it doesn't raise anymore. 

175 

176 pass 

177 except Exception as error: 

178 self._handle_exception(error) 

179 

180 def _handle_exception(self, error: Exception) -> NoReturn: 

181 if isinstance(error, ValueError) and error.args[0].lower() in ( 

182 "no active connection", 

183 "connection closed", 

184 ): 

185 raise self.dbapi.sqlite.OperationalError(error.args[0]) from error 

186 else: 

187 super()._handle_exception(error) 

188 

189 

190class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 

191 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 

192 self.aiosqlite = aiosqlite 

193 self.sqlite = sqlite 

194 self.paramstyle = "qmark" 

195 self._init_dbapi_attributes() 

196 

197 def _init_dbapi_attributes(self) -> None: 

198 for name in ( 

199 "DatabaseError", 

200 "Error", 

201 "IntegrityError", 

202 "NotSupportedError", 

203 "OperationalError", 

204 "ProgrammingError", 

205 "sqlite_version", 

206 "sqlite_version_info", 

207 ): 

208 setattr(self, name, getattr(self.aiosqlite, name)) 

209 

210 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

211 setattr(self, name, getattr(self.sqlite, name)) 

212 

213 for name in ("Binary",): 

214 setattr(self, name, getattr(self.sqlite, name)) 

215 

216 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 

217 creator_fn = kw.pop("async_creator_fn", None) 

218 if creator_fn: 

219 connection = creator_fn(*arg, **kw) 

220 else: 

221 connection = self.aiosqlite.connect(*arg, **kw) 

222 # it's a Thread. you'll thank us later 

223 connection.daemon = True 

224 

225 return AsyncAdapt_aiosqlite_connection( 

226 self, 

227 await_(connection), 

228 ) 

229 

230 

231class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

232 def create_server_side_cursor(self) -> DBAPICursor: 

233 return self._dbapi_connection.cursor(server_side=True) 

234 

235 

236class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

237 driver = "aiosqlite" 

238 supports_statement_cache = True 

239 

240 is_async = True 

241 

242 supports_server_side_cursors = True 

243 

244 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

245 

246 @classmethod 

247 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 

248 return AsyncAdapt_aiosqlite_dbapi( 

249 __import__("aiosqlite"), __import__("sqlite3") 

250 ) 

251 

252 @classmethod 

253 def get_pool_class(cls, url: URL) -> type[pool.Pool]: 

254 if cls._is_url_file_db(url): 

255 return pool.AsyncAdaptedQueuePool 

256 else: 

257 return pool.StaticPool 

258 

259 def is_disconnect( 

260 self, 

261 e: DBAPIModule.Error, 

262 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

263 cursor: Optional[DBAPICursor], 

264 ) -> bool: 

265 self.dbapi = cast("DBAPIModule", self.dbapi) 

266 if isinstance(e, self.dbapi.OperationalError): 

267 err_lower = str(e).lower() 

268 if ( 

269 "no active connection" in err_lower 

270 or "connection closed" in err_lower 

271 ): 

272 return True 

273 

274 return super().is_disconnect(e, connection, cursor) 

275 

276 def get_driver_connection( 

277 self, connection: DBAPIConnection 

278 ) -> AsyncIODBAPIConnection: 

279 return connection._connection # type: ignore[no-any-return] 

280 

281 

282dialect = SQLiteDialect_aiosqlite