Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/mariadbconnector.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

150 statements  

1# dialects/mysql/mariadbconnector.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8""" 

9 

10.. dialect:: mysql+mariadbconnector 

11 :name: MariaDB Connector/Python 

12 :dbapi: mariadb 

13 :connectstring: mariadb+mariadbconnector://<user>:<password>@<host>[:<port>]/<dbname> 

14 :url: https://pypi.org/project/mariadb/ 

15 

16Driver Status 

17------------- 

18 

19MariaDB Connector/Python enables Python programs to access MariaDB and MySQL 

20databases using an API which is compliant with the Python DB API 2.0 (PEP-249). 

21It is written in C and uses MariaDB Connector/C client library for client server 

22communication. 

23 

24Note that the default driver for a ``mariadb://`` connection URI continues to 

25be ``mysqldb``. ``mariadb+mariadbconnector://`` is required to use this driver. 

26 

27.. mariadb: https://github.com/mariadb-corporation/mariadb-connector-python 

28 

29""" # noqa 

30from __future__ import annotations 

31 

32import re 

33from typing import Any 

34from typing import Optional 

35from typing import Sequence 

36from typing import Tuple 

37from typing import TYPE_CHECKING 

38from typing import Union 

39from uuid import UUID as _python_UUID 

40 

41from .base import MySQLCompiler 

42from .base import MySQLDialect 

43from .base import MySQLExecutionContext 

44from ... import sql 

45from ... import util 

46from ...sql import sqltypes 

47 

48if TYPE_CHECKING: 

49 from ...engine.base import Connection 

50 from ...engine.interfaces import ConnectArgsType 

51 from ...engine.interfaces import DBAPIConnection 

52 from ...engine.interfaces import DBAPICursor 

53 from ...engine.interfaces import DBAPIModule 

54 from ...engine.interfaces import Dialect 

55 from ...engine.interfaces import IsolationLevel 

56 from ...engine.interfaces import PoolProxiedConnection 

57 from ...engine.url import URL 

58 from ...sql.compiler import SQLCompiler 

59 from ...sql.type_api import _ResultProcessorType 

60 

61 

62mariadb_cpy_minimum_version = (1, 0, 1) 

63 

64 

65class _MariaDBUUID(sqltypes.UUID[sqltypes._UUID_RETURN]): 

66 # work around JIRA issue 

67 # https://jira.mariadb.org/browse/CONPY-270. When that issue is fixed, 

68 # this type can be removed. 

69 def result_processor( 

70 self, dialect: Dialect, coltype: object 

71 ) -> Optional[_ResultProcessorType[Any]]: 

72 if self.as_uuid: 

73 

74 def process(value: Any) -> Any: 

75 if value is not None: 

76 if hasattr(value, "decode"): 

77 value = value.decode("ascii") 

78 value = _python_UUID(value) 

79 return value 

80 

81 return process 

82 else: 

83 

84 def process(value: Any) -> Any: 

85 if value is not None: 

86 if hasattr(value, "decode"): 

87 value = value.decode("ascii") 

88 value = str(_python_UUID(value)) 

89 return value 

90 

91 return process 

92 

93 

94class MySQLExecutionContext_mariadbconnector(MySQLExecutionContext): 

95 _lastrowid: Optional[int] = None 

96 

97 def create_server_side_cursor(self) -> DBAPICursor: 

98 return self._dbapi_connection.cursor(buffered=False) 

99 

100 def create_default_cursor(self) -> DBAPICursor: 

101 return self._dbapi_connection.cursor(buffered=True) 

102 

103 def post_exec(self) -> None: 

104 super().post_exec() 

105 

106 self._rowcount = self.cursor.rowcount 

107 

108 if TYPE_CHECKING: 

109 assert isinstance(self.compiled, SQLCompiler) 

110 if self.isinsert and self.compiled.postfetch_lastrowid: 

111 self._lastrowid = self.cursor.lastrowid 

112 

113 def get_lastrowid(self) -> int: 

114 if TYPE_CHECKING: 

115 assert self._lastrowid is not None 

116 return self._lastrowid 

117 

118 

119class MySQLCompiler_mariadbconnector(MySQLCompiler): 

120 pass 

121 

122 

123class MySQLDialect_mariadbconnector(MySQLDialect): 

124 driver = "mariadbconnector" 

125 supports_statement_cache = True 

126 

127 # set this to True at the module level to prevent the driver from running 

128 # against a backend that server detects as MySQL. currently this appears to 

129 # be unnecessary as MariaDB client libraries have always worked against 

130 # MySQL databases. However, if this changes at some point, this can be 

131 # adjusted, but PLEASE ADD A TEST in test/dialect/mysql/test_dialect.py if 

132 # this change is made at some point to ensure the correct exception 

133 # is raised at the correct point when running the driver against 

134 # a MySQL backend. 

135 # is_mariadb = True 

136 

137 supports_unicode_statements = True 

138 encoding = "utf8mb4" 

139 convert_unicode = True 

140 supports_sane_rowcount = True 

141 supports_sane_multi_rowcount = True 

142 supports_native_decimal = True 

143 default_paramstyle = "qmark" 

144 execution_ctx_cls = MySQLExecutionContext_mariadbconnector 

145 statement_compiler = MySQLCompiler_mariadbconnector 

146 

147 supports_server_side_cursors = True 

148 

149 colspecs = util.update_copy( 

150 MySQLDialect.colspecs, {sqltypes.Uuid: _MariaDBUUID} 

151 ) 

152 

153 @util.memoized_property 

154 def _dbapi_version(self) -> Tuple[int, ...]: 

155 if self.dbapi and hasattr(self.dbapi, "__version__"): 

156 return tuple( 

157 [ 

158 int(x) 

159 for x in re.findall( 

160 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ 

161 ) 

162 ] 

163 ) 

164 else: 

165 return (99, 99, 99) 

166 

167 def __init__(self, **kwargs: Any) -> None: 

168 super().__init__(**kwargs) 

169 self.paramstyle = "qmark" 

170 if self.dbapi is not None: 

171 if self._dbapi_version < mariadb_cpy_minimum_version: 

172 raise NotImplementedError( 

173 "The minimum required version for MariaDB " 

174 "Connector/Python is %s" 

175 % ".".join(str(x) for x in mariadb_cpy_minimum_version) 

176 ) 

177 

178 @classmethod 

179 def import_dbapi(cls) -> DBAPIModule: 

180 return __import__("mariadb") 

181 

182 def is_disconnect( 

183 self, 

184 e: DBAPIModule.Error, 

185 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]], 

186 cursor: Optional[DBAPICursor], 

187 ) -> bool: 

188 if super().is_disconnect(e, connection, cursor): 

189 return True 

190 elif isinstance(e, self.loaded_dbapi.Error): 

191 str_e = str(e).lower() 

192 return "not connected" in str_e or "isn't valid" in str_e 

193 else: 

194 return False 

195 

196 def create_connect_args(self, url: URL) -> ConnectArgsType: 

197 opts = url.translate_connect_args() 

198 opts.update(url.query) 

199 

200 int_params = [ 

201 "connect_timeout", 

202 "read_timeout", 

203 "write_timeout", 

204 "client_flag", 

205 "port", 

206 "pool_size", 

207 ] 

208 bool_params = [ 

209 "local_infile", 

210 "ssl_verify_cert", 

211 "ssl", 

212 "pool_reset_connection", 

213 "compress", 

214 ] 

215 

216 for key in int_params: 

217 util.coerce_kw_type(opts, key, int) 

218 for key in bool_params: 

219 util.coerce_kw_type(opts, key, bool) 

220 

221 # FOUND_ROWS must be set in CLIENT_FLAGS to enable 

222 # supports_sane_rowcount. 

223 client_flag = opts.get("client_flag", 0) 

224 if self.dbapi is not None: 

225 try: 

226 CLIENT_FLAGS = __import__( 

227 self.dbapi.__name__ + ".constants.CLIENT" 

228 ).constants.CLIENT 

229 client_flag |= CLIENT_FLAGS.FOUND_ROWS 

230 except (AttributeError, ImportError): 

231 self.supports_sane_rowcount = False 

232 opts["client_flag"] = client_flag 

233 return [], opts 

234 

235 def _extract_error_code(self, exception: DBAPIModule.Error) -> int: 

236 try: 

237 rc: int = exception.errno 

238 except: 

239 rc = -1 

240 return rc 

241 

242 def _detect_charset(self, connection: Connection) -> str: 

243 return "utf8mb4" 

244 

245 def get_isolation_level_values( 

246 self, dbapi_conn: DBAPIConnection 

247 ) -> Sequence[IsolationLevel]: 

248 return ( 

249 "SERIALIZABLE", 

250 "READ UNCOMMITTED", 

251 "READ COMMITTED", 

252 "REPEATABLE READ", 

253 "AUTOCOMMIT", 

254 ) 

255 

256 def detect_autocommit_setting(self, dbapi_conn: DBAPIConnection) -> bool: 

257 return bool(dbapi_conn.autocommit) 

258 

259 def set_isolation_level( 

260 self, dbapi_connection: DBAPIConnection, level: IsolationLevel 

261 ) -> None: 

262 if level == "AUTOCOMMIT": 

263 dbapi_connection.autocommit = True 

264 else: 

265 dbapi_connection.autocommit = False 

266 super().set_isolation_level(dbapi_connection, level) 

267 

268 def do_begin_twophase(self, connection: Connection, xid: Any) -> None: 

269 connection.execute( 

270 sql.text("XA BEGIN :xid").bindparams( 

271 sql.bindparam("xid", xid, literal_execute=True) 

272 ) 

273 ) 

274 

275 def do_prepare_twophase(self, connection: Connection, xid: Any) -> None: 

276 connection.execute( 

277 sql.text("XA END :xid").bindparams( 

278 sql.bindparam("xid", xid, literal_execute=True) 

279 ) 

280 ) 

281 connection.execute( 

282 sql.text("XA PREPARE :xid").bindparams( 

283 sql.bindparam("xid", xid, literal_execute=True) 

284 ) 

285 ) 

286 

287 def do_rollback_twophase( 

288 self, 

289 connection: Connection, 

290 xid: Any, 

291 is_prepared: bool = True, 

292 recover: bool = False, 

293 ) -> None: 

294 if not is_prepared: 

295 connection.execute( 

296 sql.text("XA END :xid").bindparams( 

297 sql.bindparam("xid", xid, literal_execute=True) 

298 ) 

299 ) 

300 connection.execute( 

301 sql.text("XA ROLLBACK :xid").bindparams( 

302 sql.bindparam("xid", xid, literal_execute=True) 

303 ) 

304 ) 

305 

306 def do_commit_twophase( 

307 self, 

308 connection: Connection, 

309 xid: Any, 

310 is_prepared: bool = True, 

311 recover: bool = False, 

312 ) -> None: 

313 if not is_prepared: 

314 self.do_prepare_twophase(connection, xid) 

315 connection.execute( 

316 sql.text("XA COMMIT :xid").bindparams( 

317 sql.bindparam("xid", xid, literal_execute=True) 

318 ) 

319 ) 

320 

321 

322dialect = MySQLDialect_mariadbconnector