Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

192 statements  

1# dialects/sqlite/aiosqlite.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r""" 

11 

12.. dialect:: sqlite+aiosqlite 

13 :name: aiosqlite 

14 :dbapi: aiosqlite 

15 :connectstring: sqlite+aiosqlite:///file_path 

16 :url: https://pypi.org/project/aiosqlite/ 

17 

18The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

19running on top of pysqlite. 

20 

21aiosqlite is a wrapper around pysqlite that uses a background thread for 

22each connection. It does not actually use non-blocking IO, as SQLite 

23databases are not socket-based. However it does provide a working asyncio 

24interface that's useful for testing and prototyping purposes. 

25 

26Using a special asyncio mediation layer, the aiosqlite dialect is usable 

27as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

28extension package. 

29 

30This dialect should normally be used only with the 

31:func:`_asyncio.create_async_engine` engine creation function:: 

32 

33 from sqlalchemy.ext.asyncio import create_async_engine 

34 

35 engine = create_async_engine("sqlite+aiosqlite:///filename") 

36 

37The URL passes through all arguments to the ``pysqlite`` driver, so all 

38connection arguments are the same as they are for that of :ref:`pysqlite`. 

39 

40.. _aiosqlite_udfs: 

41 

42User-Defined Functions 

43---------------------- 

44 

45aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

46in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

47 

48.. _aiosqlite_serializable: 

49 

50Serializable isolation / Savepoints / Transactional DDL (asyncio version) 

51------------------------------------------------------------------------- 

52 

53A newly revised version of this important section is now available 

54at the top level of the SQLAlchemy SQLite documentation, in the section 

55:ref:`sqlite_transactions`. 

56 

57 

58.. _aiosqlite_pooling: 

59 

60Pooling Behavior 

61---------------- 

62 

63The SQLAlchemy ``aiosqlite`` DBAPI establishes the connection pool differently 

64based on the kind of SQLite database that's requested: 

65 

66* When a ``:memory:`` SQLite database is specified, the dialect by default 

67 will use :class:`.StaticPool`. This pool maintains a single 

68 connection, so that all access to the engine 

69 use the same ``:memory:`` database. 

70* When a file-based database is specified, the dialect will use 

71 :class:`.AsyncAdaptedQueuePool` as the source of connections. 

72 

73 .. versionchanged:: 2.0.38 

74 

75 SQLite file database engines now use :class:`.AsyncAdaptedQueuePool` by default. 

76 Previously, :class:`.NullPool` were used. The :class:`.NullPool` class 

77 may be used by specifying it via the 

78 :paramref:`_sa.create_engine.poolclass` parameter. 

79 

80""" # noqa 

81 

82import asyncio 

83from collections import deque 

84from functools import partial 

85 

86from .base import SQLiteExecutionContext 

87from .pysqlite import SQLiteDialect_pysqlite 

88from ... import pool 

89from ... import util 

90from ...engine import AdaptedConnection 

91from ...util.concurrency import await_fallback 

92from ...util.concurrency import await_only 

93 

94 

95class AsyncAdapt_aiosqlite_cursor: 

96 # TODO: base on connectors/asyncio.py 

97 # see #10415 

98 

99 __slots__ = ( 

100 "_adapt_connection", 

101 "_connection", 

102 "description", 

103 "await_", 

104 "_rows", 

105 "arraysize", 

106 "rowcount", 

107 "lastrowid", 

108 ) 

109 

110 server_side = False 

111 

112 def __init__(self, adapt_connection): 

113 self._adapt_connection = adapt_connection 

114 self._connection = adapt_connection._connection 

115 self.await_ = adapt_connection.await_ 

116 self.arraysize = 1 

117 self.rowcount = -1 

118 self.description = None 

119 self._rows = deque() 

120 

121 def close(self): 

122 self._rows.clear() 

123 

124 def execute(self, operation, parameters=None): 

125 try: 

126 _cursor = self.await_(self._connection.cursor()) 

127 

128 if parameters is None: 

129 self.await_(_cursor.execute(operation)) 

130 else: 

131 self.await_(_cursor.execute(operation, parameters)) 

132 

133 if _cursor.description: 

134 self.description = _cursor.description 

135 self.lastrowid = self.rowcount = -1 

136 

137 if not self.server_side: 

138 self._rows = deque(self.await_(_cursor.fetchall())) 

139 else: 

140 self.description = None 

141 self.lastrowid = _cursor.lastrowid 

142 self.rowcount = _cursor.rowcount 

143 

144 if not self.server_side: 

145 self.await_(_cursor.close()) 

146 else: 

147 self._cursor = _cursor 

148 except Exception as error: 

149 self._adapt_connection._handle_exception(error) 

150 

151 def executemany(self, operation, seq_of_parameters): 

152 try: 

153 _cursor = self.await_(self._connection.cursor()) 

154 self.await_(_cursor.executemany(operation, seq_of_parameters)) 

155 self.description = None 

156 self.lastrowid = _cursor.lastrowid 

157 self.rowcount = _cursor.rowcount 

158 self.await_(_cursor.close()) 

159 except Exception as error: 

160 self._adapt_connection._handle_exception(error) 

161 

162 def setinputsizes(self, *inputsizes): 

163 pass 

164 

165 def __iter__(self): 

166 while self._rows: 

167 yield self._rows.popleft() 

168 

169 def fetchone(self): 

170 if self._rows: 

171 return self._rows.popleft() 

172 else: 

173 return None 

174 

175 def fetchmany(self, size=None): 

176 if size is None: 

177 size = self.arraysize 

178 

179 rr = self._rows 

180 return [rr.popleft() for _ in range(min(size, len(rr)))] 

181 

182 def fetchall(self): 

183 retval = list(self._rows) 

184 self._rows.clear() 

185 return retval 

186 

187 

188class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): 

189 # TODO: base on connectors/asyncio.py 

190 # see #10415 

191 __slots__ = "_cursor" 

192 

193 server_side = True 

194 

195 def __init__(self, *arg, **kw): 

196 super().__init__(*arg, **kw) 

197 self._cursor = None 

198 

199 def close(self): 

200 if self._cursor is not None: 

201 self.await_(self._cursor.close()) 

202 self._cursor = None 

203 

204 def fetchone(self): 

205 return self.await_(self._cursor.fetchone()) 

206 

207 def fetchmany(self, size=None): 

208 if size is None: 

209 size = self.arraysize 

210 return self.await_(self._cursor.fetchmany(size=size)) 

211 

212 def fetchall(self): 

213 return self.await_(self._cursor.fetchall()) 

214 

215 

216class AsyncAdapt_aiosqlite_connection(AdaptedConnection): 

217 await_ = staticmethod(await_only) 

218 __slots__ = ("dbapi",) 

219 

220 def __init__(self, dbapi, connection): 

221 self.dbapi = dbapi 

222 self._connection = connection 

223 

224 @property 

225 def isolation_level(self): 

226 return self._connection.isolation_level 

227 

228 @isolation_level.setter 

229 def isolation_level(self, value): 

230 # aiosqlite's isolation_level setter works outside the Thread 

231 # that it's supposed to, necessitating setting check_same_thread=False. 

232 # for improved stability, we instead invent our own awaitable version 

233 # using aiosqlite's async queue directly. 

234 

235 def set_iso(connection, value): 

236 connection.isolation_level = value 

237 

238 function = partial(set_iso, self._connection._conn, value) 

239 future = asyncio.get_event_loop().create_future() 

240 

241 self._connection._tx.put_nowait((future, function)) 

242 

243 try: 

244 return self.await_(future) 

245 except Exception as error: 

246 self._handle_exception(error) 

247 

248 def create_function(self, *args, **kw): 

249 try: 

250 self.await_(self._connection.create_function(*args, **kw)) 

251 except Exception as error: 

252 self._handle_exception(error) 

253 

254 def cursor(self, server_side=False): 

255 if server_side: 

256 return AsyncAdapt_aiosqlite_ss_cursor(self) 

257 else: 

258 return AsyncAdapt_aiosqlite_cursor(self) 

259 

260 def execute(self, *args, **kw): 

261 return self.await_(self._connection.execute(*args, **kw)) 

262 

263 def rollback(self): 

264 try: 

265 self.await_(self._connection.rollback()) 

266 except Exception as error: 

267 self._handle_exception(error) 

268 

269 def commit(self): 

270 try: 

271 self.await_(self._connection.commit()) 

272 except Exception as error: 

273 self._handle_exception(error) 

274 

275 def close(self): 

276 try: 

277 self.await_(self._connection.close()) 

278 except ValueError: 

279 # this is undocumented for aiosqlite, that ValueError 

280 # was raised if .close() was called more than once, which is 

281 # both not customary for DBAPI and is also not a DBAPI.Error 

282 # exception. This is now fixed in aiosqlite via my PR 

283 # https://github.com/omnilib/aiosqlite/pull/238, so we can be 

284 # assured this will not become some other kind of exception, 

285 # since it doesn't raise anymore. 

286 

287 pass 

288 except Exception as error: 

289 self._handle_exception(error) 

290 

291 def _handle_exception(self, error): 

292 if ( 

293 isinstance(error, ValueError) 

294 and error.args[0] == "no active connection" 

295 ): 

296 raise self.dbapi.sqlite.OperationalError( 

297 "no active connection" 

298 ) from error 

299 else: 

300 raise error 

301 

302 

303class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): 

304 __slots__ = () 

305 

306 await_ = staticmethod(await_fallback) 

307 

308 

309class AsyncAdapt_aiosqlite_dbapi: 

310 def __init__(self, aiosqlite, sqlite): 

311 self.aiosqlite = aiosqlite 

312 self.sqlite = sqlite 

313 self.paramstyle = "qmark" 

314 self._init_dbapi_attributes() 

315 

316 def _init_dbapi_attributes(self): 

317 for name in ( 

318 "DatabaseError", 

319 "Error", 

320 "IntegrityError", 

321 "NotSupportedError", 

322 "OperationalError", 

323 "ProgrammingError", 

324 "sqlite_version", 

325 "sqlite_version_info", 

326 ): 

327 setattr(self, name, getattr(self.aiosqlite, name)) 

328 

329 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

330 setattr(self, name, getattr(self.sqlite, name)) 

331 

332 for name in ("Binary",): 

333 setattr(self, name, getattr(self.sqlite, name)) 

334 

335 def connect(self, *arg, **kw): 

336 async_fallback = kw.pop("async_fallback", False) 

337 

338 creator_fn = kw.pop("async_creator_fn", None) 

339 if creator_fn: 

340 connection = creator_fn(*arg, **kw) 

341 else: 

342 connection = self.aiosqlite.connect(*arg, **kw) 

343 # it's a Thread. you'll thank us later 

344 connection.daemon = True 

345 

346 if util.asbool(async_fallback): 

347 return AsyncAdaptFallback_aiosqlite_connection( 

348 self, 

349 await_fallback(connection), 

350 ) 

351 else: 

352 return AsyncAdapt_aiosqlite_connection( 

353 self, 

354 await_only(connection), 

355 ) 

356 

357 

358class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

359 def create_server_side_cursor(self): 

360 return self._dbapi_connection.cursor(server_side=True) 

361 

362 

363class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

364 driver = "aiosqlite" 

365 supports_statement_cache = True 

366 

367 is_async = True 

368 

369 supports_server_side_cursors = True 

370 

371 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

372 

373 @classmethod 

374 def import_dbapi(cls): 

375 return AsyncAdapt_aiosqlite_dbapi( 

376 __import__("aiosqlite"), __import__("sqlite3") 

377 ) 

378 

379 @classmethod 

380 def get_pool_class(cls, url): 

381 if cls._is_url_file_db(url): 

382 return pool.AsyncAdaptedQueuePool 

383 else: 

384 return pool.StaticPool 

385 

386 def is_disconnect(self, e, connection, cursor): 

387 if isinstance( 

388 e, self.dbapi.OperationalError 

389 ) and "no active connection" in str(e): 

390 return True 

391 

392 return super().is_disconnect(e, connection, cursor) 

393 

394 def get_driver_connection(self, connection): 

395 return connection._connection 

396 

397 

398dialect = SQLiteDialect_aiosqlite