Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

120 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9r""" 

10 

11.. dialect:: sqlite+aiosqlite 

12 :name: aiosqlite 

13 :dbapi: aiosqlite 

14 :connectstring: sqlite+aiosqlite:///file_path 

15 :url: https://pypi.org/project/aiosqlite/ 

16 

17The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

18running on top of pysqlite. 

19 

20aiosqlite is a wrapper around pysqlite that uses a background thread for 

21each connection. It does not actually use non-blocking IO, as SQLite 

22databases are not socket-based. However it does provide a working asyncio 

23interface that's useful for testing and prototyping purposes. 

24 

25Using a special asyncio mediation layer, the aiosqlite dialect is usable 

26as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

27extension package. 

28 

29This dialect should normally be used only with the 

30:func:`_asyncio.create_async_engine` engine creation function:: 

31 

32 from sqlalchemy.ext.asyncio import create_async_engine 

33 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52A newly revised version of this important section is now available 

53at the top level of the SQLAlchemy SQLite documentation, in the section 

54:ref:`sqlite_transactions`. 

55 

56 

57.. _aiosqlite_pooling: 

58 

59Pooling Behavior 

60---------------- 

61 

62The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

63based on the kind of SQLite database that's requested: 

64 

65* When a ``:memory:`` SQLite database is specified, the dialect by default 

66 will use :class:`.StaticPool`. This pool maintains a single 

67 connection, so that all access to the engine 

68 use the same ``:memory:`` database. 

69* When a file-based database is specified, the dialect will use 

70 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

71 

72 .. versionchanged:: 2.0.38 

73 

74 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

75 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

76 may be used by specifying it via the 

77 :paramref:`_sa.create_engine.poolclass` parameter. 

78 

79""" # noqa 

80from __future__ import annotations 

81 

82import asyncio 

83from functools import partial 

84from types import ModuleType 

85from typing import Any 

86from typing import cast 

87from typing import NoReturn 

88from typing import Optional 

89from typing import TYPE_CHECKING 

90from typing import Union 

91 

92from .base import SQLiteExecutionContext 

93from .pysqlite import SQLiteDialect_pysqlite 

94from ... import pool 

95from ...connectors.asyncio import AsyncAdapt_dbapi_connection 

96from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 

97from ...connectors.asyncio import AsyncAdapt_dbapi_module 

98from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 

99from ...util.concurrency import await_ 

100 

101if TYPE_CHECKING: 

102 from ...connectors.asyncio import AsyncIODBAPIConnection 

103 from ...engine.interfaces import DBAPIConnection 

104 from ...engine.interfaces import DBAPICursor 

105 from ...engine.interfaces import DBAPIModule 

106 from ...engine.url import URL 

107 from ...pool.base import PoolProxiedConnection 

108 

109 

110class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor): 

111 __slots__ = () 

112 

113 

114class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor): 

115 __slots__ = () 

116 

117 

118class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection): 

119 __slots__ = () 

120 

121 _cursor_cls = AsyncAdapt_aiosqlite_cursor 

122 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor 

123 

124 @property 

125 def isolation_level(self) -> Optional[str]: 

126 return cast(str, self._connection.isolation_level) 

127 

128 @isolation_level.setter 

129 def isolation_level(self, value: Optional[str]) -> None: 

130 # aiosqlite's isolation_level setter works outside the Thread 

131 # that it's supposed to, necessitating setting check_same_thread=False. 

132 # for improved stability, we instead invent our own awaitable version 

133 # using aiosqlite's async queue directly. 

134 

135 def set_iso( 

136 connection: AsyncAdapt_aiosqlite_connection, value: Optional[str] 

137 ) -> None: 

138 connection.isolation_level = value 

139 

140 function = partial(set_iso, self._connection._conn, value) 

141 future = asyncio.get_event_loop().create_future() 

142 

143 self._connection._tx.put_nowait((future, function)) 

144 

145 try: 

146 await_(future) 

147 except Exception as error: 

148 self._handle_exception(error) 

149 

150 def create_function(self, *args: Any, **kw: Any) -> None: 

151 try: 

152 await_(self._connection.create_function(*args, **kw)) 

153 except Exception as error: 

154 self._handle_exception(error) 

155 

156 def rollback(self) -> None: 

157 if self._connection._connection: 

158 super().rollback() 

159 

160 def commit(self) -> None: 

161 if self._connection._connection: 

162 super().commit() 

163 

164 def close(self) -> None: 

165 try: 

166 await_(self._connection.close()) 

167 except ValueError: 

168 # this is undocumented for aiosqlite, that ValueError 

169 # was raised if .close() was called more than once, which is 

170 # both not customary for DBAPI and is also not a DBAPI.Error 

171 # exception. This is now fixed in aiosqlite via my PR 

172 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

173 # assured this will not become some other kind of exception, 

174 # since it doesn't raise anymore. 

175 

176 pass 

177 except Exception as error: 

178 self._handle_exception(error) 

179 

180 @classmethod 

181 def _handle_exception_no_connection( 

182 cls, dbapi: Any, error: Exception 

183 ) -> NoReturn: 

184 if isinstance(error, ValueError) and error.args[0].lower() in ( 

185 "no active connection", 

186 "connection closed", 

187 ): 

188 raise dbapi.sqlite.OperationalError(error.args[0]) from error 

189 else: 

190 super()._handle_exception_no_connection(dbapi, error) 

191 

192 

193class AsyncAdapt_aiosqlite_dbapi(AsyncAdapt_dbapi_module): 

194 def __init__(self, aiosqlite: ModuleType, sqlite: ModuleType): 

195 super().__init__(aiosqlite, dbapi_module=sqlite) 

196 self.aiosqlite = aiosqlite 

197 self.sqlite = sqlite 

198 self.paramstyle = "qmark" 

199 self._init_dbapi_attributes() 

200 

201 def _init_dbapi_attributes(self) -> None: 

202 for name in ( 

203 "DatabaseError", 

204 "Error", 

205 "IntegrityError", 

206 "NotSupportedError", 

207 "OperationalError", 

208 "ProgrammingError", 

209 "sqlite_version", 

210 "sqlite_version_info", 

211 ): 

212 setattr(self, name, getattr(self.aiosqlite, name)) 

213 

214 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

215 setattr(self, name, getattr(self.sqlite, name)) 

216 

217 for name in ("Binary",): 

218 setattr(self, name, getattr(self.sqlite, name)) 

219 

220 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiosqlite_connection: 

221 creator_fn = kw.pop("async_creator_fn", None) 

222 if creator_fn: 

223 connection = creator_fn(*arg, **kw) 

224 else: 

225 connection = self.aiosqlite.connect(*arg, **kw) 

226 # it's a Thread. you'll thank us later 

227 connection.daemon = True 

228 

229 return AsyncAdapt_aiosqlite_connection( 

230 self, 

231 await_(connection), 

232 ) 

233 

234 

235class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

236 def create_server_side_cursor(self) -> DBAPICursor: 

237 return self._dbapi_connection.cursor(server_side=True) 

238 

239 

240class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

241 driver = "aiosqlite" 

242 supports_statement_cache = True 

243 

244 is_async = True 

245 

246 supports_server_side_cursors = True 

247 

248 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

249 

250 @classmethod 

251 def import_dbapi(cls) -> AsyncAdapt_aiosqlite_dbapi: 

252 return AsyncAdapt_aiosqlite_dbapi( 

253 __import__("aiosqlite"), __import__("sqlite3") 

254 ) 

255 

256 @classmethod 

257 def get_pool_class(cls, url: URL) -> type[pool.Pool]: 

258 if cls._is_url_file_db(url): 

259 return pool.AsyncAdaptedQueuePool 

260 else: 

261 return pool.StaticPool 

262 

263 def is_disconnect( 

264 self, 

265 e: DBAPIModule.Error, 

266 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

267 cursor: Optional[DBAPICursor], 

268 ) -> bool: 

269 self.dbapi = cast("DBAPIModule", self.dbapi) 

270 if isinstance(e, self.dbapi.OperationalError): 

271 err_lower = str(e).lower() 

272 if ( 

273 "no active connection" in err_lower 

274 or "connection closed" in err_lower 

275 ): 

276 return True 

277 

278 return super().is_disconnect(e, connection, cursor) 

279 

280 def get_driver_connection( 

281 self, connection: DBAPIConnection 

282 ) -> AsyncIODBAPIConnection: 

283 return connection._connection # type: ignore[no-any-return] 

284 

285 

286dialect = SQLiteDialect_aiosqlite