Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/mariadbconnector.py: 45%
100 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# mysql/mariadbconnector.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8"""
10.. dialect:: mysql+mariadbconnector
11 :name: MariaDB Connector/Python
12 :dbapi: mariadb
13 :connectstring: mariadb+mariadbconnector://<user>:<password>@<host>[:<port>]/<dbname>
14 :url: https://pypi.org/project/mariadb/
16Driver Status
17-------------
19MariaDB Connector/Python enables Python programs to access MariaDB and MySQL
20databases using an API which is compliant with the Python DB API 2.0 (PEP-249).
21It is written in C and uses MariaDB Connector/C client library for client server
22communication.
24Note that the default driver for a ``mariadb://`` connection URI continues to
25be ``mysqldb``. ``mariadb+mariadbconnector://`` is required to use this driver.
27.. mariadb: https://github.com/mariadb-corporation/mariadb-connector-python
29""" # noqa
30import re
32from .base import MySQLCompiler
33from .base import MySQLDialect
34from .base import MySQLExecutionContext
35from ... import sql
36from ... import util
38mariadb_cpy_minimum_version = (1, 0, 1)
41class MySQLExecutionContext_mariadbconnector(MySQLExecutionContext):
42 _lastrowid = None
44 def create_server_side_cursor(self):
45 return self._dbapi_connection.cursor(buffered=False)
47 def create_default_cursor(self):
48 return self._dbapi_connection.cursor(buffered=True)
50 def post_exec(self):
51 if self.isinsert and self.compiled.postfetch_lastrowid:
52 self._lastrowid = self.cursor.lastrowid
54 def get_lastrowid(self):
55 return self._lastrowid
58class MySQLCompiler_mariadbconnector(MySQLCompiler):
59 pass
62class MySQLDialect_mariadbconnector(MySQLDialect):
63 driver = "mariadbconnector"
64 supports_statement_cache = True
66 # set this to True at the module level to prevent the driver from running
67 # against a backend that server detects as MySQL. currently this appears to
68 # be unnecessary as MariaDB client libraries have always worked against
69 # MySQL databases. However, if this changes at some point, this can be
70 # adjusted, but PLEASE ADD A TEST in test/dialect/mysql/test_dialect.py if
71 # this change is made at some point to ensure the correct exception
72 # is raised at the correct point when running the driver against
73 # a MySQL backend.
74 # is_mariadb = True
76 supports_unicode_statements = True
77 encoding = "utf8mb4"
78 convert_unicode = True
79 supports_sane_rowcount = True
80 supports_sane_multi_rowcount = True
81 supports_native_decimal = True
82 default_paramstyle = "qmark"
83 execution_ctx_cls = MySQLExecutionContext_mariadbconnector
84 statement_compiler = MySQLCompiler_mariadbconnector
86 supports_server_side_cursors = True
88 @util.memoized_property
89 def _dbapi_version(self):
90 if self.dbapi and hasattr(self.dbapi, "__version__"):
91 return tuple(
92 [
93 int(x)
94 for x in re.findall(
95 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__
96 )
97 ]
98 )
99 else:
100 return (99, 99, 99)
102 def __init__(self, **kwargs):
103 super(MySQLDialect_mariadbconnector, self).__init__(**kwargs)
104 self.paramstyle = "qmark"
105 if self.dbapi is not None:
106 if self._dbapi_version < mariadb_cpy_minimum_version:
107 raise NotImplementedError(
108 "The minimum required version for MariaDB "
109 "Connector/Python is %s"
110 % ".".join(str(x) for x in mariadb_cpy_minimum_version)
111 )
113 @classmethod
114 def dbapi(cls):
115 return __import__("mariadb")
117 def is_disconnect(self, e, connection, cursor):
118 if super(MySQLDialect_mariadbconnector, self).is_disconnect(
119 e, connection, cursor
120 ):
121 return True
122 elif isinstance(e, self.dbapi.Error):
123 str_e = str(e).lower()
124 return "not connected" in str_e or "isn't valid" in str_e
125 else:
126 return False
128 def create_connect_args(self, url):
129 opts = url.translate_connect_args()
131 int_params = [
132 "connect_timeout",
133 "read_timeout",
134 "write_timeout",
135 "client_flag",
136 "port",
137 "pool_size",
138 ]
139 bool_params = [
140 "local_infile",
141 "ssl_verify_cert",
142 "ssl",
143 "pool_reset_connection",
144 ]
146 for key in int_params:
147 util.coerce_kw_type(opts, key, int)
148 for key in bool_params:
149 util.coerce_kw_type(opts, key, bool)
151 # FOUND_ROWS must be set in CLIENT_FLAGS to enable
152 # supports_sane_rowcount.
153 client_flag = opts.get("client_flag", 0)
154 if self.dbapi is not None:
155 try:
156 CLIENT_FLAGS = __import__(
157 self.dbapi.__name__ + ".constants.CLIENT"
158 ).constants.CLIENT
159 client_flag |= CLIENT_FLAGS.FOUND_ROWS
160 except (AttributeError, ImportError):
161 self.supports_sane_rowcount = False
162 opts["client_flag"] = client_flag
163 return [[], opts]
165 def _extract_error_code(self, exception):
166 try:
167 rc = exception.errno
168 except:
169 rc = -1
170 return rc
172 def _detect_charset(self, connection):
173 return "utf8mb4"
175 _isolation_lookup = set(
176 [
177 "SERIALIZABLE",
178 "READ UNCOMMITTED",
179 "READ COMMITTED",
180 "REPEATABLE READ",
181 "AUTOCOMMIT",
182 ]
183 )
185 def _set_isolation_level(self, connection, level):
186 if level == "AUTOCOMMIT":
187 connection.autocommit = True
188 else:
189 connection.autocommit = False
190 super(MySQLDialect_mariadbconnector, self)._set_isolation_level(
191 connection, level
192 )
194 def do_begin_twophase(self, connection, xid):
195 connection.execute(
196 sql.text("XA BEGIN :xid").bindparams(
197 sql.bindparam("xid", xid, literal_execute=True)
198 )
199 )
201 def do_prepare_twophase(self, connection, xid):
202 connection.execute(
203 sql.text("XA END :xid").bindparams(
204 sql.bindparam("xid", xid, literal_execute=True)
205 )
206 )
207 connection.execute(
208 sql.text("XA PREPARE :xid").bindparams(
209 sql.bindparam("xid", xid, literal_execute=True)
210 )
211 )
213 def do_rollback_twophase(
214 self, connection, xid, is_prepared=True, recover=False
215 ):
216 if not is_prepared:
217 connection.execute(
218 sql.text("XA END :xid").bindparams(
219 sql.bindparam("xid", xid, literal_execute=True)
220 )
221 )
222 connection.execute(
223 sql.text("XA ROLLBACK :xid").bindparams(
224 sql.bindparam("xid", xid, literal_execute=True)
225 )
226 )
228 def do_commit_twophase(
229 self, connection, xid, is_prepared=True, recover=False
230 ):
231 if not is_prepared:
232 self.do_prepare_twophase(connection, xid)
233 connection.execute(
234 sql.text("XA COMMIT :xid").bindparams(
235 sql.bindparam("xid", xid, literal_execute=True)
236 )
237 )
240dialect = MySQLDialect_mariadbconnector