Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/connectors/aioodbc.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

89 statements  

1# connectors/aioodbc.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9from __future__ import annotations 

10 

11from typing import TYPE_CHECKING 

12 

13from .asyncio import AsyncAdapt_dbapi_connection 

14from .asyncio import AsyncAdapt_dbapi_cursor 

15from .asyncio import AsyncAdapt_dbapi_ss_cursor 

16from .asyncio import AsyncAdaptFallback_dbapi_connection 

17from .pyodbc import PyODBCConnector 

18from .. import pool 

19from .. import util 

20from ..util.concurrency import await_fallback 

21from ..util.concurrency import await_only 

22 

23 

24if TYPE_CHECKING: 

25 from ..engine.interfaces import ConnectArgsType 

26 from ..engine.url import URL 

27 

28 

29class AsyncAdapt_aioodbc_cursor(AsyncAdapt_dbapi_cursor): 

30 __slots__ = () 

31 

32 def setinputsizes(self, *inputsizes): 

33 # see https://github.com/aio-libs/aioodbc/issues/451 

34 return self._cursor._impl.setinputsizes(*inputsizes) 

35 

36 # how it's supposed to work 

37 # return self.await_(self._cursor.setinputsizes(*inputsizes)) 

38 

39 

40class AsyncAdapt_aioodbc_ss_cursor( 

41 AsyncAdapt_aioodbc_cursor, AsyncAdapt_dbapi_ss_cursor 

42): 

43 __slots__ = () 

44 

45 

46class AsyncAdapt_aioodbc_connection(AsyncAdapt_dbapi_connection): 

47 _cursor_cls = AsyncAdapt_aioodbc_cursor 

48 _ss_cursor_cls = AsyncAdapt_aioodbc_ss_cursor 

49 __slots__ = () 

50 

51 @property 

52 def autocommit(self): 

53 return self._connection.autocommit 

54 

55 @autocommit.setter 

56 def autocommit(self, value): 

57 # https://github.com/aio-libs/aioodbc/issues/448 

58 # self._connection.autocommit = value 

59 

60 self._connection._conn.autocommit = value 

61 

62 def ping(self, reconnect): 

63 return self.await_(self._connection.ping(reconnect)) 

64 

65 def add_output_converter(self, *arg, **kw): 

66 self._connection.add_output_converter(*arg, **kw) 

67 

68 def character_set_name(self): 

69 return self._connection.character_set_name() 

70 

71 def cursor(self, server_side=False): 

72 # aioodbc sets connection=None when closed and just fails with 

73 # AttributeError here. Here we use the same ProgrammingError + 

74 # message that pyodbc uses, so it triggers is_disconnect() as well. 

75 if self._connection.closed: 

76 raise self.dbapi.ProgrammingError( 

77 "Attempt to use a closed connection." 

78 ) 

79 return super().cursor(server_side=server_side) 

80 

81 def rollback(self): 

82 # aioodbc sets connection=None when closed and just fails with 

83 # AttributeError here. should be a no-op 

84 if not self._connection.closed: 

85 super().rollback() 

86 

87 def commit(self): 

88 # aioodbc sets connection=None when closed and just fails with 

89 # AttributeError here. should be a no-op 

90 if not self._connection.closed: 

91 super().commit() 

92 

93 def close(self): 

94 # aioodbc sets connection=None when closed and just fails with 

95 # AttributeError here. should be a no-op 

96 if not self._connection.closed: 

97 super().close() 

98 

99 

100class AsyncAdaptFallback_aioodbc_connection( 

101 AsyncAdaptFallback_dbapi_connection, AsyncAdapt_aioodbc_connection 

102): 

103 __slots__ = () 

104 

105 

106class AsyncAdapt_aioodbc_dbapi: 

107 def __init__(self, aioodbc, pyodbc): 

108 self.aioodbc = aioodbc 

109 self.pyodbc = pyodbc 

110 self.paramstyle = pyodbc.paramstyle 

111 self._init_dbapi_attributes() 

112 self.Cursor = AsyncAdapt_dbapi_cursor 

113 self.version = pyodbc.version 

114 

115 def _init_dbapi_attributes(self): 

116 for name in ( 

117 "Warning", 

118 "Error", 

119 "InterfaceError", 

120 "DataError", 

121 "DatabaseError", 

122 "OperationalError", 

123 "InterfaceError", 

124 "IntegrityError", 

125 "ProgrammingError", 

126 "InternalError", 

127 "NotSupportedError", 

128 "NUMBER", 

129 "STRING", 

130 "DATETIME", 

131 "BINARY", 

132 "Binary", 

133 "BinaryNull", 

134 "SQL_VARCHAR", 

135 "SQL_WVARCHAR", 

136 ): 

137 setattr(self, name, getattr(self.pyodbc, name)) 

138 

139 def connect(self, *arg, **kw): 

140 async_fallback = kw.pop("async_fallback", False) 

141 creator_fn = kw.pop("async_creator_fn", self.aioodbc.connect) 

142 

143 if util.asbool(async_fallback): 

144 return AsyncAdaptFallback_aioodbc_connection( 

145 self, 

146 await_fallback(creator_fn(*arg, **kw)), 

147 ) 

148 else: 

149 return AsyncAdapt_aioodbc_connection( 

150 self, 

151 await_only(creator_fn(*arg, **kw)), 

152 ) 

153 

154 

155class aiodbcConnector(PyODBCConnector): 

156 is_async = True 

157 supports_statement_cache = True 

158 

159 supports_server_side_cursors = True 

160 

161 @classmethod 

162 def import_dbapi(cls): 

163 return AsyncAdapt_aioodbc_dbapi( 

164 __import__("aioodbc"), __import__("pyodbc") 

165 ) 

166 

167 def create_connect_args(self, url: URL) -> ConnectArgsType: 

168 arg, kw = super().create_connect_args(url) 

169 if arg and arg[0]: 

170 kw["dsn"] = arg[0] 

171 

172 return (), kw 

173 

174 @classmethod 

175 def get_pool_class(cls, url): 

176 async_fallback = url.query.get("async_fallback", False) 

177 

178 if util.asbool(async_fallback): 

179 return pool.FallbackAsyncAdaptedQueuePool 

180 else: 

181 return pool.AsyncAdaptedQueuePool 

182 

183 def get_driver_connection(self, connection): 

184 return connection._connection