Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/aiomysql.py: 46%
158 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# mysql/aiomysql.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors <see AUTHORS
3# file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7r"""
8.. dialect:: mysql+aiomysql
9 :name: aiomysql
10 :dbapi: aiomysql
11 :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...]
12 :url: https://github.com/aio-libs/aiomysql
14.. warning:: The aiomysql dialect is not currently tested as part of
15 SQLAlchemy’s continuous integration. As of September, 2021 the driver
16 appears to be unmaintained and no longer functions for Python version 3.10,
17 and additionally depends on a significantly outdated version of PyMySQL.
18 Please refer to the :ref:`asyncmy` dialect for current MySQL/MariaDB asyncio
19 functionality.
21The aiomysql dialect is SQLAlchemy's second Python asyncio dialect.
23Using a special asyncio mediation layer, the aiomysql dialect is usable
24as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
25extension package.
27This dialect should normally be used only with the
28:func:`_asyncio.create_async_engine` engine creation function::
30 from sqlalchemy.ext.asyncio import create_async_engine
31 engine = create_async_engine("mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4")
34""" # noqa
36from .pymysql import MySQLDialect_pymysql
37from ... import pool
38from ... import util
39from ...engine import AdaptedConnection
40from ...util.concurrency import asyncio
41from ...util.concurrency import await_fallback
42from ...util.concurrency import await_only
45class AsyncAdapt_aiomysql_cursor:
46 server_side = False
47 __slots__ = (
48 "_adapt_connection",
49 "_connection",
50 "await_",
51 "_cursor",
52 "_rows",
53 )
55 def __init__(self, adapt_connection):
56 self._adapt_connection = adapt_connection
57 self._connection = adapt_connection._connection
58 self.await_ = adapt_connection.await_
60 cursor = self._connection.cursor()
62 # see https://github.com/aio-libs/aiomysql/issues/543
63 self._cursor = self.await_(cursor.__aenter__())
64 self._rows = []
66 @property
67 def description(self):
68 return self._cursor.description
70 @property
71 def rowcount(self):
72 return self._cursor.rowcount
74 @property
75 def arraysize(self):
76 return self._cursor.arraysize
78 @arraysize.setter
79 def arraysize(self, value):
80 self._cursor.arraysize = value
82 @property
83 def lastrowid(self):
84 return self._cursor.lastrowid
86 def close(self):
87 # note we aren't actually closing the cursor here,
88 # we are just letting GC do it. to allow this to be async
89 # we would need the Result to change how it does "Safe close cursor".
90 # MySQL "cursors" don't actually have state to be "closed" besides
91 # exhausting rows, which we already have done for sync cursor.
92 # another option would be to emulate aiosqlite dialect and assign
93 # cursor only if we are doing server side cursor operation.
94 self._rows[:] = []
96 def execute(self, operation, parameters=None):
97 return self.await_(self._execute_async(operation, parameters))
99 def executemany(self, operation, seq_of_parameters):
100 return self.await_(
101 self._executemany_async(operation, seq_of_parameters)
102 )
104 async def _execute_async(self, operation, parameters):
105 async with self._adapt_connection._execute_mutex:
106 if parameters is None:
107 result = await self._cursor.execute(operation)
108 else:
109 result = await self._cursor.execute(operation, parameters)
111 if not self.server_side:
112 # aiomysql has a "fake" async result, so we have to pull it out
113 # of that here since our default result is not async.
114 # we could just as easily grab "_rows" here and be done with it
115 # but this is safer.
116 self._rows = list(await self._cursor.fetchall())
117 return result
119 async def _executemany_async(self, operation, seq_of_parameters):
120 async with self._adapt_connection._execute_mutex:
121 return await self._cursor.executemany(operation, seq_of_parameters)
123 def setinputsizes(self, *inputsizes):
124 pass
126 def __iter__(self):
127 while self._rows:
128 yield self._rows.pop(0)
130 def fetchone(self):
131 if self._rows:
132 return self._rows.pop(0)
133 else:
134 return None
136 def fetchmany(self, size=None):
137 if size is None:
138 size = self.arraysize
140 retval = self._rows[0:size]
141 self._rows[:] = self._rows[size:]
142 return retval
144 def fetchall(self):
145 retval = self._rows[:]
146 self._rows[:] = []
147 return retval
150class AsyncAdapt_aiomysql_ss_cursor(AsyncAdapt_aiomysql_cursor):
151 __slots__ = ()
152 server_side = True
154 def __init__(self, adapt_connection):
155 self._adapt_connection = adapt_connection
156 self._connection = adapt_connection._connection
157 self.await_ = adapt_connection.await_
159 cursor = self._connection.cursor(
160 adapt_connection.dbapi.aiomysql.SSCursor
161 )
163 self._cursor = self.await_(cursor.__aenter__())
165 def close(self):
166 if self._cursor is not None:
167 self.await_(self._cursor.close())
168 self._cursor = None
170 def fetchone(self):
171 return self.await_(self._cursor.fetchone())
173 def fetchmany(self, size=None):
174 return self.await_(self._cursor.fetchmany(size=size))
176 def fetchall(self):
177 return self.await_(self._cursor.fetchall())
180class AsyncAdapt_aiomysql_connection(AdaptedConnection):
181 await_ = staticmethod(await_only)
182 __slots__ = ("dbapi", "_connection", "_execute_mutex")
184 def __init__(self, dbapi, connection):
185 self.dbapi = dbapi
186 self._connection = connection
187 self._execute_mutex = asyncio.Lock()
189 def ping(self, reconnect):
190 return self.await_(self._connection.ping(reconnect))
192 def character_set_name(self):
193 return self._connection.character_set_name()
195 def autocommit(self, value):
196 self.await_(self._connection.autocommit(value))
198 def cursor(self, server_side=False):
199 if server_side:
200 return AsyncAdapt_aiomysql_ss_cursor(self)
201 else:
202 return AsyncAdapt_aiomysql_cursor(self)
204 def rollback(self):
205 self.await_(self._connection.rollback())
207 def commit(self):
208 self.await_(self._connection.commit())
210 def close(self):
211 # it's not awaitable.
212 self._connection.close()
215class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection):
216 __slots__ = ()
218 await_ = staticmethod(await_fallback)
221class AsyncAdapt_aiomysql_dbapi:
222 def __init__(self, aiomysql, pymysql):
223 self.aiomysql = aiomysql
224 self.pymysql = pymysql
225 self.paramstyle = "format"
226 self._init_dbapi_attributes()
228 def _init_dbapi_attributes(self):
229 for name in (
230 "Warning",
231 "Error",
232 "InterfaceError",
233 "DataError",
234 "DatabaseError",
235 "OperationalError",
236 "InterfaceError",
237 "IntegrityError",
238 "ProgrammingError",
239 "InternalError",
240 "NotSupportedError",
241 ):
242 setattr(self, name, getattr(self.aiomysql, name))
244 for name in (
245 "NUMBER",
246 "STRING",
247 "DATETIME",
248 "BINARY",
249 "TIMESTAMP",
250 "Binary",
251 ):
252 setattr(self, name, getattr(self.pymysql, name))
254 def connect(self, *arg, **kw):
255 async_fallback = kw.pop("async_fallback", False)
257 if util.asbool(async_fallback):
258 return AsyncAdaptFallback_aiomysql_connection(
259 self,
260 await_fallback(self.aiomysql.connect(*arg, **kw)),
261 )
262 else:
263 return AsyncAdapt_aiomysql_connection(
264 self,
265 await_only(self.aiomysql.connect(*arg, **kw)),
266 )
269class MySQLDialect_aiomysql(MySQLDialect_pymysql):
270 driver = "aiomysql"
271 supports_statement_cache = True
273 supports_server_side_cursors = True
274 _sscursor = AsyncAdapt_aiomysql_ss_cursor
276 is_async = True
278 @classmethod
279 def dbapi(cls):
280 return AsyncAdapt_aiomysql_dbapi(
281 __import__("aiomysql"), __import__("pymysql")
282 )
284 @classmethod
285 def get_pool_class(cls, url):
287 async_fallback = url.query.get("async_fallback", False)
289 if util.asbool(async_fallback):
290 return pool.FallbackAsyncAdaptedQueuePool
291 else:
292 return pool.AsyncAdaptedQueuePool
294 def create_connect_args(self, url):
295 return super(MySQLDialect_aiomysql, self).create_connect_args(
296 url, _translate_args=dict(username="user", database="db")
297 )
299 def is_disconnect(self, e, connection, cursor):
300 if super(MySQLDialect_aiomysql, self).is_disconnect(
301 e, connection, cursor
302 ):
303 return True
304 else:
305 str_e = str(e).lower()
306 return "not connected" in str_e
308 def _found_rows_client_flag(self):
309 from pymysql.constants import CLIENT
311 return CLIENT.FOUND_ROWS
313 def get_driver_connection(self, connection):
314 return connection._connection
317dialect = MySQLDialect_aiomysql