1# connectors/aioodbc.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9from __future__ import annotations
10
11from typing import TYPE_CHECKING
12
13from .asyncio import AsyncAdapt_dbapi_connection
14from .asyncio import AsyncAdapt_dbapi_cursor
15from .asyncio import AsyncAdapt_dbapi_ss_cursor
16from .asyncio import AsyncAdaptFallback_dbapi_connection
17from .pyodbc import PyODBCConnector
18from .. import pool
19from .. import util
20from ..util.concurrency import await_fallback
21from ..util.concurrency import await_only
22
23if TYPE_CHECKING:
24 from ..engine.interfaces import ConnectArgsType
25 from ..engine.url import URL
26
27
28class AsyncAdapt_aioodbc_cursor(AsyncAdapt_dbapi_cursor):
29 __slots__ = ()
30
31 def setinputsizes(self, *inputsizes):
32 # see https://github.com/aio-libs/aioodbc/issues/451
33 return self._cursor._impl.setinputsizes(*inputsizes)
34
35 # how it's supposed to work
36 # return self.await_(self._cursor.setinputsizes(*inputsizes))
37
38
39class AsyncAdapt_aioodbc_ss_cursor(
40 AsyncAdapt_aioodbc_cursor, AsyncAdapt_dbapi_ss_cursor
41):
42 __slots__ = ()
43
44
45class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection):
46 _cursor_cls = AsyncAdapt_aioodbc_cursor
47 _ss_cursor_cls = AsyncAdapt_aioodbc_ss_cursor
48 __slots__ = ()
49
50 @property
51 def autocommit(self):
52 return self._connection.autocommit
53
54 @autocommit.setter
55 def autocommit(self, value):
56 # https://github.com/aio-libs/aioodbc/issues/448
57 # self._connection.autocommit = value
58
59 self._connection._conn.autocommit = value
60
61 def cursor(self, server_side=False):
62 # aioodbc sets connection=None when closed and just fails with
63 # AttributeError here. Here we use the same ProgrammingError +
64 # message that pyodbc uses, so it triggers is_disconnect() as well.
65 if self._connection.closed:
66 raise self.dbapi.ProgrammingError(
67 "Attempt to use a closed connection."
68 )
69 return super().cursor(server_side=server_side)
70
71 def rollback(self):
72 # aioodbc sets connection=None when closed and just fails with
73 # AttributeError here. should be a no-op
74 if not self._connection.closed:
75 super().rollback()
76
77 def commit(self):
78 # aioodbc sets connection=None when closed and just fails with
79 # AttributeError here. should be a no-op
80 if not self._connection.closed:
81 super().commit()
82
83 def close(self):
84 # aioodbc sets connection=None when closed and just fails with
85 # AttributeError here. should be a no-op
86 if not self._connection.closed:
87 super().close()
88
89
90class AsyncAdaptFallback_aioodbc_connection(
91 AsyncAdaptFallback_dbapi_connection, AsyncAdapt_aioodbc_connection
92):
93 __slots__ = ()
94
95
96class AsyncAdapt_aioodbc_dbapi:
97 def __init__(self, aioodbc, pyodbc):
98 self.aioodbc = aioodbc
99 self.pyodbc = pyodbc
100 self.paramstyle = pyodbc.paramstyle
101 self._init_dbapi_attributes()
102 self.Cursor = AsyncAdapt_dbapi_cursor
103 self.version = pyodbc.version
104
105 def _init_dbapi_attributes(self):
106 for name in (
107 "Warning",
108 "Error",
109 "InterfaceError",
110 "DataError",
111 "DatabaseError",
112 "OperationalError",
113 "InterfaceError",
114 "IntegrityError",
115 "ProgrammingError",
116 "InternalError",
117 "NotSupportedError",
118 "NUMBER",
119 "STRING",
120 "DATETIME",
121 "BINARY",
122 "Binary",
123 "BinaryNull",
124 "SQL_VARCHAR",
125 "SQL_WVARCHAR",
126 ):
127 setattr(self, name, getattr(self.pyodbc, name))
128
129 def connect(self, *arg, **kw):
130 async_fallback = kw.pop("async_fallback", False)
131 creator_fn = kw.pop("async_creator_fn", self.aioodbc.connect)
132
133 if util.asbool(async_fallback):
134 return AsyncAdaptFallback_aioodbc_connection(
135 self,
136 await_fallback(creator_fn(*arg, **kw)),
137 )
138 else:
139 return AsyncAdapt_aioodbc_connection(
140 self,
141 await_only(creator_fn(*arg, **kw)),
142 )
143
144
145class aiodbcConnector(PyODBCConnector):
146 is_async = True
147 supports_statement_cache = True
148
149 supports_server_side_cursors = True
150
151 @classmethod
152 def import_dbapi(cls):
153 return AsyncAdapt_aioodbc_dbapi(
154 __import__("aioodbc"), __import__("pyodbc")
155 )
156
157 def create_connect_args(self, url: URL) -> ConnectArgsType:
158 arg, kw = super().create_connect_args(url)
159 if arg and arg[0]:
160 kw["dsn"] = arg[0]
161
162 return (), kw
163
164 @classmethod
165 def get_pool_class(cls, url):
166 async_fallback = url.query.get("async_fallback", False)
167
168 if util.asbool(async_fallback):
169 return pool.FallbackAsyncAdaptedQueuePool
170 else:
171 return pool.AsyncAdaptedQueuePool
172
173 def get_driver_connection(self, connection):
174 return connection._connection