Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/connectors/asyncio.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

133 statements  

1# connectors/asyncio.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9"""generic asyncio-adapted versions of DBAPI connection and cursor""" 

10 

11from __future__ import annotations 

12 

13import collections 

14 

15from ..engine import AdaptedConnection 

16from ..util.concurrency import asyncio 

17from ..util.concurrency import await_fallback 

18from ..util.concurrency import await_only 

19 

20 

21class AsyncAdapt_dbapi_cursor: 

22 server_side = False 

23 __slots__ = ( 

24 "_adapt_connection", 

25 "_connection", 

26 "await_", 

27 "_cursor", 

28 "_rows", 

29 ) 

30 

31 def __init__(self, adapt_connection): 

32 self._adapt_connection = adapt_connection 

33 self._connection = adapt_connection._connection 

34 self.await_ = adapt_connection.await_ 

35 

36 cursor = self._connection.cursor() 

37 self._cursor = self._aenter_cursor(cursor) 

38 

39 if not self.server_side: 

40 self._rows = collections.deque() 

41 

42 def _aenter_cursor(self, cursor): 

43 return self.await_(cursor.__aenter__()) 

44 

45 @property 

46 def description(self): 

47 return self._cursor.description 

48 

49 @property 

50 def rowcount(self): 

51 return self._cursor.rowcount 

52 

53 @property 

54 def arraysize(self): 

55 return self._cursor.arraysize 

56 

57 @arraysize.setter 

58 def arraysize(self, value): 

59 self._cursor.arraysize = value 

60 

61 @property 

62 def lastrowid(self): 

63 return self._cursor.lastrowid 

64 

65 def close(self): 

66 # note we aren't actually closing the cursor here, 

67 # we are just letting GC do it. see notes in aiomysql dialect 

68 self._rows.clear() 

69 

70 def execute(self, operation, parameters=None): 

71 return self.await_(self._execute_async(operation, parameters)) 

72 

73 def executemany(self, operation, seq_of_parameters): 

74 return self.await_( 

75 self._executemany_async(operation, seq_of_parameters) 

76 ) 

77 

78 async def _execute_async(self, operation, parameters): 

79 async with self._adapt_connection._execute_mutex: 

80 result = await self._cursor.execute(operation, parameters or ()) 

81 

82 if self._cursor.description and not self.server_side: 

83 self._rows = collections.deque(await self._cursor.fetchall()) 

84 return result 

85 

86 async def _executemany_async(self, operation, seq_of_parameters): 

87 async with self._adapt_connection._execute_mutex: 

88 return await self._cursor.executemany(operation, seq_of_parameters) 

89 

90 def nextset(self): 

91 self.await_(self._cursor.nextset()) 

92 if self._cursor.description and not self.server_side: 

93 self._rows = collections.deque( 

94 self.await_(self._cursor.fetchall()) 

95 ) 

96 

97 def setinputsizes(self, *inputsizes): 

98 # NOTE: this is overrridden in aioodbc due to 

99 # see https://github.com/aio-libs/aioodbc/issues/451 

100 # right now 

101 

102 return self.await_(self._cursor.setinputsizes(*inputsizes)) 

103 

104 def __iter__(self): 

105 while self._rows: 

106 yield self._rows.popleft() 

107 

108 def fetchone(self): 

109 if self._rows: 

110 return self._rows.popleft() 

111 else: 

112 return None 

113 

114 def fetchmany(self, size=None): 

115 if size is None: 

116 size = self.arraysize 

117 rr = self._rows 

118 return [rr.popleft() for _ in range(min(size, len(rr)))] 

119 

120 def fetchall(self): 

121 retval = list(self._rows) 

122 self._rows.clear() 

123 return retval 

124 

125 

126class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor): 

127 __slots__ = () 

128 server_side = True 

129 

130 def __init__(self, adapt_connection): 

131 self._adapt_connection = adapt_connection 

132 self._connection = adapt_connection._connection 

133 self.await_ = adapt_connection.await_ 

134 

135 cursor = self._connection.cursor() 

136 

137 self._cursor = self.await_(cursor.__aenter__()) 

138 

139 def close(self): 

140 if self._cursor is not None: 

141 self.await_(self._cursor.close()) 

142 self._cursor = None 

143 

144 def fetchone(self): 

145 return self.await_(self._cursor.fetchone()) 

146 

147 def fetchmany(self, size=None): 

148 return self.await_(self._cursor.fetchmany(size=size)) 

149 

150 def fetchall(self): 

151 return self.await_(self._cursor.fetchall()) 

152 

153 def __iter__(self): 

154 iterator = self._cursor.__aiter__() 

155 while True: 

156 try: 

157 yield self.await_(iterator.__anext__()) 

158 except StopAsyncIteration: 

159 break 

160 

161 

162class AsyncAdapt_dbapi_connection(AdaptedConnection): 

163 _cursor_cls = AsyncAdapt_dbapi_cursor 

164 _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor 

165 

166 await_ = staticmethod(await_only) 

167 __slots__ = ("dbapi", "_execute_mutex") 

168 

169 def __init__(self, dbapi, connection): 

170 self.dbapi = dbapi 

171 self._connection = connection 

172 self._execute_mutex = asyncio.Lock() 

173 

174 def ping(self, reconnect): 

175 return self.await_(self._connection.ping(reconnect)) 

176 

177 def add_output_converter(self, *arg, **kw): 

178 self._connection.add_output_converter(*arg, **kw) 

179 

180 def character_set_name(self): 

181 return self._connection.character_set_name() 

182 

183 @property 

184 def autocommit(self): 

185 return self._connection.autocommit 

186 

187 @autocommit.setter 

188 def autocommit(self, value): 

189 # https://github.com/aio-libs/aioodbc/issues/448 

190 # self._connection.autocommit = value 

191 

192 self._connection._conn.autocommit = value 

193 

194 def cursor(self, server_side=False): 

195 if server_side: 

196 return self._ss_cursor_cls(self) 

197 else: 

198 return self._cursor_cls(self) 

199 

200 def rollback(self): 

201 self.await_(self._connection.rollback()) 

202 

203 def commit(self): 

204 self.await_(self._connection.commit()) 

205 

206 def close(self): 

207 self.await_(self._connection.close()) 

208 

209 

210class AsyncAdaptFallback_dbapi_connection(AsyncAdapt_dbapi_connection): 

211 __slots__ = () 

212 

213 await_ = staticmethod(await_fallback)