Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r""" 

11 

12.. dialect:: sqlite+aiosqlite 

13 :name: aiosqlite 

14 :dbapi: aiosqlite 

15 :connectstring: sqlite+aiosqlite:///file_path 

16 :url: https://pypi.org/project/aiosqlite/ 

17 

18The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

19running on top of pysqlite. 

20 

21aiosqlite is a wrapper around pysqlite that uses a background thread for 

22each connection. It does not actually use non-blocking IO, as SQLite 

23databases are not socket-based. However it does provide a working asyncio 

24interface that's useful for testing and prototyping purposes. 

25 

26Using a special asyncio mediation layer, the aiosqlite dialect is usable 

27as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

28extension package. 

29 

30This dialect should normally be used only with the 

31:func:`_asyncio.create_async_engine` engine creation function:: 

32 

33 from sqlalchemy.ext.asyncio import create_async_engine 

34 

35 engine = create_async_engine("sqlite+aiosqlite:///filename") 

36 

37The URL passes through all arguments to the ``pysqlite`` driver, so all 

38connection arguments are the same as they are for that of :ref:`pysqlite`. 

39 

40.. _aiosqlite_udfs: 

41 

42User-Defined Functions 

43---------------------- 

44 

45aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

46in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

47 

48.. _aiosqlite_serializable: 

49 

50Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

51------------------------------------------------------------------------- 

52 

53A newly revised version of this important section is now available 

54at the top level of the SQLAlchemy SQLite documentation, in the section 

55:ref:`sqlite_transactions`. 

56 

57 

58.. _aiosqlite_pooling: 

59 

60Pooling Behavior 

61---------------- 

62 

63The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

64based on the kind of SQLite database that's requested: 

65 

66* When a ``:memory:`` SQLite database is specified, the dialect by default 

67 will use :class:`.StaticPool`. This pool maintains a single 

68 connection, so that all access to the engine 

69 use the same ``:memory:`` database. 

70* When a file-based database is specified, the dialect will use 

71 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

72 

73 .. versionchanged:: 2.0.38 

74 

75 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

76 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

77 may be used by specifying it via the 

78 :paramref:`_sa.create_engine.poolclass` parameter. 

79 

80""" # noqa 

81 

82import asyncio 

83from functools import partial 

84 

85from .base import SQLiteExecutionContext 

86from .pysqlite import SQLiteDialect_pysqlite 

87from ... import pool 

88from ...connectors.asyncio import AsyncAdapt_dbapi_connection 

89from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 

90from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 

91from ...util.concurrency import await_ 

92 

93 

94class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor): 

95 __slots__ = () 

96 

97 

98class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor): 

99 __slots__ = () 

100 

101 

102class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection): 

103 __slots__ = () 

104 

105 _cursor_cls = AsyncAdapt_aiosqlite_cursor 

106 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor 

107 

108 @property 

109 def isolation_level(self): 

110 return self._connection.isolation_level 

111 

112 @isolation_level.setter 

113 def isolation_level(self, value): 

114 # aiosqlite's isolation_level setter works outside the Thread 

115 # that it's supposed to, necessitating setting check_same_thread=False. 

116 # for improved stability, we instead invent our own awaitable version 

117 # using aiosqlite's async queue directly. 

118 

119 def set_iso(connection, value): 

120 connection.isolation_level = value 

121 

122 function = partial(set_iso, self._connection._conn, value) 

123 future = asyncio.get_event_loop().create_future() 

124 

125 self._connection._tx.put_nowait((future, function)) 

126 

127 try: 

128 return await_(future) 

129 except Exception as error: 

130 self._handle_exception(error) 

131 

132 def create_function(self, *args, **kw): 

133 try: 

134 await_(self._connection.create_function(*args, **kw)) 

135 except Exception as error: 

136 self._handle_exception(error) 

137 

138 def rollback(self): 

139 if self._connection._connection: 

140 super().rollback() 

141 

142 def commit(self): 

143 if self._connection._connection: 

144 super().commit() 

145 

146 def close(self): 

147 try: 

148 await_(self._connection.close()) 

149 except ValueError: 

150 # this is undocumented for aiosqlite, that ValueError 

151 # was raised if .close() was called more than once, which is 

152 # both not customary for DBAPI and is also not a DBAPI.Error 

153 # exception. This is now fixed in aiosqlite via my PR 

154 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

155 # assured this will not become some other kind of exception, 

156 # since it doesn't raise anymore. 

157 

158 pass 

159 except Exception as error: 

160 self._handle_exception(error) 

161 

162 def _handle_exception(self, error): 

163 if isinstance(error, ValueError) and error.args[0].lower() in ( 

164 "no active connection", 

165 "connection closed", 

166 ): 

167 raise self.dbapi.sqlite.OperationalError(error.args[0]) from error 

168 else: 

169 super()._handle_exception(error) 

170 

171 

172class AsyncAdapt_aiosqlite_dbapi: 

173 def __init__(self, aiosqlite, sqlite): 

174 self.aiosqlite = aiosqlite 

175 self.sqlite = sqlite 

176 self.paramstyle = "qmark" 

177 self._init_dbapi_attributes() 

178 

179 def _init_dbapi_attributes(self): 

180 for name in ( 

181 "DatabaseError", 

182 "Error", 

183 "IntegrityError", 

184 "NotSupportedError", 

185 "OperationalError", 

186 "ProgrammingError", 

187 "sqlite_version", 

188 "sqlite_version_info", 

189 ): 

190 setattr(self, name, getattr(self.aiosqlite, name)) 

191 

192 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

193 setattr(self, name, getattr(self.sqlite, name)) 

194 

195 for name in ("Binary",): 

196 setattr(self, name, getattr(self.sqlite, name)) 

197 

198 def connect(self, *arg, **kw): 

199 creator_fn = kw.pop("async_creator_fn", None) 

200 if creator_fn: 

201 connection = creator_fn(*arg, **kw) 

202 else: 

203 connection = self.aiosqlite.connect(*arg, **kw) 

204 # it's a Thread. you'll thank us later 

205 connection.daemon = True 

206 

207 return AsyncAdapt_aiosqlite_connection( 

208 self, 

209 await_(connection), 

210 ) 

211 

212 

213class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

214 def create_server_side_cursor(self): 

215 return self._dbapi_connection.cursor(server_side=True) 

216 

217 

218class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

219 driver = "aiosqlite" 

220 supports_statement_cache = True 

221 

222 is_async = True 

223 

224 supports_server_side_cursors = True 

225 

226 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

227 

228 @classmethod 

229 def import_dbapi(cls): 

230 return AsyncAdapt_aiosqlite_dbapi( 

231 __import__("aiosqlite"), __import__("sqlite3") 

232 ) 

233 

234 @classmethod 

235 def get_pool_class(cls, url): 

236 if cls._is_url_file_db(url): 

237 return pool.AsyncAdaptedQueuePool 

238 else: 

239 return pool.StaticPool 

240 

241 def is_disconnect(self, e, connection, cursor): 

242 if isinstance(e, self.dbapi.OperationalError): 

243 err_lower = str(e).lower() 

244 if ( 

245 "no active connection" in err_lower 

246 or "connection closed" in err_lower 

247 ): 

248 return True 

249 

250 return super().is_disconnect(e, connection, cursor) 

251 

252 def get_driver_connection(self, connection): 

253 return connection._connection 

254 

255 

256dialect = SQLiteDialect_aiosqlite