1# dialects/mysql/mariadbconnector.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""
9
10.. dialect:: mysql+mariadbconnector
11 :name: MariaDB Connector/Python
12 :dbapi: mariadb
13 :connectstring: mariadb+mariadbconnector://<user>:<password>@<host>[:<port>]/<dbname>
14 :url: https://pypi.org/project/mariadb/
15
16Driver Status
17-------------
18
19MariaDB Connector/Python enables Python programs to access MariaDB and MySQL
20databases using an API which is compliant with the Python DB API 2.0 (PEP-249).
21It is written in C and uses MariaDB Connector/C client library for client server
22communication.
23
24Note that the default driver for a ``mariadb://`` connection URI continues to
25be ``mysqldb``. ``mariadb+mariadbconnector://`` is required to use this driver.
26
27.. mariadb: https://github.com/mariadb-corporation/mariadb-connector-python
28
29""" # noqa
30from __future__ import annotations
31
32import re
33from typing import Any
34from typing import Optional
35from typing import Sequence
36from typing import Tuple
37from typing import TYPE_CHECKING
38from typing import Union
39from uuid import UUID as _python_UUID
40
41from .base import MySQLCompiler
42from .base import MySQLDialect
43from .base import MySQLExecutionContext
44from ... import sql
45from ... import util
46from ...sql import sqltypes
47
48if TYPE_CHECKING:
49 from ...engine.base import Connection
50 from ...engine.interfaces import ConnectArgsType
51 from ...engine.interfaces import DBAPIConnection
52 from ...engine.interfaces import DBAPICursor
53 from ...engine.interfaces import DBAPIModule
54 from ...engine.interfaces import Dialect
55 from ...engine.interfaces import IsolationLevel
56 from ...engine.interfaces import PoolProxiedConnection
57 from ...engine.url import URL
58 from ...sql.compiler import SQLCompiler
59 from ...sql.type_api import _ResultProcessorType
60
61
62mariadb_cpy_minimum_version = (1, 0, 1)
63
64
65class _MariaDBUUID(sqltypes.UUID[sqltypes._UUID_RETURN]):
66 # work around JIRA issue
67 # https://jira.mariadb.org/browse/CONPY-270. When that issue is fixed,
68 # this type can be removed.
69 def result_processor(
70 self, dialect: Dialect, coltype: object
71 ) -> Optional[_ResultProcessorType[Any]]:
72 if self.as_uuid:
73
74 def process(value: Any) -> Any:
75 if value is not None:
76 if hasattr(value, "decode"):
77 value = value.decode("ascii")
78 value = _python_UUID(value)
79 return value
80
81 return process
82 else:
83
84 def process(value: Any) -> Any:
85 if value is not None:
86 if hasattr(value, "decode"):
87 value = value.decode("ascii")
88 value = str(_python_UUID(value))
89 return value
90
91 return process
92
93
94class MySQLExecutionContext_mariadbconnector(MySQLExecutionContext):
95 _lastrowid: Optional[int] = None
96
97 def create_server_side_cursor(self) -> DBAPICursor:
98 return self._dbapi_connection.cursor(buffered=False)
99
100 def create_default_cursor(self) -> DBAPICursor:
101 return self._dbapi_connection.cursor(buffered=True)
102
103 def post_exec(self) -> None:
104 super().post_exec()
105
106 self._rowcount = self.cursor.rowcount
107
108 if TYPE_CHECKING:
109 assert isinstance(self.compiled, SQLCompiler)
110 if self.isinsert and self.compiled.postfetch_lastrowid:
111 self._lastrowid = self.cursor.lastrowid
112
113 def get_lastrowid(self) -> int:
114 if TYPE_CHECKING:
115 assert self._lastrowid is not None
116 return self._lastrowid
117
118
119class MySQLCompiler_mariadbconnector(MySQLCompiler):
120 pass
121
122
123class MySQLDialect_mariadbconnector(MySQLDialect):
124 driver = "mariadbconnector"
125 supports_statement_cache = True
126
127 # set this to True at the module level to prevent the driver from running
128 # against a backend that server detects as MySQL. currently this appears to
129 # be unnecessary as MariaDB client libraries have always worked against
130 # MySQL databases. However, if this changes at some point, this can be
131 # adjusted, but PLEASE ADD A TEST in test/dialect/mysql/test_dialect.py if
132 # this change is made at some point to ensure the correct exception
133 # is raised at the correct point when running the driver against
134 # a MySQL backend.
135 # is_mariadb = True
136
137 supports_unicode_statements = True
138 encoding = "utf8mb4"
139 convert_unicode = True
140 supports_sane_rowcount = True
141 supports_sane_multi_rowcount = True
142 supports_native_decimal = True
143 default_paramstyle = "qmark"
144 execution_ctx_cls = MySQLExecutionContext_mariadbconnector
145 statement_compiler = MySQLCompiler_mariadbconnector
146
147 supports_server_side_cursors = True
148
149 colspecs = util.update_copy(
150 MySQLDialect.colspecs, {sqltypes.Uuid: _MariaDBUUID}
151 )
152
153 @util.memoized_property
154 def _dbapi_version(self) -> Tuple[int, ...]:
155 if self.dbapi and hasattr(self.dbapi, "__version__"):
156 return tuple(
157 [
158 int(x)
159 for x in re.findall(
160 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__
161 )
162 ]
163 )
164 else:
165 return (99, 99, 99)
166
167 def __init__(self, **kwargs: Any) -> None:
168 super().__init__(**kwargs)
169 self.paramstyle = "qmark"
170 if self.dbapi is not None:
171 if self._dbapi_version < mariadb_cpy_minimum_version:
172 raise NotImplementedError(
173 "The minimum required version for MariaDB "
174 "Connector/Python is %s"
175 % ".".join(str(x) for x in mariadb_cpy_minimum_version)
176 )
177
178 @classmethod
179 def import_dbapi(cls) -> DBAPIModule:
180 return __import__("mariadb")
181
182 def is_disconnect(
183 self,
184 e: DBAPIModule.Error,
185 connection: Optional[Union[PoolProxiedConnection, DBAPIConnection]],
186 cursor: Optional[DBAPICursor],
187 ) -> bool:
188 if super().is_disconnect(e, connection, cursor):
189 return True
190 elif isinstance(e, self.loaded_dbapi.Error):
191 str_e = str(e).lower()
192 return "not connected" in str_e or "isn't valid" in str_e
193 else:
194 return False
195
196 def create_connect_args(self, url: URL) -> ConnectArgsType:
197 opts = url.translate_connect_args()
198 opts.update(url.query)
199
200 int_params = [
201 "connect_timeout",
202 "read_timeout",
203 "write_timeout",
204 "client_flag",
205 "port",
206 "pool_size",
207 ]
208 bool_params = [
209 "local_infile",
210 "ssl_verify_cert",
211 "ssl",
212 "pool_reset_connection",
213 "compress",
214 ]
215
216 for key in int_params:
217 util.coerce_kw_type(opts, key, int)
218 for key in bool_params:
219 util.coerce_kw_type(opts, key, bool)
220
221 # FOUND_ROWS must be set in CLIENT_FLAGS to enable
222 # supports_sane_rowcount.
223 client_flag = opts.get("client_flag", 0)
224 if self.dbapi is not None:
225 try:
226 CLIENT_FLAGS = __import__(
227 self.dbapi.__name__ + ".constants.CLIENT"
228 ).constants.CLIENT
229 client_flag |= CLIENT_FLAGS.FOUND_ROWS
230 except (AttributeError, ImportError):
231 self.supports_sane_rowcount = False
232 opts["client_flag"] = client_flag
233 return [], opts
234
235 def _extract_error_code(self, exception: DBAPIModule.Error) -> int:
236 try:
237 rc: int = exception.errno
238 except:
239 rc = -1
240 return rc
241
242 def _detect_charset(self, connection: Connection) -> str:
243 return "utf8mb4"
244
245 def get_isolation_level_values(
246 self, dbapi_conn: DBAPIConnection
247 ) -> Sequence[IsolationLevel]:
248 return (
249 "SERIALIZABLE",
250 "READ UNCOMMITTED",
251 "READ COMMITTED",
252 "REPEATABLE READ",
253 "AUTOCOMMIT",
254 )
255
256 def detect_autocommit_setting(self, dbapi_conn: DBAPIConnection) -> bool:
257 return bool(dbapi_conn.autocommit)
258
259 def set_isolation_level(
260 self, dbapi_connection: DBAPIConnection, level: IsolationLevel
261 ) -> None:
262 if level == "AUTOCOMMIT":
263 dbapi_connection.autocommit = True
264 else:
265 dbapi_connection.autocommit = False
266 super().set_isolation_level(dbapi_connection, level)
267
268 def do_begin_twophase(self, connection: Connection, xid: Any) -> None:
269 connection.execute(
270 sql.text("XA BEGIN :xid").bindparams(
271 sql.bindparam("xid", xid, literal_execute=True)
272 )
273 )
274
275 def do_prepare_twophase(self, connection: Connection, xid: Any) -> None:
276 connection.execute(
277 sql.text("XA END :xid").bindparams(
278 sql.bindparam("xid", xid, literal_execute=True)
279 )
280 )
281 connection.execute(
282 sql.text("XA PREPARE :xid").bindparams(
283 sql.bindparam("xid", xid, literal_execute=True)
284 )
285 )
286
287 def do_rollback_twophase(
288 self,
289 connection: Connection,
290 xid: Any,
291 is_prepared: bool = True,
292 recover: bool = False,
293 ) -> None:
294 if not is_prepared:
295 connection.execute(
296 sql.text("XA END :xid").bindparams(
297 sql.bindparam("xid", xid, literal_execute=True)
298 )
299 )
300 connection.execute(
301 sql.text("XA ROLLBACK :xid").bindparams(
302 sql.bindparam("xid", xid, literal_execute=True)
303 )
304 )
305
306 def do_commit_twophase(
307 self,
308 connection: Connection,
309 xid: Any,
310 is_prepared: bool = True,
311 recover: bool = False,
312 ) -> None:
313 if not is_prepared:
314 self.do_prepare_twophase(connection, xid)
315 connection.execute(
316 sql.text("XA COMMIT :xid").bindparams(
317 sql.bindparam("xid", xid, literal_execute=True)
318 )
319 )
320
321
322dialect = MySQLDialect_mariadbconnector