Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r""" 

11 

12.. dialect:: sqlite+aiosqlite 

13 :name: aiosqlite 

14 :dbapi: aiosqlite 

15 :connectstring: sqlite+aiosqlite:///file_path 

16 :url: https://pypi.org/project/aiosqlite/ 

17 

18The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

19running on top of pysqlite. 

20 

21aiosqlite is a wrapper around pysqlite that uses a background thread for 

22each connection. It does not actually use non-blocking IO, as SQLite 

23databases are not socket-based. However it does provide a working asyncio 

24interface that's useful for testing and prototyping purposes. 

25 

26Using a special asyncio mediation layer, the aiosqlite dialect is usable 

27as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

28extension package. 

29 

30This dialect should normally be used only with the 

31:func:`_asyncio.create_async_engine` engine creation function:: 

32 

33 from sqlalchemy.ext.asyncio import create_async_engine 

34 engine = create_async_engine("sqlite+aiosqlite:///filename") 

35 

36The URL passes through all arguments to the ``pysqlite`` driver, so all 

37connection arguments are the same as they are for that of :ref:`pysqlite`. 

38 

39.. _aiosqlite_udfs: 

40 

41User-Defined Functions 

42---------------------- 

43 

44aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

45in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

46 

47.. _aiosqlite_serializable: 

48 

49Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

50------------------------------------------------------------------------- 

51 

52Similarly to pysqlite, aiosqlite does not support SAVEPOINT feature. 

53 

54The solution is similar to :ref:`pysqlite_serializable`. This is achieved by the event listeners in async:: 

55 

56 from sqlalchemy import create_engine, event 

57 from sqlalchemy.ext.asyncio import create_async_engine 

58 

59 engine = create_async_engine("sqlite+aiosqlite:///myfile.db") 

60 

61 @event.listens_for(engine.sync_engine, "connect") 

62 def do_connect(dbapi_connection, connection_record): 

63 # disable aiosqlite's emitting of the BEGIN statement entirely. 

64 # also stops it from emitting COMMIT before any DDL. 

65 dbapi_connection.isolation_level = None 

66 

67 @event.listens_for(engine.sync_engine, "begin") 

68 def do_begin(conn): 

69 # emit our own BEGIN 

70 conn.exec_driver_sql("BEGIN") 

71 

72.. warning:: When using the above recipe, it is advised to not use the 

73 :paramref:`.Connection.execution_options.isolation_level` setting on 

74 :class:`_engine.Connection` and :func:`_sa.create_engine` 

75 with the SQLite driver, 

76 as this function necessarily will also alter the ".isolation_level" setting. 

77 

78""" # noqa 

79 

80import asyncio 

81from functools import partial 

82 

83from .base import SQLiteExecutionContext 

84from .pysqlite import SQLiteDialect_pysqlite 

85from ... import pool 

86from ...connectors.asyncio import AsyncAdapt_dbapi_connection 

87from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 

88from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 

89from ...util.concurrency import await_ 

90 

91 

92class AsyncAdapt_aiosqlite_cursor(AsyncAdapt_dbapi_cursor): 

93 __slots__ = () 

94 

95 

96class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_dbapi_ss_cursor): 

97 __slots__ = () 

98 

99 

100class AsyncAdapt_aiosqlite_connection(AsyncAdapt_dbapi_connection): 

101 __slots__ = () 

102 

103 _cursor_cls = AsyncAdapt_aiosqlite_cursor 

104 _ss_cursor_cls = AsyncAdapt_aiosqlite_ss_cursor 

105 

106 @property 

107 def isolation_level(self): 

108 return self._connection.isolation_level 

109 

110 @isolation_level.setter 

111 def isolation_level(self, value): 

112 # aiosqlite's isolation_level setter works outside the Thread 

113 # that it's supposed to, necessitating setting check_same_thread=False. 

114 # for improved stability, we instead invent our own awaitable version 

115 # using aiosqlite's async queue directly. 

116 

117 def set_iso(connection, value): 

118 connection.isolation_level = value 

119 

120 function = partial(set_iso, self._connection._conn, value) 

121 future = asyncio.get_event_loop().create_future() 

122 

123 self._connection._tx.put_nowait((future, function)) 

124 

125 try: 

126 return await_(future) 

127 except Exception as error: 

128 self._handle_exception(error) 

129 

130 def create_function(self, *args, **kw): 

131 try: 

132 await_(self._connection.create_function(*args, **kw)) 

133 except Exception as error: 

134 self._handle_exception(error) 

135 

136 def rollback(self): 

137 if self._connection._connection: 

138 super().rollback() 

139 

140 def commit(self): 

141 if self._connection._connection: 

142 super().commit() 

143 

144 def close(self): 

145 try: 

146 await_(self._connection.close()) 

147 except ValueError: 

148 # this is undocumented for aiosqlite, that ValueError 

149 # was raised if .close() was called more than once, which is 

150 # both not customary for DBAPI and is also not a DBAPI.Error 

151 # exception. This is now fixed in aiosqlite via my PR 

152 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

153 # assured this will not become some other kind of exception, 

154 # since it doesn't raise anymore. 

155 

156 pass 

157 except Exception as error: 

158 self._handle_exception(error) 

159 

160 def _handle_exception(self, error): 

161 if isinstance(error, ValueError) and error.args[0].lower() in ( 

162 "no active connection", 

163 "connection closed", 

164 ): 

165 raise self.dbapi.sqlite.OperationalError(error.args[0]) from error 

166 else: 

167 super()._handle_exception(error) 

168 

169 

170class AsyncAdapt_aiosqlite_dbapi: 

171 def __init__(self, aiosqlite, sqlite): 

172 self.aiosqlite = aiosqlite 

173 self.sqlite = sqlite 

174 self.paramstyle = "qmark" 

175 self._init_dbapi_attributes() 

176 

177 def _init_dbapi_attributes(self): 

178 for name in ( 

179 "DatabaseError", 

180 "Error", 

181 "IntegrityError", 

182 "NotSupportedError", 

183 "OperationalError", 

184 "ProgrammingError", 

185 "sqlite_version", 

186 "sqlite_version_info", 

187 ): 

188 setattr(self, name, getattr(self.aiosqlite, name)) 

189 

190 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

191 setattr(self, name, getattr(self.sqlite, name)) 

192 

193 for name in ("Binary",): 

194 setattr(self, name, getattr(self.sqlite, name)) 

195 

196 def connect(self, *arg, **kw): 

197 creator_fn = kw.pop("async_creator_fn", None) 

198 if creator_fn: 

199 connection = creator_fn(*arg, **kw) 

200 else: 

201 connection = self.aiosqlite.connect(*arg, **kw) 

202 # it's a Thread. you'll thank us later 

203 connection.daemon = True 

204 

205 return AsyncAdapt_aiosqlite_connection( 

206 self, 

207 await_(connection), 

208 ) 

209 

210 

211class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

212 def create_server_side_cursor(self): 

213 return self._dbapi_connection.cursor(server_side=True) 

214 

215 

216class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

217 driver = "aiosqlite" 

218 supports_statement_cache = True 

219 

220 is_async = True 

221 

222 supports_server_side_cursors = True 

223 

224 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

225 

226 @classmethod 

227 def import_dbapi(cls): 

228 return AsyncAdapt_aiosqlite_dbapi( 

229 __import__("aiosqlite"), __import__("sqlite3") 

230 ) 

231 

232 @classmethod 

233 def get_pool_class(cls, url): 

234 if cls._is_url_file_db(url): 

235 return pool.NullPool 

236 else: 

237 return pool.StaticPool 

238 

239 def is_disconnect(self, e, connection, cursor): 

240 if isinstance(e, self.dbapi.OperationalError): 

241 err_lower = str(e).lower() 

242 if ( 

243 "no active connection" in err_lower 

244 or "connection closed" in err_lower 

245 ): 

246 return True 

247 

248 return super().is_disconnect(e, connection, cursor) 

249 

250 def get_driver_connection(self, connection): 

251 return connection._connection 

252 

253 

254dialect = SQLiteDialect_aiosqlite