Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/mariadbconnector.py: 45%

100 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# mysql/mariadbconnector.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8""" 

9 

10.. dialect:: mysql+mariadbconnector 

11 :name: MariaDB Connector/Python 

12 :dbapi: mariadb 

13 :connectstring: mariadb+mariadbconnector://<user>:<password>@<host>[:<port>]/<dbname> 

14 :url: https://pypi.org/project/mariadb/ 

15 

16Driver Status 

17------------- 

18 

19MariaDB Connector/Python enables Python programs to access MariaDB and MySQL 

20databases using an API which is compliant with the Python DB API 2.0 (PEP-249). 

21It is written in C and uses MariaDB Connector/C client library for client server 

22communication. 

23 

24Note that the default driver for a ``mariadb://`` connection URI continues to 

25be ``mysqldb``. ``mariadb+mariadbconnector://`` is required to use this driver. 

26 

27.. mariadb: https://github.com/mariadb-corporation/mariadb-connector-python 

28 

29""" # noqa 

30import re 

31 

32from .base import MySQLCompiler 

33from .base import MySQLDialect 

34from .base import MySQLExecutionContext 

35from ... import sql 

36from ... import util 

37 

38mariadb_cpy_minimum_version = (1, 0, 1) 

39 

40 

41class MySQLExecutionContext_mariadbconnector(MySQLExecutionContext): 

42 _lastrowid = None 

43 

44 def create_server_side_cursor(self): 

45 return self._dbapi_connection.cursor(buffered=False) 

46 

47 def create_default_cursor(self): 

48 return self._dbapi_connection.cursor(buffered=True) 

49 

50 def post_exec(self): 

51 if self.isinsert and self.compiled.postfetch_lastrowid: 

52 self._lastrowid = self.cursor.lastrowid 

53 

54 def get_lastrowid(self): 

55 return self._lastrowid 

56 

57 

58class MySQLCompiler_mariadbconnector(MySQLCompiler): 

59 pass 

60 

61 

62class MySQLDialect_mariadbconnector(MySQLDialect): 

63 driver = "mariadbconnector" 

64 supports_statement_cache = True 

65 

66 # set this to True at the module level to prevent the driver from running 

67 # against a backend that server detects as MySQL. currently this appears to 

68 # be unnecessary as MariaDB client libraries have always worked against 

69 # MySQL databases. However, if this changes at some point, this can be 

70 # adjusted, but PLEASE ADD A TEST in test/dialect/mysql/test_dialect.py if 

71 # this change is made at some point to ensure the correct exception 

72 # is raised at the correct point when running the driver against 

73 # a MySQL backend. 

74 # is_mariadb = True 

75 

76 supports_unicode_statements = True 

77 encoding = "utf8mb4" 

78 convert_unicode = True 

79 supports_sane_rowcount = True 

80 supports_sane_multi_rowcount = True 

81 supports_native_decimal = True 

82 default_paramstyle = "qmark" 

83 execution_ctx_cls = MySQLExecutionContext_mariadbconnector 

84 statement_compiler = MySQLCompiler_mariadbconnector 

85 

86 supports_server_side_cursors = True 

87 

88 @util.memoized_property 

89 def _dbapi_version(self): 

90 if self.dbapi and hasattr(self.dbapi, "__version__"): 

91 return tuple( 

92 [ 

93 int(x) 

94 for x in re.findall( 

95 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ 

96 ) 

97 ] 

98 ) 

99 else: 

100 return (99, 99, 99) 

101 

102 def __init__(self, **kwargs): 

103 super(MySQLDialect_mariadbconnector, self).__init__(**kwargs) 

104 self.paramstyle = "qmark" 

105 if self.dbapi is not None: 

106 if self._dbapi_version < mariadb_cpy_minimum_version: 

107 raise NotImplementedError( 

108 "The minimum required version for MariaDB " 

109 "Connector/Python is %s" 

110 % ".".join(str(x) for x in mariadb_cpy_minimum_version) 

111 ) 

112 

113 @classmethod 

114 def dbapi(cls): 

115 return __import__("mariadb") 

116 

117 def is_disconnect(self, e, connection, cursor): 

118 if super(MySQLDialect_mariadbconnector, self).is_disconnect( 

119 e, connection, cursor 

120 ): 

121 return True 

122 elif isinstance(e, self.dbapi.Error): 

123 str_e = str(e).lower() 

124 return "not connected" in str_e or "isn't valid" in str_e 

125 else: 

126 return False 

127 

128 def create_connect_args(self, url): 

129 opts = url.translate_connect_args() 

130 

131 int_params = [ 

132 "connect_timeout", 

133 "read_timeout", 

134 "write_timeout", 

135 "client_flag", 

136 "port", 

137 "pool_size", 

138 ] 

139 bool_params = [ 

140 "local_infile", 

141 "ssl_verify_cert", 

142 "ssl", 

143 "pool_reset_connection", 

144 ] 

145 

146 for key in int_params: 

147 util.coerce_kw_type(opts, key, int) 

148 for key in bool_params: 

149 util.coerce_kw_type(opts, key, bool) 

150 

151 # FOUND_ROWS must be set in CLIENT_FLAGS to enable 

152 # supports_sane_rowcount. 

153 client_flag = opts.get("client_flag", 0) 

154 if self.dbapi is not None: 

155 try: 

156 CLIENT_FLAGS = __import__( 

157 self.dbapi.__name__ + ".constants.CLIENT" 

158 ).constants.CLIENT 

159 client_flag |= CLIENT_FLAGS.FOUND_ROWS 

160 except (AttributeError, ImportError): 

161 self.supports_sane_rowcount = False 

162 opts["client_flag"] = client_flag 

163 return [[], opts] 

164 

165 def _extract_error_code(self, exception): 

166 try: 

167 rc = exception.errno 

168 except: 

169 rc = -1 

170 return rc 

171 

172 def _detect_charset(self, connection): 

173 return "utf8mb4" 

174 

175 _isolation_lookup = set( 

176 [ 

177 "SERIALIZABLE", 

178 "READ UNCOMMITTED", 

179 "READ COMMITTED", 

180 "REPEATABLE READ", 

181 "AUTOCOMMIT", 

182 ] 

183 ) 

184 

185 def _set_isolation_level(self, connection, level): 

186 if level == "AUTOCOMMIT": 

187 connection.autocommit = True 

188 else: 

189 connection.autocommit = False 

190 super(MySQLDialect_mariadbconnector, self)._set_isolation_level( 

191 connection, level 

192 ) 

193 

194 def do_begin_twophase(self, connection, xid): 

195 connection.execute( 

196 sql.text("XA BEGIN :xid").bindparams( 

197 sql.bindparam("xid", xid, literal_execute=True) 

198 ) 

199 ) 

200 

201 def do_prepare_twophase(self, connection, xid): 

202 connection.execute( 

203 sql.text("XA END :xid").bindparams( 

204 sql.bindparam("xid", xid, literal_execute=True) 

205 ) 

206 ) 

207 connection.execute( 

208 sql.text("XA PREPARE :xid").bindparams( 

209 sql.bindparam("xid", xid, literal_execute=True) 

210 ) 

211 ) 

212 

213 def do_rollback_twophase( 

214 self, connection, xid, is_prepared=True, recover=False 

215 ): 

216 if not is_prepared: 

217 connection.execute( 

218 sql.text("XA END :xid").bindparams( 

219 sql.bindparam("xid", xid, literal_execute=True) 

220 ) 

221 ) 

222 connection.execute( 

223 sql.text("XA ROLLBACK :xid").bindparams( 

224 sql.bindparam("xid", xid, literal_execute=True) 

225 ) 

226 ) 

227 

228 def do_commit_twophase( 

229 self, connection, xid, is_prepared=True, recover=False 

230 ): 

231 if not is_prepared: 

232 self.do_prepare_twophase(connection, xid) 

233 connection.execute( 

234 sql.text("XA COMMIT :xid").bindparams( 

235 sql.bindparam("xid", xid, literal_execute=True) 

236 ) 

237 ) 

238 

239 

240dialect = MySQLDialect_mariadbconnector