1# engine/cursor.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Define cursor-specific result set constructs including
10:class:`.CursorResult`."""
11
12
13from __future__ import annotations
14
15import collections
16import functools
17import operator
18import typing
19from typing import Any
20from typing import cast
21from typing import ClassVar
22from typing import Dict
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Mapping
27from typing import NoReturn
28from typing import Optional
29from typing import Sequence
30from typing import Tuple
31from typing import TYPE_CHECKING
32from typing import TypeVar
33from typing import Union
34
35from .result import IteratorResult
36from .result import MergedResult
37from .result import Result
38from .result import ResultMetaData
39from .result import SimpleResultMetaData
40from .result import tuplegetter
41from .row import Row
42from .. import exc
43from .. import util
44from ..sql import elements
45from ..sql import sqltypes
46from ..sql import util as sql_util
47from ..sql.base import _generative
48from ..sql.compiler import ResultColumnsEntry
49from ..sql.compiler import RM_NAME
50from ..sql.compiler import RM_OBJECTS
51from ..sql.compiler import RM_RENDERED_NAME
52from ..sql.compiler import RM_TYPE
53from ..sql.type_api import TypeEngine
54from ..util import compat
55from ..util.typing import Literal
56from ..util.typing import Self
57
58
59if typing.TYPE_CHECKING:
60 from .base import Connection
61 from .default import DefaultExecutionContext
62 from .interfaces import _DBAPICursorDescription
63 from .interfaces import DBAPICursor
64 from .interfaces import Dialect
65 from .interfaces import ExecutionContext
66 from .result import _KeyIndexType
67 from .result import _KeyMapRecType
68 from .result import _KeyMapType
69 from .result import _KeyType
70 from .result import _ProcessorsType
71 from .result import _TupleGetterType
72 from ..sql.type_api import _ResultProcessorType
73
74
75_T = TypeVar("_T", bound=Any)
76
77
78# metadata entry tuple indexes.
79# using raw tuple is faster than namedtuple.
80# these match up to the positions in
81# _CursorKeyMapRecType
82MD_INDEX: Literal[0] = 0
83"""integer index in cursor.description
84
85"""
86
87MD_RESULT_MAP_INDEX: Literal[1] = 1
88"""integer index in compiled._result_columns"""
89
90MD_OBJECTS: Literal[2] = 2
91"""other string keys and ColumnElement obj that can match.
92
93This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
94
95"""
96
97MD_LOOKUP_KEY: Literal[3] = 3
98"""string key we usually expect for key-based lookup
99
100this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
101"""
102
103
104MD_RENDERED_NAME: Literal[4] = 4
105"""name that is usually in cursor.description
106
107this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
108"""
109
110
111MD_PROCESSOR: Literal[5] = 5
112"""callable to process a result value into a row"""
113
114MD_UNTRANSLATED: Literal[6] = 6
115"""raw name from cursor.description"""
116
117
118_CursorKeyMapRecType = Tuple[
119 Optional[int], # MD_INDEX, None means the record is ambiguously named
120 int, # MD_RESULT_MAP_INDEX
121 List[Any], # MD_OBJECTS
122 str, # MD_LOOKUP_KEY
123 str, # MD_RENDERED_NAME
124 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
125 Optional[str], # MD_UNTRANSLATED
126]
127
128_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
129
130# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
131# not None
132_NonAmbigCursorKeyMapRecType = Tuple[
133 int,
134 int,
135 List[Any],
136 str,
137 str,
138 Optional["_ResultProcessorType[Any]"],
139 str,
140]
141
142
143class CursorResultMetaData(ResultMetaData):
144 """Result metadata for DBAPI cursors."""
145
146 __slots__ = (
147 "_keymap",
148 "_processors",
149 "_keys",
150 "_keymap_by_result_column_idx",
151 "_tuplefilter",
152 "_translated_indexes",
153 "_safe_for_cache",
154 "_unpickled",
155 "_key_to_index",
156 # don't need _unique_filters support here for now. Can be added
157 # if a need arises.
158 )
159
160 _keymap: _CursorKeyMapType
161 _processors: _ProcessorsType
162 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
163 _unpickled: bool
164 _safe_for_cache: bool
165 _translated_indexes: Optional[List[int]]
166
167 returns_rows: ClassVar[bool] = True
168
169 def _has_key(self, key: Any) -> bool:
170 return key in self._keymap
171
172 def _for_freeze(self) -> ResultMetaData:
173 return SimpleResultMetaData(
174 self._keys,
175 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
176 )
177
178 def _make_new_metadata(
179 self,
180 *,
181 unpickled: bool,
182 processors: _ProcessorsType,
183 keys: Sequence[str],
184 keymap: _KeyMapType,
185 tuplefilter: Optional[_TupleGetterType],
186 translated_indexes: Optional[List[int]],
187 safe_for_cache: bool,
188 keymap_by_result_column_idx: Any,
189 ) -> CursorResultMetaData:
190 new_obj = self.__class__.__new__(self.__class__)
191 new_obj._unpickled = unpickled
192 new_obj._processors = processors
193 new_obj._keys = keys
194 new_obj._keymap = keymap
195 new_obj._tuplefilter = tuplefilter
196 new_obj._translated_indexes = translated_indexes
197 new_obj._safe_for_cache = safe_for_cache
198 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
199 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
200 return new_obj
201
202 def _remove_processors(self) -> CursorResultMetaData:
203 assert not self._tuplefilter
204 return self._make_new_metadata(
205 unpickled=self._unpickled,
206 processors=[None] * len(self._processors),
207 tuplefilter=None,
208 translated_indexes=None,
209 keymap={
210 key: value[0:5] + (None,) + value[6:]
211 for key, value in self._keymap.items()
212 },
213 keys=self._keys,
214 safe_for_cache=self._safe_for_cache,
215 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
216 )
217
218 def _splice_horizontally(
219 self, other: CursorResultMetaData
220 ) -> CursorResultMetaData:
221 assert not self._tuplefilter
222
223 keymap = dict(self._keymap)
224 offset = len(self._keys)
225 keymap.update(
226 {
227 key: (
228 # int index should be None for ambiguous key
229 (
230 value[0] + offset
231 if value[0] is not None and key not in keymap
232 else None
233 ),
234 value[1] + offset,
235 *value[2:],
236 )
237 for key, value in other._keymap.items()
238 }
239 )
240 return self._make_new_metadata(
241 unpickled=self._unpickled,
242 processors=self._processors + other._processors, # type: ignore
243 tuplefilter=None,
244 translated_indexes=None,
245 keys=self._keys + other._keys, # type: ignore
246 keymap=keymap,
247 safe_for_cache=self._safe_for_cache,
248 keymap_by_result_column_idx={
249 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
250 for metadata_entry in keymap.values()
251 },
252 )
253
254 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
255 recs = list(self._metadata_for_keys(keys))
256
257 indexes = [rec[MD_INDEX] for rec in recs]
258 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
259
260 if self._translated_indexes:
261 indexes = [self._translated_indexes[idx] for idx in indexes]
262 tup = tuplegetter(*indexes)
263 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
264
265 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
266 # TODO: need unit test for:
267 # result = connection.execute("raw sql, no columns").scalars()
268 # without the "or ()" it's failing because MD_OBJECTS is None
269 keymap.update(
270 (e, new_rec)
271 for new_rec in new_recs
272 for e in new_rec[MD_OBJECTS] or ()
273 )
274
275 return self._make_new_metadata(
276 unpickled=self._unpickled,
277 processors=self._processors,
278 keys=new_keys,
279 tuplefilter=tup,
280 translated_indexes=indexes,
281 keymap=keymap, # type: ignore[arg-type]
282 safe_for_cache=self._safe_for_cache,
283 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
284 )
285
286 def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData:
287 """When using a cached Compiled construct that has a _result_map,
288 for a new statement that used the cached Compiled, we need to ensure
289 the keymap has the Column objects from our new statement as keys.
290 So here we rewrite keymap with new entries for the new columns
291 as matched to those of the cached statement.
292
293 """
294
295 if not context.compiled or not context.compiled._result_columns:
296 return self
297
298 compiled_statement = context.compiled.statement
299 invoked_statement = context.invoked_statement
300
301 if TYPE_CHECKING:
302 assert isinstance(invoked_statement, elements.ClauseElement)
303
304 if compiled_statement is invoked_statement:
305 return self
306
307 assert invoked_statement is not None
308
309 # this is the most common path for Core statements when
310 # caching is used. In ORM use, this codepath is not really used
311 # as the _result_disable_adapt_to_context execution option is
312 # set by the ORM.
313
314 # make a copy and add the columns from the invoked statement
315 # to the result map.
316
317 keymap_by_position = self._keymap_by_result_column_idx
318
319 if keymap_by_position is None:
320 # first retrival from cache, this map will not be set up yet,
321 # initialize lazily
322 keymap_by_position = self._keymap_by_result_column_idx = {
323 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
324 for metadata_entry in self._keymap.values()
325 }
326
327 assert not self._tuplefilter
328 return self._make_new_metadata(
329 keymap=compat.dict_union(
330 self._keymap,
331 {
332 new: keymap_by_position[idx]
333 for idx, new in enumerate(
334 invoked_statement._all_selected_columns
335 )
336 if idx in keymap_by_position
337 },
338 ),
339 unpickled=self._unpickled,
340 processors=self._processors,
341 tuplefilter=None,
342 translated_indexes=None,
343 keys=self._keys,
344 safe_for_cache=self._safe_for_cache,
345 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
346 )
347
348 def __init__(
349 self,
350 parent: CursorResult[Any],
351 cursor_description: _DBAPICursorDescription,
352 ):
353 context = parent.context
354 self._tuplefilter = None
355 self._translated_indexes = None
356 self._safe_for_cache = self._unpickled = False
357
358 if context.result_column_struct:
359 (
360 result_columns,
361 cols_are_ordered,
362 textual_ordered,
363 ad_hoc_textual,
364 loose_column_name_matching,
365 ) = context.result_column_struct
366 num_ctx_cols = len(result_columns)
367 else:
368 result_columns = cols_are_ordered = ( # type: ignore
369 num_ctx_cols
370 ) = ad_hoc_textual = loose_column_name_matching = (
371 textual_ordered
372 ) = False
373
374 # merge cursor.description with the column info
375 # present in the compiled structure, if any
376 raw = self._merge_cursor_description(
377 context,
378 cursor_description,
379 result_columns,
380 num_ctx_cols,
381 cols_are_ordered,
382 textual_ordered,
383 ad_hoc_textual,
384 loose_column_name_matching,
385 )
386
387 # processors in key order which are used when building up
388 # a row
389 self._processors = [
390 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
391 ]
392
393 # this is used when using this ResultMetaData in a Core-only cache
394 # retrieval context. it's initialized on first cache retrieval
395 # when the _result_disable_adapt_to_context execution option
396 # (which the ORM generally sets) is not set.
397 self._keymap_by_result_column_idx = None
398
399 # for compiled SQL constructs, copy additional lookup keys into
400 # the key lookup map, such as Column objects, labels,
401 # column keys and other names
402 if num_ctx_cols:
403 # keymap by primary string...
404 by_key = {
405 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
406 for metadata_entry in raw
407 }
408
409 if len(by_key) != num_ctx_cols:
410 # if by-primary-string dictionary smaller than
411 # number of columns, assume we have dupes; (this check
412 # is also in place if string dictionary is bigger, as
413 # can occur when '*' was used as one of the compiled columns,
414 # which may or may not be suggestive of dupes), rewrite
415 # dupe records with "None" for index which results in
416 # ambiguous column exception when accessed.
417 #
418 # this is considered to be the less common case as it is not
419 # common to have dupe column keys in a SELECT statement.
420 #
421 # new in 1.4: get the complete set of all possible keys,
422 # strings, objects, whatever, that are dupes across two
423 # different records, first.
424 index_by_key: Dict[Any, Any] = {}
425 dupes = set()
426 for metadata_entry in raw:
427 for key in (metadata_entry[MD_RENDERED_NAME],) + (
428 metadata_entry[MD_OBJECTS] or ()
429 ):
430 idx = metadata_entry[MD_INDEX]
431 # if this key has been associated with more than one
432 # positional index, it's a dupe
433 if index_by_key.setdefault(key, idx) != idx:
434 dupes.add(key)
435
436 # then put everything we have into the keymap excluding only
437 # those keys that are dupes.
438 self._keymap = {
439 obj_elem: metadata_entry
440 for metadata_entry in raw
441 if metadata_entry[MD_OBJECTS]
442 for obj_elem in metadata_entry[MD_OBJECTS]
443 if obj_elem not in dupes
444 }
445
446 # then for the dupe keys, put the "ambiguous column"
447 # record into by_key.
448 by_key.update(
449 {
450 key: (None, None, [], key, key, None, None)
451 for key in dupes
452 }
453 )
454
455 else:
456 # no dupes - copy secondary elements from compiled
457 # columns into self._keymap. this is the most common
458 # codepath for Core / ORM statement executions before the
459 # result metadata is cached
460 self._keymap = {
461 obj_elem: metadata_entry
462 for metadata_entry in raw
463 if metadata_entry[MD_OBJECTS]
464 for obj_elem in metadata_entry[MD_OBJECTS]
465 }
466 # update keymap with primary string names taking
467 # precedence
468 self._keymap.update(by_key)
469 else:
470 # no compiled objects to map, just create keymap by primary string
471 self._keymap = {
472 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
473 for metadata_entry in raw
474 }
475
476 # update keymap with "translated" names. In SQLAlchemy this is a
477 # sqlite only thing, and in fact impacting only extremely old SQLite
478 # versions unlikely to be present in modern Python versions.
479 # however, the pyhive third party dialect is
480 # also using this hook, which means others still might use it as well.
481 # I dislike having this awkward hook here but as long as we need
482 # to use names in cursor.description in some cases we need to have
483 # some hook to accomplish this.
484 if not num_ctx_cols and context._translate_colname:
485 self._keymap.update(
486 {
487 metadata_entry[MD_UNTRANSLATED]: self._keymap[
488 metadata_entry[MD_LOOKUP_KEY]
489 ]
490 for metadata_entry in raw
491 if metadata_entry[MD_UNTRANSLATED]
492 }
493 )
494
495 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
496
497 def _merge_cursor_description(
498 self,
499 context,
500 cursor_description,
501 result_columns,
502 num_ctx_cols,
503 cols_are_ordered,
504 textual_ordered,
505 ad_hoc_textual,
506 loose_column_name_matching,
507 ):
508 """Merge a cursor.description with compiled result column information.
509
510 There are at least four separate strategies used here, selected
511 depending on the type of SQL construct used to start with.
512
513 The most common case is that of the compiled SQL expression construct,
514 which generated the column names present in the raw SQL string and
515 which has the identical number of columns as were reported by
516 cursor.description. In this case, we assume a 1-1 positional mapping
517 between the entries in cursor.description and the compiled object.
518 This is also the most performant case as we disregard extracting /
519 decoding the column names present in cursor.description since we
520 already have the desired name we generated in the compiled SQL
521 construct.
522
523 The next common case is that of the completely raw string SQL,
524 such as passed to connection.execute(). In this case we have no
525 compiled construct to work with, so we extract and decode the
526 names from cursor.description and index those as the primary
527 result row target keys.
528
529 The remaining fairly common case is that of the textual SQL
530 that includes at least partial column information; this is when
531 we use a :class:`_expression.TextualSelect` construct.
532 This construct may have
533 unordered or ordered column information. In the ordered case, we
534 merge the cursor.description and the compiled construct's information
535 positionally, and warn if there are additional description names
536 present, however we still decode the names in cursor.description
537 as we don't have a guarantee that the names in the columns match
538 on these. In the unordered case, we match names in cursor.description
539 to that of the compiled construct based on name matching.
540 In both of these cases, the cursor.description names and the column
541 expression objects and names are indexed as result row target keys.
542
543 The final case is much less common, where we have a compiled
544 non-textual SQL expression construct, but the number of columns
545 in cursor.description doesn't match what's in the compiled
546 construct. We make the guess here that there might be textual
547 column expressions in the compiled construct that themselves include
548 a comma in them causing them to split. We do the same name-matching
549 as with textual non-ordered columns.
550
551 The name-matched system of merging is the same as that used by
552 SQLAlchemy for all cases up through the 0.9 series. Positional
553 matching for compiled SQL expressions was introduced in 1.0 as a
554 major performance feature, and positional matching for textual
555 :class:`_expression.TextualSelect` objects in 1.1.
556 As name matching is no longer
557 a common case, it was acceptable to factor it into smaller generator-
558 oriented methods that are easier to understand, but incur slightly
559 more performance overhead.
560
561 """
562
563 if (
564 num_ctx_cols
565 and cols_are_ordered
566 and not textual_ordered
567 and num_ctx_cols == len(cursor_description)
568 ):
569 self._keys = [elem[0] for elem in result_columns]
570 # pure positional 1-1 case; doesn't need to read
571 # the names from cursor.description
572
573 # most common case for Core and ORM
574
575 # this metadata is safe to cache because we are guaranteed
576 # to have the columns in the same order for new executions
577 self._safe_for_cache = True
578 return [
579 (
580 idx,
581 idx,
582 rmap_entry[RM_OBJECTS],
583 rmap_entry[RM_NAME],
584 rmap_entry[RM_RENDERED_NAME],
585 context.get_result_processor(
586 rmap_entry[RM_TYPE],
587 rmap_entry[RM_RENDERED_NAME],
588 cursor_description[idx][1],
589 ),
590 None,
591 )
592 for idx, rmap_entry in enumerate(result_columns)
593 ]
594 else:
595 # name-based or text-positional cases, where we need
596 # to read cursor.description names
597
598 if textual_ordered or (
599 ad_hoc_textual and len(cursor_description) == num_ctx_cols
600 ):
601 self._safe_for_cache = True
602 # textual positional case
603 raw_iterator = self._merge_textual_cols_by_position(
604 context, cursor_description, result_columns
605 )
606 elif num_ctx_cols:
607 # compiled SQL with a mismatch of description cols
608 # vs. compiled cols, or textual w/ unordered columns
609 # the order of columns can change if the query is
610 # against a "select *", so not safe to cache
611 self._safe_for_cache = False
612 raw_iterator = self._merge_cols_by_name(
613 context,
614 cursor_description,
615 result_columns,
616 loose_column_name_matching,
617 )
618 else:
619 # no compiled SQL, just a raw string, order of columns
620 # can change for "select *"
621 self._safe_for_cache = False
622 raw_iterator = self._merge_cols_by_none(
623 context, cursor_description
624 )
625
626 return [
627 (
628 idx,
629 ridx,
630 obj,
631 cursor_colname,
632 cursor_colname,
633 context.get_result_processor(
634 mapped_type, cursor_colname, coltype
635 ),
636 untranslated,
637 )
638 for (
639 idx,
640 ridx,
641 cursor_colname,
642 mapped_type,
643 coltype,
644 obj,
645 untranslated,
646 ) in raw_iterator
647 ]
648
649 def _colnames_from_description(self, context, cursor_description):
650 """Extract column names and data types from a cursor.description.
651
652 Applies unicode decoding, column translation, "normalization",
653 and case sensitivity rules to the names based on the dialect.
654
655 """
656
657 dialect = context.dialect
658 translate_colname = context._translate_colname
659 normalize_name = (
660 dialect.normalize_name if dialect.requires_name_normalize else None
661 )
662 untranslated = None
663
664 self._keys = []
665
666 for idx, rec in enumerate(cursor_description):
667 colname = rec[0]
668 coltype = rec[1]
669
670 if translate_colname:
671 colname, untranslated = translate_colname(colname)
672
673 if normalize_name:
674 colname = normalize_name(colname)
675
676 self._keys.append(colname)
677
678 yield idx, colname, untranslated, coltype
679
680 def _merge_textual_cols_by_position(
681 self, context, cursor_description, result_columns
682 ):
683 num_ctx_cols = len(result_columns)
684
685 if num_ctx_cols > len(cursor_description):
686 util.warn(
687 "Number of columns in textual SQL (%d) is "
688 "smaller than number of columns requested (%d)"
689 % (num_ctx_cols, len(cursor_description))
690 )
691 seen = set()
692
693 for (
694 idx,
695 colname,
696 untranslated,
697 coltype,
698 ) in self._colnames_from_description(context, cursor_description):
699 if idx < num_ctx_cols:
700 ctx_rec = result_columns[idx]
701 obj = ctx_rec[RM_OBJECTS]
702 ridx = idx
703 mapped_type = ctx_rec[RM_TYPE]
704 if obj[0] in seen:
705 raise exc.InvalidRequestError(
706 "Duplicate column expression requested "
707 "in textual SQL: %r" % obj[0]
708 )
709 seen.add(obj[0])
710 else:
711 mapped_type = sqltypes.NULLTYPE
712 obj = None
713 ridx = None
714 yield idx, ridx, colname, mapped_type, coltype, obj, untranslated
715
716 def _merge_cols_by_name(
717 self,
718 context,
719 cursor_description,
720 result_columns,
721 loose_column_name_matching,
722 ):
723 match_map = self._create_description_match_map(
724 result_columns, loose_column_name_matching
725 )
726 mapped_type: TypeEngine[Any]
727
728 for (
729 idx,
730 colname,
731 untranslated,
732 coltype,
733 ) in self._colnames_from_description(context, cursor_description):
734 try:
735 ctx_rec = match_map[colname]
736 except KeyError:
737 mapped_type = sqltypes.NULLTYPE
738 obj = None
739 result_columns_idx = None
740 else:
741 obj = ctx_rec[1]
742 mapped_type = ctx_rec[2]
743 result_columns_idx = ctx_rec[3]
744 yield (
745 idx,
746 result_columns_idx,
747 colname,
748 mapped_type,
749 coltype,
750 obj,
751 untranslated,
752 )
753
754 @classmethod
755 def _create_description_match_map(
756 cls,
757 result_columns: List[ResultColumnsEntry],
758 loose_column_name_matching: bool = False,
759 ) -> Dict[
760 Union[str, object], Tuple[str, Tuple[Any, ...], TypeEngine[Any], int]
761 ]:
762 """when matching cursor.description to a set of names that are present
763 in a Compiled object, as is the case with TextualSelect, get all the
764 names we expect might match those in cursor.description.
765 """
766
767 d: Dict[
768 Union[str, object],
769 Tuple[str, Tuple[Any, ...], TypeEngine[Any], int],
770 ] = {}
771 for ridx, elem in enumerate(result_columns):
772 key = elem[RM_RENDERED_NAME]
773 if key in d:
774 # conflicting keyname - just add the column-linked objects
775 # to the existing record. if there is a duplicate column
776 # name in the cursor description, this will allow all of those
777 # objects to raise an ambiguous column error
778 e_name, e_obj, e_type, e_ridx = d[key]
779 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
780 else:
781 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
782
783 if loose_column_name_matching:
784 # when using a textual statement with an unordered set
785 # of columns that line up, we are expecting the user
786 # to be using label names in the SQL that match to the column
787 # expressions. Enable more liberal matching for this case;
788 # duplicate keys that are ambiguous will be fixed later.
789 for r_key in elem[RM_OBJECTS]:
790 d.setdefault(
791 r_key,
792 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
793 )
794 return d
795
796 def _merge_cols_by_none(self, context, cursor_description):
797 for (
798 idx,
799 colname,
800 untranslated,
801 coltype,
802 ) in self._colnames_from_description(context, cursor_description):
803 yield (
804 idx,
805 None,
806 colname,
807 sqltypes.NULLTYPE,
808 coltype,
809 None,
810 untranslated,
811 )
812
813 if not TYPE_CHECKING:
814
815 def _key_fallback(
816 self, key: Any, err: Optional[Exception], raiseerr: bool = True
817 ) -> Optional[NoReturn]:
818 if raiseerr:
819 if self._unpickled and isinstance(key, elements.ColumnElement):
820 raise exc.NoSuchColumnError(
821 "Row was unpickled; lookup by ColumnElement "
822 "is unsupported"
823 ) from err
824 else:
825 raise exc.NoSuchColumnError(
826 "Could not locate column in row for column '%s'"
827 % util.string_or_unprintable(key)
828 ) from err
829 else:
830 return None
831
832 def _raise_for_ambiguous_column_name(self, rec):
833 raise exc.InvalidRequestError(
834 "Ambiguous column name '%s' in "
835 "result set column descriptions" % rec[MD_LOOKUP_KEY]
836 )
837
838 def _index_for_key(self, key: Any, raiseerr: bool = True) -> Optional[int]:
839 # TODO: can consider pre-loading ints and negative ints
840 # into _keymap - also no coverage here
841 if isinstance(key, int):
842 key = self._keys[key]
843
844 try:
845 rec = self._keymap[key]
846 except KeyError as ke:
847 x = self._key_fallback(key, ke, raiseerr)
848 assert x is None
849 return None
850
851 index = rec[0]
852
853 if index is None:
854 self._raise_for_ambiguous_column_name(rec)
855 return index
856
857 def _indexes_for_keys(self, keys):
858 try:
859 return [self._keymap[key][0] for key in keys]
860 except KeyError as ke:
861 # ensure it raises
862 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
863
864 def _metadata_for_keys(
865 self, keys: Sequence[Any]
866 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
867 for key in keys:
868 if int in key.__class__.__mro__:
869 key = self._keys[key]
870
871 try:
872 rec = self._keymap[key]
873 except KeyError as ke:
874 # ensure it raises
875 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
876
877 index = rec[MD_INDEX]
878
879 if index is None:
880 self._raise_for_ambiguous_column_name(rec)
881
882 yield cast(_NonAmbigCursorKeyMapRecType, rec)
883
884 def __getstate__(self):
885 # TODO: consider serializing this as SimpleResultMetaData
886 return {
887 "_keymap": {
888 key: (
889 rec[MD_INDEX],
890 rec[MD_RESULT_MAP_INDEX],
891 [],
892 key,
893 rec[MD_RENDERED_NAME],
894 None,
895 None,
896 )
897 for key, rec in self._keymap.items()
898 if isinstance(key, (str, int))
899 },
900 "_keys": self._keys,
901 "_translated_indexes": self._translated_indexes,
902 }
903
904 def __setstate__(self, state):
905 self._processors = [None for _ in range(len(state["_keys"]))]
906 self._keymap = state["_keymap"]
907 self._keymap_by_result_column_idx = None
908 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
909 self._keys = state["_keys"]
910 self._unpickled = True
911 if state["_translated_indexes"]:
912 self._translated_indexes = cast(
913 "List[int]", state["_translated_indexes"]
914 )
915 self._tuplefilter = tuplegetter(*self._translated_indexes)
916 else:
917 self._translated_indexes = self._tuplefilter = None
918
919
920class ResultFetchStrategy:
921 """Define a fetching strategy for a result object.
922
923
924 .. versionadded:: 1.4
925
926 """
927
928 __slots__ = ()
929
930 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
931
932 def soft_close(
933 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
934 ) -> None:
935 raise NotImplementedError()
936
937 def hard_close(
938 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
939 ) -> None:
940 raise NotImplementedError()
941
942 def yield_per(
943 self,
944 result: CursorResult[Any],
945 dbapi_cursor: Optional[DBAPICursor],
946 num: int,
947 ) -> None:
948 return
949
950 def fetchone(
951 self,
952 result: CursorResult[Any],
953 dbapi_cursor: DBAPICursor,
954 hard_close: bool = False,
955 ) -> Any:
956 raise NotImplementedError()
957
958 def fetchmany(
959 self,
960 result: CursorResult[Any],
961 dbapi_cursor: DBAPICursor,
962 size: Optional[int] = None,
963 ) -> Any:
964 raise NotImplementedError()
965
966 def fetchall(
967 self,
968 result: CursorResult[Any],
969 dbapi_cursor: DBAPICursor,
970 ) -> Any:
971 raise NotImplementedError()
972
973 def handle_exception(
974 self,
975 result: CursorResult[Any],
976 dbapi_cursor: Optional[DBAPICursor],
977 err: BaseException,
978 ) -> NoReturn:
979 raise err
980
981
982class NoCursorFetchStrategy(ResultFetchStrategy):
983 """Cursor strategy for a result that has no open cursor.
984
985 There are two varieties of this strategy, one for DQL and one for
986 DML (and also DDL), each of which represent a result that had a cursor
987 but no longer has one.
988
989 """
990
991 __slots__ = ()
992
993 def soft_close(self, result, dbapi_cursor):
994 pass
995
996 def hard_close(self, result, dbapi_cursor):
997 pass
998
999 def fetchone(self, result, dbapi_cursor, hard_close=False):
1000 return self._non_result(result, None)
1001
1002 def fetchmany(self, result, dbapi_cursor, size=None):
1003 return self._non_result(result, [])
1004
1005 def fetchall(self, result, dbapi_cursor):
1006 return self._non_result(result, [])
1007
1008 def _non_result(self, result, default, err=None):
1009 raise NotImplementedError()
1010
1011
1012class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1013 """Cursor strategy for a DQL result that has no open cursor.
1014
1015 This is a result set that can return rows, i.e. for a SELECT, or for an
1016 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1017 where the cursor is closed and no rows remain available. The owning result
1018 object may or may not be "hard closed", which determines if the fetch
1019 methods send empty results or raise for closed result.
1020
1021 """
1022
1023 __slots__ = ()
1024
1025 def _non_result(self, result, default, err=None):
1026 if result.closed:
1027 raise exc.ResourceClosedError(
1028 "This result object is closed."
1029 ) from err
1030 else:
1031 return default
1032
1033
1034_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1035
1036
1037class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1038 """Cursor strategy for a DML result that has no open cursor.
1039
1040 This is a result set that does not return rows, i.e. for an INSERT,
1041 UPDATE, DELETE that does not include RETURNING.
1042
1043 """
1044
1045 __slots__ = ()
1046
1047 def _non_result(self, result, default, err=None):
1048 # we only expect to have a _NoResultMetaData() here right now.
1049 assert not result._metadata.returns_rows
1050 result._metadata._we_dont_return_rows(err)
1051
1052
1053_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1054
1055
1056class CursorFetchStrategy(ResultFetchStrategy):
1057 """Call fetch methods from a DBAPI cursor.
1058
1059 Alternate versions of this class may instead buffer the rows from
1060 cursors or not use cursors at all.
1061
1062 """
1063
1064 __slots__ = ()
1065
1066 def soft_close(
1067 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1068 ) -> None:
1069 result.cursor_strategy = _NO_CURSOR_DQL
1070
1071 def hard_close(
1072 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1073 ) -> None:
1074 result.cursor_strategy = _NO_CURSOR_DQL
1075
1076 def handle_exception(
1077 self,
1078 result: CursorResult[Any],
1079 dbapi_cursor: Optional[DBAPICursor],
1080 err: BaseException,
1081 ) -> NoReturn:
1082 result.connection._handle_dbapi_exception(
1083 err, None, None, dbapi_cursor, result.context
1084 )
1085
1086 def yield_per(
1087 self,
1088 result: CursorResult[Any],
1089 dbapi_cursor: Optional[DBAPICursor],
1090 num: int,
1091 ) -> None:
1092 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1093 dbapi_cursor,
1094 {"max_row_buffer": num},
1095 initial_buffer=collections.deque(),
1096 growth_factor=0,
1097 )
1098
1099 def fetchone(
1100 self,
1101 result: CursorResult[Any],
1102 dbapi_cursor: DBAPICursor,
1103 hard_close: bool = False,
1104 ) -> Any:
1105 try:
1106 row = dbapi_cursor.fetchone()
1107 if row is None:
1108 result._soft_close(hard=hard_close)
1109 return row
1110 except BaseException as e:
1111 self.handle_exception(result, dbapi_cursor, e)
1112
1113 def fetchmany(
1114 self,
1115 result: CursorResult[Any],
1116 dbapi_cursor: DBAPICursor,
1117 size: Optional[int] = None,
1118 ) -> Any:
1119 try:
1120 if size is None:
1121 l = dbapi_cursor.fetchmany()
1122 else:
1123 l = dbapi_cursor.fetchmany(size)
1124
1125 if not l:
1126 result._soft_close()
1127 return l
1128 except BaseException as e:
1129 self.handle_exception(result, dbapi_cursor, e)
1130
1131 def fetchall(
1132 self,
1133 result: CursorResult[Any],
1134 dbapi_cursor: DBAPICursor,
1135 ) -> Any:
1136 try:
1137 rows = dbapi_cursor.fetchall()
1138 result._soft_close()
1139 return rows
1140 except BaseException as e:
1141 self.handle_exception(result, dbapi_cursor, e)
1142
1143
1144_DEFAULT_FETCH = CursorFetchStrategy()
1145
1146
1147class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1148 """A cursor fetch strategy with row buffering behavior.
1149
1150 This strategy buffers the contents of a selection of rows
1151 before ``fetchone()`` is called. This is to allow the results of
1152 ``cursor.description`` to be available immediately, when
1153 interfacing with a DB-API that requires rows to be consumed before
1154 this information is available (currently psycopg2, when used with
1155 server-side cursors).
1156
1157 The pre-fetching behavior fetches only one row initially, and then
1158 grows its buffer size by a fixed amount with each successive need
1159 for additional rows up the ``max_row_buffer`` size, which defaults
1160 to 1000::
1161
1162 with psycopg2_engine.connect() as conn:
1163
1164 result = conn.execution_options(
1165 stream_results=True, max_row_buffer=50
1166 ).execute(text("select * from table"))
1167
1168 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1169
1170 .. seealso::
1171
1172 :ref:`psycopg2_execution_options`
1173 """
1174
1175 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1176
1177 def __init__(
1178 self,
1179 dbapi_cursor,
1180 execution_options,
1181 growth_factor=5,
1182 initial_buffer=None,
1183 ):
1184 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1185
1186 if initial_buffer is not None:
1187 self._rowbuffer = initial_buffer
1188 else:
1189 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1190 self._growth_factor = growth_factor
1191
1192 if growth_factor:
1193 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1194 else:
1195 self._bufsize = self._max_row_buffer
1196
1197 @classmethod
1198 def create(cls, result):
1199 return BufferedRowCursorFetchStrategy(
1200 result.cursor,
1201 result.context.execution_options,
1202 )
1203
1204 def _buffer_rows(self, result, dbapi_cursor):
1205 """this is currently used only by fetchone()."""
1206
1207 size = self._bufsize
1208 try:
1209 if size < 1:
1210 new_rows = dbapi_cursor.fetchall()
1211 else:
1212 new_rows = dbapi_cursor.fetchmany(size)
1213 except BaseException as e:
1214 self.handle_exception(result, dbapi_cursor, e)
1215
1216 if not new_rows:
1217 return
1218 self._rowbuffer = collections.deque(new_rows)
1219 if self._growth_factor and size < self._max_row_buffer:
1220 self._bufsize = min(
1221 self._max_row_buffer, size * self._growth_factor
1222 )
1223
1224 def yield_per(self, result, dbapi_cursor, num):
1225 self._growth_factor = 0
1226 self._max_row_buffer = self._bufsize = num
1227
1228 def soft_close(self, result, dbapi_cursor):
1229 self._rowbuffer.clear()
1230 super().soft_close(result, dbapi_cursor)
1231
1232 def hard_close(self, result, dbapi_cursor):
1233 self._rowbuffer.clear()
1234 super().hard_close(result, dbapi_cursor)
1235
1236 def fetchone(self, result, dbapi_cursor, hard_close=False):
1237 if not self._rowbuffer:
1238 self._buffer_rows(result, dbapi_cursor)
1239 if not self._rowbuffer:
1240 try:
1241 result._soft_close(hard=hard_close)
1242 except BaseException as e:
1243 self.handle_exception(result, dbapi_cursor, e)
1244 return None
1245 return self._rowbuffer.popleft()
1246
1247 def fetchmany(self, result, dbapi_cursor, size=None):
1248 if size is None:
1249 return self.fetchall(result, dbapi_cursor)
1250
1251 rb = self._rowbuffer
1252 lb = len(rb)
1253 close = False
1254 if size > lb:
1255 try:
1256 new = dbapi_cursor.fetchmany(size - lb)
1257 except BaseException as e:
1258 self.handle_exception(result, dbapi_cursor, e)
1259 else:
1260 if not new:
1261 # defer closing since it may clear the row buffer
1262 close = True
1263 else:
1264 rb.extend(new)
1265
1266 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1267 if close:
1268 result._soft_close()
1269 return res
1270
1271 def fetchall(self, result, dbapi_cursor):
1272 try:
1273 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1274 self._rowbuffer.clear()
1275 result._soft_close()
1276 return ret
1277 except BaseException as e:
1278 self.handle_exception(result, dbapi_cursor, e)
1279
1280
1281class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1282 """A cursor strategy that buffers rows fully upon creation.
1283
1284 Used for operations where a result is to be delivered
1285 after the database conversation can not be continued,
1286 such as MSSQL INSERT...OUTPUT after an autocommit.
1287
1288 """
1289
1290 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1291
1292 def __init__(
1293 self,
1294 dbapi_cursor: Optional[DBAPICursor],
1295 alternate_description: Optional[_DBAPICursorDescription] = None,
1296 initial_buffer: Optional[Iterable[Any]] = None,
1297 ):
1298 self.alternate_cursor_description = alternate_description
1299 if initial_buffer is not None:
1300 self._rowbuffer = collections.deque(initial_buffer)
1301 else:
1302 assert dbapi_cursor is not None
1303 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1304
1305 def yield_per(self, result, dbapi_cursor, num):
1306 pass
1307
1308 def soft_close(self, result, dbapi_cursor):
1309 self._rowbuffer.clear()
1310 super().soft_close(result, dbapi_cursor)
1311
1312 def hard_close(self, result, dbapi_cursor):
1313 self._rowbuffer.clear()
1314 super().hard_close(result, dbapi_cursor)
1315
1316 def fetchone(self, result, dbapi_cursor, hard_close=False):
1317 if self._rowbuffer:
1318 return self._rowbuffer.popleft()
1319 else:
1320 result._soft_close(hard=hard_close)
1321 return None
1322
1323 def fetchmany(self, result, dbapi_cursor, size=None):
1324 if size is None:
1325 return self.fetchall(result, dbapi_cursor)
1326
1327 rb = self._rowbuffer
1328 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1329 if not rows:
1330 result._soft_close()
1331 return rows
1332
1333 def fetchall(self, result, dbapi_cursor):
1334 ret = self._rowbuffer
1335 self._rowbuffer = collections.deque()
1336 result._soft_close()
1337 return ret
1338
1339
1340class _NoResultMetaData(ResultMetaData):
1341 __slots__ = ()
1342
1343 returns_rows = False
1344
1345 def _we_dont_return_rows(self, err=None):
1346 raise exc.ResourceClosedError(
1347 "This result object does not return rows. "
1348 "It has been closed automatically."
1349 ) from err
1350
1351 def _index_for_key(self, keys, raiseerr):
1352 self._we_dont_return_rows()
1353
1354 def _metadata_for_keys(self, key):
1355 self._we_dont_return_rows()
1356
1357 def _reduce(self, keys):
1358 self._we_dont_return_rows()
1359
1360 @property
1361 def _keymap(self):
1362 self._we_dont_return_rows()
1363
1364 @property
1365 def _key_to_index(self):
1366 self._we_dont_return_rows()
1367
1368 @property
1369 def _processors(self):
1370 self._we_dont_return_rows()
1371
1372 @property
1373 def keys(self):
1374 self._we_dont_return_rows()
1375
1376
1377_NO_RESULT_METADATA = _NoResultMetaData()
1378
1379
1380def null_dml_result() -> IteratorResult[Any]:
1381 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1382 it._soft_close()
1383 return it
1384
1385
1386class CursorResult(Result[_T]):
1387 """A Result that is representing state from a DBAPI cursor.
1388
1389 .. versionchanged:: 1.4 The :class:`.CursorResult``
1390 class replaces the previous :class:`.ResultProxy` interface.
1391 This classes are based on the :class:`.Result` calling API
1392 which provides an updated usage model and calling facade for
1393 SQLAlchemy Core and SQLAlchemy ORM.
1394
1395 Returns database rows via the :class:`.Row` class, which provides
1396 additional API features and behaviors on top of the raw data returned by
1397 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1398 method, other kinds of objects may also be returned.
1399
1400 .. seealso::
1401
1402 :ref:`tutorial_selecting_data` - introductory material for accessing
1403 :class:`_engine.CursorResult` and :class:`.Row` objects.
1404
1405 """
1406
1407 __slots__ = (
1408 "context",
1409 "dialect",
1410 "cursor",
1411 "cursor_strategy",
1412 "_echo",
1413 "connection",
1414 )
1415
1416 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1417 _no_result_metadata = _NO_RESULT_METADATA
1418 _soft_closed: bool = False
1419 closed: bool = False
1420 _is_cursor = True
1421
1422 context: DefaultExecutionContext
1423 dialect: Dialect
1424 cursor_strategy: ResultFetchStrategy
1425 connection: Connection
1426
1427 def __init__(
1428 self,
1429 context: DefaultExecutionContext,
1430 cursor_strategy: ResultFetchStrategy,
1431 cursor_description: Optional[_DBAPICursorDescription],
1432 ):
1433 self.context = context
1434 self.dialect = context.dialect
1435 self.cursor = context.cursor
1436 self.cursor_strategy = cursor_strategy
1437 self.connection = context.root_connection
1438 self._echo = echo = (
1439 self.connection._echo and context.engine._should_log_debug()
1440 )
1441
1442 if cursor_description is not None:
1443 # inline of Result._row_getter(), set up an initial row
1444 # getter assuming no transformations will be called as this
1445 # is the most common case
1446
1447 metadata = self._init_metadata(context, cursor_description)
1448
1449 _make_row: Any
1450 _make_row = functools.partial(
1451 Row,
1452 metadata,
1453 metadata._effective_processors,
1454 metadata._key_to_index,
1455 )
1456
1457 if context._num_sentinel_cols:
1458 sentinel_filter = operator.itemgetter(
1459 slice(-context._num_sentinel_cols)
1460 )
1461
1462 def _sliced_row(raw_data):
1463 return _make_row(sentinel_filter(raw_data))
1464
1465 sliced_row = _sliced_row
1466 else:
1467 sliced_row = _make_row
1468
1469 if echo:
1470 log = self.context.connection._log_debug
1471
1472 def _log_row(row):
1473 log("Row %r", sql_util._repr_row(row))
1474 return row
1475
1476 self._row_logging_fn = _log_row
1477
1478 def _make_row_2(row):
1479 return _log_row(sliced_row(row))
1480
1481 make_row = _make_row_2
1482 else:
1483 make_row = sliced_row
1484 self._set_memoized_attribute("_row_getter", make_row)
1485
1486 else:
1487 assert context._num_sentinel_cols == 0
1488 self._metadata = self._no_result_metadata
1489
1490 def _init_metadata(self, context, cursor_description):
1491 if context.compiled:
1492 compiled = context.compiled
1493
1494 if compiled._cached_metadata:
1495 metadata = compiled._cached_metadata
1496 else:
1497 metadata = CursorResultMetaData(self, cursor_description)
1498 if metadata._safe_for_cache:
1499 compiled._cached_metadata = metadata
1500
1501 # result rewrite/ adapt step. this is to suit the case
1502 # when we are invoked against a cached Compiled object, we want
1503 # to rewrite the ResultMetaData to reflect the Column objects
1504 # that are in our current SQL statement object, not the one
1505 # that is associated with the cached Compiled object.
1506 # the Compiled object may also tell us to not
1507 # actually do this step; this is to support the ORM where
1508 # it is to produce a new Result object in any case, and will
1509 # be using the cached Column objects against this database result
1510 # so we don't want to rewrite them.
1511 #
1512 # Basically this step suits the use case where the end user
1513 # is using Core SQL expressions and is accessing columns in the
1514 # result row using row._mapping[table.c.column].
1515 if (
1516 not context.execution_options.get(
1517 "_result_disable_adapt_to_context", False
1518 )
1519 and compiled._result_columns
1520 and context.cache_hit is context.dialect.CACHE_HIT
1521 and compiled.statement is not context.invoked_statement
1522 ):
1523 metadata = metadata._adapt_to_context(context)
1524
1525 self._metadata = metadata
1526
1527 else:
1528 self._metadata = metadata = CursorResultMetaData(
1529 self, cursor_description
1530 )
1531 if self._echo:
1532 context.connection._log_debug(
1533 "Col %r", tuple(x[0] for x in cursor_description)
1534 )
1535 return metadata
1536
1537 def _soft_close(self, hard=False):
1538 """Soft close this :class:`_engine.CursorResult`.
1539
1540 This releases all DBAPI cursor resources, but leaves the
1541 CursorResult "open" from a semantic perspective, meaning the
1542 fetchXXX() methods will continue to return empty results.
1543
1544 This method is called automatically when:
1545
1546 * all result rows are exhausted using the fetchXXX() methods.
1547 * cursor.description is None.
1548
1549 This method is **not public**, but is documented in order to clarify
1550 the "autoclose" process used.
1551
1552 .. seealso::
1553
1554 :meth:`_engine.CursorResult.close`
1555
1556
1557 """
1558
1559 if (not hard and self._soft_closed) or (hard and self.closed):
1560 return
1561
1562 if hard:
1563 self.closed = True
1564 self.cursor_strategy.hard_close(self, self.cursor)
1565 else:
1566 self.cursor_strategy.soft_close(self, self.cursor)
1567
1568 if not self._soft_closed:
1569 cursor = self.cursor
1570 self.cursor = None # type: ignore
1571 self.connection._safe_close_cursor(cursor)
1572 self._soft_closed = True
1573
1574 @property
1575 def inserted_primary_key_rows(self):
1576 """Return the value of
1577 :attr:`_engine.CursorResult.inserted_primary_key`
1578 as a row contained within a list; some dialects may support a
1579 multiple row form as well.
1580
1581 .. note:: As indicated below, in current SQLAlchemy versions this
1582 accessor is only useful beyond what's already supplied by
1583 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1584 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1585 generalize this feature to more dialects.
1586
1587 This accessor is added to support dialects that offer the feature
1588 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1589 feature, currently **only the psycopg2 dialect**, which provides
1590 for many rows to be INSERTed at once while still retaining the
1591 behavior of being able to return server-generated primary key values.
1592
1593 * **When using the psycopg2 dialect, or other dialects that may support
1594 "fast executemany" style inserts in upcoming releases** : When
1595 invoking an INSERT statement while passing a list of rows as the
1596 second argument to :meth:`_engine.Connection.execute`, this accessor
1597 will then provide a list of rows, where each row contains the primary
1598 key value for each row that was INSERTed.
1599
1600 * **When using all other dialects / backends that don't yet support
1601 this feature**: This accessor is only useful for **single row INSERT
1602 statements**, and returns the same information as that of the
1603 :attr:`_engine.CursorResult.inserted_primary_key` within a
1604 single-element list. When an INSERT statement is executed in
1605 conjunction with a list of rows to be INSERTed, the list will contain
1606 one row per row inserted in the statement, however it will contain
1607 ``None`` for any server-generated values.
1608
1609 Future releases of SQLAlchemy will further generalize the
1610 "fast execution helper" feature of psycopg2 to suit other dialects,
1611 thus allowing this accessor to be of more general use.
1612
1613 .. versionadded:: 1.4
1614
1615 .. seealso::
1616
1617 :attr:`_engine.CursorResult.inserted_primary_key`
1618
1619 """
1620 if not self.context.compiled:
1621 raise exc.InvalidRequestError(
1622 "Statement is not a compiled expression construct."
1623 )
1624 elif not self.context.isinsert:
1625 raise exc.InvalidRequestError(
1626 "Statement is not an insert() expression construct."
1627 )
1628 elif self.context._is_explicit_returning:
1629 raise exc.InvalidRequestError(
1630 "Can't call inserted_primary_key "
1631 "when returning() "
1632 "is used."
1633 )
1634 return self.context.inserted_primary_key_rows
1635
1636 @property
1637 def inserted_primary_key(self):
1638 """Return the primary key for the row just inserted.
1639
1640 The return value is a :class:`_result.Row` object representing
1641 a named tuple of primary key values in the order in which the
1642 primary key columns are configured in the source
1643 :class:`_schema.Table`.
1644
1645 .. versionchanged:: 1.4.8 - the
1646 :attr:`_engine.CursorResult.inserted_primary_key`
1647 value is now a named tuple via the :class:`_result.Row` class,
1648 rather than a plain tuple.
1649
1650 This accessor only applies to single row :func:`_expression.insert`
1651 constructs which did not explicitly specify
1652 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1653 while not yet available for most backends, would be accessed using
1654 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1655
1656 Note that primary key columns which specify a server_default clause, or
1657 otherwise do not qualify as "autoincrement" columns (see the notes at
1658 :class:`_schema.Column`), and were generated using the database-side
1659 default, will appear in this list as ``None`` unless the backend
1660 supports "returning" and the insert statement executed with the
1661 "implicit returning" enabled.
1662
1663 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1664 statement is not a compiled expression construct
1665 or is not an insert() construct.
1666
1667 """
1668
1669 if self.context.executemany:
1670 raise exc.InvalidRequestError(
1671 "This statement was an executemany call; if primary key "
1672 "returning is supported, please "
1673 "use .inserted_primary_key_rows."
1674 )
1675
1676 ikp = self.inserted_primary_key_rows
1677 if ikp:
1678 return ikp[0]
1679 else:
1680 return None
1681
1682 def last_updated_params(self):
1683 """Return the collection of updated parameters from this
1684 execution.
1685
1686 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1687 statement is not a compiled expression construct
1688 or is not an update() construct.
1689
1690 """
1691 if not self.context.compiled:
1692 raise exc.InvalidRequestError(
1693 "Statement is not a compiled expression construct."
1694 )
1695 elif not self.context.isupdate:
1696 raise exc.InvalidRequestError(
1697 "Statement is not an update() expression construct."
1698 )
1699 elif self.context.executemany:
1700 return self.context.compiled_parameters
1701 else:
1702 return self.context.compiled_parameters[0]
1703
1704 def last_inserted_params(self):
1705 """Return the collection of inserted parameters from this
1706 execution.
1707
1708 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1709 statement is not a compiled expression construct
1710 or is not an insert() construct.
1711
1712 """
1713 if not self.context.compiled:
1714 raise exc.InvalidRequestError(
1715 "Statement is not a compiled expression construct."
1716 )
1717 elif not self.context.isinsert:
1718 raise exc.InvalidRequestError(
1719 "Statement is not an insert() expression construct."
1720 )
1721 elif self.context.executemany:
1722 return self.context.compiled_parameters
1723 else:
1724 return self.context.compiled_parameters[0]
1725
1726 @property
1727 def returned_defaults_rows(self):
1728 """Return a list of rows each containing the values of default
1729 columns that were fetched using
1730 the :meth:`.ValuesBase.return_defaults` feature.
1731
1732 The return value is a list of :class:`.Row` objects.
1733
1734 .. versionadded:: 1.4
1735
1736 """
1737 return self.context.returned_default_rows
1738
1739 def splice_horizontally(self, other):
1740 """Return a new :class:`.CursorResult` that "horizontally splices"
1741 together the rows of this :class:`.CursorResult` with that of another
1742 :class:`.CursorResult`.
1743
1744 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1745 not intended for general use.
1746
1747 "horizontally splices" means that for each row in the first and second
1748 result sets, a new row that concatenates the two rows together is
1749 produced, which then becomes the new row. The incoming
1750 :class:`.CursorResult` must have the identical number of rows. It is
1751 typically expected that the two result sets come from the same sort
1752 order as well, as the result rows are spliced together based on their
1753 position in the result.
1754
1755 The expected use case here is so that multiple INSERT..RETURNING
1756 statements (which definitely need to be sorted) against different
1757 tables can produce a single result that looks like a JOIN of those two
1758 tables.
1759
1760 E.g.::
1761
1762 r1 = connection.execute(
1763 users.insert().returning(
1764 users.c.user_name, users.c.user_id, sort_by_parameter_order=True
1765 ),
1766 user_values,
1767 )
1768
1769 r2 = connection.execute(
1770 addresses.insert().returning(
1771 addresses.c.address_id,
1772 addresses.c.address,
1773 addresses.c.user_id,
1774 sort_by_parameter_order=True,
1775 ),
1776 address_values,
1777 )
1778
1779 rows = r1.splice_horizontally(r2).all()
1780 assert rows == [
1781 ("john", 1, 1, "foo@bar.com", 1),
1782 ("jack", 2, 2, "bar@bat.com", 2),
1783 ]
1784
1785 .. versionadded:: 2.0
1786
1787 .. seealso::
1788
1789 :meth:`.CursorResult.splice_vertically`
1790
1791
1792 """ # noqa: E501
1793
1794 clone = self._generate()
1795 total_rows = [
1796 tuple(r1) + tuple(r2)
1797 for r1, r2 in zip(
1798 list(self._raw_row_iterator()),
1799 list(other._raw_row_iterator()),
1800 )
1801 ]
1802
1803 clone._metadata = clone._metadata._splice_horizontally(other._metadata)
1804
1805 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1806 None,
1807 initial_buffer=total_rows,
1808 )
1809 clone._reset_memoizations()
1810 return clone
1811
1812 def splice_vertically(self, other):
1813 """Return a new :class:`.CursorResult` that "vertically splices",
1814 i.e. "extends", the rows of this :class:`.CursorResult` with that of
1815 another :class:`.CursorResult`.
1816
1817 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1818 not intended for general use.
1819
1820 "vertically splices" means the rows of the given result are appended to
1821 the rows of this cursor result. The incoming :class:`.CursorResult`
1822 must have rows that represent the identical list of columns in the
1823 identical order as they are in this :class:`.CursorResult`.
1824
1825 .. versionadded:: 2.0
1826
1827 .. seealso::
1828
1829 :meth:`.CursorResult.splice_horizontally`
1830
1831 """
1832 clone = self._generate()
1833 total_rows = list(self._raw_row_iterator()) + list(
1834 other._raw_row_iterator()
1835 )
1836
1837 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1838 None,
1839 initial_buffer=total_rows,
1840 )
1841 clone._reset_memoizations()
1842 return clone
1843
1844 def _rewind(self, rows):
1845 """rewind this result back to the given rowset.
1846
1847 this is used internally for the case where an :class:`.Insert`
1848 construct combines the use of
1849 :meth:`.Insert.return_defaults` along with the
1850 "supplemental columns" feature.
1851
1852 """
1853
1854 if self._echo:
1855 self.context.connection._log_debug(
1856 "CursorResult rewound %d row(s)", len(rows)
1857 )
1858
1859 # the rows given are expected to be Row objects, so we
1860 # have to clear out processors which have already run on these
1861 # rows
1862 self._metadata = cast(
1863 CursorResultMetaData, self._metadata
1864 )._remove_processors()
1865
1866 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
1867 None,
1868 # TODO: if these are Row objects, can we save on not having to
1869 # re-make new Row objects out of them a second time? is that
1870 # what's actually happening right now? maybe look into this
1871 initial_buffer=rows,
1872 )
1873 self._reset_memoizations()
1874 return self
1875
1876 @property
1877 def returned_defaults(self):
1878 """Return the values of default columns that were fetched using
1879 the :meth:`.ValuesBase.return_defaults` feature.
1880
1881 The value is an instance of :class:`.Row`, or ``None``
1882 if :meth:`.ValuesBase.return_defaults` was not used or if the
1883 backend does not support RETURNING.
1884
1885 .. seealso::
1886
1887 :meth:`.ValuesBase.return_defaults`
1888
1889 """
1890
1891 if self.context.executemany:
1892 raise exc.InvalidRequestError(
1893 "This statement was an executemany call; if return defaults "
1894 "is supported, please use .returned_defaults_rows."
1895 )
1896
1897 rows = self.context.returned_default_rows
1898 if rows:
1899 return rows[0]
1900 else:
1901 return None
1902
1903 def lastrow_has_defaults(self):
1904 """Return ``lastrow_has_defaults()`` from the underlying
1905 :class:`.ExecutionContext`.
1906
1907 See :class:`.ExecutionContext` for details.
1908
1909 """
1910
1911 return self.context.lastrow_has_defaults()
1912
1913 def postfetch_cols(self):
1914 """Return ``postfetch_cols()`` from the underlying
1915 :class:`.ExecutionContext`.
1916
1917 See :class:`.ExecutionContext` for details.
1918
1919 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1920 statement is not a compiled expression construct
1921 or is not an insert() or update() construct.
1922
1923 """
1924
1925 if not self.context.compiled:
1926 raise exc.InvalidRequestError(
1927 "Statement is not a compiled expression construct."
1928 )
1929 elif not self.context.isinsert and not self.context.isupdate:
1930 raise exc.InvalidRequestError(
1931 "Statement is not an insert() or update() "
1932 "expression construct."
1933 )
1934 return self.context.postfetch_cols
1935
1936 def prefetch_cols(self):
1937 """Return ``prefetch_cols()`` from the underlying
1938 :class:`.ExecutionContext`.
1939
1940 See :class:`.ExecutionContext` for details.
1941
1942 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1943 statement is not a compiled expression construct
1944 or is not an insert() or update() construct.
1945
1946 """
1947
1948 if not self.context.compiled:
1949 raise exc.InvalidRequestError(
1950 "Statement is not a compiled expression construct."
1951 )
1952 elif not self.context.isinsert and not self.context.isupdate:
1953 raise exc.InvalidRequestError(
1954 "Statement is not an insert() or update() "
1955 "expression construct."
1956 )
1957 return self.context.prefetch_cols
1958
1959 def supports_sane_rowcount(self):
1960 """Return ``supports_sane_rowcount`` from the dialect.
1961
1962 See :attr:`_engine.CursorResult.rowcount` for background.
1963
1964 """
1965
1966 return self.dialect.supports_sane_rowcount
1967
1968 def supports_sane_multi_rowcount(self):
1969 """Return ``supports_sane_multi_rowcount`` from the dialect.
1970
1971 See :attr:`_engine.CursorResult.rowcount` for background.
1972
1973 """
1974
1975 return self.dialect.supports_sane_multi_rowcount
1976
1977 @util.memoized_property
1978 def rowcount(self) -> int:
1979 """Return the 'rowcount' for this result.
1980
1981 The primary purpose of 'rowcount' is to report the number of rows
1982 matched by the WHERE criterion of an UPDATE or DELETE statement
1983 executed once (i.e. for a single parameter set), which may then be
1984 compared to the number of rows expected to be updated or deleted as a
1985 means of asserting data integrity.
1986
1987 This attribute is transferred from the ``cursor.rowcount`` attribute
1988 of the DBAPI before the cursor is closed, to support DBAPIs that
1989 don't make this value available after cursor close. Some DBAPIs may
1990 offer meaningful values for other kinds of statements, such as INSERT
1991 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
1992 for these statements, set the
1993 :paramref:`.Connection.execution_options.preserve_rowcount`
1994 execution option to True, which will cause the ``cursor.rowcount``
1995 value to be unconditionally memoized before any results are returned
1996 or the cursor is closed, regardless of statement type.
1997
1998 For cases where the DBAPI does not support rowcount for a particular
1999 kind of statement and/or execution, the returned value will be ``-1``,
2000 which is delivered directly from the DBAPI and is part of :pep:`249`.
2001 All DBAPIs should support rowcount for single-parameter-set
2002 UPDATE and DELETE statements, however.
2003
2004 .. note::
2005
2006 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2007
2008
2009 * This attribute returns the number of rows *matched*,
2010 which is not necessarily the same as the number of rows
2011 that were actually *modified*. For example, an UPDATE statement
2012 may have no net change on a given row if the SET values
2013 given are the same as those present in the row already.
2014 Such a row would be matched but not modified.
2015 On backends that feature both styles, such as MySQL,
2016 rowcount is configured to return the match
2017 count in all cases.
2018
2019 * :attr:`_engine.CursorResult.rowcount` in the default case is
2020 *only* useful in conjunction with an UPDATE or DELETE statement,
2021 and only with a single set of parameters. For other kinds of
2022 statements, SQLAlchemy will not attempt to pre-memoize the value
2023 unless the
2024 :paramref:`.Connection.execution_options.preserve_rowcount`
2025 execution option is used. Note that contrary to :pep:`249`, many
2026 DBAPIs do not support rowcount values for statements that are not
2027 UPDATE or DELETE, particularly when rows are being returned which
2028 are not fully pre-buffered. DBAPIs that dont support rowcount
2029 for a particular kind of statement should return the value ``-1``
2030 for such statements.
2031
2032 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2033 when executing a single statement with multiple parameter sets
2034 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2035 values across multiple parameter sets and will return ``-1``
2036 when accessed.
2037
2038 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2039 a correct population of :attr:`_engine.CursorResult.rowcount`
2040 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2041 execution option is set to True.
2042
2043 * Statements that use RETURNING may not support rowcount, returning
2044 a ``-1`` value instead.
2045
2046 .. seealso::
2047
2048 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2049
2050 :paramref:`.Connection.execution_options.preserve_rowcount`
2051
2052 """ # noqa: E501
2053 try:
2054 return self.context.rowcount
2055 except BaseException as e:
2056 self.cursor_strategy.handle_exception(self, self.cursor, e)
2057 raise # not called
2058
2059 @property
2060 def lastrowid(self):
2061 """Return the 'lastrowid' accessor on the DBAPI cursor.
2062
2063 This is a DBAPI specific method and is only functional
2064 for those backends which support it, for statements
2065 where it is appropriate. It's behavior is not
2066 consistent across backends.
2067
2068 Usage of this method is normally unnecessary when
2069 using insert() expression constructs; the
2070 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2071 tuple of primary key values for a newly inserted row,
2072 regardless of database backend.
2073
2074 """
2075 try:
2076 return self.context.get_lastrowid()
2077 except BaseException as e:
2078 self.cursor_strategy.handle_exception(self, self.cursor, e)
2079
2080 @property
2081 def returns_rows(self):
2082 """True if this :class:`_engine.CursorResult` returns zero or more
2083 rows.
2084
2085 I.e. if it is legal to call the methods
2086 :meth:`_engine.CursorResult.fetchone`,
2087 :meth:`_engine.CursorResult.fetchmany`
2088 :meth:`_engine.CursorResult.fetchall`.
2089
2090 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2091 always be synonymous with whether or not the DBAPI cursor had a
2092 ``.description`` attribute, indicating the presence of result columns,
2093 noting that a cursor that returns zero rows still has a
2094 ``.description`` if a row-returning statement was emitted.
2095
2096 This attribute should be True for all results that are against
2097 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2098 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2099 not using RETURNING, the value will usually be False, however
2100 there are some dialect-specific exceptions to this, such as when
2101 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2102 order to retrieve an inserted primary key value.
2103
2104
2105 """
2106 return self._metadata.returns_rows
2107
2108 @property
2109 def is_insert(self):
2110 """True if this :class:`_engine.CursorResult` is the result
2111 of a executing an expression language compiled
2112 :func:`_expression.insert` construct.
2113
2114 When True, this implies that the
2115 :attr:`inserted_primary_key` attribute is accessible,
2116 assuming the statement did not include
2117 a user defined "returning" construct.
2118
2119 """
2120 return self.context.isinsert
2121
2122 def _fetchiter_impl(self):
2123 fetchone = self.cursor_strategy.fetchone
2124
2125 while True:
2126 row = fetchone(self, self.cursor)
2127 if row is None:
2128 break
2129 yield row
2130
2131 def _fetchone_impl(self, hard_close=False):
2132 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2133
2134 def _fetchall_impl(self):
2135 return self.cursor_strategy.fetchall(self, self.cursor)
2136
2137 def _fetchmany_impl(self, size=None):
2138 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2139
2140 def _raw_row_iterator(self):
2141 return self._fetchiter_impl()
2142
2143 def merge(self, *others: Result[Any]) -> MergedResult[Any]:
2144 merged_result = super().merge(*others)
2145 if self.context._has_rowcount:
2146 merged_result.rowcount = sum(
2147 cast("CursorResult[Any]", result).rowcount
2148 for result in (self,) + others
2149 )
2150 return merged_result
2151
2152 def close(self) -> Any:
2153 """Close this :class:`_engine.CursorResult`.
2154
2155 This closes out the underlying DBAPI cursor corresponding to the
2156 statement execution, if one is still present. Note that the DBAPI
2157 cursor is automatically released when the :class:`_engine.CursorResult`
2158 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2159 generally an optional method except in the case when discarding a
2160 :class:`_engine.CursorResult` that still has additional rows pending
2161 for fetch.
2162
2163 After this method is called, it is no longer valid to call upon
2164 the fetch methods, which will raise a :class:`.ResourceClosedError`
2165 on subsequent use.
2166
2167 .. seealso::
2168
2169 :ref:`connections_toplevel`
2170
2171 """
2172 self._soft_close(hard=True)
2173
2174 @_generative
2175 def yield_per(self, num: int) -> Self:
2176 self._yield_per = num
2177 self.cursor_strategy.yield_per(self, self.cursor, num)
2178 return self
2179
2180
2181ResultProxy = CursorResult