1# dialects/mysql/asyncmy.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS
3# file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8r"""
9.. dialect:: mysql+asyncmy
10 :name: asyncmy
11 :dbapi: asyncmy
12 :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...]
13 :url: https://github.com/long2ice/asyncmy
14
15Using a special asyncio mediation layer, the asyncmy dialect is usable
16as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
17extension package.
18
19This dialect should normally be used only with the
20:func:`_asyncio.create_async_engine` engine creation function::
21
22 from sqlalchemy.ext.asyncio import create_async_engine
23
24 engine = create_async_engine(
25 "mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4"
26 )
27
28""" # noqa
29from __future__ import annotations
30
31from types import ModuleType
32from typing import Any
33from typing import NoReturn
34from typing import Optional
35from typing import TYPE_CHECKING
36from typing import Union
37
38from .pymysql import MySQLDialect_pymysql
39from ... import pool
40from ... import util
41from ...connectors.asyncio import AsyncAdapt_dbapi_connection
42from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
43from ...connectors.asyncio import AsyncAdapt_dbapi_module
44from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
45from ...connectors.asyncio import AsyncAdapt_terminate
46from ...util.concurrency import await_fallback
47from ...util.concurrency import await_only
48
49if TYPE_CHECKING:
50 from ...connectors.asyncio import AsyncIODBAPIConnection
51 from ...connectors.asyncio import AsyncIODBAPICursor
52 from ...engine.interfaces import ConnectArgsType
53 from ...engine.interfaces import DBAPIConnection
54 from ...engine.interfaces import DBAPICursor
55 from ...engine.interfaces import DBAPIModule
56 from ...engine.interfaces import PoolProxiedConnection
57 from ...engine.url import URL
58
59
60class AsyncAdapt_asyncmy_cursor(AsyncAdapt_dbapi_cursor):
61 __slots__ = ()
62
63
64class AsyncAdapt_asyncmy_ss_cursor(
65 AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_asyncmy_cursor
66):
67 __slots__ = ()
68
69 def _make_new_cursor(
70 self, connection: AsyncIODBAPIConnection
71 ) -> AsyncIODBAPICursor:
72 return connection.cursor(
73 self._adapt_connection.dbapi.asyncmy.cursors.SSCursor
74 )
75
76
77class AsyncAdapt_asyncmy_connection(
78 AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
79):
80 __slots__ = ()
81
82 _cursor_cls = AsyncAdapt_asyncmy_cursor
83 _ss_cursor_cls = AsyncAdapt_asyncmy_ss_cursor
84
85 def _handle_exception(self, error: Exception) -> NoReturn:
86 if isinstance(error, AttributeError):
87 raise self.dbapi.InternalError(
88 "network operation failed due to asyncmy attribute error"
89 )
90
91 raise error
92
93 def ping(self, reconnect: bool) -> None:
94 assert not reconnect
95 return self.await_(self._do_ping())
96
97 async def _do_ping(self) -> None:
98 try:
99 async with self._execute_mutex:
100 await self._connection.ping(False)
101 except Exception as error:
102 self._handle_exception(error)
103
104 def character_set_name(self) -> Optional[str]:
105 return self._connection.character_set_name() # type: ignore[no-any-return] # noqa: E501
106
107 def autocommit(self, value: Any) -> None:
108 self.await_(self._connection.autocommit(value))
109
110 def get_autocommit(self) -> bool:
111 return self._connection.get_autocommit() # type: ignore
112
113 def close(self) -> None:
114 self.await_(self._connection.ensure_closed())
115
116 async def _terminate_graceful_close(self) -> None:
117 await self._connection.ensure_closed()
118
119 def _terminate_force_close(self) -> None:
120 # it's not awaitable.
121 self._connection.close()
122
123
124class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection):
125 __slots__ = ()
126
127 await_ = staticmethod(await_fallback)
128
129
130class AsyncAdapt_asyncmy_dbapi(AsyncAdapt_dbapi_module):
131 def __init__(self, asyncmy: ModuleType):
132 self.asyncmy = asyncmy
133 self.paramstyle = "format"
134 self._init_dbapi_attributes()
135
136 def _init_dbapi_attributes(self) -> None:
137 for name in (
138 "Warning",
139 "Error",
140 "InterfaceError",
141 "DataError",
142 "DatabaseError",
143 "OperationalError",
144 "InterfaceError",
145 "IntegrityError",
146 "ProgrammingError",
147 "InternalError",
148 "NotSupportedError",
149 ):
150 setattr(self, name, getattr(self.asyncmy.errors, name))
151
152 STRING = util.symbol("STRING")
153 NUMBER = util.symbol("NUMBER")
154 BINARY = util.symbol("BINARY")
155 DATETIME = util.symbol("DATETIME")
156 TIMESTAMP = util.symbol("TIMESTAMP")
157 Binary = staticmethod(bytes)
158
159 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_asyncmy_connection:
160 async_fallback = kw.pop("async_fallback", False)
161 creator_fn = kw.pop("async_creator_fn", self.asyncmy.connect)
162
163 if util.asbool(async_fallback):
164 return AsyncAdaptFallback_asyncmy_connection(
165 self,
166 await_fallback(creator_fn(*arg, **kw)),
167 )
168 else:
169 return AsyncAdapt_asyncmy_connection(
170 self,
171 await_only(creator_fn(*arg, **kw)),
172 )
173
174
175class MySQLDialect_asyncmy(MySQLDialect_pymysql):
176 driver = "asyncmy"
177 supports_statement_cache = True
178
179 supports_server_side_cursors = True
180 _sscursor = AsyncAdapt_asyncmy_ss_cursor
181
182 is_async = True
183 has_terminate = True
184
185 @classmethod
186 def import_dbapi(cls) -> DBAPIModule:
187 return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy"))
188
189 @classmethod
190 def get_pool_class(cls, url: URL) -> type:
191 async_fallback = url.query.get("async_fallback", False)
192
193 if util.asbool(async_fallback):
194 return pool.FallbackAsyncAdaptedQueuePool
195 else:
196 return pool.AsyncAdaptedQueuePool
197
198 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
199 dbapi_connection.terminate()
200
201 def create_connect_args(self, url: URL) -> ConnectArgsType: # type: ignore[override] # noqa: E501
202 return super().create_connect_args(
203 url, _translate_args=dict(username="user", database="db")
204 )
205
206 def is_disconnect(
207 self,
208 e: DBAPIModule.Error,
209 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
210 cursor: Optional[DBAPICursor],
211 ) -> bool:
212 if super().is_disconnect(e, connection, cursor):
213 return True
214 else:
215 str_e = str(e).lower()
216 return (
217 "not connected" in str_e or "network operation failed" in str_e
218 )
219
220 def _found_rows_client_flag(self) -> int:
221 from asyncmy.constants import CLIENT # type: ignore
222
223 return CLIENT.FOUND_ROWS # type: ignore[no-any-return]
224
225 def get_driver_connection(
226 self, connection: DBAPIConnection
227 ) -> AsyncIODBAPIConnection:
228 return connection._connection # type: ignore[no-any-return]
229
230
231dialect = MySQLDialect_asyncmy