Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/asyncmy.py: 48%

175 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# mysql/asyncmy.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors <see AUTHORS 

3# file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7r""" 

8.. dialect:: mysql+asyncmy 

9 :name: asyncmy 

10 :dbapi: asyncmy 

11 :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...] 

12 :url: https://github.com/long2ice/asyncmy 

13 

14.. note:: The asyncmy dialect as of September, 2021 was added to provide 

15 MySQL/MariaDB asyncio compatibility given that the :ref:`aiomysql` database 

16 driver has become unmaintained, however asyncmy is itself very new. 

17 

18Using a special asyncio mediation layer, the asyncmy dialect is usable 

19as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

20extension package. 

21 

22This dialect should normally be used only with the 

23:func:`_asyncio.create_async_engine` engine creation function:: 

24 

25 from sqlalchemy.ext.asyncio import create_async_engine 

26 engine = create_async_engine("mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4") 

27 

28 

29""" # noqa 

30 

31from .pymysql import MySQLDialect_pymysql 

32from ... import pool 

33from ... import util 

34from ...engine import AdaptedConnection 

35from ...util.concurrency import asynccontextmanager 

36from ...util.concurrency import asyncio 

37from ...util.concurrency import await_fallback 

38from ...util.concurrency import await_only 

39 

40 

41class AsyncAdapt_asyncmy_cursor: 

42 server_side = False 

43 __slots__ = ( 

44 "_adapt_connection", 

45 "_connection", 

46 "await_", 

47 "_cursor", 

48 "_rows", 

49 ) 

50 

51 def __init__(self, adapt_connection): 

52 self._adapt_connection = adapt_connection 

53 self._connection = adapt_connection._connection 

54 self.await_ = adapt_connection.await_ 

55 

56 cursor = self._connection.cursor() 

57 

58 self._cursor = self.await_(cursor.__aenter__()) 

59 self._rows = [] 

60 

61 @property 

62 def description(self): 

63 return self._cursor.description 

64 

65 @property 

66 def rowcount(self): 

67 return self._cursor.rowcount 

68 

69 @property 

70 def arraysize(self): 

71 return self._cursor.arraysize 

72 

73 @arraysize.setter 

74 def arraysize(self, value): 

75 self._cursor.arraysize = value 

76 

77 @property 

78 def lastrowid(self): 

79 return self._cursor.lastrowid 

80 

81 def close(self): 

82 # note we aren't actually closing the cursor here, 

83 # we are just letting GC do it. to allow this to be async 

84 # we would need the Result to change how it does "Safe close cursor". 

85 # MySQL "cursors" don't actually have state to be "closed" besides 

86 # exhausting rows, which we already have done for sync cursor. 

87 # another option would be to emulate aiosqlite dialect and assign 

88 # cursor only if we are doing server side cursor operation. 

89 self._rows[:] = [] 

90 

91 def execute(self, operation, parameters=None): 

92 return self.await_(self._execute_async(operation, parameters)) 

93 

94 def executemany(self, operation, seq_of_parameters): 

95 return self.await_( 

96 self._executemany_async(operation, seq_of_parameters) 

97 ) 

98 

99 async def _execute_async(self, operation, parameters): 

100 async with self._adapt_connection._mutex_and_adapt_errors(): 

101 if parameters is None: 

102 result = await self._cursor.execute(operation) 

103 else: 

104 result = await self._cursor.execute(operation, parameters) 

105 

106 if not self.server_side: 

107 # asyncmy has a "fake" async result, so we have to pull it out 

108 # of that here since our default result is not async. 

109 # we could just as easily grab "_rows" here and be done with it 

110 # but this is safer. 

111 self._rows = list(await self._cursor.fetchall()) 

112 return result 

113 

114 async def _executemany_async(self, operation, seq_of_parameters): 

115 async with self._adapt_connection._mutex_and_adapt_errors(): 

116 return await self._cursor.executemany(operation, seq_of_parameters) 

117 

118 def setinputsizes(self, *inputsizes): 

119 pass 

120 

121 def __iter__(self): 

122 while self._rows: 

123 yield self._rows.pop(0) 

124 

125 def fetchone(self): 

126 if self._rows: 

127 return self._rows.pop(0) 

128 else: 

129 return None 

130 

131 def fetchmany(self, size=None): 

132 if size is None: 

133 size = self.arraysize 

134 

135 retval = self._rows[0:size] 

136 self._rows[:] = self._rows[size:] 

137 return retval 

138 

139 def fetchall(self): 

140 retval = self._rows[:] 

141 self._rows[:] = [] 

142 return retval 

143 

144 

145class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor): 

146 __slots__ = () 

147 server_side = True 

148 

149 def __init__(self, adapt_connection): 

150 self._adapt_connection = adapt_connection 

151 self._connection = adapt_connection._connection 

152 self.await_ = adapt_connection.await_ 

153 

154 cursor = self._connection.cursor( 

155 adapt_connection.dbapi.asyncmy.cursors.SSCursor 

156 ) 

157 

158 self._cursor = self.await_(cursor.__aenter__()) 

159 

160 def close(self): 

161 if self._cursor is not None: 

162 self.await_(self._cursor.close()) 

163 self._cursor = None 

164 

165 def fetchone(self): 

166 return self.await_(self._cursor.fetchone()) 

167 

168 def fetchmany(self, size=None): 

169 return self.await_(self._cursor.fetchmany(size=size)) 

170 

171 def fetchall(self): 

172 return self.await_(self._cursor.fetchall()) 

173 

174 

175class AsyncAdapt_asyncmy_connection(AdaptedConnection): 

176 await_ = staticmethod(await_only) 

177 __slots__ = ("dbapi", "_connection", "_execute_mutex") 

178 

179 def __init__(self, dbapi, connection): 

180 self.dbapi = dbapi 

181 self._connection = connection 

182 self._execute_mutex = asyncio.Lock() 

183 

184 @asynccontextmanager 

185 async def _mutex_and_adapt_errors(self): 

186 async with self._execute_mutex: 

187 try: 

188 yield 

189 except AttributeError: 

190 raise self.dbapi.InternalError( 

191 "network operation failed due to asyncmy attribute error" 

192 ) 

193 

194 def ping(self, reconnect): 

195 assert not reconnect 

196 return self.await_(self._do_ping()) 

197 

198 async def _do_ping(self): 

199 async with self._mutex_and_adapt_errors(): 

200 return await self._connection.ping(False) 

201 

202 def character_set_name(self): 

203 return self._connection.character_set_name() 

204 

205 def autocommit(self, value): 

206 self.await_(self._connection.autocommit(value)) 

207 

208 def cursor(self, server_side=False): 

209 if server_side: 

210 return AsyncAdapt_asyncmy_ss_cursor(self) 

211 else: 

212 return AsyncAdapt_asyncmy_cursor(self) 

213 

214 def rollback(self): 

215 self.await_(self._connection.rollback()) 

216 

217 def commit(self): 

218 self.await_(self._connection.commit()) 

219 

220 def close(self): 

221 # it's not awaitable. 

222 self._connection.close() 

223 

224 

225class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection): 

226 __slots__ = () 

227 

228 await_ = staticmethod(await_fallback) 

229 

230 

231def _Binary(x): 

232 """Return x as a binary type.""" 

233 return bytes(x) 

234 

235 

236class AsyncAdapt_asyncmy_dbapi: 

237 def __init__(self, asyncmy): 

238 self.asyncmy = asyncmy 

239 self.paramstyle = "format" 

240 self._init_dbapi_attributes() 

241 

242 def _init_dbapi_attributes(self): 

243 for name in ( 

244 "Warning", 

245 "Error", 

246 "InterfaceError", 

247 "DataError", 

248 "DatabaseError", 

249 "OperationalError", 

250 "InterfaceError", 

251 "IntegrityError", 

252 "ProgrammingError", 

253 "InternalError", 

254 "NotSupportedError", 

255 ): 

256 setattr(self, name, getattr(self.asyncmy.errors, name)) 

257 

258 STRING = util.symbol("STRING") 

259 NUMBER = util.symbol("NUMBER") 

260 BINARY = util.symbol("BINARY") 

261 DATETIME = util.symbol("DATETIME") 

262 TIMESTAMP = util.symbol("TIMESTAMP") 

263 Binary = staticmethod(_Binary) 

264 

265 def connect(self, *arg, **kw): 

266 async_fallback = kw.pop("async_fallback", False) 

267 

268 if util.asbool(async_fallback): 

269 return AsyncAdaptFallback_asyncmy_connection( 

270 self, 

271 await_fallback(self.asyncmy.connect(*arg, **kw)), 

272 ) 

273 else: 

274 return AsyncAdapt_asyncmy_connection( 

275 self, 

276 await_only(self.asyncmy.connect(*arg, **kw)), 

277 ) 

278 

279 

280class MySQLDialect_asyncmy(MySQLDialect_pymysql): 

281 driver = "asyncmy" 

282 supports_statement_cache = True 

283 

284 supports_server_side_cursors = True 

285 _sscursor = AsyncAdapt_asyncmy_ss_cursor 

286 

287 is_async = True 

288 

289 @classmethod 

290 def dbapi(cls): 

291 return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy")) 

292 

293 @classmethod 

294 def get_pool_class(cls, url): 

295 

296 async_fallback = url.query.get("async_fallback", False) 

297 

298 if util.asbool(async_fallback): 

299 return pool.FallbackAsyncAdaptedQueuePool 

300 else: 

301 return pool.AsyncAdaptedQueuePool 

302 

303 def create_connect_args(self, url): 

304 return super(MySQLDialect_asyncmy, self).create_connect_args( 

305 url, _translate_args=dict(username="user", database="db") 

306 ) 

307 

308 def is_disconnect(self, e, connection, cursor): 

309 if super(MySQLDialect_asyncmy, self).is_disconnect( 

310 e, connection, cursor 

311 ): 

312 return True 

313 else: 

314 str_e = str(e).lower() 

315 return ( 

316 "not connected" in str_e or "network operation failed" in str_e 

317 ) 

318 

319 def _found_rows_client_flag(self): 

320 from asyncmy.constants import CLIENT 

321 

322 return CLIENT.FOUND_ROWS 

323 

324 def get_driver_connection(self, connection): 

325 return connection._connection 

326 

327 

328dialect = MySQLDialect_asyncmy