1# dialects/mysql/asyncmy.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors <see AUTHORS
3# file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7r"""
8.. dialect:: mysql+asyncmy
9 :name: asyncmy
10 :dbapi: asyncmy
11 :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...]
12 :url: https://github.com/long2ice/asyncmy
13
14Using a special asyncio mediation layer, the asyncmy dialect is usable
15as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
16extension package.
17
18This dialect should normally be used only with the
19:func:`_asyncio.create_async_engine` engine creation function::
20
21 from sqlalchemy.ext.asyncio import create_async_engine
22 engine = create_async_engine("mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4")
23
24
25""" # noqa
26
27from .pymysql import MySQLDialect_pymysql
28from ... import pool
29from ... import util
30from ...engine import AdaptedConnection
31from ...util.concurrency import asynccontextmanager
32from ...util.concurrency import asyncio
33from ...util.concurrency import await_fallback
34from ...util.concurrency import await_only
35
36
37class AsyncAdapt_asyncmy_cursor:
38 server_side = False
39 __slots__ = (
40 "_adapt_connection",
41 "_connection",
42 "await_",
43 "_cursor",
44 "_rows",
45 )
46
47 def __init__(self, adapt_connection):
48 self._adapt_connection = adapt_connection
49 self._connection = adapt_connection._connection
50 self.await_ = adapt_connection.await_
51
52 cursor = self._connection.cursor()
53
54 self._cursor = self.await_(cursor.__aenter__())
55 self._rows = []
56
57 @property
58 def description(self):
59 return self._cursor.description
60
61 @property
62 def rowcount(self):
63 return self._cursor.rowcount
64
65 @property
66 def arraysize(self):
67 return self._cursor.arraysize
68
69 @arraysize.setter
70 def arraysize(self, value):
71 self._cursor.arraysize = value
72
73 @property
74 def lastrowid(self):
75 return self._cursor.lastrowid
76
77 def close(self):
78 # note we aren't actually closing the cursor here,
79 # we are just letting GC do it. to allow this to be async
80 # we would need the Result to change how it does "Safe close cursor".
81 # MySQL "cursors" don't actually have state to be "closed" besides
82 # exhausting rows, which we already have done for sync cursor.
83 # another option would be to emulate aiosqlite dialect and assign
84 # cursor only if we are doing server side cursor operation.
85 self._rows[:] = []
86
87 def execute(self, operation, parameters=None):
88 return self.await_(self._execute_async(operation, parameters))
89
90 def executemany(self, operation, seq_of_parameters):
91 return self.await_(
92 self._executemany_async(operation, seq_of_parameters)
93 )
94
95 async def _execute_async(self, operation, parameters):
96 async with self._adapt_connection._mutex_and_adapt_errors():
97 if parameters is None:
98 result = await self._cursor.execute(operation)
99 else:
100 result = await self._cursor.execute(operation, parameters)
101
102 if not self.server_side:
103 # asyncmy has a "fake" async result, so we have to pull it out
104 # of that here since our default result is not async.
105 # we could just as easily grab "_rows" here and be done with it
106 # but this is safer.
107 self._rows = list(await self._cursor.fetchall())
108 return result
109
110 async def _executemany_async(self, operation, seq_of_parameters):
111 async with self._adapt_connection._mutex_and_adapt_errors():
112 return await self._cursor.executemany(operation, seq_of_parameters)
113
114 def setinputsizes(self, *inputsizes):
115 pass
116
117 def __iter__(self):
118 while self._rows:
119 yield self._rows.pop(0)
120
121 def fetchone(self):
122 if self._rows:
123 return self._rows.pop(0)
124 else:
125 return None
126
127 def fetchmany(self, size=None):
128 if size is None:
129 size = self.arraysize
130
131 retval = self._rows[0:size]
132 self._rows[:] = self._rows[size:]
133 return retval
134
135 def fetchall(self):
136 retval = self._rows[:]
137 self._rows[:] = []
138 return retval
139
140
141class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor):
142 __slots__ = ()
143 server_side = True
144
145 def __init__(self, adapt_connection):
146 self._adapt_connection = adapt_connection
147 self._connection = adapt_connection._connection
148 self.await_ = adapt_connection.await_
149
150 cursor = self._connection.cursor(
151 adapt_connection.dbapi.asyncmy.cursors.SSCursor
152 )
153
154 self._cursor = self.await_(cursor.__aenter__())
155
156 def close(self):
157 if self._cursor is not None:
158 self.await_(self._cursor.close())
159 self._cursor = None
160
161 def fetchone(self):
162 return self.await_(self._cursor.fetchone())
163
164 def fetchmany(self, size=None):
165 return self.await_(self._cursor.fetchmany(size=size))
166
167 def fetchall(self):
168 return self.await_(self._cursor.fetchall())
169
170
171class AsyncAdapt_asyncmy_connection(AdaptedConnection):
172 await_ = staticmethod(await_only)
173 __slots__ = ("dbapi", "_connection", "_execute_mutex")
174
175 def __init__(self, dbapi, connection):
176 self.dbapi = dbapi
177 self._connection = connection
178 self._execute_mutex = asyncio.Lock()
179
180 @asynccontextmanager
181 async def _mutex_and_adapt_errors(self):
182 async with self._execute_mutex:
183 try:
184 yield
185 except AttributeError:
186 raise self.dbapi.InternalError(
187 "network operation failed due to asyncmy attribute error"
188 )
189
190 def ping(self, reconnect):
191 assert not reconnect
192 return self.await_(self._do_ping())
193
194 async def _do_ping(self):
195 async with self._mutex_and_adapt_errors():
196 return await self._connection.ping(False)
197
198 def character_set_name(self):
199 return self._connection.character_set_name()
200
201 def autocommit(self, value):
202 self.await_(self._connection.autocommit(value))
203
204 def cursor(self, server_side=False):
205 if server_side:
206 return AsyncAdapt_asyncmy_ss_cursor(self)
207 else:
208 return AsyncAdapt_asyncmy_cursor(self)
209
210 def rollback(self):
211 self.await_(self._connection.rollback())
212
213 def commit(self):
214 self.await_(self._connection.commit())
215
216 def close(self):
217 # it's not awaitable.
218 self._connection.close()
219
220
221class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection):
222 __slots__ = ()
223
224 await_ = staticmethod(await_fallback)
225
226
227def _Binary(x):
228 """Return x as a binary type."""
229 return bytes(x)
230
231
232class AsyncAdapt_asyncmy_dbapi:
233 def __init__(self, asyncmy):
234 self.asyncmy = asyncmy
235 self.paramstyle = "format"
236 self._init_dbapi_attributes()
237
238 def _init_dbapi_attributes(self):
239 for name in (
240 "Warning",
241 "Error",
242 "InterfaceError",
243 "DataError",
244 "DatabaseError",
245 "OperationalError",
246 "InterfaceError",
247 "IntegrityError",
248 "ProgrammingError",
249 "InternalError",
250 "NotSupportedError",
251 ):
252 setattr(self, name, getattr(self.asyncmy.errors, name))
253
254 STRING = util.symbol("STRING")
255 NUMBER = util.symbol("NUMBER")
256 BINARY = util.symbol("BINARY")
257 DATETIME = util.symbol("DATETIME")
258 TIMESTAMP = util.symbol("TIMESTAMP")
259 Binary = staticmethod(_Binary)
260
261 def connect(self, *arg, **kw):
262 async_fallback = kw.pop("async_fallback", False)
263
264 if util.asbool(async_fallback):
265 return AsyncAdaptFallback_asyncmy_connection(
266 self,
267 await_fallback(self.asyncmy.connect(*arg, **kw)),
268 )
269 else:
270 return AsyncAdapt_asyncmy_connection(
271 self,
272 await_only(self.asyncmy.connect(*arg, **kw)),
273 )
274
275
276class MySQLDialect_asyncmy(MySQLDialect_pymysql):
277 driver = "asyncmy"
278 supports_statement_cache = True
279
280 supports_server_side_cursors = True
281 _sscursor = AsyncAdapt_asyncmy_ss_cursor
282
283 is_async = True
284
285 @classmethod
286 def dbapi(cls):
287 return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy"))
288
289 @classmethod
290 def get_pool_class(cls, url):
291
292 async_fallback = url.query.get("async_fallback", False)
293
294 if util.asbool(async_fallback):
295 return pool.FallbackAsyncAdaptedQueuePool
296 else:
297 return pool.AsyncAdaptedQueuePool
298
299 def create_connect_args(self, url):
300 return super(MySQLDialect_asyncmy, self).create_connect_args(
301 url, _translate_args=dict(username="user", database="db")
302 )
303
304 def is_disconnect(self, e, connection, cursor):
305 if super(MySQLDialect_asyncmy, self).is_disconnect(
306 e, connection, cursor
307 ):
308 return True
309 else:
310 str_e = str(e).lower()
311 return (
312 "not connected" in str_e or "network operation failed" in str_e
313 )
314
315 def _found_rows_client_flag(self):
316 from asyncmy.constants import CLIENT
317
318 return CLIENT.FOUND_ROWS
319
320 def get_driver_connection(self, connection):
321 return connection._connection
322
323
324dialect = MySQLDialect_asyncmy