Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/connectors/aioodbc.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

95 statements  

1# connectors/aioodbc.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9from __future__ import annotations 

10 

11from typing import TYPE_CHECKING 

12 

13from .asyncio import AsyncAdapt_dbapi_connection 

14from .asyncio import AsyncAdapt_dbapi_cursor 

15from .asyncio import AsyncAdapt_dbapi_ss_cursor 

16from .asyncio import AsyncAdaptFallback_dbapi_connection 

17from .pyodbc import PyODBCConnector 

18from .. import pool 

19from .. import util 

20from ..util.concurrency import await_fallback 

21from ..util.concurrency import await_only 

22 

23 

24if TYPE_CHECKING: 

25 from ..engine.interfaces import ConnectArgsType 

26 from ..engine.url import URL 

27 

28 

29class AsyncAdapt_aioodbc_cursor(AsyncAdapt_dbapi_cursor): 

30 __slots__ = () 

31 

32 def setinputsizes(self, *inputsizes): 

33 # see https://github.com/aio-libs/aioodbc/issues/451 

34 return self._cursor._impl.setinputsizes(*inputsizes) 

35 

36 # how it's supposed to work 

37 # return self.await_(self._cursor.setinputsizes(*inputsizes)) 

38 

39 @property 

40 def fast_executemany(self): 

41 return self._cursor._impl.fast_executemany 

42 

43 @fast_executemany.setter 

44 def fast_executemany(self, value): 

45 self._cursor._impl.fast_executemany = value 

46 

47 

48class AsyncAdapt_aioodbc_ss_cursor( 

49 AsyncAdapt_aioodbc_cursor, AsyncAdapt_dbapi_ss_cursor 

50): 

51 __slots__ = () 

52 

53 

54class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection): 

55 _cursor_cls = AsyncAdapt_aioodbc_cursor 

56 _ss_cursor_cls = AsyncAdapt_aioodbc_ss_cursor 

57 __slots__ = () 

58 

59 @property 

60 def autocommit(self): 

61 return self._connection.autocommit 

62 

63 @autocommit.setter 

64 def autocommit(self, value): 

65 # https://github.com/aio-libs/aioodbc/issues/448 

66 # self._connection.autocommit = value 

67 

68 self._connection._conn.autocommit = value 

69 

70 def ping(self, reconnect): 

71 return self.await_(self._connection.ping(reconnect)) 

72 

73 def add_output_converter(self, *arg, **kw): 

74 self._connection.add_output_converter(*arg, **kw) 

75 

76 def character_set_name(self): 

77 return self._connection.character_set_name() 

78 

79 def cursor(self, server_side=False): 

80 # aioodbc sets connection=None when closed and just fails with 

81 # AttributeError here. Here we use the same ProgrammingError + 

82 # message that pyodbc uses, so it triggers is_disconnect() as well. 

83 if self._connection.closed: 

84 raise self.dbapi.ProgrammingError( 

85 "Attempt to use a closed connection." 

86 ) 

87 return super().cursor(server_side=server_side) 

88 

89 def rollback(self): 

90 # aioodbc sets connection=None when closed and just fails with 

91 # AttributeError here. should be a no-op 

92 if not self._connection.closed: 

93 super().rollback() 

94 

95 def commit(self): 

96 # aioodbc sets connection=None when closed and just fails with 

97 # AttributeError here. should be a no-op 

98 if not self._connection.closed: 

99 super().commit() 

100 

101 def close(self): 

102 # aioodbc sets connection=None when closed and just fails with 

103 # AttributeError here. should be a no-op 

104 if not self._connection.closed: 

105 super().close() 

106 

107 

108class AsyncAdaptFallback_aioodbc_connection( 

109 AsyncAdaptFallback_dbapi_connection, AsyncAdapt_aioodbc_connection 

110): 

111 __slots__ = () 

112 

113 

114class AsyncAdapt_aioodbc_dbapi: 

115 def __init__(self, aioodbc, pyodbc): 

116 self.aioodbc = aioodbc 

117 self.pyodbc = pyodbc 

118 self.paramstyle = pyodbc.paramstyle 

119 self._init_dbapi_attributes() 

120 self.Cursor = AsyncAdapt_dbapi_cursor 

121 self.version = pyodbc.version 

122 

123 def _init_dbapi_attributes(self): 

124 for name in ( 

125 "Warning", 

126 "Error", 

127 "InterfaceError", 

128 "DataError", 

129 "DatabaseError", 

130 "OperationalError", 

131 "InterfaceError", 

132 "IntegrityError", 

133 "ProgrammingError", 

134 "InternalError", 

135 "NotSupportedError", 

136 "SQL_DRIVER_NAME", 

137 "NUMBER", 

138 "STRING", 

139 "DATETIME", 

140 "BINARY", 

141 "Binary", 

142 "BinaryNull", 

143 "SQL_VARCHAR", 

144 "SQL_WVARCHAR", 

145 "SQL_DECIMAL", 

146 ): 

147 setattr(self, name, getattr(self.pyodbc, name)) 

148 

149 def connect(self, *arg, **kw): 

150 async_fallback = kw.pop("async_fallback", False) 

151 creator_fn = kw.pop("async_creator_fn", self.aioodbc.connect) 

152 

153 if util.asbool(async_fallback): 

154 return AsyncAdaptFallback_aioodbc_connection( 

155 self, 

156 await_fallback(creator_fn(*arg, **kw)), 

157 ) 

158 else: 

159 return AsyncAdapt_aioodbc_connection( 

160 self, 

161 await_only(creator_fn(*arg, **kw)), 

162 ) 

163 

164 

165class aiodbcConnector(PyODBCConnector): 

166 is_async = True 

167 supports_statement_cache = True 

168 

169 supports_server_side_cursors = True 

170 

171 @classmethod 

172 def import_dbapi(cls): 

173 return AsyncAdapt_aioodbc_dbapi( 

174 __import__("aioodbc"), __import__("pyodbc") 

175 ) 

176 

177 def create_connect_args(self, url: URL) -> ConnectArgsType: 

178 arg, kw = super().create_connect_args(url) 

179 if arg and arg[0]: 

180 kw["dsn"] = arg[0] 

181 

182 return (), kw 

183 

184 @classmethod 

185 def get_pool_class(cls, url): 

186 async_fallback = url.query.get("async_fallback", False) 

187 

188 if util.asbool(async_fallback): 

189 return pool.FallbackAsyncAdaptedQueuePool 

190 else: 

191 return pool.AsyncAdaptedQueuePool 

192 

193 def get_driver_connection(self, connection): 

194 return connection._connection