Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/aiomysql.py: 46%

158 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# mysql/aiomysql.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors <see AUTHORS 

3# file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7r""" 

8.. dialect:: mysql+aiomysql 

9 :name: aiomysql 

10 :dbapi: aiomysql 

11 :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...] 

12 :url: https://github.com/aio-libs/aiomysql 

13 

14.. warning:: The aiomysql dialect is not currently tested as part of 

15 SQLAlchemy’s continuous integration. As of September, 2021 the driver 

16 appears to be unmaintained and no longer functions for Python version 3.10, 

17 and additionally depends on a significantly outdated version of PyMySQL. 

18 Please refer to the :ref:`asyncmy` dialect for current MySQL/MariaDB asyncio 

19 functionality. 

20 

21The aiomysql dialect is SQLAlchemy's second Python asyncio dialect. 

22 

23Using a special asyncio mediation layer, the aiomysql dialect is usable 

24as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

25extension package. 

26 

27This dialect should normally be used only with the 

28:func:`_asyncio.create_async_engine` engine creation function:: 

29 

30 from sqlalchemy.ext.asyncio import create_async_engine 

31 engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4") 

32 

33 

34""" # noqa 

35 

36from .pymysql import MySQLDialect_pymysql 

37from ... import pool 

38from ... import util 

39from ...engine import AdaptedConnection 

40from ...util.concurrency import asyncio 

41from ...util.concurrency import await_fallback 

42from ...util.concurrency import await_only 

43 

44 

45class AsyncAdapt_aiomysql_cursor: 

46 server_side = False 

47 __slots__ = ( 

48 "_adapt_connection", 

49 "_connection", 

50 "await_", 

51 "_cursor", 

52 "_rows", 

53 ) 

54 

55 def __init__(self, adapt_connection): 

56 self._adapt_connection = adapt_connection 

57 self._connection = adapt_connection._connection 

58 self.await_ = adapt_connection.await_ 

59 

60 cursor = self._connection.cursor() 

61 

62 # see https://github.com/aio-libs/aiomysql/issues/543 

63 self._cursor = self.await_(cursor.__aenter__()) 

64 self._rows = [] 

65 

66 @property 

67 def description(self): 

68 return self._cursor.description 

69 

70 @property 

71 def rowcount(self): 

72 return self._cursor.rowcount 

73 

74 @property 

75 def arraysize(self): 

76 return self._cursor.arraysize 

77 

78 @arraysize.setter 

79 def arraysize(self, value): 

80 self._cursor.arraysize = value 

81 

82 @property 

83 def lastrowid(self): 

84 return self._cursor.lastrowid 

85 

86 def close(self): 

87 # note we aren't actually closing the cursor here, 

88 # we are just letting GC do it. to allow this to be async 

89 # we would need the Result to change how it does "Safe close cursor". 

90 # MySQL "cursors" don't actually have state to be "closed" besides 

91 # exhausting rows, which we already have done for sync cursor. 

92 # another option would be to emulate aiosqlite dialect and assign 

93 # cursor only if we are doing server side cursor operation. 

94 self._rows[:] = [] 

95 

96 def execute(self, operation, parameters=None): 

97 return self.await_(self._execute_async(operation, parameters)) 

98 

99 def executemany(self, operation, seq_of_parameters): 

100 return self.await_( 

101 self._executemany_async(operation, seq_of_parameters) 

102 ) 

103 

104 async def _execute_async(self, operation, parameters): 

105 async with self._adapt_connection._execute_mutex: 

106 if parameters is None: 

107 result = await self._cursor.execute(operation) 

108 else: 

109 result = await self._cursor.execute(operation, parameters) 

110 

111 if not self.server_side: 

112 # aiomysql has a "fake" async result, so we have to pull it out 

113 # of that here since our default result is not async. 

114 # we could just as easily grab "_rows" here and be done with it 

115 # but this is safer. 

116 self._rows = list(await self._cursor.fetchall()) 

117 return result 

118 

119 async def _executemany_async(self, operation, seq_of_parameters): 

120 async with self._adapt_connection._execute_mutex: 

121 return await self._cursor.executemany(operation, seq_of_parameters) 

122 

123 def setinputsizes(self, *inputsizes): 

124 pass 

125 

126 def __iter__(self): 

127 while self._rows: 

128 yield self._rows.pop(0) 

129 

130 def fetchone(self): 

131 if self._rows: 

132 return self._rows.pop(0) 

133 else: 

134 return None 

135 

136 def fetchmany(self, size=None): 

137 if size is None: 

138 size = self.arraysize 

139 

140 retval = self._rows[0:size] 

141 self._rows[:] = self._rows[size:] 

142 return retval 

143 

144 def fetchall(self): 

145 retval = self._rows[:] 

146 self._rows[:] = [] 

147 return retval 

148 

149 

150class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor): 

151 __slots__ = () 

152 server_side = True 

153 

154 def __init__(self, adapt_connection): 

155 self._adapt_connection = adapt_connection 

156 self._connection = adapt_connection._connection 

157 self.await_ = adapt_connection.await_ 

158 

159 cursor = self._connection.cursor( 

160 adapt_connection.dbapi.aiomysql.SSCursor 

161 ) 

162 

163 self._cursor = self.await_(cursor.__aenter__()) 

164 

165 def close(self): 

166 if self._cursor is not None: 

167 self.await_(self._cursor.close()) 

168 self._cursor = None 

169 

170 def fetchone(self): 

171 return self.await_(self._cursor.fetchone()) 

172 

173 def fetchmany(self, size=None): 

174 return self.await_(self._cursor.fetchmany(size=size)) 

175 

176 def fetchall(self): 

177 return self.await_(self._cursor.fetchall()) 

178 

179 

180class AsyncAdapt_aiomysql_connection(AdaptedConnection): 

181 await_ = staticmethod(await_only) 

182 __slots__ = ("dbapi", "_connection", "_execute_mutex") 

183 

184 def __init__(self, dbapi, connection): 

185 self.dbapi = dbapi 

186 self._connection = connection 

187 self._execute_mutex = asyncio.Lock() 

188 

189 def ping(self, reconnect): 

190 return self.await_(self._connection.ping(reconnect)) 

191 

192 def character_set_name(self): 

193 return self._connection.character_set_name() 

194 

195 def autocommit(self, value): 

196 self.await_(self._connection.autocommit(value)) 

197 

198 def cursor(self, server_side=False): 

199 if server_side: 

200 return AsyncAdapt_aiomysql_ss_cursor(self) 

201 else: 

202 return AsyncAdapt_aiomysql_cursor(self) 

203 

204 def rollback(self): 

205 self.await_(self._connection.rollback()) 

206 

207 def commit(self): 

208 self.await_(self._connection.commit()) 

209 

210 def close(self): 

211 # it's not awaitable. 

212 self._connection.close() 

213 

214 

215class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection): 

216 __slots__ = () 

217 

218 await_ = staticmethod(await_fallback) 

219 

220 

221class AsyncAdapt_aiomysql_dbapi: 

222 def __init__(self, aiomysql, pymysql): 

223 self.aiomysql = aiomysql 

224 self.pymysql = pymysql 

225 self.paramstyle = "format" 

226 self._init_dbapi_attributes() 

227 

228 def _init_dbapi_attributes(self): 

229 for name in ( 

230 "Warning", 

231 "Error", 

232 "InterfaceError", 

233 "DataError", 

234 "DatabaseError", 

235 "OperationalError", 

236 "InterfaceError", 

237 "IntegrityError", 

238 "ProgrammingError", 

239 "InternalError", 

240 "NotSupportedError", 

241 ): 

242 setattr(self, name, getattr(self.aiomysql, name)) 

243 

244 for name in ( 

245 "NUMBER", 

246 "STRING", 

247 "DATETIME", 

248 "BINARY", 

249 "TIMESTAMP", 

250 "Binary", 

251 ): 

252 setattr(self, name, getattr(self.pymysql, name)) 

253 

254 def connect(self, *arg, **kw): 

255 async_fallback = kw.pop("async_fallback", False) 

256 

257 if util.asbool(async_fallback): 

258 return AsyncAdaptFallback_aiomysql_connection( 

259 self, 

260 await_fallback(self.aiomysql.connect(*arg, **kw)), 

261 ) 

262 else: 

263 return AsyncAdapt_aiomysql_connection( 

264 self, 

265 await_only(self.aiomysql.connect(*arg, **kw)), 

266 ) 

267 

268 

269class MySQLDialect_aiomysql(MySQLDialect_pymysql): 

270 driver = "aiomysql" 

271 supports_statement_cache = True 

272 

273 supports_server_side_cursors = True 

274 _sscursor = AsyncAdapt_aiomysql_ss_cursor 

275 

276 is_async = True 

277 

278 @classmethod 

279 def dbapi(cls): 

280 return AsyncAdapt_aiomysql_dbapi( 

281 __import__("aiomysql"), __import__("pymysql") 

282 ) 

283 

284 @classmethod 

285 def get_pool_class(cls, url): 

286 

287 async_fallback = url.query.get("async_fallback", False) 

288 

289 if util.asbool(async_fallback): 

290 return pool.FallbackAsyncAdaptedQueuePool 

291 else: 

292 return pool.AsyncAdaptedQueuePool 

293 

294 def create_connect_args(self, url): 

295 return super(MySQLDialect_aiomysql, self).create_connect_args( 

296 url, _translate_args=dict(username="user", database="db") 

297 ) 

298 

299 def is_disconnect(self, e, connection, cursor): 

300 if super(MySQLDialect_aiomysql, self).is_disconnect( 

301 e, connection, cursor 

302 ): 

303 return True 

304 else: 

305 str_e = str(e).lower() 

306 return "not connected" in str_e 

307 

308 def _found_rows_client_flag(self): 

309 from pymysql.constants import CLIENT 

310 

311 return CLIENT.FOUND_ROWS 

312 

313 def get_driver_connection(self, connection): 

314 return connection._connection 

315 

316 

317dialect = MySQLDialect_aiomysql