Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/connectors/aioodbc.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

95 statements  

1# connectors/aioodbc.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9from __future__ import annotations 

10 

11from typing import TYPE_CHECKING 

12 

13from .asyncio import AsyncAdapt_dbapi_connection 

14from .asyncio import AsyncAdapt_dbapi_cursor 

15from .asyncio import AsyncAdapt_dbapi_ss_cursor 

16from .asyncio import AsyncAdaptFallback_dbapi_connection 

17from .pyodbc import PyODBCConnector 

18from .. import pool 

19from .. import util 

20from ..util.concurrency import await_fallback 

21from ..util.concurrency import await_only 

22 

23if TYPE_CHECKING: 

24 from ..engine.interfaces import ConnectArgsType 

25 from ..engine.url import URL 

26 

27 

28class AsyncAdapt_aioodbc_cursor(AsyncAdapt_dbapi_cursor): 

29 __slots__ = () 

30 

31 def setinputsizes(self, *inputsizes): 

32 # see https://github.com/aio-libs/aioodbc/issues/451 

33 return self._cursor._impl.setinputsizes(*inputsizes) 

34 

35 # how it's supposed to work 

36 # return self.await_(self._cursor.setinputsizes(*inputsizes)) 

37 

38 @property 

39 def fast_executemany(self): 

40 return self._cursor._impl.fast_executemany 

41 

42 @fast_executemany.setter 

43 def fast_executemany(self, value): 

44 self._cursor._impl.fast_executemany = value 

45 

46 

47class AsyncAdapt_aioodbc_ss_cursor( 

48 AsyncAdapt_aioodbc_cursor, AsyncAdapt_dbapi_ss_cursor 

49): 

50 __slots__ = () 

51 

52 

53class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection): 

54 _cursor_cls = AsyncAdapt_aioodbc_cursor 

55 _ss_cursor_cls = AsyncAdapt_aioodbc_ss_cursor 

56 __slots__ = () 

57 

58 @property 

59 def autocommit(self): 

60 return self._connection.autocommit 

61 

62 @autocommit.setter 

63 def autocommit(self, value): 

64 # https://github.com/aio-libs/aioodbc/issues/448 

65 # self._connection.autocommit = value 

66 

67 self._connection._conn.autocommit = value 

68 

69 def ping(self, reconnect): 

70 return self.await_(self._connection.ping(reconnect)) 

71 

72 def add_output_converter(self, *arg, **kw): 

73 self._connection.add_output_converter(*arg, **kw) 

74 

75 def character_set_name(self): 

76 return self._connection.character_set_name() 

77 

78 def cursor(self, server_side=False): 

79 # aioodbc sets connection=None when closed and just fails with 

80 # AttributeError here. Here we use the same ProgrammingError + 

81 # message that pyodbc uses, so it triggers is_disconnect() as well. 

82 if self._connection.closed: 

83 raise self.dbapi.ProgrammingError( 

84 "Attempt to use a closed connection." 

85 ) 

86 return super().cursor(server_side=server_side) 

87 

88 def rollback(self): 

89 # aioodbc sets connection=None when closed and just fails with 

90 # AttributeError here. should be a no-op 

91 if not self._connection.closed: 

92 super().rollback() 

93 

94 def commit(self): 

95 # aioodbc sets connection=None when closed and just fails with 

96 # AttributeError here. should be a no-op 

97 if not self._connection.closed: 

98 super().commit() 

99 

100 def close(self): 

101 # aioodbc sets connection=None when closed and just fails with 

102 # AttributeError here. should be a no-op 

103 if not self._connection.closed: 

104 super().close() 

105 

106 

107class AsyncAdaptFallback_aioodbc_connection( 

108 AsyncAdaptFallback_dbapi_connection, AsyncAdapt_aioodbc_connection 

109): 

110 __slots__ = () 

111 

112 

113class AsyncAdapt_aioodbc_dbapi: 

114 def __init__(self, aioodbc, pyodbc): 

115 self.aioodbc = aioodbc 

116 self.pyodbc = pyodbc 

117 self.paramstyle = pyodbc.paramstyle 

118 self._init_dbapi_attributes() 

119 self.Cursor = AsyncAdapt_dbapi_cursor 

120 self.version = pyodbc.version 

121 

122 def _init_dbapi_attributes(self): 

123 for name in ( 

124 "Warning", 

125 "Error", 

126 "InterfaceError", 

127 "DataError", 

128 "DatabaseError", 

129 "OperationalError", 

130 "InterfaceError", 

131 "IntegrityError", 

132 "ProgrammingError", 

133 "InternalError", 

134 "NotSupportedError", 

135 "SQL_DRIVER_NAME", 

136 "NUMBER", 

137 "STRING", 

138 "DATETIME", 

139 "BINARY", 

140 "Binary", 

141 "BinaryNull", 

142 "SQL_VARCHAR", 

143 "SQL_WVARCHAR", 

144 "SQL_DECIMAL", 

145 ): 

146 setattr(self, name, getattr(self.pyodbc, name)) 

147 

148 def connect(self, *arg, **kw): 

149 async_fallback = kw.pop("async_fallback", False) 

150 creator_fn = kw.pop("async_creator_fn", self.aioodbc.connect) 

151 

152 if util.asbool(async_fallback): 

153 return AsyncAdaptFallback_aioodbc_connection( 

154 self, 

155 await_fallback(creator_fn(*arg, **kw)), 

156 ) 

157 else: 

158 return AsyncAdapt_aioodbc_connection( 

159 self, 

160 await_only(creator_fn(*arg, **kw)), 

161 ) 

162 

163 

164class aiodbcConnector(PyODBCConnector): 

165 is_async = True 

166 supports_statement_cache = True 

167 

168 supports_server_side_cursors = True 

169 

170 @classmethod 

171 def import_dbapi(cls): 

172 return AsyncAdapt_aioodbc_dbapi( 

173 __import__("aioodbc"), __import__("pyodbc") 

174 ) 

175 

176 def create_connect_args(self, url: URL) -> ConnectArgsType: 

177 arg, kw = super().create_connect_args(url) 

178 if arg and arg[0]: 

179 kw["dsn"] = arg[0] 

180 

181 return (), kw 

182 

183 @classmethod 

184 def get_pool_class(cls, url): 

185 async_fallback = url.query.get("async_fallback", False) 

186 

187 if util.asbool(async_fallback): 

188 return pool.FallbackAsyncAdaptedQueuePool 

189 else: 

190 return pool.AsyncAdaptedQueuePool 

191 

192 def get_driver_connection(self, connection): 

193 return connection._connection