1# dialects/mysql/aiomysql.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS
3# file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8r"""
9.. dialect:: mysql+aiomysql
10 :name: aiomysql
11 :dbapi: aiomysql
12 :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...]
13 :url: https://github.com/aio-libs/aiomysql
14
15The aiomysql dialect is SQLAlchemy's second Python asyncio dialect.
16
17Using a special asyncio mediation layer, the aiomysql dialect is usable
18as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
19extension package.
20
21This dialect should normally be used only with the
22:func:`_asyncio.create_async_engine` engine creation function::
23
24 from sqlalchemy.ext.asyncio import create_async_engine
25
26 engine = create_async_engine(
27 "mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4"
28 )
29
30""" # noqa
31from __future__ import annotations
32
33from types import ModuleType
34from typing import Any
35from typing import Dict
36from typing import Optional
37from typing import Tuple
38from typing import TYPE_CHECKING
39from typing import Union
40
41from .pymysql import MySQLDialect_pymysql
42from ... import pool
43from ... import util
44from ...connectors.asyncio import AsyncAdapt_dbapi_connection
45from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
46from ...connectors.asyncio import AsyncAdapt_dbapi_module
47from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor
48from ...connectors.asyncio import AsyncAdapt_terminate
49from ...util.concurrency import await_fallback
50from ...util.concurrency import await_only
51
52if TYPE_CHECKING:
53
54 from ...connectors.asyncio import AsyncIODBAPIConnection
55 from ...connectors.asyncio import AsyncIODBAPICursor
56 from ...engine.interfaces import ConnectArgsType
57 from ...engine.interfaces import DBAPIConnection
58 from ...engine.interfaces import DBAPICursor
59 from ...engine.interfaces import DBAPIModule
60 from ...engine.interfaces import PoolProxiedConnection
61 from ...engine.url import URL
62
63
64class AsyncAdapt_aiomysql_cursor(AsyncAdapt_dbapi_cursor):
65 __slots__ = ()
66
67 def _make_new_cursor(
68 self, connection: AsyncIODBAPIConnection
69 ) -> AsyncIODBAPICursor:
70 return connection.cursor(self._adapt_connection.dbapi.Cursor)
71
72
73class AsyncAdapt_aiomysql_ss_cursor(
74 AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_aiomysql_cursor
75):
76 __slots__ = ()
77
78 def _make_new_cursor(
79 self, connection: AsyncIODBAPIConnection
80 ) -> AsyncIODBAPICursor:
81 return connection.cursor(
82 self._adapt_connection.dbapi.aiomysql.cursors.SSCursor
83 )
84
85
86class AsyncAdapt_aiomysql_connection(
87 AsyncAdapt_terminate, AsyncAdapt_dbapi_connection
88):
89 __slots__ = ()
90
91 _cursor_cls = AsyncAdapt_aiomysql_cursor
92 _ss_cursor_cls = AsyncAdapt_aiomysql_ss_cursor
93
94 def ping(self, reconnect: bool) -> None:
95 assert not reconnect
96 self.await_(self._connection.ping(reconnect))
97
98 def character_set_name(self) -> Optional[str]:
99 return self._connection.character_set_name() # type: ignore[no-any-return] # noqa: E501
100
101 def autocommit(self, value: Any) -> None:
102 self.await_(self._connection.autocommit(value))
103
104 def get_autocommit(self) -> bool:
105 return self._connection.get_autocommit() # type: ignore
106
107 def close(self) -> None:
108 self.await_(self._connection.ensure_closed())
109
110 async def _terminate_graceful_close(self) -> None:
111 await self._connection.ensure_closed()
112
113 def _terminate_force_close(self) -> None:
114 # it's not awaitable.
115 self._connection.close()
116
117
118class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection):
119 __slots__ = ()
120
121 await_ = staticmethod(await_fallback)
122
123
124class AsyncAdapt_aiomysql_dbapi(AsyncAdapt_dbapi_module):
125 def __init__(self, aiomysql: ModuleType, pymysql: ModuleType):
126 self.aiomysql = aiomysql
127 self.pymysql = pymysql
128 self.paramstyle = "format"
129 self._init_dbapi_attributes()
130 self.Cursor, self.SSCursor = self._init_cursors_subclasses()
131
132 def _init_dbapi_attributes(self) -> None:
133 for name in (
134 "Warning",
135 "Error",
136 "InterfaceError",
137 "DataError",
138 "DatabaseError",
139 "OperationalError",
140 "InterfaceError",
141 "IntegrityError",
142 "ProgrammingError",
143 "InternalError",
144 "NotSupportedError",
145 ):
146 setattr(self, name, getattr(self.aiomysql, name))
147
148 for name in (
149 "NUMBER",
150 "STRING",
151 "DATETIME",
152 "BINARY",
153 "TIMESTAMP",
154 "Binary",
155 ):
156 setattr(self, name, getattr(self.pymysql, name))
157
158 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiomysql_connection:
159 async_fallback = kw.pop("async_fallback", False)
160 creator_fn = kw.pop("async_creator_fn", self.aiomysql.connect)
161
162 if util.asbool(async_fallback):
163 return AsyncAdaptFallback_aiomysql_connection(
164 self,
165 await_fallback(creator_fn(*arg, **kw)),
166 )
167 else:
168 return AsyncAdapt_aiomysql_connection(
169 self,
170 await_only(creator_fn(*arg, **kw)),
171 )
172
173 def _init_cursors_subclasses(
174 self,
175 ) -> Tuple[AsyncIODBAPICursor, AsyncIODBAPICursor]:
176 # suppress unconditional warning emitted by aiomysql
177 class Cursor(self.aiomysql.Cursor): # type: ignore[misc, name-defined]
178 async def _show_warnings(
179 self, conn: AsyncIODBAPIConnection
180 ) -> None:
181 pass
182
183 class SSCursor(self.aiomysql.SSCursor): # type: ignore[misc, name-defined] # noqa: E501
184 async def _show_warnings(
185 self, conn: AsyncIODBAPIConnection
186 ) -> None:
187 pass
188
189 return Cursor, SSCursor # type: ignore[return-value]
190
191
192class MySQLDialect_aiomysql(MySQLDialect_pymysql):
193 driver = "aiomysql"
194 supports_statement_cache = True
195
196 supports_server_side_cursors = True
197 _sscursor = AsyncAdapt_aiomysql_ss_cursor
198
199 is_async = True
200 has_terminate = True
201
202 @classmethod
203 def import_dbapi(cls) -> AsyncAdapt_aiomysql_dbapi:
204 return AsyncAdapt_aiomysql_dbapi(
205 __import__("aiomysql"), __import__("pymysql")
206 )
207
208 @classmethod
209 def get_pool_class(cls, url: URL) -> type:
210 async_fallback = url.query.get("async_fallback", False)
211
212 if util.asbool(async_fallback):
213 return pool.FallbackAsyncAdaptedQueuePool
214 else:
215 return pool.AsyncAdaptedQueuePool
216
217 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
218 dbapi_connection.terminate()
219
220 def create_connect_args(
221 self, url: URL, _translate_args: Optional[Dict[str, Any]] = None
222 ) -> ConnectArgsType:
223 return super().create_connect_args(
224 url, _translate_args=dict(username="user", database="db")
225 )
226
227 def is_disconnect(
228 self,
229 e: DBAPIModule.Error,
230 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
231 cursor: Optional[DBAPICursor],
232 ) -> bool:
233 if super().is_disconnect(e, connection, cursor):
234 return True
235 else:
236 str_e = str(e).lower()
237 return "not connected" in str_e
238
239 def _found_rows_client_flag(self) -> int:
240 from pymysql.constants import CLIENT # type: ignore
241
242 return CLIENT.FOUND_ROWS # type: ignore[no-any-return]
243
244 def get_driver_connection(
245 self, connection: DBAPIConnection
246 ) -> AsyncIODBAPIConnection:
247 return connection._connection # type: ignore[no-any-return]
248
249
250dialect = MySQLDialect_aiomysql