Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/asyncmy.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

117 statements  

1# dialects/mysql/asyncmy.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS 

3# file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8r""" 

9.. dialect:: mysql+asyncmy 

10 :name: asyncmy 

11 :dbapi: asyncmy 

12 :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...] 

13 :url: https://github.com/long2ice/asyncmy 

14 

15Using a special asyncio mediation layer, the asyncmy dialect is usable 

16as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` 

17extension package. 

18 

19This dialect should normally be used only with the 

20:func:`_asyncio.create_async_engine` engine creation function:: 

21 

22 from sqlalchemy.ext.asyncio import create_async_engine 

23 

24 engine = create_async_engine( 

25 "mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4" 

26 ) 

27 

28""" # noqa 

29from __future__ import annotations 

30 

31from types import ModuleType 

32from typing import Any 

33from typing import NoReturn 

34from typing import Optional 

35from typing import TYPE_CHECKING 

36from typing import Union 

37 

38from .pymysql import MySQLDialect_pymysql 

39from ... import pool 

40from ... import util 

41from ...connectors.asyncio import AsyncAdapt_dbapi_connection 

42from ...connectors.asyncio import AsyncAdapt_dbapi_cursor 

43from ...connectors.asyncio import AsyncAdapt_dbapi_module 

44from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor 

45from ...connectors.asyncio import AsyncAdapt_terminate 

46from ...util.concurrency import await_fallback 

47from ...util.concurrency import await_only 

48 

49if TYPE_CHECKING: 

50 from ...connectors.asyncio import AsyncIODBAPIConnection 

51 from ...connectors.asyncio import AsyncIODBAPICursor 

52 from ...engine.interfaces import ConnectArgsType 

53 from ...engine.interfaces import DBAPIConnection 

54 from ...engine.interfaces import DBAPICursor 

55 from ...engine.interfaces import DBAPIModule 

56 from ...engine.interfaces import PoolProxiedConnection 

57 from ...engine.url import URL 

58 

59 

60class AsyncAdapt_asyncmy_cursor(AsyncAdapt_dbapi_cursor): 

61 __slots__ = () 

62 

63 

64class AsyncAdapt_asyncmy_ss_cursor( 

65 AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_asyncmy_cursor 

66): 

67 __slots__ = () 

68 

69 def _make_new_cursor( 

70 self, connection: AsyncIODBAPIConnection 

71 ) -> AsyncIODBAPICursor: 

72 return connection.cursor( 

73 self._adapt_connection.dbapi.asyncmy.cursors.SSCursor 

74 ) 

75 

76 

77class AsyncAdapt_asyncmy_connection( 

78 AsyncAdapt_terminate, AsyncAdapt_dbapi_connection 

79): 

80 __slots__ = () 

81 

82 _cursor_cls = AsyncAdapt_asyncmy_cursor 

83 _ss_cursor_cls = AsyncAdapt_asyncmy_ss_cursor 

84 

85 def _handle_exception(self, error: Exception) -> NoReturn: 

86 if isinstance(error, AttributeError): 

87 raise self.dbapi.InternalError( 

88 "network operation failed due to asyncmy attribute error" 

89 ) 

90 

91 raise error 

92 

93 def ping(self, reconnect: bool) -> None: 

94 assert not reconnect 

95 return self.await_(self._do_ping()) 

96 

97 async def _do_ping(self) -> None: 

98 try: 

99 async with self._execute_mutex: 

100 await self._connection.ping(False) 

101 except Exception as error: 

102 self._handle_exception(error) 

103 

104 def character_set_name(self) -> Optional[str]: 

105 return self._connection.character_set_name() # type: ignore[no-any-return] # noqa: E501 

106 

107 def autocommit(self, value: Any) -> None: 

108 self.await_(self._connection.autocommit(value)) 

109 

110 def get_autocommit(self) -> bool: 

111 return self._connection.get_autocommit() # type: ignore 

112 

113 def close(self) -> None: 

114 self.await_(self._connection.ensure_closed()) 

115 

116 async def _terminate_graceful_close(self) -> None: 

117 await self._connection.ensure_closed() 

118 

119 def _terminate_force_close(self) -> None: 

120 # it's not awaitable. 

121 self._connection.close() 

122 

123 

124class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection): 

125 __slots__ = () 

126 

127 await_ = staticmethod(await_fallback) 

128 

129 

130class AsyncAdapt_asyncmy_dbapi(AsyncAdapt_dbapi_module): 

131 def __init__(self, asyncmy: ModuleType): 

132 self.asyncmy = asyncmy 

133 self.paramstyle = "format" 

134 self._init_dbapi_attributes() 

135 

136 def _init_dbapi_attributes(self) -> None: 

137 for name in ( 

138 "Warning", 

139 "Error", 

140 "InterfaceError", 

141 "DataError", 

142 "DatabaseError", 

143 "OperationalError", 

144 "InterfaceError", 

145 "IntegrityError", 

146 "ProgrammingError", 

147 "InternalError", 

148 "NotSupportedError", 

149 ): 

150 setattr(self, name, getattr(self.asyncmy.errors, name)) 

151 

152 STRING = util.symbol("STRING") 

153 NUMBER = util.symbol("NUMBER") 

154 BINARY = util.symbol("BINARY") 

155 DATETIME = util.symbol("DATETIME") 

156 TIMESTAMP = util.symbol("TIMESTAMP") 

157 Binary = staticmethod(bytes) 

158 

159 def connect(self, *arg: Any, **kw: Any) -> AsyncAdapt_asyncmy_connection: 

160 async_fallback = kw.pop("async_fallback", False) 

161 creator_fn = kw.pop("async_creator_fn", self.asyncmy.connect) 

162 

163 if util.asbool(async_fallback): 

164 return AsyncAdaptFallback_asyncmy_connection( 

165 self, 

166 await_fallback(creator_fn(*arg, **kw)), 

167 ) 

168 else: 

169 return AsyncAdapt_asyncmy_connection( 

170 self, 

171 await_only(creator_fn(*arg, **kw)), 

172 ) 

173 

174 

175class MySQLDialect_asyncmy(MySQLDialect_pymysql): 

176 driver = "asyncmy" 

177 supports_statement_cache = True 

178 

179 supports_server_side_cursors = True 

180 _sscursor = AsyncAdapt_asyncmy_ss_cursor 

181 

182 is_async = True 

183 has_terminate = True 

184 

185 @classmethod 

186 def import_dbapi(cls) -> DBAPIModule: 

187 return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy")) 

188 

189 @classmethod 

190 def get_pool_class(cls, url: URL) -> type: 

191 async_fallback = url.query.get("async_fallback", False) 

192 

193 if util.asbool(async_fallback): 

194 return pool.FallbackAsyncAdaptedQueuePool 

195 else: 

196 return pool.AsyncAdaptedQueuePool 

197 

198 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: 

199 dbapi_connection.terminate() 

200 

201 def create_connect_args(self, url: URL) -> ConnectArgsType: # type: ignore[override] # noqa: E501 

202 return super().create_connect_args( 

203 url, _translate_args=dict(username="user", database="db") 

204 ) 

205 

206 def is_disconnect( 

207 self, 

208 e: DBAPIModule.Error, 

209 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

210 cursor: Optional[DBAPICursor], 

211 ) -> bool: 

212 if super().is_disconnect(e, connection, cursor): 

213 return True 

214 else: 

215 str_e = str(e).lower() 

216 return ( 

217 "not connected" in str_e or "network operation failed" in str_e 

218 ) 

219 

220 def _found_rows_client_flag(self) -> int: 

221 from asyncmy.constants import CLIENT # type: ignore 

222 

223 return CLIENT.FOUND_ROWS # type: ignore[no-any-return] 

224 

225 def get_driver_connection( 

226 self, connection: DBAPIConnection 

227 ) -> AsyncIODBAPIConnection: 

228 return connection._connection # type: ignore[no-any-return] 

229 

230 

231dialect = MySQLDialect_asyncmy