1# connectors/asyncio.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9"""generic asyncio-adapted versions of DBAPI connection and cursor"""
10
11from __future__ import annotations
12
13import collections
14
15from ..engine import AdaptedConnection
16from ..util.concurrency import asyncio
17from ..util.concurrency import await_fallback
18from ..util.concurrency import await_only
19
20
21class AsyncAdapt_dbapi_cursor:
22 server_side = False
23 __slots__ = (
24 "_adapt_connection",
25 "_connection",
26 "await_",
27 "_cursor",
28 "_rows",
29 )
30
31 def __init__(self, adapt_connection):
32 self._adapt_connection = adapt_connection
33 self._connection = adapt_connection._connection
34 self.await_ = adapt_connection.await_
35
36 cursor = self._connection.cursor()
37 self._cursor = self._aenter_cursor(cursor)
38
39 if not self.server_side:
40 self._rows = collections.deque()
41
42 def _aenter_cursor(self, cursor):
43 return self.await_(cursor.__aenter__())
44
45 @property
46 def description(self):
47 return self._cursor.description
48
49 @property
50 def rowcount(self):
51 return self._cursor.rowcount
52
53 @property
54 def arraysize(self):
55 return self._cursor.arraysize
56
57 @arraysize.setter
58 def arraysize(self, value):
59 self._cursor.arraysize = value
60
61 @property
62 def lastrowid(self):
63 return self._cursor.lastrowid
64
65 def close(self):
66 # note we aren't actually closing the cursor here,
67 # we are just letting GC do it. see notes in aiomysql dialect
68 self._rows.clear()
69
70 def execute(self, operation, parameters=None):
71 return self.await_(self._execute_async(operation, parameters))
72
73 def executemany(self, operation, seq_of_parameters):
74 return self.await_(
75 self._executemany_async(operation, seq_of_parameters)
76 )
77
78 async def _execute_async(self, operation, parameters):
79 async with self._adapt_connection._execute_mutex:
80 result = await self._cursor.execute(operation, parameters or ())
81
82 if self._cursor.description and not self.server_side:
83 self._rows = collections.deque(await self._cursor.fetchall())
84 return result
85
86 async def _executemany_async(self, operation, seq_of_parameters):
87 async with self._adapt_connection._execute_mutex:
88 return await self._cursor.executemany(operation, seq_of_parameters)
89
90 def nextset(self):
91 self.await_(self._cursor.nextset())
92 if self._cursor.description and not self.server_side:
93 self._rows = collections.deque(
94 self.await_(self._cursor.fetchall())
95 )
96
97 def setinputsizes(self, *inputsizes):
98 # NOTE: this is overrridden in aioodbc due to
99 # see https://github.com/aio-libs/aioodbc/issues/451
100 # right now
101
102 return self.await_(self._cursor.setinputsizes(*inputsizes))
103
104 def __iter__(self):
105 while self._rows:
106 yield self._rows.popleft()
107
108 def fetchone(self):
109 if self._rows:
110 return self._rows.popleft()
111 else:
112 return None
113
114 def fetchmany(self, size=None):
115 if size is None:
116 size = self.arraysize
117 rr = self._rows
118 return [rr.popleft() for _ in range(min(size, len(rr)))]
119
120 def fetchall(self):
121 retval = list(self._rows)
122 self._rows.clear()
123 return retval
124
125
126class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor):
127 __slots__ = ()
128 server_side = True
129
130 def __init__(self, adapt_connection):
131 self._adapt_connection = adapt_connection
132 self._connection = adapt_connection._connection
133 self.await_ = adapt_connection.await_
134
135 cursor = self._connection.cursor()
136
137 self._cursor = self.await_(cursor.__aenter__())
138
139 def close(self):
140 if self._cursor is not None:
141 self.await_(self._cursor.close())
142 self._cursor = None
143
144 def fetchone(self):
145 return self.await_(self._cursor.fetchone())
146
147 def fetchmany(self, size=None):
148 return self.await_(self._cursor.fetchmany(size=size))
149
150 def fetchall(self):
151 return self.await_(self._cursor.fetchall())
152
153 def __iter__(self):
154 iterator = self._cursor.__aiter__()
155 while True:
156 try:
157 yield self.await_(iterator.__anext__())
158 except StopAsyncIteration:
159 break
160
161
162class AsyncAdapt_dbapi_connection(AdaptedConnection):
163 _cursor_cls = AsyncAdapt_dbapi_cursor
164 _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor
165
166 await_ = staticmethod(await_only)
167 __slots__ = ("dbapi", "_execute_mutex")
168
169 def __init__(self, dbapi, connection):
170 self.dbapi = dbapi
171 self._connection = connection
172 self._execute_mutex = asyncio.Lock()
173
174 def ping(self, reconnect):
175 return self.await_(self._connection.ping(reconnect))
176
177 def add_output_converter(self, *arg, **kw):
178 self._connection.add_output_converter(*arg, **kw)
179
180 def character_set_name(self):
181 return self._connection.character_set_name()
182
183 @property
184 def autocommit(self):
185 return self._connection.autocommit
186
187 @autocommit.setter
188 def autocommit(self, value):
189 # https://github.com/aio-libs/aioodbc/issues/448
190 # self._connection.autocommit = value
191
192 self._connection._conn.autocommit = value
193
194 def cursor(self, server_side=False):
195 if server_side:
196 return self._ss_cursor_cls(self)
197 else:
198 return self._cursor_cls(self)
199
200 def rollback(self):
201 self.await_(self._connection.rollback())
202
203 def commit(self):
204 self.await_(self._connection.commit())
205
206 def close(self):
207 self.await_(self._connection.close())
208
209
210class AsyncAdaptFallback_dbapi_connection(AsyncAdapt_dbapi_connection):
211 __slots__ = ()
212
213 await_ = staticmethod(await_fallback)