1# connectors/aioodbc.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9from __future__ import annotations
10
11from typing import TYPE_CHECKING
12
13from .asyncio import AsyncAdapt_dbapi_connection
14from .asyncio import AsyncAdapt_dbapi_cursor
15from .asyncio import AsyncAdapt_dbapi_ss_cursor
16from .asyncio import AsyncAdaptFallback_dbapi_connection
17from .pyodbc import PyODBCConnector
18from .. import pool
19from .. import util
20from ..util.concurrency import await_fallback
21from ..util.concurrency import await_only
22
23if TYPE_CHECKING:
24 from ..engine.interfaces import ConnectArgsType
25 from ..engine.url import URL
26
27
28class AsyncAdapt_aioodbc_cursor(AsyncAdapt_dbapi_cursor):
29 __slots__ = ()
30
31 def setinputsizes(self, *inputsizes):
32 # see https://github.com/aio-libs/aioodbc/issues/451
33 return self._cursor._impl.setinputsizes(*inputsizes)
34
35 # how it's supposed to work
36 # return self.await_(self._cursor.setinputsizes(*inputsizes))
37
38 @property
39 def fast_executemany(self):
40 return self._cursor._impl.fast_executemany
41
42 @fast_executemany.setter
43 def fast_executemany(self, value):
44 self._cursor._impl.fast_executemany = value
45
46
47class AsyncAdapt_aioodbc_ss_cursor(
48 AsyncAdapt_aioodbc_cursor, AsyncAdapt_dbapi_ss_cursor
49):
50 __slots__ = ()
51
52
53class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection):
54 _cursor_cls = AsyncAdapt_aioodbc_cursor
55 _ss_cursor_cls = AsyncAdapt_aioodbc_ss_cursor
56 __slots__ = ()
57
58 @property
59 def autocommit(self):
60 return self._connection.autocommit
61
62 @autocommit.setter
63 def autocommit(self, value):
64 # https://github.com/aio-libs/aioodbc/issues/448
65 # self._connection.autocommit = value
66
67 self._connection._conn.autocommit = value
68
69 def ping(self, reconnect):
70 return self.await_(self._connection.ping(reconnect))
71
72 def add_output_converter(self, *arg, **kw):
73 self._connection.add_output_converter(*arg, **kw)
74
75 def character_set_name(self):
76 return self._connection.character_set_name()
77
78 def cursor(self, server_side=False):
79 # aioodbc sets connection=None when closed and just fails with
80 # AttributeError here. Here we use the same ProgrammingError +
81 # message that pyodbc uses, so it triggers is_disconnect() as well.
82 if self._connection.closed:
83 raise self.dbapi.ProgrammingError(
84 "Attempt to use a closed connection."
85 )
86 return super().cursor(server_side=server_side)
87
88 def rollback(self):
89 # aioodbc sets connection=None when closed and just fails with
90 # AttributeError here. should be a no-op
91 if not self._connection.closed:
92 super().rollback()
93
94 def commit(self):
95 # aioodbc sets connection=None when closed and just fails with
96 # AttributeError here. should be a no-op
97 if not self._connection.closed:
98 super().commit()
99
100 def close(self):
101 # aioodbc sets connection=None when closed and just fails with
102 # AttributeError here. should be a no-op
103 if not self._connection.closed:
104 super().close()
105
106
107class AsyncAdaptFallback_aioodbc_connection(
108 AsyncAdaptFallback_dbapi_connection, AsyncAdapt_aioodbc_connection
109):
110 __slots__ = ()
111
112
113class AsyncAdapt_aioodbc_dbapi:
114 def __init__(self, aioodbc, pyodbc):
115 self.aioodbc = aioodbc
116 self.pyodbc = pyodbc
117 self.paramstyle = pyodbc.paramstyle
118 self._init_dbapi_attributes()
119 self.Cursor = AsyncAdapt_dbapi_cursor
120 self.version = pyodbc.version
121
122 def _init_dbapi_attributes(self):
123 for name in (
124 "Warning",
125 "Error",
126 "InterfaceError",
127 "DataError",
128 "DatabaseError",
129 "OperationalError",
130 "InterfaceError",
131 "IntegrityError",
132 "ProgrammingError",
133 "InternalError",
134 "NotSupportedError",
135 "SQL_DRIVER_NAME",
136 "NUMBER",
137 "STRING",
138 "DATETIME",
139 "BINARY",
140 "Binary",
141 "BinaryNull",
142 "SQL_VARCHAR",
143 "SQL_WVARCHAR",
144 "SQL_DECIMAL",
145 ):
146 setattr(self, name, getattr(self.pyodbc, name))
147
148 def connect(self, *arg, **kw):
149 async_fallback = kw.pop("async_fallback", False)
150 creator_fn = kw.pop("async_creator_fn", self.aioodbc.connect)
151
152 if util.asbool(async_fallback):
153 return AsyncAdaptFallback_aioodbc_connection(
154 self,
155 await_fallback(creator_fn(*arg, **kw)),
156 )
157 else:
158 return AsyncAdapt_aioodbc_connection(
159 self,
160 await_only(creator_fn(*arg, **kw)),
161 )
162
163
164class aiodbcConnector(PyODBCConnector):
165 is_async = True
166 supports_statement_cache = True
167
168 supports_server_side_cursors = True
169
170 @classmethod
171 def import_dbapi(cls):
172 return AsyncAdapt_aioodbc_dbapi(
173 __import__("aioodbc"), __import__("pyodbc")
174 )
175
176 def create_connect_args(self, url: URL) -> ConnectArgsType:
177 arg, kw = super().create_connect_args(url)
178 if arg and arg[0]:
179 kw["dsn"] = arg[0]
180
181 return (), kw
182
183 @classmethod
184 def get_pool_class(cls, url):
185 async_fallback = url.query.get("async_fallback", False)
186
187 if util.asbool(async_fallback):
188 return pool.FallbackAsyncAdaptedQueuePool
189 else:
190 return pool.AsyncAdaptedQueuePool
191
192 def get_driver_connection(self, connection):
193 return connection._connection