1# connectors/pyodbc.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8from __future__ import annotations
9
10import re
11import typing
12from typing import Any
13from typing import Dict
14from typing import List
15from typing import Optional
16from typing import Tuple
17from typing import Union
18from urllib.parse import unquote_plus
19
20from . import Connector
21from .. import ExecutionContext
22from .. import pool
23from .. import util
24from ..engine import ConnectArgsType
25from ..engine import Connection
26from ..engine import interfaces
27from ..engine import URL
28from ..sql.type_api import TypeEngine
29
30if typing.TYPE_CHECKING:
31 from ..engine.interfaces import DBAPIModule
32 from ..engine.interfaces import IsolationLevel
33
34
35class PyODBCConnector(Connector):
36 driver = "pyodbc"
37
38 # this is no longer False for pyodbc in general
39 supports_sane_rowcount_returning = True
40 supports_sane_multi_rowcount = False
41
42 supports_native_decimal = True
43 default_paramstyle = "named"
44
45 fast_executemany = False
46
47 # for non-DSN connections, this *may* be used to
48 # hold the desired driver name
49 pyodbc_driver_name: Optional[str] = None
50
51 def __init__(self, use_setinputsizes: bool = False, **kw: Any):
52 super().__init__(**kw)
53 if use_setinputsizes:
54 self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
55
56 @classmethod
57 def import_dbapi(cls) -> DBAPIModule:
58 return __import__("pyodbc")
59
60 def create_connect_args(self, url: URL) -> ConnectArgsType:
61 opts = url.translate_connect_args(username="user")
62 opts.update(url.query)
63
64 keys = opts
65
66 query = url.query
67
68 connect_args: Dict[str, Any] = {}
69 connectors: List[str]
70
71 for param in ("ansi", "unicode_results", "autocommit"):
72 if param in keys:
73 connect_args[param] = util.asbool(keys.pop(param))
74
75 if "odbc_connect" in keys:
76 connectors = [unquote_plus(keys.pop("odbc_connect"))]
77 else:
78
79 def check_quote(token: str) -> str:
80 if ";" in str(token) or str(token).startswith("{"):
81 token = "{%s}" % token.replace("}", "}}")
82 return token
83
84 keys = {k: check_quote(v) for k, v in keys.items()}
85
86 dsn_connection = "dsn" in keys or (
87 "host" in keys and "database" not in keys
88 )
89 if dsn_connection:
90 connectors = [
91 "dsn=%s" % (keys.pop("host", "") or keys.pop("dsn", ""))
92 ]
93 else:
94 port = ""
95 if "port" in keys and "port" not in query:
96 port = ",%d" % int(keys.pop("port"))
97
98 connectors = []
99 driver = keys.pop("driver", self.pyodbc_driver_name)
100 if driver is None and keys:
101 # note if keys is empty, this is a totally blank URL
102 util.warn(
103 "No driver name specified; "
104 "this is expected by PyODBC when using "
105 "DSN-less connections"
106 )
107 else:
108 connectors.append("DRIVER={%s}" % driver)
109
110 connectors.extend(
111 [
112 "Server=%s%s" % (keys.pop("host", ""), port),
113 "Database=%s" % keys.pop("database", ""),
114 ]
115 )
116
117 user = keys.pop("user", None)
118 if user:
119 connectors.append("UID=%s" % user)
120 pwd = keys.pop("password", "")
121 if pwd:
122 connectors.append("PWD=%s" % pwd)
123 else:
124 authentication = keys.pop("authentication", None)
125 if authentication:
126 connectors.append("Authentication=%s" % authentication)
127 else:
128 connectors.append("Trusted_Connection=Yes")
129
130 # if set to 'Yes', the ODBC layer will try to automagically
131 # convert textual data from your database encoding to your
132 # client encoding. This should obviously be set to 'No' if
133 # you query a cp1253 encoded database from a latin1 client...
134 if "odbc_autotranslate" in keys:
135 connectors.append(
136 "AutoTranslate=%s" % keys.pop("odbc_autotranslate")
137 )
138
139 connectors.extend(["%s=%s" % (k, v) for k, v in keys.items()])
140
141 return ((";".join(connectors),), connect_args)
142
143 def is_disconnect(
144 self,
145 e: Exception,
146 connection: Optional[
147 Union[pool.PoolProxiedConnection, interfaces.DBAPIConnection]
148 ],
149 cursor: Optional[interfaces.DBAPICursor],
150 ) -> bool:
151 if isinstance(e, self.loaded_dbapi.ProgrammingError):
152 return "The cursor's connection has been closed." in str(
153 e
154 ) or "Attempt to use a closed connection." in str(e)
155 else:
156 return False
157
158 def _dbapi_version(self) -> interfaces.VersionInfoType:
159 if not self.dbapi:
160 return ()
161 return self._parse_dbapi_version(self.dbapi.version)
162
163 def _parse_dbapi_version(self, vers: str) -> interfaces.VersionInfoType:
164 m = re.match(r"(?:py.*-)?([\d\.]+)(?:-(\w+))?", vers)
165 if not m:
166 return ()
167 vers_tuple: interfaces.VersionInfoType = tuple(
168 [int(x) for x in m.group(1).split(".")]
169 )
170 if m.group(2):
171 vers_tuple += (m.group(2),)
172 return vers_tuple
173
174 def _get_server_version_info(
175 self, connection: Connection
176 ) -> interfaces.VersionInfoType:
177 # NOTE: this function is not reliable, particularly when
178 # freetds is in use. Implement database-specific server version
179 # queries.
180 dbapi_con = connection.connection.dbapi_connection
181 version: Tuple[Union[int, str], ...] = ()
182 r = re.compile(r"[.\-]")
183 for n in r.split(dbapi_con.getinfo(self.dbapi.SQL_DBMS_VER)): # type: ignore[union-attr] # noqa: E501
184 try:
185 version += (int(n),)
186 except ValueError:
187 pass
188 return tuple(version)
189
190 def do_set_input_sizes(
191 self,
192 cursor: interfaces.DBAPICursor,
193 list_of_tuples: List[Tuple[str, Any, TypeEngine[Any]]],
194 context: ExecutionContext,
195 ) -> None:
196 # the rules for these types seems a little strange, as you can pass
197 # non-tuples as well as tuples, however it seems to assume "0"
198 # for the subsequent values if you don't pass a tuple which fails
199 # for types such as pyodbc.SQL_WLONGVARCHAR, which is the datatype
200 # that ticket #5649 is targeting.
201
202 # NOTE: as of #6058, this won't be called if the use_setinputsizes
203 # parameter were not passed to the dialect, or if no types were
204 # specified in list_of_tuples
205
206 # as of #8177 for 2.0 we assume use_setinputsizes=True and only
207 # omit the setinputsizes calls for .executemany() with
208 # fast_executemany=True
209
210 if (
211 context.execute_style is interfaces.ExecuteStyle.EXECUTEMANY
212 and self.fast_executemany
213 ):
214 return
215
216 cursor.setinputsizes(
217 [
218 (
219 (dbtype, None, None)
220 if not isinstance(dbtype, tuple)
221 else dbtype
222 )
223 for key, dbtype, sqltype in list_of_tuples
224 ]
225 )
226
227 def get_isolation_level_values(
228 self, dbapi_conn: interfaces.DBAPIConnection
229 ) -> List[IsolationLevel]:
230 return [*super().get_isolation_level_values(dbapi_conn), "AUTOCOMMIT"]
231
232 def set_isolation_level(
233 self,
234 dbapi_connection: interfaces.DBAPIConnection,
235 level: IsolationLevel,
236 ) -> None:
237 # adjust for ConnectionFairy being present
238 # allows attribute set e.g. "connection.autocommit = True"
239 # to work properly
240
241 if level == "AUTOCOMMIT":
242 dbapi_connection.autocommit = True
243 else:
244 dbapi_connection.autocommit = False
245 super().set_isolation_level(dbapi_connection, level)
246
247 def detect_autocommit_setting(
248 self, dbapi_conn: interfaces.DBAPIConnection
249 ) -> bool:
250 return bool(dbapi_conn.autocommit)