1# connectors/aioodbc.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9from __future__ import annotations
10
11from typing import TYPE_CHECKING
12
13from .asyncio import AsyncAdapt_dbapi_connection
14from .asyncio import AsyncAdapt_dbapi_cursor
15from .asyncio import AsyncAdapt_dbapi_ss_cursor
16from .asyncio import AsyncAdaptFallback_dbapi_connection
17from .pyodbc import PyODBCConnector
18from .. import pool
19from .. import util
20from ..util.concurrency import await_fallback
21from ..util.concurrency import await_only
22
23
24if TYPE_CHECKING:
25 from ..engine.interfaces import ConnectArgsType
26 from ..engine.url import URL
27
28
29class AsyncAdapt_aioodbc_cursor(AsyncAdapt_dbapi_cursor):
30 __slots__ = ()
31
32 def setinputsizes(self, *inputsizes):
33 # see https://github.com/aio-libs/aioodbc/issues/451
34 return self._cursor._impl.setinputsizes(*inputsizes)
35
36 # how it's supposed to work
37 # return self.await_(self._cursor.setinputsizes(*inputsizes))
38
39 @property
40 def fast_executemany(self):
41 return self._cursor._impl.fast_executemany
42
43 @fast_executemany.setter
44 def fast_executemany(self, value):
45 self._cursor._impl.fast_executemany = value
46
47
48class AsyncAdapt_aioodbc_ss_cursor(
49 AsyncAdapt_aioodbc_cursor, AsyncAdapt_dbapi_ss_cursor
50):
51 __slots__ = ()
52
53
54class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection):
55 _cursor_cls = AsyncAdapt_aioodbc_cursor
56 _ss_cursor_cls = AsyncAdapt_aioodbc_ss_cursor
57 __slots__ = ()
58
59 @property
60 def autocommit(self):
61 return self._connection.autocommit
62
63 @autocommit.setter
64 def autocommit(self, value):
65 # https://github.com/aio-libs/aioodbc/issues/448
66 # self._connection.autocommit = value
67
68 self._connection._conn.autocommit = value
69
70 def ping(self, reconnect):
71 return self.await_(self._connection.ping(reconnect))
72
73 def add_output_converter(self, *arg, **kw):
74 self._connection.add_output_converter(*arg, **kw)
75
76 def character_set_name(self):
77 return self._connection.character_set_name()
78
79 def cursor(self, server_side=False):
80 # aioodbc sets connection=None when closed and just fails with
81 # AttributeError here. Here we use the same ProgrammingError +
82 # message that pyodbc uses, so it triggers is_disconnect() as well.
83 if self._connection.closed:
84 raise self.dbapi.ProgrammingError(
85 "Attempt to use a closed connection."
86 )
87 return super().cursor(server_side=server_side)
88
89 def rollback(self):
90 # aioodbc sets connection=None when closed and just fails with
91 # AttributeError here. should be a no-op
92 if not self._connection.closed:
93 super().rollback()
94
95 def commit(self):
96 # aioodbc sets connection=None when closed and just fails with
97 # AttributeError here. should be a no-op
98 if not self._connection.closed:
99 super().commit()
100
101 def close(self):
102 # aioodbc sets connection=None when closed and just fails with
103 # AttributeError here. should be a no-op
104 if not self._connection.closed:
105 super().close()
106
107
108class AsyncAdaptFallback_aioodbc_connection(
109 AsyncAdaptFallback_dbapi_connection, AsyncAdapt_aioodbc_connection
110):
111 __slots__ = ()
112
113
114class AsyncAdapt_aioodbc_dbapi:
115 def __init__(self, aioodbc, pyodbc):
116 self.aioodbc = aioodbc
117 self.pyodbc = pyodbc
118 self.paramstyle = pyodbc.paramstyle
119 self._init_dbapi_attributes()
120 self.Cursor = AsyncAdapt_dbapi_cursor
121 self.version = pyodbc.version
122
123 def _init_dbapi_attributes(self):
124 for name in (
125 "Warning",
126 "Error",
127 "InterfaceError",
128 "DataError",
129 "DatabaseError",
130 "OperationalError",
131 "InterfaceError",
132 "IntegrityError",
133 "ProgrammingError",
134 "InternalError",
135 "NotSupportedError",
136 "SQL_DRIVER_NAME",
137 "NUMBER",
138 "STRING",
139 "DATETIME",
140 "BINARY",
141 "Binary",
142 "BinaryNull",
143 "SQL_VARCHAR",
144 "SQL_WVARCHAR",
145 "SQL_DECIMAL",
146 ):
147 setattr(self, name, getattr(self.pyodbc, name))
148
149 def connect(self, *arg, **kw):
150 async_fallback = kw.pop("async_fallback", False)
151 creator_fn = kw.pop("async_creator_fn", self.aioodbc.connect)
152
153 if util.asbool(async_fallback):
154 return AsyncAdaptFallback_aioodbc_connection(
155 self,
156 await_fallback(creator_fn(*arg, **kw)),
157 )
158 else:
159 return AsyncAdapt_aioodbc_connection(
160 self,
161 await_only(creator_fn(*arg, **kw)),
162 )
163
164
165class aiodbcConnector(PyODBCConnector):
166 is_async = True
167 supports_statement_cache = True
168
169 supports_server_side_cursors = True
170
171 @classmethod
172 def import_dbapi(cls):
173 return AsyncAdapt_aioodbc_dbapi(
174 __import__("aioodbc"), __import__("pyodbc")
175 )
176
177 def create_connect_args(self, url: URL) -> ConnectArgsType:
178 arg, kw = super().create_connect_args(url)
179 if arg and arg[0]:
180 kw["dsn"] = arg[0]
181
182 return (), kw
183
184 @classmethod
185 def get_pool_class(cls, url):
186 async_fallback = url.query.get("async_fallback", False)
187
188 if util.asbool(async_fallback):
189 return pool.FallbackAsyncAdaptedQueuePool
190 else:
191 return pool.AsyncAdaptedQueuePool
192
193 def get_driver_connection(self, connection):
194 return connection._connection