Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/asyncmy.py: 48%
175 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# mysql/asyncmy.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors <see AUTHORS
3# file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7r"""
8.. dialect:: mysql+asyncmy
9 :name: asyncmy
10 :dbapi: asyncmy
11 :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...]
12 :url: https://github.com/long2ice/asyncmy
14.. note:: The asyncmy dialect as of September, 2021 was added to provide
15 MySQL/MariaDB asyncio compatibility given that the :ref:`aiomysql` database
16 driver has become unmaintained, however asyncmy is itself very new.
18Using a special asyncio mediation layer, the asyncmy dialect is usable
19as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
20extension package.
22This dialect should normally be used only with the
23:func:`_asyncio.create_async_engine` engine creation function::
25 from sqlalchemy.ext.asyncio import create_async_engine
26 engine = create_async_engine("mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4")
29""" # noqa
31from .pymysql import MySQLDialect_pymysql
32from ... import pool
33from ... import util
34from ...engine import AdaptedConnection
35from ...util.concurrency import asynccontextmanager
36from ...util.concurrency import asyncio
37from ...util.concurrency import await_fallback
38from ...util.concurrency import await_only
41class AsyncAdapt_asyncmy_cursor:
42 server_side = False
43 __slots__ = (
44 "_adapt_connection",
45 "_connection",
46 "await_",
47 "_cursor",
48 "_rows",
49 )
51 def __init__(self, adapt_connection):
52 self._adapt_connection = adapt_connection
53 self._connection = adapt_connection._connection
54 self.await_ = adapt_connection.await_
56 cursor = self._connection.cursor()
58 self._cursor = self.await_(cursor.__aenter__())
59 self._rows = []
61 @property
62 def description(self):
63 return self._cursor.description
65 @property
66 def rowcount(self):
67 return self._cursor.rowcount
69 @property
70 def arraysize(self):
71 return self._cursor.arraysize
73 @arraysize.setter
74 def arraysize(self, value):
75 self._cursor.arraysize = value
77 @property
78 def lastrowid(self):
79 return self._cursor.lastrowid
81 def close(self):
82 # note we aren't actually closing the cursor here,
83 # we are just letting GC do it. to allow this to be async
84 # we would need the Result to change how it does "Safe close cursor".
85 # MySQL "cursors" don't actually have state to be "closed" besides
86 # exhausting rows, which we already have done for sync cursor.
87 # another option would be to emulate aiosqlite dialect and assign
88 # cursor only if we are doing server side cursor operation.
89 self._rows[:] = []
91 def execute(self, operation, parameters=None):
92 return self.await_(self._execute_async(operation, parameters))
94 def executemany(self, operation, seq_of_parameters):
95 return self.await_(
96 self._executemany_async(operation, seq_of_parameters)
97 )
99 async def _execute_async(self, operation, parameters):
100 async with self._adapt_connection._mutex_and_adapt_errors():
101 if parameters is None:
102 result = await self._cursor.execute(operation)
103 else:
104 result = await self._cursor.execute(operation, parameters)
106 if not self.server_side:
107 # asyncmy has a "fake" async result, so we have to pull it out
108 # of that here since our default result is not async.
109 # we could just as easily grab "_rows" here and be done with it
110 # but this is safer.
111 self._rows = list(await self._cursor.fetchall())
112 return result
114 async def _executemany_async(self, operation, seq_of_parameters):
115 async with self._adapt_connection._mutex_and_adapt_errors():
116 return await self._cursor.executemany(operation, seq_of_parameters)
118 def setinputsizes(self, *inputsizes):
119 pass
121 def __iter__(self):
122 while self._rows:
123 yield self._rows.pop(0)
125 def fetchone(self):
126 if self._rows:
127 return self._rows.pop(0)
128 else:
129 return None
131 def fetchmany(self, size=None):
132 if size is None:
133 size = self.arraysize
135 retval = self._rows[0:size]
136 self._rows[:] = self._rows[size:]
137 return retval
139 def fetchall(self):
140 retval = self._rows[:]
141 self._rows[:] = []
142 return retval
145class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor):
146 __slots__ = ()
147 server_side = True
149 def __init__(self, adapt_connection):
150 self._adapt_connection = adapt_connection
151 self._connection = adapt_connection._connection
152 self.await_ = adapt_connection.await_
154 cursor = self._connection.cursor(
155 adapt_connection.dbapi.asyncmy.cursors.SSCursor
156 )
158 self._cursor = self.await_(cursor.__aenter__())
160 def close(self):
161 if self._cursor is not None:
162 self.await_(self._cursor.close())
163 self._cursor = None
165 def fetchone(self):
166 return self.await_(self._cursor.fetchone())
168 def fetchmany(self, size=None):
169 return self.await_(self._cursor.fetchmany(size=size))
171 def fetchall(self):
172 return self.await_(self._cursor.fetchall())
175class AsyncAdapt_asyncmy_connection(AdaptedConnection):
176 await_ = staticmethod(await_only)
177 __slots__ = ("dbapi", "_connection", "_execute_mutex")
179 def __init__(self, dbapi, connection):
180 self.dbapi = dbapi
181 self._connection = connection
182 self._execute_mutex = asyncio.Lock()
184 @asynccontextmanager
185 async def _mutex_and_adapt_errors(self):
186 async with self._execute_mutex:
187 try:
188 yield
189 except AttributeError:
190 raise self.dbapi.InternalError(
191 "network operation failed due to asyncmy attribute error"
192 )
194 def ping(self, reconnect):
195 assert not reconnect
196 return self.await_(self._do_ping())
198 async def _do_ping(self):
199 async with self._mutex_and_adapt_errors():
200 return await self._connection.ping(False)
202 def character_set_name(self):
203 return self._connection.character_set_name()
205 def autocommit(self, value):
206 self.await_(self._connection.autocommit(value))
208 def cursor(self, server_side=False):
209 if server_side:
210 return AsyncAdapt_asyncmy_ss_cursor(self)
211 else:
212 return AsyncAdapt_asyncmy_cursor(self)
214 def rollback(self):
215 self.await_(self._connection.rollback())
217 def commit(self):
218 self.await_(self._connection.commit())
220 def close(self):
221 # it's not awaitable.
222 self._connection.close()
225class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection):
226 __slots__ = ()
228 await_ = staticmethod(await_fallback)
231def _Binary(x):
232 """Return x as a binary type."""
233 return bytes(x)
236class AsyncAdapt_asyncmy_dbapi:
237 def __init__(self, asyncmy):
238 self.asyncmy = asyncmy
239 self.paramstyle = "format"
240 self._init_dbapi_attributes()
242 def _init_dbapi_attributes(self):
243 for name in (
244 "Warning",
245 "Error",
246 "InterfaceError",
247 "DataError",
248 "DatabaseError",
249 "OperationalError",
250 "InterfaceError",
251 "IntegrityError",
252 "ProgrammingError",
253 "InternalError",
254 "NotSupportedError",
255 ):
256 setattr(self, name, getattr(self.asyncmy.errors, name))
258 STRING = util.symbol("STRING")
259 NUMBER = util.symbol("NUMBER")
260 BINARY = util.symbol("BINARY")
261 DATETIME = util.symbol("DATETIME")
262 TIMESTAMP = util.symbol("TIMESTAMP")
263 Binary = staticmethod(_Binary)
265 def connect(self, *arg, **kw):
266 async_fallback = kw.pop("async_fallback", False)
268 if util.asbool(async_fallback):
269 return AsyncAdaptFallback_asyncmy_connection(
270 self,
271 await_fallback(self.asyncmy.connect(*arg, **kw)),
272 )
273 else:
274 return AsyncAdapt_asyncmy_connection(
275 self,
276 await_only(self.asyncmy.connect(*arg, **kw)),
277 )
280class MySQLDialect_asyncmy(MySQLDialect_pymysql):
281 driver = "asyncmy"
282 supports_statement_cache = True
284 supports_server_side_cursors = True
285 _sscursor = AsyncAdapt_asyncmy_ss_cursor
287 is_async = True
289 @classmethod
290 def dbapi(cls):
291 return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy"))
293 @classmethod
294 def get_pool_class(cls, url):
296 async_fallback = url.query.get("async_fallback", False)
298 if util.asbool(async_fallback):
299 return pool.FallbackAsyncAdaptedQueuePool
300 else:
301 return pool.AsyncAdaptedQueuePool
303 def create_connect_args(self, url):
304 return super(MySQLDialect_asyncmy, self).create_connect_args(
305 url, _translate_args=dict(username="user", database="db")
306 )
308 def is_disconnect(self, e, connection, cursor):
309 if super(MySQLDialect_asyncmy, self).is_disconnect(
310 e, connection, cursor
311 ):
312 return True
313 else:
314 str_e = str(e).lower()
315 return (
316 "not connected" in str_e or "network operation failed" in str_e
317 )
319 def _found_rows_client_flag(self):
320 from asyncmy.constants import CLIENT
322 return CLIENT.FOUND_ROWS
324 def get_driver_connection(self, connection):
325 return connection._connection
328dialect = MySQLDialect_asyncmy