Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/asyncmy.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

175 statements  

1# dialects/mysql/asyncmy.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors <see AUTHORS 

3# file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7r""" 

8.. dialect:: mysql+asyncmy 

9 :name: asyncmy 

10 :dbapi: asyncmy 

11 :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...] 

12 :url: https://github.com/long2ice/asyncmy 

13 

14Using a special asyncio mediation layer, the asyncmy dialect is usable 

15as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

16extension package. 

17 

18This dialect should normally be used only with the 

19:func:`_asyncio.create_async_engine` engine creation function:: 

20 

21 from sqlalchemy.ext.asyncio import create_async_engine 

22 engine = create_async_engine("mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4") 

23 

24 

25""" # noqa 

26 

27from .pymysql import MySQLDialect_pymysql 

28from ... import pool 

29from ... import util 

30from ...engine import AdaptedConnection 

31from ...util.concurrency import asynccontextmanager 

32from ...util.concurrency import asyncio 

33from ...util.concurrency import await_fallback 

34from ...util.concurrency import await_only 

35 

36 

37class AsyncAdapt_asyncmy_cursor: 

38 server_side = False 

39 __slots__ = ( 

40 "_adapt_connection", 

41 "_connection", 

42 "await_", 

43 "_cursor", 

44 "_rows", 

45 ) 

46 

47 def __init__(self, adapt_connection): 

48 self._adapt_connection = adapt_connection 

49 self._connection = adapt_connection._connection 

50 self.await_ = adapt_connection.await_ 

51 

52 cursor = self._connection.cursor() 

53 

54 self._cursor = self.await_(cursor.__aenter__()) 

55 self._rows = [] 

56 

57 @property 

58 def description(self): 

59 return self._cursor.description 

60 

61 @property 

62 def rowcount(self): 

63 return self._cursor.rowcount 

64 

65 @property 

66 def arraysize(self): 

67 return self._cursor.arraysize 

68 

69 @arraysize.setter 

70 def arraysize(self, value): 

71 self._cursor.arraysize = value 

72 

73 @property 

74 def lastrowid(self): 

75 return self._cursor.lastrowid 

76 

77 def close(self): 

78 # note we aren't actually closing the cursor here, 

79 # we are just letting GC do it. to allow this to be async 

80 # we would need the Result to change how it does "Safe close cursor". 

81 # MySQL "cursors" don't actually have state to be "closed" besides 

82 # exhausting rows, which we already have done for sync cursor. 

83 # another option would be to emulate aiosqlite dialect and assign 

84 # cursor only if we are doing server side cursor operation. 

85 self._rows[:] = [] 

86 

87 def execute(self, operation, parameters=None): 

88 return self.await_(self._execute_async(operation, parameters)) 

89 

90 def executemany(self, operation, seq_of_parameters): 

91 return self.await_( 

92 self._executemany_async(operation, seq_of_parameters) 

93 ) 

94 

95 async def _execute_async(self, operation, parameters): 

96 async with self._adapt_connection._mutex_and_adapt_errors(): 

97 if parameters is None: 

98 result = await self._cursor.execute(operation) 

99 else: 

100 result = await self._cursor.execute(operation, parameters) 

101 

102 if not self.server_side: 

103 # asyncmy has a "fake" async result, so we have to pull it out 

104 # of that here since our default result is not async. 

105 # we could just as easily grab "_rows" here and be done with it 

106 # but this is safer. 

107 self._rows = list(await self._cursor.fetchall()) 

108 return result 

109 

110 async def _executemany_async(self, operation, seq_of_parameters): 

111 async with self._adapt_connection._mutex_and_adapt_errors(): 

112 return await self._cursor.executemany(operation, seq_of_parameters) 

113 

114 def setinputsizes(self, *inputsizes): 

115 pass 

116 

117 def __iter__(self): 

118 while self._rows: 

119 yield self._rows.pop(0) 

120 

121 def fetchone(self): 

122 if self._rows: 

123 return self._rows.pop(0) 

124 else: 

125 return None 

126 

127 def fetchmany(self, size=None): 

128 if size is None: 

129 size = self.arraysize 

130 

131 retval = self._rows[0:size] 

132 self._rows[:] = self._rows[size:] 

133 return retval 

134 

135 def fetchall(self): 

136 retval = self._rows[:] 

137 self._rows[:] = [] 

138 return retval 

139 

140 

141class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor): 

142 __slots__ = () 

143 server_side = True 

144 

145 def __init__(self, adapt_connection): 

146 self._adapt_connection = adapt_connection 

147 self._connection = adapt_connection._connection 

148 self.await_ = adapt_connection.await_ 

149 

150 cursor = self._connection.cursor( 

151 adapt_connection.dbapi.asyncmy.cursors.SSCursor 

152 ) 

153 

154 self._cursor = self.await_(cursor.__aenter__()) 

155 

156 def close(self): 

157 if self._cursor is not None: 

158 self.await_(self._cursor.close()) 

159 self._cursor = None 

160 

161 def fetchone(self): 

162 return self.await_(self._cursor.fetchone()) 

163 

164 def fetchmany(self, size=None): 

165 return self.await_(self._cursor.fetchmany(size=size)) 

166 

167 def fetchall(self): 

168 return self.await_(self._cursor.fetchall()) 

169 

170 

171class AsyncAdapt_asyncmy_connection(AdaptedConnection): 

172 await_ = staticmethod(await_only) 

173 __slots__ = ("dbapi", "_connection", "_execute_mutex") 

174 

175 def __init__(self, dbapi, connection): 

176 self.dbapi = dbapi 

177 self._connection = connection 

178 self._execute_mutex = asyncio.Lock() 

179 

180 @asynccontextmanager 

181 async def _mutex_and_adapt_errors(self): 

182 async with self._execute_mutex: 

183 try: 

184 yield 

185 except AttributeError: 

186 raise self.dbapi.InternalError( 

187 "network operation failed due to asyncmy attribute error" 

188 ) 

189 

190 def ping(self, reconnect): 

191 assert not reconnect 

192 return self.await_(self._do_ping()) 

193 

194 async def _do_ping(self): 

195 async with self._mutex_and_adapt_errors(): 

196 return await self._connection.ping(False) 

197 

198 def character_set_name(self): 

199 return self._connection.character_set_name() 

200 

201 def autocommit(self, value): 

202 self.await_(self._connection.autocommit(value)) 

203 

204 def cursor(self, server_side=False): 

205 if server_side: 

206 return AsyncAdapt_asyncmy_ss_cursor(self) 

207 else: 

208 return AsyncAdapt_asyncmy_cursor(self) 

209 

210 def rollback(self): 

211 self.await_(self._connection.rollback()) 

212 

213 def commit(self): 

214 self.await_(self._connection.commit()) 

215 

216 def close(self): 

217 # it's not awaitable. 

218 self._connection.close() 

219 

220 

221class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection): 

222 __slots__ = () 

223 

224 await_ = staticmethod(await_fallback) 

225 

226 

227def _Binary(x): 

228 """Return x as a binary type.""" 

229 return bytes(x) 

230 

231 

232class AsyncAdapt_asyncmy_dbapi: 

233 def __init__(self, asyncmy): 

234 self.asyncmy = asyncmy 

235 self.paramstyle = "format" 

236 self._init_dbapi_attributes() 

237 

238 def _init_dbapi_attributes(self): 

239 for name in ( 

240 "Warning", 

241 "Error", 

242 "InterfaceError", 

243 "DataError", 

244 "DatabaseError", 

245 "OperationalError", 

246 "InterfaceError", 

247 "IntegrityError", 

248 "ProgrammingError", 

249 "InternalError", 

250 "NotSupportedError", 

251 ): 

252 setattr(self, name, getattr(self.asyncmy.errors, name)) 

253 

254 STRING = util.symbol("STRING") 

255 NUMBER = util.symbol("NUMBER") 

256 BINARY = util.symbol("BINARY") 

257 DATETIME = util.symbol("DATETIME") 

258 TIMESTAMP = util.symbol("TIMESTAMP") 

259 Binary = staticmethod(_Binary) 

260 

261 def connect(self, *arg, **kw): 

262 async_fallback = kw.pop("async_fallback", False) 

263 

264 if util.asbool(async_fallback): 

265 return AsyncAdaptFallback_asyncmy_connection( 

266 self, 

267 await_fallback(self.asyncmy.connect(*arg, **kw)), 

268 ) 

269 else: 

270 return AsyncAdapt_asyncmy_connection( 

271 self, 

272 await_only(self.asyncmy.connect(*arg, **kw)), 

273 ) 

274 

275 

276class MySQLDialect_asyncmy(MySQLDialect_pymysql): 

277 driver = "asyncmy" 

278 supports_statement_cache = True 

279 

280 supports_server_side_cursors = True 

281 _sscursor = AsyncAdapt_asyncmy_ss_cursor 

282 

283 is_async = True 

284 

285 @classmethod 

286 def dbapi(cls): 

287 return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy")) 

288 

289 @classmethod 

290 def get_pool_class(cls, url): 

291 

292 async_fallback = url.query.get("async_fallback", False) 

293 

294 if util.asbool(async_fallback): 

295 return pool.FallbackAsyncAdaptedQueuePool 

296 else: 

297 return pool.AsyncAdaptedQueuePool 

298 

299 def create_connect_args(self, url): 

300 return super(MySQLDialect_asyncmy, self).create_connect_args( 

301 url, _translate_args=dict(username="user", database="db") 

302 ) 

303 

304 def is_disconnect(self, e, connection, cursor): 

305 if super(MySQLDialect_asyncmy, self).is_disconnect( 

306 e, connection, cursor 

307 ): 

308 return True 

309 else: 

310 str_e = str(e).lower() 

311 return ( 

312 "not connected" in str_e or "network operation failed" in str_e 

313 ) 

314 

315 def _found_rows_client_flag(self): 

316 from asyncmy.constants import CLIENT 

317 

318 return CLIENT.FOUND_ROWS 

319 

320 def get_driver_connection(self, connection): 

321 return connection._connection 

322 

323 

324dialect = MySQLDialect_asyncmy