Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/aiomysql.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

116 statements  

1# dialects/mysql/aiomysql.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS 

3# file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8r""" 

9.. dialect:: mysql+aiomysql 

10 :name: aiomysql 

11 :dbapi: aiomysql 

12 :connectstring: mysql+aiomysql://user:password@host:port/dbname[?key=value&key=value...] 

13 :url: https://github.com/aio-libs/aiomysql 

14 

15The aiomysql dialect is SQLAlchemy's second Python asyncio dialect. 

16 

17Using a special asyncio mediation layer, the aiomysql dialect is usable 

18as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

19extension package. 

20 

21This dialect should normally be used only with the 

22:func:`_asyncio.create_async_engine` engine creation function:: 

23 

24 from sqlalchemy.ext.asyncio import create_async_engine 

25 

26 engine = create_async_engine( 

27 "mysql+aiomysql://user:pass@hostname/dbname?charset=utf8mb4" 

28 ) 

29 

30""" # noqa 

31from __future__ import annotations 

32 

33from types import ModuleType 

34from typing import Any 

35from typing import Dict 

36from typing import Optional 

37from typing import Tuple 

38from typing import TYPE_CHECKING 

39from typing import Union 

40 

41from .pymysql import MySQLDialect_pymysql 

42from ... import pool 

43from ... import util 

44from ...connectors.asyncio import AsyncAdapt_dbapi_connection 

45from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 

46from ...connectors.asyncio import AsyncAdapt_dbapi_module 

47from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 

48from ...connectors.asyncio import AsyncAdapt_terminate 

49from ...util.concurrency import await_fallback 

50from ...util.concurrency import await_only 

51 

52if TYPE_CHECKING: 

53 

54 from ...connectors.asyncio import AsyncIODBAPIConnection 

55 from ...connectors.asyncio import AsyncIODBAPICursor 

56 from ...engine.interfaces import ConnectArgsType 

57 from ...engine.interfaces import DBAPIConnection 

58 from ...engine.interfaces import DBAPICursor 

59 from ...engine.interfaces import DBAPIModule 

60 from ...engine.interfaces import PoolProxiedConnection 

61 from ...engine.url import URL 

62 

63 

64class AsyncAdapt_aiomysql_cursor(AsyncAdapt_dbapi_cursor): 

65 __slots__ = () 

66 

67 def _make_new_cursor( 

68 self, connection: AsyncIODBAPIConnection 

69 ) -> AsyncIODBAPICursor: 

70 return connection.cursor(self._adapt_connection.dbapi.Cursor) 

71 

72 

73class AsyncAdapt_aiomysql_ss_cursor( 

74 AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_aiomysql_cursor 

75): 

76 __slots__ = () 

77 

78 def _make_new_cursor( 

79 self, connection: AsyncIODBAPIConnection 

80 ) -> AsyncIODBAPICursor: 

81 return connection.cursor( 

82 self._adapt_connection.dbapi.aiomysql.cursors.SSCursor 

83 ) 

84 

85 

86class AsyncAdapt_aiomysql_connection( 

87 AsyncAdapt_terminate, AsyncAdapt_dbapi_connection 

88): 

89 __slots__ = () 

90 

91 _cursor_cls = AsyncAdapt_aiomysql_cursor 

92 _ss_cursor_cls = AsyncAdapt_aiomysql_ss_cursor 

93 

94 def ping(self, reconnect: bool) -> None: 

95 assert not reconnect 

96 self.await_(self._connection.ping(reconnect)) 

97 

98 def character_set_name(self) -> Optional[str]: 

99 return self._connection.character_set_name() # type: ignore[no-any-return] # noqa: E501 

100 

101 def autocommit(self, value: Any) -> None: 

102 self.await_(self._connection.autocommit(value)) 

103 

104 def get_autocommit(self) -> bool: 

105 return self._connection.get_autocommit() # type: ignore 

106 

107 def close(self) -> None: 

108 self.await_(self._connection.ensure_closed()) 

109 

110 async def _terminate_graceful_close(self) -> None: 

111 await self._connection.ensure_closed() 

112 

113 def _terminate_force_close(self) -> None: 

114 # it's not awaitable. 

115 self._connection.close() 

116 

117 

118class AsyncAdaptFallback_aiomysql_connection(AsyncAdapt_aiomysql_connection): 

119 __slots__ = () 

120 

121 await_ = staticmethod(await_fallback) 

122 

123 

124class AsyncAdapt_aiomysql_dbapi(AsyncAdapt_dbapi_module): 

125 def __init__(self, aiomysql: ModuleType, pymysql: ModuleType): 

126 self.aiomysql = aiomysql 

127 self.pymysql = pymysql 

128 self.paramstyle = "format" 

129 self._init_dbapi_attributes() 

130 self.Cursor, self.SSCursor = self._init_cursors_subclasses() 

131 

132 def _init_dbapi_attributes(self) -> None: 

133 for name in ( 

134 "Warning", 

135 "Error", 

136 "InterfaceError", 

137 "DataError", 

138 "DatabaseError", 

139 "OperationalError", 

140 "InterfaceError", 

141 "IntegrityError", 

142 "ProgrammingError", 

143 "InternalError", 

144 "NotSupportedError", 

145 ): 

146 setattr(self, name, getattr(self.aiomysql, name)) 

147 

148 for name in ( 

149 "NUMBER", 

150 "STRING", 

151 "DATETIME", 

152 "BINARY", 

153 "TIMESTAMP", 

154 "Binary", 

155 ): 

156 setattr(self, name, getattr(self.pymysql, name)) 

157 

158 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_aiomysql_connection: 

159 async_fallback = kw.pop("async_fallback", False) 

160 creator_fn = kw.pop("async_creator_fn", self.aiomysql.connect) 

161 

162 if util.asbool(async_fallback): 

163 return AsyncAdaptFallback_aiomysql_connection( 

164 self, 

165 await_fallback(creator_fn(*arg, **kw)), 

166 ) 

167 else: 

168 return AsyncAdapt_aiomysql_connection( 

169 self, 

170 await_only(creator_fn(*arg, **kw)), 

171 ) 

172 

173 def _init_cursors_subclasses( 

174 self, 

175 ) -> Tuple[AsyncIODBAPICursor, AsyncIODBAPICursor]: 

176 # suppress unconditional warning emitted by aiomysql 

177 class Cursor(self.aiomysql.Cursor): # type: ignore[misc, name-defined] 

178 async def _show_warnings( 

179 self, conn: AsyncIODBAPIConnection 

180 ) -> None: 

181 pass 

182 

183 class SSCursor(self.aiomysql.SSCursor): # type: ignore[misc, name-defined] # noqa: E501 

184 async def _show_warnings( 

185 self, conn: AsyncIODBAPIConnection 

186 ) -> None: 

187 pass 

188 

189 return Cursor, SSCursor # type: ignore[return-value] 

190 

191 

192class MySQLDialect_aiomysql(MySQLDialect_pymysql): 

193 driver = "aiomysql" 

194 supports_statement_cache = True 

195 

196 supports_server_side_cursors = True 

197 _sscursor = AsyncAdapt_aiomysql_ss_cursor 

198 

199 is_async = True 

200 has_terminate = True 

201 

202 @classmethod 

203 def import_dbapi(cls) -> AsyncAdapt_aiomysql_dbapi: 

204 return AsyncAdapt_aiomysql_dbapi( 

205 __import__("aiomysql"), __import__("pymysql") 

206 ) 

207 

208 @classmethod 

209 def get_pool_class(cls, url: URL) -> type: 

210 async_fallback = url.query.get("async_fallback", False) 

211 

212 if util.asbool(async_fallback): 

213 return pool.FallbackAsyncAdaptedQueuePool 

214 else: 

215 return pool.AsyncAdaptedQueuePool 

216 

217 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: 

218 dbapi_connection.terminate() 

219 

220 def create_connect_args( 

221 self, url: URL, _translate_args: Optional[Dict[str, Any]] = None 

222 ) -> ConnectArgsType: 

223 return super().create_connect_args( 

224 url, _translate_args=dict(username="user", database="db") 

225 ) 

226 

227 def is_disconnect( 

228 self, 

229 e: DBAPIModule.Error, 

230 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

231 cursor: Optional[DBAPICursor], 

232 ) -> bool: 

233 if super().is_disconnect(e, connection, cursor): 

234 return True 

235 else: 

236 str_e = str(e).lower() 

237 return "not connected" in str_e 

238 

239 def _found_rows_client_flag(self) -> int: 

240 from pymysql.constants import CLIENT # type: ignore 

241 

242 return CLIENT.FOUND_ROWS # type: ignore[no-any-return] 

243 

244 def get_driver_connection( 

245 self, connection: DBAPIConnection 

246 ) -> AsyncIODBAPIConnection: 

247 return connection._connection # type: ignore[no-any-return] 

248 

249 

250dialect = MySQLDialect_aiomysql