Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py: 36%

180 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# sqlite/aiosqlite.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8r""" 

9 

10.. dialect:: sqlite+aiosqlite 

11 :name: aiosqlite 

12 :dbapi: aiosqlite 

13 :connectstring: sqlite+aiosqlite:///file_path 

14 :url: https://pypi.org/project/aiosqlite/ 

15 

16The aiosqlite dialect provides support for the SQLAlchemy asyncio interface 

17running on top of pysqlite. 

18 

19aiosqlite is a wrapper around pysqlite that uses a background thread for 

20each connection. It does not actually use non-blocking IO, as SQLite 

21databases are not socket-based. However it does provide a working asyncio 

22interface that's useful for testing and prototyping purposes. 

23 

24Using a special asyncio mediation layer, the aiosqlite dialect is usable 

25as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

26extension package. 

27 

28This dialect should normally be used only with the 

29:func:`_asyncio.create_async_engine` engine creation function:: 

30 

31 from sqlalchemy.ext.asyncio import create_async_engine 

32 engine = create_async_engine("sqlite+aiosqlite:///filename") 

33 

34The URL passes through all arguments to the ``pysqlite`` driver, so all 

35connection arguments are the same as they are for that of :ref:`pysqlite`. 

36 

37.. _aiosqlite_udfs: 

38 

39User-Defined Functions 

40---------------------- 

41 

42aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) 

43in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. 

44 

45 

46""" # noqa 

47 

48from .base import SQLiteExecutionContext 

49from .pysqlite import SQLiteDialect_pysqlite 

50from ... import pool 

51from ... import util 

52from ...engine import AdaptedConnection 

53from ...util.concurrency import await_fallback 

54from ...util.concurrency import await_only 

55 

56 

57class AsyncAdapt_aiosqlite_cursor: 

58 __slots__ = ( 

59 "_adapt_connection", 

60 "_connection", 

61 "description", 

62 "await_", 

63 "_rows", 

64 "arraysize", 

65 "rowcount", 

66 "lastrowid", 

67 ) 

68 

69 server_side = False 

70 

71 def __init__(self, adapt_connection): 

72 self._adapt_connection = adapt_connection 

73 self._connection = adapt_connection._connection 

74 self.await_ = adapt_connection.await_ 

75 self.arraysize = 1 

76 self.rowcount = -1 

77 self.description = None 

78 self._rows = [] 

79 

80 def close(self): 

81 self._rows[:] = [] 

82 

83 def execute(self, operation, parameters=None): 

84 try: 

85 _cursor = self.await_(self._connection.cursor()) 

86 

87 if parameters is None: 

88 self.await_(_cursor.execute(operation)) 

89 else: 

90 self.await_(_cursor.execute(operation, parameters)) 

91 

92 if _cursor.description: 

93 self.description = _cursor.description 

94 self.lastrowid = self.rowcount = -1 

95 

96 if not self.server_side: 

97 self._rows = self.await_(_cursor.fetchall()) 

98 else: 

99 self.description = None 

100 self.lastrowid = _cursor.lastrowid 

101 self.rowcount = _cursor.rowcount 

102 

103 if not self.server_side: 

104 self.await_(_cursor.close()) 

105 else: 

106 self._cursor = _cursor 

107 except Exception as error: 

108 self._adapt_connection._handle_exception(error) 

109 

110 def executemany(self, operation, seq_of_parameters): 

111 try: 

112 _cursor = self.await_(self._connection.cursor()) 

113 self.await_(_cursor.executemany(operation, seq_of_parameters)) 

114 self.description = None 

115 self.lastrowid = _cursor.lastrowid 

116 self.rowcount = _cursor.rowcount 

117 self.await_(_cursor.close()) 

118 except Exception as error: 

119 self._adapt_connection._handle_exception(error) 

120 

121 def setinputsizes(self, *inputsizes): 

122 pass 

123 

124 def __iter__(self): 

125 while self._rows: 

126 yield self._rows.pop(0) 

127 

128 def fetchone(self): 

129 if self._rows: 

130 return self._rows.pop(0) 

131 else: 

132 return None 

133 

134 def fetchmany(self, size=None): 

135 if size is None: 

136 size = self.arraysize 

137 

138 retval = self._rows[0:size] 

139 self._rows[:] = self._rows[size:] 

140 return retval 

141 

142 def fetchall(self): 

143 retval = self._rows[:] 

144 self._rows[:] = [] 

145 return retval 

146 

147 

148class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): 

149 __slots__ = "_cursor" 

150 

151 server_side = True 

152 

153 def __init__(self, *arg, **kw): 

154 super().__init__(*arg, **kw) 

155 self._cursor = None 

156 

157 def close(self): 

158 if self._cursor is not None: 

159 self.await_(self._cursor.close()) 

160 self._cursor = None 

161 

162 def fetchone(self): 

163 return self.await_(self._cursor.fetchone()) 

164 

165 def fetchmany(self, size=None): 

166 if size is None: 

167 size = self.arraysize 

168 return self.await_(self._cursor.fetchmany(size=size)) 

169 

170 def fetchall(self): 

171 return self.await_(self._cursor.fetchall()) 

172 

173 

174class AsyncAdapt_aiosqlite_connection(AdaptedConnection): 

175 await_ = staticmethod(await_only) 

176 __slots__ = ("dbapi", "_connection") 

177 

178 def __init__(self, dbapi, connection): 

179 self.dbapi = dbapi 

180 self._connection = connection 

181 

182 @property 

183 def isolation_level(self): 

184 return self._connection.isolation_level 

185 

186 @isolation_level.setter 

187 def isolation_level(self, value): 

188 try: 

189 self._connection.isolation_level = value 

190 except Exception as error: 

191 self._handle_exception(error) 

192 

193 def create_function(self, *args, **kw): 

194 try: 

195 self.await_(self._connection.create_function(*args, **kw)) 

196 except Exception as error: 

197 self._handle_exception(error) 

198 

199 def cursor(self, server_side=False): 

200 if server_side: 

201 return AsyncAdapt_aiosqlite_ss_cursor(self) 

202 else: 

203 return AsyncAdapt_aiosqlite_cursor(self) 

204 

205 def execute(self, *args, **kw): 

206 return self.await_(self._connection.execute(*args, **kw)) 

207 

208 def rollback(self): 

209 try: 

210 self.await_(self._connection.rollback()) 

211 except Exception as error: 

212 self._handle_exception(error) 

213 

214 def commit(self): 

215 try: 

216 self.await_(self._connection.commit()) 

217 except Exception as error: 

218 self._handle_exception(error) 

219 

220 def close(self): 

221 try: 

222 self.await_(self._connection.close()) 

223 except Exception as error: 

224 self._handle_exception(error) 

225 

226 def _handle_exception(self, error): 

227 if ( 

228 isinstance(error, ValueError) 

229 and error.args[0] == "no active connection" 

230 ): 

231 util.raise_( 

232 self.dbapi.sqlite.OperationalError("no active connection"), 

233 from_=error, 

234 ) 

235 else: 

236 raise error 

237 

238 

239class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): 

240 __slots__ = () 

241 

242 await_ = staticmethod(await_fallback) 

243 

244 

245class AsyncAdapt_aiosqlite_dbapi: 

246 def __init__(self, aiosqlite, sqlite): 

247 self.aiosqlite = aiosqlite 

248 self.sqlite = sqlite 

249 self.paramstyle = "qmark" 

250 self._init_dbapi_attributes() 

251 

252 def _init_dbapi_attributes(self): 

253 for name in ( 

254 "DatabaseError", 

255 "Error", 

256 "IntegrityError", 

257 "NotSupportedError", 

258 "OperationalError", 

259 "ProgrammingError", 

260 "sqlite_version", 

261 "sqlite_version_info", 

262 ): 

263 setattr(self, name, getattr(self.aiosqlite, name)) 

264 

265 for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): 

266 setattr(self, name, getattr(self.sqlite, name)) 

267 

268 for name in ("Binary",): 

269 setattr(self, name, getattr(self.sqlite, name)) 

270 

271 def connect(self, *arg, **kw): 

272 async_fallback = kw.pop("async_fallback", False) 

273 

274 # Q. WHY do we need this? 

275 # A. Because there is no way to set connection.isolation_level 

276 # otherwise 

277 # Q. BUT HOW do you know it is SAFE ????? 

278 # A. The only operation that isn't safe is the isolation level set 

279 # operation which aiosqlite appears to have let slip through even 

280 # though pysqlite appears to do check_same_thread for this. 

281 # All execute operations etc. should be safe because they all 

282 # go through the single executor thread. 

283 

284 kw["check_same_thread"] = False 

285 

286 connection = self.aiosqlite.connect(*arg, **kw) 

287 

288 # it's a Thread. you'll thank us later 

289 connection.daemon = True 

290 

291 if util.asbool(async_fallback): 

292 return AsyncAdaptFallback_aiosqlite_connection( 

293 self, 

294 await_fallback(connection), 

295 ) 

296 else: 

297 return AsyncAdapt_aiosqlite_connection( 

298 self, 

299 await_only(connection), 

300 ) 

301 

302 

303class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): 

304 def create_server_side_cursor(self): 

305 return self._dbapi_connection.cursor(server_side=True) 

306 

307 

308class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): 

309 driver = "aiosqlite" 

310 supports_statement_cache = True 

311 

312 is_async = True 

313 

314 supports_server_side_cursors = True 

315 

316 execution_ctx_cls = SQLiteExecutionContext_aiosqlite 

317 

318 @classmethod 

319 def dbapi(cls): 

320 return AsyncAdapt_aiosqlite_dbapi( 

321 __import__("aiosqlite"), __import__("sqlite3") 

322 ) 

323 

324 @classmethod 

325 def get_pool_class(cls, url): 

326 if cls._is_url_file_db(url): 

327 return pool.NullPool 

328 else: 

329 return pool.StaticPool 

330 

331 def is_disconnect(self, e, connection, cursor): 

332 if isinstance( 

333 e, self.dbapi.OperationalError 

334 ) and "no active connection" in str(e): 

335 return True 

336 

337 return super().is_disconnect(e, connection, cursor) 

338 

339 def get_driver_connection(self, connection): 

340 return connection._connection 

341 

342 

343dialect = SQLiteDialect_aiosqlite