1# engine/cursor.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define cursor-specific result set constructs including
9:class:`.CursorResult`."""
10
11from __future__ import annotations
12
13import collections
14import functools
15import operator
16import typing
17from typing import Any
18from typing import cast
19from typing import ClassVar
20from typing import Deque
21from typing import Dict
22from typing import Iterable
23from typing import Iterator
24from typing import List
25from typing import Mapping
26from typing import NoReturn
27from typing import Optional
28from typing import Sequence
29from typing import Tuple
30from typing import TYPE_CHECKING
31from typing import TypeVar
32from typing import Union
33
34from .result import IteratorResult
35from .result import MergedResult
36from .result import Result
37from .result import ResultMetaData
38from .result import SimpleResultMetaData
39from .result import tuplegetter
40from .row import Row
41from .. import exc
42from .. import util
43from ..sql import elements
44from ..sql import sqltypes
45from ..sql import util as sql_util
46from ..sql.base import _generative
47from ..sql.compiler import ResultColumnsEntry
48from ..sql.compiler import RM_NAME
49from ..sql.compiler import RM_OBJECTS
50from ..sql.compiler import RM_RENDERED_NAME
51from ..sql.compiler import RM_TYPE
52from ..sql.type_api import TypeEngine
53from ..util import compat
54from ..util.typing import Final
55from ..util.typing import Literal
56from ..util.typing import Self
57
58if typing.TYPE_CHECKING:
59 from .base import Connection
60 from .default import DefaultExecutionContext
61 from .interfaces import _DBAPICursorDescription
62 from .interfaces import _MutableCoreSingleExecuteParams
63 from .interfaces import CoreExecuteOptionsParameter
64 from .interfaces import DBAPICursor
65 from .interfaces import DBAPIType
66 from .interfaces import Dialect
67 from .interfaces import ExecutionContext
68 from .result import _KeyIndexType
69 from .result import _KeyMapRecType
70 from .result import _KeyMapType
71 from .result import _KeyType
72 from .result import _ProcessorsType
73 from .result import _TupleGetterType
74 from ..sql.schema import Column
75 from ..sql.type_api import _ResultProcessorType
76
77
78_T = TypeVar("_T", bound=Any)
79TupleAny = Tuple[Any, ...]
80
81# metadata entry tuple indexes.
82# using raw tuple is faster than namedtuple.
83# these match up to the positions in
84# _CursorKeyMapRecType
85MD_INDEX: Final[Literal[0]] = 0
86"""integer index in cursor.description
87
88"""
89
90MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1
91"""integer index in compiled._result_columns"""
92
93MD_OBJECTS: Final[Literal[2]] = 2
94"""other string keys and ColumnElement obj that can match.
95
96This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
97
98"""
99
100MD_LOOKUP_KEY: Final[Literal[3]] = 3
101"""string key we usually expect for key-based lookup
102
103this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
104"""
105
106
107MD_RENDERED_NAME: Final[Literal[4]] = 4
108"""name that is usually in cursor.description
109
110this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
111"""
112
113
114MD_PROCESSOR: Final[Literal[5]] = 5
115"""callable to process a result value into a row"""
116
117MD_UNTRANSLATED: Final[Literal[6]] = 6
118"""raw name from cursor.description"""
119
120
121_CursorKeyMapRecType = Tuple[
122 Optional[int], # MD_INDEX, None means the record is ambiguously named
123 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None
124 TupleAny, # MD_OBJECTS
125 str, # MD_LOOKUP_KEY
126 str, # MD_RENDERED_NAME
127 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
128 Optional[str], # MD_UNTRANSLATED
129]
130
131_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
132
133# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
134# not None
135_NonAmbigCursorKeyMapRecType = Tuple[
136 int,
137 int,
138 List[Any],
139 str,
140 str,
141 Optional["_ResultProcessorType[Any]"],
142 str,
143]
144
145_MergeColTuple = Tuple[
146 int,
147 Optional[int],
148 str,
149 TypeEngine[Any],
150 "DBAPIType",
151 Optional[TupleAny],
152 Optional[str],
153]
154
155
156class CursorResultMetaData(ResultMetaData):
157 """Result metadata for DBAPI cursors."""
158
159 __slots__ = (
160 "_keymap",
161 "_processors",
162 "_keys",
163 "_keymap_by_result_column_idx",
164 "_tuplefilter",
165 "_translated_indexes",
166 "_safe_for_cache",
167 "_unpickled",
168 "_key_to_index",
169 # don't need _unique_filters support here for now. Can be added
170 # if a need arises.
171 )
172
173 _keymap: _CursorKeyMapType
174 _processors: _ProcessorsType
175 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
176 _unpickled: bool
177 _safe_for_cache: bool
178 _translated_indexes: Optional[List[int]]
179
180 returns_rows: ClassVar[bool] = True
181
182 def _has_key(self, key: Any) -> bool:
183 return key in self._keymap
184
185 def _for_freeze(self) -> ResultMetaData:
186 ambiguous = {
187 rec[MD_LOOKUP_KEY]
188 for rec in self._keymap.values()
189 if rec[MD_INDEX] is None
190 }
191 return SimpleResultMetaData(
192 self._keys,
193 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
194 _ambiguous_keys=frozenset(ambiguous) if ambiguous else None,
195 )
196
197 def _make_new_metadata(
198 self,
199 *,
200 unpickled: bool,
201 processors: _ProcessorsType,
202 keys: Sequence[str],
203 keymap: _KeyMapType,
204 tuplefilter: Optional[_TupleGetterType],
205 translated_indexes: Optional[List[int]],
206 safe_for_cache: bool,
207 keymap_by_result_column_idx: Any,
208 ) -> CursorResultMetaData:
209 new_obj = self.__class__.__new__(self.__class__)
210 new_obj._unpickled = unpickled
211 new_obj._processors = processors
212 new_obj._keys = keys
213 new_obj._keymap = keymap
214 new_obj._tuplefilter = tuplefilter
215 new_obj._translated_indexes = translated_indexes
216 new_obj._safe_for_cache = safe_for_cache
217 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
218 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
219 return new_obj
220
221 def _remove_processors(self) -> CursorResultMetaData:
222 assert not self._tuplefilter
223 return self._make_new_metadata(
224 unpickled=self._unpickled,
225 processors=[None] * len(self._processors),
226 tuplefilter=None,
227 translated_indexes=None,
228 keymap={
229 key: value[0:5] + (None,) + value[6:]
230 for key, value in self._keymap.items()
231 },
232 keys=self._keys,
233 safe_for_cache=self._safe_for_cache,
234 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
235 )
236
237 def _splice_horizontally(
238 self, other: CursorResultMetaData
239 ) -> CursorResultMetaData:
240 assert not self._tuplefilter
241
242 keymap = dict(self._keymap)
243 offset = len(self._keys)
244
245 for key, value in other._keymap.items():
246 # int index should be None for ambiguous key
247 if value[MD_INDEX] is not None and key not in keymap:
248 md_index = value[MD_INDEX] + offset
249 md_object = value[MD_RESULT_MAP_INDEX] + offset
250 else:
251 md_index = None
252 md_object = -1
253 keymap[key] = (md_index, md_object, *value[2:])
254
255 return self._make_new_metadata(
256 unpickled=self._unpickled,
257 processors=self._processors + other._processors, # type: ignore
258 tuplefilter=None,
259 translated_indexes=None,
260 keys=self._keys + other._keys, # type: ignore
261 keymap=keymap,
262 safe_for_cache=self._safe_for_cache,
263 keymap_by_result_column_idx={
264 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
265 for metadata_entry in keymap.values()
266 },
267 )
268
269 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
270 recs = list(self._metadata_for_keys(keys))
271
272 indexes = [rec[MD_INDEX] for rec in recs]
273 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
274
275 if self._translated_indexes:
276 indexes = [self._translated_indexes[idx] for idx in indexes]
277 tup = tuplegetter(*indexes)
278 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
279
280 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
281 # TODO: need unit test for:
282 # result = connection.execute("raw sql, no columns").scalars()
283 # without the "or ()" it's failing because MD_OBJECTS is None
284 keymap.update(
285 (e, new_rec)
286 for new_rec in new_recs
287 for e in new_rec[MD_OBJECTS] or ()
288 )
289
290 return self._make_new_metadata(
291 unpickled=self._unpickled,
292 processors=self._processors,
293 keys=new_keys,
294 tuplefilter=tup,
295 translated_indexes=indexes,
296 keymap=keymap, # type: ignore[arg-type]
297 safe_for_cache=self._safe_for_cache,
298 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
299 )
300
301 def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData:
302 """When using a cached Compiled construct that has a _result_map,
303 for a new statement that used the cached Compiled, we need to ensure
304 the keymap has the Column objects from our new statement as keys.
305 So here we rewrite keymap with new entries for the new columns
306 as matched to those of the cached statement.
307
308 """
309
310 if not context.compiled or not context.compiled._result_columns:
311 return self
312
313 compiled_statement = context.compiled.statement
314 invoked_statement = context.invoked_statement
315
316 if TYPE_CHECKING:
317 assert isinstance(invoked_statement, elements.ClauseElement)
318
319 if compiled_statement is invoked_statement:
320 return self
321
322 assert invoked_statement is not None
323
324 # this is the most common path for Core statements when
325 # caching is used. In ORM use, this codepath is not really used
326 # as the _result_disable_adapt_to_context execution option is
327 # set by the ORM.
328
329 # make a copy and add the columns from the invoked statement
330 # to the result map.
331
332 keymap_by_position = self._keymap_by_result_column_idx
333
334 if keymap_by_position is None:
335 # first retrieval from cache, this map will not be set up yet,
336 # initialize lazily
337 keymap_by_position = self._keymap_by_result_column_idx = {
338 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
339 for metadata_entry in self._keymap.values()
340 }
341
342 assert not self._tuplefilter
343 return self._make_new_metadata(
344 keymap=compat.dict_union(
345 self._keymap,
346 {
347 new: keymap_by_position[idx]
348 for idx, new in enumerate(
349 invoked_statement._all_selected_columns
350 )
351 if idx in keymap_by_position
352 },
353 ),
354 unpickled=self._unpickled,
355 processors=self._processors,
356 tuplefilter=None,
357 translated_indexes=None,
358 keys=self._keys,
359 safe_for_cache=self._safe_for_cache,
360 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
361 )
362
363 def __init__(
364 self,
365 parent: CursorResult[Any],
366 cursor_description: _DBAPICursorDescription,
367 ):
368 context = parent.context
369 self._tuplefilter = None
370 self._translated_indexes = None
371 self._safe_for_cache = self._unpickled = False
372
373 if context.result_column_struct:
374 (
375 result_columns,
376 cols_are_ordered,
377 textual_ordered,
378 ad_hoc_textual,
379 loose_column_name_matching,
380 ) = context.result_column_struct
381 num_ctx_cols = len(result_columns)
382 else:
383 result_columns = cols_are_ordered = ( # type: ignore
384 num_ctx_cols
385 ) = ad_hoc_textual = loose_column_name_matching = (
386 textual_ordered
387 ) = False
388
389 # merge cursor.description with the column info
390 # present in the compiled structure, if any
391 raw = self._merge_cursor_description(
392 context,
393 cursor_description,
394 result_columns,
395 num_ctx_cols,
396 cols_are_ordered,
397 textual_ordered,
398 ad_hoc_textual,
399 loose_column_name_matching,
400 )
401
402 # processors in key order which are used when building up
403 # a row
404 self._processors = [
405 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
406 ]
407
408 # this is used when using this ResultMetaData in a Core-only cache
409 # retrieval context. it's initialized on first cache retrieval
410 # when the _result_disable_adapt_to_context execution option
411 # (which the ORM generally sets) is not set.
412 self._keymap_by_result_column_idx = None
413
414 # for compiled SQL constructs, copy additional lookup keys into
415 # the key lookup map, such as Column objects, labels,
416 # column keys and other names
417 if num_ctx_cols:
418 # keymap by primary string...
419 by_key: Dict[_KeyType, _CursorKeyMapRecType] = {
420 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
421 for metadata_entry in raw
422 }
423
424 if len(by_key) != num_ctx_cols:
425 # if by-primary-string dictionary smaller than
426 # number of columns, assume we have dupes; (this check
427 # is also in place if string dictionary is bigger, as
428 # can occur when '*' was used as one of the compiled columns,
429 # which may or may not be suggestive of dupes), rewrite
430 # dupe records with "None" for index which results in
431 # ambiguous column exception when accessed.
432 #
433 # this is considered to be the less common case as it is not
434 # common to have dupe column keys in a SELECT statement.
435 #
436 # new in 1.4: get the complete set of all possible keys,
437 # strings, objects, whatever, that are dupes across two
438 # different records, first.
439 index_by_key: Dict[Any, Any] = {}
440 dupes = set()
441 for metadata_entry in raw:
442 for key in (metadata_entry[MD_RENDERED_NAME],) + (
443 metadata_entry[MD_OBJECTS] or ()
444 ):
445 idx = metadata_entry[MD_INDEX]
446 # if this key has been associated with more than one
447 # positional index, it's a dupe
448 if index_by_key.setdefault(key, idx) != idx:
449 dupes.add(key)
450
451 # then put everything we have into the keymap excluding only
452 # those keys that are dupes.
453 self._keymap = {
454 obj_elem: metadata_entry
455 for metadata_entry in raw
456 if metadata_entry[MD_OBJECTS]
457 for obj_elem in metadata_entry[MD_OBJECTS]
458 if obj_elem not in dupes
459 }
460
461 # then for the dupe keys, put the "ambiguous column"
462 # record into by_key.
463 by_key.update(
464 {
465 key: (None, -1, (), key, key, None, None)
466 for key in dupes
467 }
468 )
469
470 else:
471 # no dupes - copy secondary elements from compiled
472 # columns into self._keymap. this is the most common
473 # codepath for Core / ORM statement executions before the
474 # result metadata is cached
475 self._keymap = {
476 obj_elem: metadata_entry
477 for metadata_entry in raw
478 if metadata_entry[MD_OBJECTS]
479 for obj_elem in metadata_entry[MD_OBJECTS]
480 }
481 # update keymap with primary string names taking
482 # precedence
483 self._keymap.update(by_key)
484 else:
485 # no compiled objects to map, just create keymap by primary string
486 self._keymap = {
487 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
488 for metadata_entry in raw
489 }
490
491 # update keymap with "translated" names. In SQLAlchemy this is a
492 # sqlite only thing, and in fact impacting only extremely old SQLite
493 # versions unlikely to be present in modern Python versions.
494 # however, the pyhive third party dialect is
495 # also using this hook, which means others still might use it as well.
496 # I dislike having this awkward hook here but as long as we need
497 # to use names in cursor.description in some cases we need to have
498 # some hook to accomplish this.
499 if not num_ctx_cols and context._translate_colname:
500 self._keymap.update(
501 {
502 metadata_entry[MD_UNTRANSLATED]: self._keymap[
503 metadata_entry[MD_LOOKUP_KEY]
504 ]
505 for metadata_entry in raw
506 if metadata_entry[MD_UNTRANSLATED]
507 }
508 )
509
510 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
511
512 def _merge_cursor_description(
513 self,
514 context: DefaultExecutionContext,
515 cursor_description: _DBAPICursorDescription,
516 result_columns: Sequence[ResultColumnsEntry],
517 num_ctx_cols: int,
518 cols_are_ordered: bool,
519 textual_ordered: bool,
520 ad_hoc_textual: bool,
521 loose_column_name_matching: bool,
522 ) -> List[_CursorKeyMapRecType]:
523 """Merge a cursor.description with compiled result column information.
524
525 There are at least four separate strategies used here, selected
526 depending on the type of SQL construct used to start with.
527
528 The most common case is that of the compiled SQL expression construct,
529 which generated the column names present in the raw SQL string and
530 which has the identical number of columns as were reported by
531 cursor.description. In this case, we assume a 1-1 positional mapping
532 between the entries in cursor.description and the compiled object.
533 This is also the most performant case as we disregard extracting /
534 decoding the column names present in cursor.description since we
535 already have the desired name we generated in the compiled SQL
536 construct.
537
538 The next common case is that of the completely raw string SQL,
539 such as passed to connection.execute(). In this case we have no
540 compiled construct to work with, so we extract and decode the
541 names from cursor.description and index those as the primary
542 result row target keys.
543
544 The remaining fairly common case is that of the textual SQL
545 that includes at least partial column information; this is when
546 we use a :class:`_expression.TextualSelect` construct.
547 This construct may have
548 unordered or ordered column information. In the ordered case, we
549 merge the cursor.description and the compiled construct's information
550 positionally, and warn if there are additional description names
551 present, however we still decode the names in cursor.description
552 as we don't have a guarantee that the names in the columns match
553 on these. In the unordered case, we match names in cursor.description
554 to that of the compiled construct based on name matching.
555 In both of these cases, the cursor.description names and the column
556 expression objects and names are indexed as result row target keys.
557
558 The final case is much less common, where we have a compiled
559 non-textual SQL expression construct, but the number of columns
560 in cursor.description doesn't match what's in the compiled
561 construct. We make the guess here that there might be textual
562 column expressions in the compiled construct that themselves include
563 a comma in them causing them to split. We do the same name-matching
564 as with textual non-ordered columns.
565
566 The name-matched system of merging is the same as that used by
567 SQLAlchemy for all cases up through the 0.9 series. Positional
568 matching for compiled SQL expressions was introduced in 1.0 as a
569 major performance feature, and positional matching for textual
570 :class:`_expression.TextualSelect` objects in 1.1.
571 As name matching is no longer
572 a common case, it was acceptable to factor it into smaller generator-
573 oriented methods that are easier to understand, but incur slightly
574 more performance overhead.
575
576 """
577
578 if (
579 num_ctx_cols
580 and cols_are_ordered
581 and not textual_ordered
582 and num_ctx_cols == len(cursor_description)
583 ):
584 self._keys = [elem[0] for elem in result_columns]
585 # pure positional 1-1 case; doesn't need to read
586 # the names from cursor.description
587
588 # most common case for Core and ORM
589
590 # this metadata is safe to cache because we are guaranteed
591 # to have the columns in the same order for new executions
592 self._safe_for_cache = True
593 return [
594 (
595 idx,
596 idx,
597 rmap_entry[RM_OBJECTS],
598 rmap_entry[RM_NAME],
599 rmap_entry[RM_RENDERED_NAME],
600 context.get_result_processor(
601 rmap_entry[RM_TYPE],
602 rmap_entry[RM_RENDERED_NAME],
603 cursor_description[idx][1],
604 ),
605 None,
606 )
607 for idx, rmap_entry in enumerate(result_columns)
608 ]
609 else:
610 # name-based or text-positional cases, where we need
611 # to read cursor.description names
612
613 if textual_ordered or (
614 ad_hoc_textual and len(cursor_description) == num_ctx_cols
615 ):
616 self._safe_for_cache = True
617 # textual positional case
618 raw_iterator = self._merge_textual_cols_by_position(
619 context, cursor_description, result_columns
620 )
621 elif num_ctx_cols:
622 # compiled SQL with a mismatch of description cols
623 # vs. compiled cols, or textual w/ unordered columns
624 # the order of columns can change if the query is
625 # against a "select *", so not safe to cache
626 self._safe_for_cache = False
627 raw_iterator = self._merge_cols_by_name(
628 context,
629 cursor_description,
630 result_columns,
631 loose_column_name_matching,
632 )
633 else:
634 # no compiled SQL, just a raw string, order of columns
635 # can change for "select *"
636 self._safe_for_cache = False
637 raw_iterator = self._merge_cols_by_none(
638 context, cursor_description
639 )
640
641 return [
642 (
643 idx,
644 ridx,
645 obj,
646 cursor_colname,
647 cursor_colname,
648 context.get_result_processor(
649 mapped_type, cursor_colname, coltype
650 ),
651 untranslated,
652 ) # type: ignore[misc]
653 for (
654 idx,
655 ridx,
656 cursor_colname,
657 mapped_type,
658 coltype,
659 obj,
660 untranslated,
661 ) in raw_iterator
662 ]
663
664 def _colnames_from_description(
665 self,
666 context: DefaultExecutionContext,
667 cursor_description: _DBAPICursorDescription,
668 ) -> Iterator[Tuple[int, str, Optional[str], DBAPIType]]:
669 """Extract column names and data types from a cursor.description.
670
671 Applies unicode decoding, column translation, "normalization",
672 and case sensitivity rules to the names based on the dialect.
673
674 """
675
676 dialect = context.dialect
677 translate_colname = context._translate_colname
678 normalize_name = (
679 dialect.normalize_name if dialect.requires_name_normalize else None
680 )
681 untranslated = None
682
683 self._keys = []
684
685 for idx, rec in enumerate(cursor_description):
686 colname = rec[0]
687 coltype = rec[1]
688
689 if translate_colname:
690 colname, untranslated = translate_colname(colname)
691
692 if normalize_name:
693 colname = normalize_name(colname)
694
695 self._keys.append(colname)
696
697 yield idx, colname, untranslated, coltype
698
699 def _merge_textual_cols_by_position(
700 self,
701 context: DefaultExecutionContext,
702 cursor_description: _DBAPICursorDescription,
703 result_columns: Sequence[ResultColumnsEntry],
704 ) -> Iterator[_MergeColTuple]:
705 num_ctx_cols = len(result_columns)
706
707 if num_ctx_cols > len(cursor_description):
708 util.warn(
709 "Number of columns in textual SQL (%d) is "
710 "smaller than number of columns requested (%d)"
711 % (num_ctx_cols, len(cursor_description))
712 )
713 seen = set()
714
715 for (
716 idx,
717 colname,
718 untranslated,
719 coltype,
720 ) in self._colnames_from_description(context, cursor_description):
721 if idx < num_ctx_cols:
722 ctx_rec = result_columns[idx]
723 obj = ctx_rec[RM_OBJECTS]
724 ridx = idx
725 mapped_type = ctx_rec[RM_TYPE]
726 if obj[0] in seen:
727 raise exc.InvalidRequestError(
728 "Duplicate column expression requested "
729 "in textual SQL: %r" % obj[0]
730 )
731 seen.add(obj[0])
732 else:
733 mapped_type = sqltypes.NULLTYPE
734 obj = None
735 ridx = None
736 yield idx, ridx, colname, mapped_type, coltype, obj, untranslated
737
738 def _merge_cols_by_name(
739 self,
740 context: DefaultExecutionContext,
741 cursor_description: _DBAPICursorDescription,
742 result_columns: Sequence[ResultColumnsEntry],
743 loose_column_name_matching: bool,
744 ) -> Iterator[_MergeColTuple]:
745 match_map = self._create_description_match_map(
746 result_columns, loose_column_name_matching
747 )
748 mapped_type: TypeEngine[Any]
749
750 for (
751 idx,
752 colname,
753 untranslated,
754 coltype,
755 ) in self._colnames_from_description(context, cursor_description):
756 try:
757 ctx_rec = match_map[colname]
758 except KeyError:
759 mapped_type = sqltypes.NULLTYPE
760 obj = None
761 result_columns_idx = None
762 else:
763 obj = ctx_rec[1]
764 mapped_type = ctx_rec[2]
765 result_columns_idx = ctx_rec[3]
766 yield (
767 idx,
768 result_columns_idx,
769 colname,
770 mapped_type,
771 coltype,
772 obj,
773 untranslated,
774 )
775
776 @classmethod
777 def _create_description_match_map(
778 cls,
779 result_columns: Sequence[ResultColumnsEntry],
780 loose_column_name_matching: bool = False,
781 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]:
782 """when matching cursor.description to a set of names that are present
783 in a Compiled object, as is the case with TextualSelect, get all the
784 names we expect might match those in cursor.description.
785 """
786
787 d: Dict[
788 Union[str, object],
789 Tuple[str, TupleAny, TypeEngine[Any], int],
790 ] = {}
791 for ridx, elem in enumerate(result_columns):
792 key = elem[RM_RENDERED_NAME]
793 if key in d:
794 # conflicting keyname - just add the column-linked objects
795 # to the existing record. if there is a duplicate column
796 # name in the cursor description, this will allow all of those
797 # objects to raise an ambiguous column error
798 e_name, e_obj, e_type, e_ridx = d[key]
799 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
800 else:
801 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
802
803 if loose_column_name_matching:
804 # when using a textual statement with an unordered set
805 # of columns that line up, we are expecting the user
806 # to be using label names in the SQL that match to the column
807 # expressions. Enable more liberal matching for this case;
808 # duplicate keys that are ambiguous will be fixed later.
809 for r_key in elem[RM_OBJECTS]:
810 d.setdefault(
811 r_key,
812 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
813 )
814 return d
815
816 def _merge_cols_by_none(
817 self,
818 context: DefaultExecutionContext,
819 cursor_description: _DBAPICursorDescription,
820 ) -> Iterator[_MergeColTuple]:
821 self._keys = []
822
823 for (
824 idx,
825 colname,
826 untranslated,
827 coltype,
828 ) in self._colnames_from_description(context, cursor_description):
829 yield (
830 idx,
831 None,
832 colname,
833 sqltypes.NULLTYPE,
834 coltype,
835 None,
836 untranslated,
837 )
838
839 if not TYPE_CHECKING:
840
841 def _key_fallback(
842 self, key: Any, err: Optional[Exception], raiseerr: bool = True
843 ) -> Optional[NoReturn]:
844 if raiseerr:
845 if self._unpickled and isinstance(key, elements.ColumnElement):
846 raise exc.NoSuchColumnError(
847 "Row was unpickled; lookup by ColumnElement "
848 "is unsupported"
849 ) from err
850 else:
851 raise exc.NoSuchColumnError(
852 "Could not locate column in row for column '%s'"
853 % util.string_or_unprintable(key)
854 ) from err
855 else:
856 return None
857
858 def _raise_for_ambiguous_column_name(
859 self, rec: _KeyMapRecType
860 ) -> NoReturn:
861 raise exc.InvalidRequestError(
862 "Ambiguous column name '%s' in "
863 "result set column descriptions" % rec[MD_LOOKUP_KEY]
864 )
865
866 def _index_for_key(
867 self, key: _KeyIndexType, raiseerr: bool = True
868 ) -> Optional[int]:
869 # TODO: can consider pre-loading ints and negative ints
870 # into _keymap - also no coverage here
871 if isinstance(key, int):
872 key = self._keys[key]
873
874 try:
875 rec = self._keymap[key]
876 except KeyError as ke:
877 x = self._key_fallback(key, ke, raiseerr)
878 assert x is None
879 return None
880
881 index = rec[0]
882
883 if index is None:
884 self._raise_for_ambiguous_column_name(rec)
885 return index
886
887 def _indexes_for_keys(
888 self, keys: Sequence[_KeyIndexType]
889 ) -> Sequence[int]:
890 try:
891 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501
892 except KeyError as ke:
893 # ensure it raises
894 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
895
896 def _metadata_for_keys(
897 self, keys: Sequence[_KeyIndexType]
898 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
899 for key in keys:
900 if int in key.__class__.__mro__:
901 key = self._keys[key] # type: ignore[index]
902
903 try:
904 rec = self._keymap[key] # type: ignore[index]
905 except KeyError as ke:
906 # ensure it raises
907 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
908
909 index = rec[MD_INDEX]
910
911 if index is None:
912 self._raise_for_ambiguous_column_name(rec)
913
914 yield cast(_NonAmbigCursorKeyMapRecType, rec)
915
916 def __getstate__(self) -> Dict[str, Any]:
917 # TODO: consider serializing this as SimpleResultMetaData
918 return {
919 "_keymap": {
920 key: (
921 rec[MD_INDEX],
922 rec[MD_RESULT_MAP_INDEX],
923 [],
924 key,
925 rec[MD_RENDERED_NAME],
926 None,
927 None,
928 )
929 for key, rec in self._keymap.items()
930 if isinstance(key, (str, int))
931 },
932 "_keys": self._keys,
933 "_translated_indexes": self._translated_indexes,
934 }
935
936 def __setstate__(self, state: Dict[str, Any]) -> None:
937 self._processors = [None for _ in range(len(state["_keys"]))]
938 self._keymap = state["_keymap"]
939 self._keymap_by_result_column_idx = None
940 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
941 self._keys = state["_keys"]
942 self._unpickled = True
943 if state["_translated_indexes"]:
944 self._translated_indexes = cast(
945 "List[int]", state["_translated_indexes"]
946 )
947 self._tuplefilter = tuplegetter(*self._translated_indexes)
948 else:
949 self._translated_indexes = self._tuplefilter = None
950
951
952class ResultFetchStrategy:
953 """Define a fetching strategy for a result object.
954
955
956 .. versionadded:: 1.4
957
958 """
959
960 __slots__ = ()
961
962 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
963
964 def soft_close(
965 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
966 ) -> None:
967 raise NotImplementedError()
968
969 def hard_close(
970 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
971 ) -> None:
972 raise NotImplementedError()
973
974 def yield_per(
975 self,
976 result: CursorResult[Any],
977 dbapi_cursor: DBAPICursor,
978 num: int,
979 ) -> None:
980 return
981
982 def fetchone(
983 self,
984 result: CursorResult[Any],
985 dbapi_cursor: DBAPICursor,
986 hard_close: bool = False,
987 ) -> Any:
988 raise NotImplementedError()
989
990 def fetchmany(
991 self,
992 result: CursorResult[Any],
993 dbapi_cursor: DBAPICursor,
994 size: Optional[int] = None,
995 ) -> Any:
996 raise NotImplementedError()
997
998 def fetchall(
999 self,
1000 result: CursorResult[Any],
1001 dbapi_cursor: DBAPICursor,
1002 ) -> Any:
1003 raise NotImplementedError()
1004
1005 def handle_exception(
1006 self,
1007 result: CursorResult[Any],
1008 dbapi_cursor: Optional[DBAPICursor],
1009 err: BaseException,
1010 ) -> NoReturn:
1011 raise err
1012
1013
1014class NoCursorFetchStrategy(ResultFetchStrategy):
1015 """Cursor strategy for a result that has no open cursor.
1016
1017 There are two varieties of this strategy, one for DQL and one for
1018 DML (and also DDL), each of which represent a result that had a cursor
1019 but no longer has one.
1020
1021 """
1022
1023 __slots__ = ()
1024
1025 def soft_close(
1026 self,
1027 result: CursorResult[Any],
1028 dbapi_cursor: Optional[DBAPICursor],
1029 ) -> None:
1030 pass
1031
1032 def hard_close(
1033 self,
1034 result: CursorResult[Any],
1035 dbapi_cursor: Optional[DBAPICursor],
1036 ) -> None:
1037 pass
1038
1039 def fetchone(
1040 self,
1041 result: CursorResult[Any],
1042 dbapi_cursor: DBAPICursor,
1043 hard_close: bool = False,
1044 ) -> Any:
1045 return self._non_result(result, None)
1046
1047 def fetchmany(
1048 self,
1049 result: CursorResult[Any],
1050 dbapi_cursor: DBAPICursor,
1051 size: Optional[int] = None,
1052 ) -> Any:
1053 return self._non_result(result, [])
1054
1055 def fetchall(
1056 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1057 ) -> Any:
1058 return self._non_result(result, [])
1059
1060 def _non_result(
1061 self,
1062 result: CursorResult[Any],
1063 default: Any,
1064 err: Optional[BaseException] = None,
1065 ) -> Any:
1066 raise NotImplementedError()
1067
1068
1069class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1070 """Cursor strategy for a DQL result that has no open cursor.
1071
1072 This is a result set that can return rows, i.e. for a SELECT, or for an
1073 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1074 where the cursor is closed and no rows remain available. The owning result
1075 object may or may not be "hard closed", which determines if the fetch
1076 methods send empty results or raise for closed result.
1077
1078 """
1079
1080 __slots__ = ()
1081
1082 def _non_result(
1083 self,
1084 result: CursorResult[Any],
1085 default: Any,
1086 err: Optional[BaseException] = None,
1087 ) -> Any:
1088 if result.closed:
1089 raise exc.ResourceClosedError(
1090 "This result object is closed."
1091 ) from err
1092 else:
1093 return default
1094
1095
1096_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1097
1098
1099class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1100 """Cursor strategy for a DML result that has no open cursor.
1101
1102 This is a result set that does not return rows, i.e. for an INSERT,
1103 UPDATE, DELETE that does not include RETURNING.
1104
1105 """
1106
1107 __slots__ = ()
1108
1109 def _non_result(
1110 self,
1111 result: CursorResult[Any],
1112 default: Any,
1113 err: Optional[BaseException] = None,
1114 ) -> Any:
1115 # we only expect to have a _NoResultMetaData() here right now.
1116 assert not result._metadata.returns_rows
1117 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr]
1118
1119
1120_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1121
1122
1123class CursorFetchStrategy(ResultFetchStrategy):
1124 """Call fetch methods from a DBAPI cursor.
1125
1126 Alternate versions of this class may instead buffer the rows from
1127 cursors or not use cursors at all.
1128
1129 """
1130
1131 __slots__ = ()
1132
1133 def soft_close(
1134 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1135 ) -> None:
1136 result.cursor_strategy = _NO_CURSOR_DQL
1137
1138 def hard_close(
1139 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1140 ) -> None:
1141 result.cursor_strategy = _NO_CURSOR_DQL
1142
1143 def handle_exception(
1144 self,
1145 result: CursorResult[Any],
1146 dbapi_cursor: Optional[DBAPICursor],
1147 err: BaseException,
1148 ) -> NoReturn:
1149 result.connection._handle_dbapi_exception(
1150 err, None, None, dbapi_cursor, result.context
1151 )
1152
1153 def yield_per(
1154 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1155 ) -> None:
1156 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1157 dbapi_cursor,
1158 {"max_row_buffer": num},
1159 initial_buffer=collections.deque(),
1160 growth_factor=0,
1161 )
1162
1163 def fetchone(
1164 self,
1165 result: CursorResult[Any],
1166 dbapi_cursor: DBAPICursor,
1167 hard_close: bool = False,
1168 ) -> Any:
1169 try:
1170 row = dbapi_cursor.fetchone()
1171 if row is None:
1172 result._soft_close(hard=hard_close)
1173 return row
1174 except BaseException as e:
1175 self.handle_exception(result, dbapi_cursor, e)
1176
1177 def fetchmany(
1178 self,
1179 result: CursorResult[Any],
1180 dbapi_cursor: DBAPICursor,
1181 size: Optional[int] = None,
1182 ) -> Any:
1183 try:
1184 if size is None:
1185 l = dbapi_cursor.fetchmany()
1186 else:
1187 l = dbapi_cursor.fetchmany(size)
1188
1189 if not l:
1190 result._soft_close()
1191 return l
1192 except BaseException as e:
1193 self.handle_exception(result, dbapi_cursor, e)
1194
1195 def fetchall(
1196 self,
1197 result: CursorResult[Any],
1198 dbapi_cursor: DBAPICursor,
1199 ) -> Any:
1200 try:
1201 rows = dbapi_cursor.fetchall()
1202 result._soft_close()
1203 return rows
1204 except BaseException as e:
1205 self.handle_exception(result, dbapi_cursor, e)
1206
1207
1208_DEFAULT_FETCH = CursorFetchStrategy()
1209
1210
1211class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1212 """A cursor fetch strategy with row buffering behavior.
1213
1214 This strategy buffers the contents of a selection of rows
1215 before ``fetchone()`` is called. This is to allow the results of
1216 ``cursor.description`` to be available immediately, when
1217 interfacing with a DB-API that requires rows to be consumed before
1218 this information is available (currently psycopg2, when used with
1219 server-side cursors).
1220
1221 The pre-fetching behavior fetches only one row initially, and then
1222 grows its buffer size by a fixed amount with each successive need
1223 for additional rows up the ``max_row_buffer`` size, which defaults
1224 to 1000::
1225
1226 with psycopg2_engine.connect() as conn:
1227
1228 result = conn.execution_options(
1229 stream_results=True, max_row_buffer=50
1230 ).execute(text("select * from table"))
1231
1232 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1233
1234 .. seealso::
1235
1236 :ref:`psycopg2_execution_options`
1237 """
1238
1239 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1240
1241 def __init__(
1242 self,
1243 dbapi_cursor: DBAPICursor,
1244 execution_options: CoreExecuteOptionsParameter,
1245 growth_factor: int = 5,
1246 initial_buffer: Optional[Deque[Any]] = None,
1247 ) -> None:
1248 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1249
1250 if initial_buffer is not None:
1251 self._rowbuffer = initial_buffer
1252 else:
1253 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1254 self._growth_factor = growth_factor
1255
1256 if growth_factor:
1257 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1258 else:
1259 self._bufsize = self._max_row_buffer
1260
1261 @classmethod
1262 def create(
1263 cls, result: CursorResult[Any]
1264 ) -> BufferedRowCursorFetchStrategy:
1265 return BufferedRowCursorFetchStrategy(
1266 result.cursor,
1267 result.context.execution_options,
1268 )
1269
1270 def _buffer_rows(
1271 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1272 ) -> None:
1273 """this is currently used only by fetchone()."""
1274
1275 size = self._bufsize
1276 try:
1277 if size < 1:
1278 new_rows = dbapi_cursor.fetchall()
1279 else:
1280 new_rows = dbapi_cursor.fetchmany(size)
1281 except BaseException as e:
1282 self.handle_exception(result, dbapi_cursor, e)
1283
1284 if not new_rows:
1285 return
1286 self._rowbuffer = collections.deque(new_rows)
1287 if self._growth_factor and size < self._max_row_buffer:
1288 self._bufsize = min(
1289 self._max_row_buffer, size * self._growth_factor
1290 )
1291
1292 def yield_per(
1293 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1294 ) -> None:
1295 self._growth_factor = 0
1296 self._max_row_buffer = self._bufsize = num
1297
1298 def soft_close(
1299 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1300 ) -> None:
1301 self._rowbuffer.clear()
1302 super().soft_close(result, dbapi_cursor)
1303
1304 def hard_close(
1305 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1306 ) -> None:
1307 self._rowbuffer.clear()
1308 super().hard_close(result, dbapi_cursor)
1309
1310 def fetchone(
1311 self,
1312 result: CursorResult[Any],
1313 dbapi_cursor: DBAPICursor,
1314 hard_close: bool = False,
1315 ) -> Any:
1316 if not self._rowbuffer:
1317 self._buffer_rows(result, dbapi_cursor)
1318 if not self._rowbuffer:
1319 try:
1320 result._soft_close(hard=hard_close)
1321 except BaseException as e:
1322 self.handle_exception(result, dbapi_cursor, e)
1323 return None
1324 return self._rowbuffer.popleft()
1325
1326 def fetchmany(
1327 self,
1328 result: CursorResult[Any],
1329 dbapi_cursor: DBAPICursor,
1330 size: Optional[int] = None,
1331 ) -> Any:
1332 if size is None:
1333 return self.fetchall(result, dbapi_cursor)
1334
1335 rb = self._rowbuffer
1336 lb = len(rb)
1337 close = False
1338 if size > lb:
1339 try:
1340 new = dbapi_cursor.fetchmany(size - lb)
1341 except BaseException as e:
1342 self.handle_exception(result, dbapi_cursor, e)
1343 else:
1344 if not new:
1345 # defer closing since it may clear the row buffer
1346 close = True
1347 else:
1348 rb.extend(new)
1349
1350 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1351 if close:
1352 result._soft_close()
1353 return res
1354
1355 def fetchall(
1356 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1357 ) -> Any:
1358 try:
1359 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1360 self._rowbuffer.clear()
1361 result._soft_close()
1362 return ret
1363 except BaseException as e:
1364 self.handle_exception(result, dbapi_cursor, e)
1365
1366
1367class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1368 """A cursor strategy that buffers rows fully upon creation.
1369
1370 Used for operations where a result is to be delivered
1371 after the database conversation can not be continued,
1372 such as MSSQL INSERT...OUTPUT after an autocommit.
1373
1374 """
1375
1376 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1377
1378 def __init__(
1379 self,
1380 dbapi_cursor: Optional[DBAPICursor],
1381 alternate_description: Optional[_DBAPICursorDescription] = None,
1382 initial_buffer: Optional[Iterable[Any]] = None,
1383 ):
1384 self.alternate_cursor_description = alternate_description
1385 if initial_buffer is not None:
1386 self._rowbuffer = collections.deque(initial_buffer)
1387 else:
1388 assert dbapi_cursor is not None
1389 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1390
1391 def yield_per(
1392 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1393 ) -> Any:
1394 pass
1395
1396 def soft_close(
1397 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1398 ) -> None:
1399 self._rowbuffer.clear()
1400 super().soft_close(result, dbapi_cursor)
1401
1402 def hard_close(
1403 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1404 ) -> None:
1405 self._rowbuffer.clear()
1406 super().hard_close(result, dbapi_cursor)
1407
1408 def fetchone(
1409 self,
1410 result: CursorResult[Any],
1411 dbapi_cursor: DBAPICursor,
1412 hard_close: bool = False,
1413 ) -> Any:
1414 if self._rowbuffer:
1415 return self._rowbuffer.popleft()
1416 else:
1417 result._soft_close(hard=hard_close)
1418 return None
1419
1420 def fetchmany(
1421 self,
1422 result: CursorResult[Any],
1423 dbapi_cursor: DBAPICursor,
1424 size: Optional[int] = None,
1425 ) -> Any:
1426 if size is None:
1427 return self.fetchall(result, dbapi_cursor)
1428
1429 rb = self._rowbuffer
1430 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1431 if not rows:
1432 result._soft_close()
1433 return rows
1434
1435 def fetchall(
1436 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1437 ) -> Any:
1438 ret = self._rowbuffer
1439 self._rowbuffer = collections.deque()
1440 result._soft_close()
1441 return ret
1442
1443
1444class _NoResultMetaData(ResultMetaData):
1445 __slots__ = ()
1446
1447 returns_rows = False
1448
1449 def _we_dont_return_rows(
1450 self, err: Optional[BaseException] = None
1451 ) -> NoReturn:
1452 raise exc.ResourceClosedError(
1453 "This result object does not return rows. "
1454 "It has been closed automatically."
1455 ) from err
1456
1457 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn:
1458 self._we_dont_return_rows()
1459
1460 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1461 self._we_dont_return_rows()
1462
1463 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1464 self._we_dont_return_rows()
1465
1466 @property
1467 def _keymap(self) -> NoReturn: # type: ignore[override]
1468 self._we_dont_return_rows()
1469
1470 @property
1471 def _key_to_index(self) -> NoReturn: # type: ignore[override]
1472 self._we_dont_return_rows()
1473
1474 @property
1475 def _processors(self) -> NoReturn: # type: ignore[override]
1476 self._we_dont_return_rows()
1477
1478 @property
1479 def keys(self) -> NoReturn:
1480 self._we_dont_return_rows()
1481
1482
1483_NO_RESULT_METADATA = _NoResultMetaData()
1484
1485
1486def null_dml_result() -> IteratorResult[Any]:
1487 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1488 it._soft_close()
1489 return it
1490
1491
1492class CursorResult(Result[_T]):
1493 """A Result that is representing state from a DBAPI cursor.
1494
1495 .. versionchanged:: 1.4 The :class:`.CursorResult``
1496 class replaces the previous :class:`.ResultProxy` interface.
1497 This classes are based on the :class:`.Result` calling API
1498 which provides an updated usage model and calling facade for
1499 SQLAlchemy Core and SQLAlchemy ORM.
1500
1501 Returns database rows via the :class:`.Row` class, which provides
1502 additional API features and behaviors on top of the raw data returned by
1503 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1504 method, other kinds of objects may also be returned.
1505
1506 .. seealso::
1507
1508 :ref:`tutorial_selecting_data` - introductory material for accessing
1509 :class:`_engine.CursorResult` and :class:`.Row` objects.
1510
1511 """
1512
1513 __slots__ = (
1514 "context",
1515 "dialect",
1516 "cursor",
1517 "cursor_strategy",
1518 "_echo",
1519 "connection",
1520 )
1521
1522 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1523 _no_result_metadata = _NO_RESULT_METADATA
1524 _soft_closed: bool = False
1525 closed: bool = False
1526 _is_cursor = True
1527
1528 context: DefaultExecutionContext
1529 dialect: Dialect
1530 cursor_strategy: ResultFetchStrategy
1531 connection: Connection
1532
1533 def __init__(
1534 self,
1535 context: DefaultExecutionContext,
1536 cursor_strategy: ResultFetchStrategy,
1537 cursor_description: Optional[_DBAPICursorDescription],
1538 ):
1539 self.context = context
1540 self.dialect = context.dialect
1541 self.cursor = context.cursor
1542 self.cursor_strategy = cursor_strategy
1543 self.connection = context.root_connection
1544 self._echo = echo = (
1545 self.connection._echo and context.engine._should_log_debug()
1546 )
1547
1548 if cursor_description is not None:
1549 # inline of Result._row_getter(), set up an initial row
1550 # getter assuming no transformations will be called as this
1551 # is the most common case
1552
1553 metadata = self._init_metadata(context, cursor_description)
1554
1555 _make_row: Any
1556 _make_row = functools.partial(
1557 Row,
1558 metadata,
1559 metadata._effective_processors,
1560 metadata._key_to_index,
1561 )
1562
1563 if context._num_sentinel_cols:
1564 sentinel_filter = operator.itemgetter(
1565 slice(-context._num_sentinel_cols)
1566 )
1567
1568 def _sliced_row(raw_data: Any) -> Any:
1569 return _make_row(sentinel_filter(raw_data))
1570
1571 sliced_row = _sliced_row
1572 else:
1573 sliced_row = _make_row
1574
1575 if echo:
1576 log = self.context.connection._log_debug
1577
1578 def _log_row(row: Any) -> Any:
1579 log("Row %r", sql_util._repr_row(row))
1580 return row
1581
1582 self._row_logging_fn = _log_row
1583
1584 def _make_row_2(row: Any) -> Any:
1585 return _log_row(sliced_row(row))
1586
1587 make_row = _make_row_2
1588 else:
1589 make_row = sliced_row # type: ignore[assignment]
1590 self._set_memoized_attribute("_row_getter", make_row)
1591
1592 else:
1593 assert context._num_sentinel_cols == 0
1594 self._metadata = self._no_result_metadata
1595
1596 def _init_metadata(
1597 self,
1598 context: DefaultExecutionContext,
1599 cursor_description: _DBAPICursorDescription,
1600 ) -> CursorResultMetaData:
1601
1602 if context.compiled:
1603 compiled = context.compiled
1604
1605 if compiled._cached_metadata:
1606 metadata = compiled._cached_metadata
1607 else:
1608 metadata = CursorResultMetaData(self, cursor_description)
1609 if metadata._safe_for_cache:
1610 compiled._cached_metadata = metadata
1611
1612 # result rewrite/ adapt step. this is to suit the case
1613 # when we are invoked against a cached Compiled object, we want
1614 # to rewrite the ResultMetaData to reflect the Column objects
1615 # that are in our current SQL statement object, not the one
1616 # that is associated with the cached Compiled object.
1617 # the Compiled object may also tell us to not
1618 # actually do this step; this is to support the ORM where
1619 # it is to produce a new Result object in any case, and will
1620 # be using the cached Column objects against this database result
1621 # so we don't want to rewrite them.
1622 #
1623 # Basically this step suits the use case where the end user
1624 # is using Core SQL expressions and is accessing columns in the
1625 # result row using row._mapping[table.c.column].
1626 if (
1627 not context.execution_options.get(
1628 "_result_disable_adapt_to_context", False
1629 )
1630 and compiled._result_columns
1631 and context.cache_hit is context.dialect.CACHE_HIT
1632 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501
1633 ):
1634 metadata = metadata._adapt_to_context(context) # type: ignore[assignment] # noqa: E501
1635
1636 self._metadata = metadata
1637
1638 else:
1639 self._metadata = metadata = CursorResultMetaData(
1640 self, cursor_description
1641 )
1642 if self._echo:
1643 context.connection._log_debug(
1644 "Col %r", tuple(x[0] for x in cursor_description)
1645 )
1646 return metadata
1647
1648 def _soft_close(self, hard: bool = False) -> None:
1649 """Soft close this :class:`_engine.CursorResult`.
1650
1651 This releases all DBAPI cursor resources, but leaves the
1652 CursorResult "open" from a semantic perspective, meaning the
1653 fetchXXX() methods will continue to return empty results.
1654
1655 This method is called automatically when:
1656
1657 * all result rows are exhausted using the fetchXXX() methods.
1658 * cursor.description is None.
1659
1660 This method is **not public**, but is documented in order to clarify
1661 the "autoclose" process used.
1662
1663 .. seealso::
1664
1665 :meth:`_engine.CursorResult.close`
1666
1667
1668 """
1669
1670 if (not hard and self._soft_closed) or (hard and self.closed):
1671 return
1672
1673 if hard:
1674 self.closed = True
1675 self.cursor_strategy.hard_close(self, self.cursor)
1676 else:
1677 self.cursor_strategy.soft_close(self, self.cursor)
1678
1679 if not self._soft_closed:
1680 cursor = self.cursor
1681 self.cursor = None # type: ignore
1682 self.connection._safe_close_cursor(cursor)
1683 self._soft_closed = True
1684
1685 @property
1686 def inserted_primary_key_rows(self) -> List[Optional[Any]]:
1687 """Return the value of
1688 :attr:`_engine.CursorResult.inserted_primary_key`
1689 as a row contained within a list; some dialects may support a
1690 multiple row form as well.
1691
1692 .. note:: As indicated below, in current SQLAlchemy versions this
1693 accessor is only useful beyond what's already supplied by
1694 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1695 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1696 generalize this feature to more dialects.
1697
1698 This accessor is added to support dialects that offer the feature
1699 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1700 feature, currently **only the psycopg2 dialect**, which provides
1701 for many rows to be INSERTed at once while still retaining the
1702 behavior of being able to return server-generated primary key values.
1703
1704 * **When using the psycopg2 dialect, or other dialects that may support
1705 "fast executemany" style inserts in upcoming releases** : When
1706 invoking an INSERT statement while passing a list of rows as the
1707 second argument to :meth:`_engine.Connection.execute`, this accessor
1708 will then provide a list of rows, where each row contains the primary
1709 key value for each row that was INSERTed.
1710
1711 * **When using all other dialects / backends that don't yet support
1712 this feature**: This accessor is only useful for **single row INSERT
1713 statements**, and returns the same information as that of the
1714 :attr:`_engine.CursorResult.inserted_primary_key` within a
1715 single-element list. When an INSERT statement is executed in
1716 conjunction with a list of rows to be INSERTed, the list will contain
1717 one row per row inserted in the statement, however it will contain
1718 ``None`` for any server-generated values.
1719
1720 Future releases of SQLAlchemy will further generalize the
1721 "fast execution helper" feature of psycopg2 to suit other dialects,
1722 thus allowing this accessor to be of more general use.
1723
1724 .. versionadded:: 1.4
1725
1726 .. seealso::
1727
1728 :attr:`_engine.CursorResult.inserted_primary_key`
1729
1730 """
1731 if not self.context.compiled:
1732 raise exc.InvalidRequestError(
1733 "Statement is not a compiled expression construct."
1734 )
1735 elif not self.context.isinsert:
1736 raise exc.InvalidRequestError(
1737 "Statement is not an insert() expression construct."
1738 )
1739 elif self.context._is_explicit_returning:
1740 raise exc.InvalidRequestError(
1741 "Can't call inserted_primary_key "
1742 "when returning() "
1743 "is used."
1744 )
1745 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501
1746
1747 @property
1748 def inserted_primary_key(self) -> Optional[Any]:
1749 """Return the primary key for the row just inserted.
1750
1751 The return value is a :class:`_result.Row` object representing
1752 a named tuple of primary key values in the order in which the
1753 primary key columns are configured in the source
1754 :class:`_schema.Table`.
1755
1756 .. versionchanged:: 1.4.8 - the
1757 :attr:`_engine.CursorResult.inserted_primary_key`
1758 value is now a named tuple via the :class:`_result.Row` class,
1759 rather than a plain tuple.
1760
1761 This accessor only applies to single row :func:`_expression.insert`
1762 constructs which did not explicitly specify
1763 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1764 while not yet available for most backends, would be accessed using
1765 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1766
1767 Note that primary key columns which specify a server_default clause, or
1768 otherwise do not qualify as "autoincrement" columns (see the notes at
1769 :class:`_schema.Column`), and were generated using the database-side
1770 default, will appear in this list as ``None`` unless the backend
1771 supports "returning" and the insert statement executed with the
1772 "implicit returning" enabled.
1773
1774 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1775 statement is not a compiled expression construct
1776 or is not an insert() construct.
1777
1778 """
1779
1780 if self.context.executemany:
1781 raise exc.InvalidRequestError(
1782 "This statement was an executemany call; if primary key "
1783 "returning is supported, please "
1784 "use .inserted_primary_key_rows."
1785 )
1786
1787 ikp = self.inserted_primary_key_rows
1788 if ikp:
1789 return ikp[0]
1790 else:
1791 return None
1792
1793 def last_updated_params(
1794 self,
1795 ) -> Union[
1796 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1797 ]:
1798 """Return the collection of updated parameters from this
1799 execution.
1800
1801 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1802 statement is not a compiled expression construct
1803 or is not an update() construct.
1804
1805 """
1806 if not self.context.compiled:
1807 raise exc.InvalidRequestError(
1808 "Statement is not a compiled expression construct."
1809 )
1810 elif not self.context.isupdate:
1811 raise exc.InvalidRequestError(
1812 "Statement is not an update() expression construct."
1813 )
1814 elif self.context.executemany:
1815 return self.context.compiled_parameters
1816 else:
1817 return self.context.compiled_parameters[0]
1818
1819 def last_inserted_params(
1820 self,
1821 ) -> Union[
1822 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1823 ]:
1824 """Return the collection of inserted parameters from this
1825 execution.
1826
1827 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1828 statement is not a compiled expression construct
1829 or is not an insert() construct.
1830
1831 """
1832 if not self.context.compiled:
1833 raise exc.InvalidRequestError(
1834 "Statement is not a compiled expression construct."
1835 )
1836 elif not self.context.isinsert:
1837 raise exc.InvalidRequestError(
1838 "Statement is not an insert() expression construct."
1839 )
1840 elif self.context.executemany:
1841 return self.context.compiled_parameters
1842 else:
1843 return self.context.compiled_parameters[0]
1844
1845 @property
1846 def returned_defaults_rows(
1847 self,
1848 ) -> Optional[Sequence[Row[Any]]]:
1849 """Return a list of rows each containing the values of default
1850 columns that were fetched using
1851 the :meth:`.ValuesBase.return_defaults` feature.
1852
1853 The return value is a list of :class:`.Row` objects.
1854
1855 .. versionadded:: 1.4
1856
1857 """
1858 return self.context.returned_default_rows
1859
1860 def splice_horizontally(self, other: CursorResult[Any]) -> Self:
1861 """Return a new :class:`.CursorResult` that "horizontally splices"
1862 together the rows of this :class:`.CursorResult` with that of another
1863 :class:`.CursorResult`.
1864
1865 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1866 not intended for general use.
1867
1868 "horizontally splices" means that for each row in the first and second
1869 result sets, a new row that concatenates the two rows together is
1870 produced, which then becomes the new row. The incoming
1871 :class:`.CursorResult` must have the identical number of rows. It is
1872 typically expected that the two result sets come from the same sort
1873 order as well, as the result rows are spliced together based on their
1874 position in the result.
1875
1876 The expected use case here is so that multiple INSERT..RETURNING
1877 statements (which definitely need to be sorted) against different
1878 tables can produce a single result that looks like a JOIN of those two
1879 tables.
1880
1881 E.g.::
1882
1883 r1 = connection.execute(
1884 users.insert().returning(
1885 users.c.user_name, users.c.user_id, sort_by_parameter_order=True
1886 ),
1887 user_values,
1888 )
1889
1890 r2 = connection.execute(
1891 addresses.insert().returning(
1892 addresses.c.address_id,
1893 addresses.c.address,
1894 addresses.c.user_id,
1895 sort_by_parameter_order=True,
1896 ),
1897 address_values,
1898 )
1899
1900 rows = r1.splice_horizontally(r2).all()
1901 assert rows == [
1902 ("john", 1, 1, "foo@bar.com", 1),
1903 ("jack", 2, 2, "bar@bat.com", 2),
1904 ]
1905
1906 .. versionadded:: 2.0
1907
1908 .. seealso::
1909
1910 :meth:`.CursorResult.splice_vertically`
1911
1912
1913 """ # noqa: E501
1914
1915 clone = self._generate()
1916 total_rows = [
1917 tuple(r1) + tuple(r2)
1918 for r1, r2 in zip(
1919 list(self._raw_row_iterator()),
1920 list(other._raw_row_iterator()),
1921 )
1922 ]
1923
1924 clone._metadata = clone._metadata._splice_horizontally(other._metadata) # type: ignore[union-attr, arg-type] # noqa: E501
1925
1926 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1927 None,
1928 initial_buffer=total_rows,
1929 )
1930 clone._reset_memoizations()
1931 return clone
1932
1933 def splice_vertically(self, other: CursorResult[Any]) -> Self:
1934 """Return a new :class:`.CursorResult` that "vertically splices",
1935 i.e. "extends", the rows of this :class:`.CursorResult` with that of
1936 another :class:`.CursorResult`.
1937
1938 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1939 not intended for general use.
1940
1941 "vertically splices" means the rows of the given result are appended to
1942 the rows of this cursor result. The incoming :class:`.CursorResult`
1943 must have rows that represent the identical list of columns in the
1944 identical order as they are in this :class:`.CursorResult`.
1945
1946 .. versionadded:: 2.0
1947
1948 .. seealso::
1949
1950 :meth:`.CursorResult.splice_horizontally`
1951
1952 """
1953 clone = self._generate()
1954 total_rows = list(self._raw_row_iterator()) + list(
1955 other._raw_row_iterator()
1956 )
1957
1958 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1959 None,
1960 initial_buffer=total_rows,
1961 )
1962 clone._reset_memoizations()
1963 return clone
1964
1965 def _rewind(self, rows: Any) -> Self:
1966 """rewind this result back to the given rowset.
1967
1968 this is used internally for the case where an :class:`.Insert`
1969 construct combines the use of
1970 :meth:`.Insert.return_defaults` along with the
1971 "supplemental columns" feature.
1972
1973 """
1974
1975 if self._echo:
1976 self.context.connection._log_debug(
1977 "CursorResult rewound %d row(s)", len(rows)
1978 )
1979
1980 # the rows given are expected to be Row objects, so we
1981 # have to clear out processors which have already run on these
1982 # rows
1983 self._metadata = cast(
1984 CursorResultMetaData, self._metadata
1985 )._remove_processors()
1986
1987 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
1988 None,
1989 # TODO: if these are Row objects, can we save on not having to
1990 # re-make new Row objects out of them a second time? is that
1991 # what's actually happening right now? maybe look into this
1992 initial_buffer=rows,
1993 )
1994 self._reset_memoizations()
1995 return self
1996
1997 @property
1998 def returned_defaults(self) -> Optional[Row[Any]]:
1999 """Return the values of default columns that were fetched using
2000 the :meth:`.ValuesBase.return_defaults` feature.
2001
2002 The value is an instance of :class:`.Row`, or ``None``
2003 if :meth:`.ValuesBase.return_defaults` was not used or if the
2004 backend does not support RETURNING.
2005
2006 .. seealso::
2007
2008 :meth:`.ValuesBase.return_defaults`
2009
2010 """
2011
2012 if self.context.executemany:
2013 raise exc.InvalidRequestError(
2014 "This statement was an executemany call; if return defaults "
2015 "is supported, please use .returned_defaults_rows."
2016 )
2017
2018 rows = self.context.returned_default_rows
2019 if rows:
2020 return rows[0]
2021 else:
2022 return None
2023
2024 def lastrow_has_defaults(self) -> bool:
2025 """Return ``lastrow_has_defaults()`` from the underlying
2026 :class:`.ExecutionContext`.
2027
2028 See :class:`.ExecutionContext` for details.
2029
2030 """
2031
2032 return self.context.lastrow_has_defaults()
2033
2034 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2035 """Return ``postfetch_cols()`` from the underlying
2036 :class:`.ExecutionContext`.
2037
2038 See :class:`.ExecutionContext` for details.
2039
2040 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2041 statement is not a compiled expression construct
2042 or is not an insert() or update() construct.
2043
2044 """
2045
2046 if not self.context.compiled:
2047 raise exc.InvalidRequestError(
2048 "Statement is not a compiled expression construct."
2049 )
2050 elif not self.context.isinsert and not self.context.isupdate:
2051 raise exc.InvalidRequestError(
2052 "Statement is not an insert() or update() "
2053 "expression construct."
2054 )
2055 return self.context.postfetch_cols
2056
2057 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2058 """Return ``prefetch_cols()`` from the underlying
2059 :class:`.ExecutionContext`.
2060
2061 See :class:`.ExecutionContext` for details.
2062
2063 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2064 statement is not a compiled expression construct
2065 or is not an insert() or update() construct.
2066
2067 """
2068
2069 if not self.context.compiled:
2070 raise exc.InvalidRequestError(
2071 "Statement is not a compiled expression construct."
2072 )
2073 elif not self.context.isinsert and not self.context.isupdate:
2074 raise exc.InvalidRequestError(
2075 "Statement is not an insert() or update() "
2076 "expression construct."
2077 )
2078 return self.context.prefetch_cols
2079
2080 def supports_sane_rowcount(self) -> bool:
2081 """Return ``supports_sane_rowcount`` from the dialect.
2082
2083 See :attr:`_engine.CursorResult.rowcount` for background.
2084
2085 """
2086
2087 return self.dialect.supports_sane_rowcount
2088
2089 def supports_sane_multi_rowcount(self) -> bool:
2090 """Return ``supports_sane_multi_rowcount`` from the dialect.
2091
2092 See :attr:`_engine.CursorResult.rowcount` for background.
2093
2094 """
2095
2096 return self.dialect.supports_sane_multi_rowcount
2097
2098 @util.memoized_property
2099 def rowcount(self) -> int:
2100 """Return the 'rowcount' for this result.
2101
2102 The primary purpose of 'rowcount' is to report the number of rows
2103 matched by the WHERE criterion of an UPDATE or DELETE statement
2104 executed once (i.e. for a single parameter set), which may then be
2105 compared to the number of rows expected to be updated or deleted as a
2106 means of asserting data integrity.
2107
2108 This attribute is transferred from the ``cursor.rowcount`` attribute
2109 of the DBAPI before the cursor is closed, to support DBAPIs that
2110 don't make this value available after cursor close. Some DBAPIs may
2111 offer meaningful values for other kinds of statements, such as INSERT
2112 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
2113 for these statements, set the
2114 :paramref:`.Connection.execution_options.preserve_rowcount`
2115 execution option to True, which will cause the ``cursor.rowcount``
2116 value to be unconditionally memoized before any results are returned
2117 or the cursor is closed, regardless of statement type.
2118
2119 For cases where the DBAPI does not support rowcount for a particular
2120 kind of statement and/or execution, the returned value will be ``-1``,
2121 which is delivered directly from the DBAPI and is part of :pep:`249`.
2122 All DBAPIs should support rowcount for single-parameter-set
2123 UPDATE and DELETE statements, however.
2124
2125 .. note::
2126
2127 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2128
2129
2130 * This attribute returns the number of rows *matched*,
2131 which is not necessarily the same as the number of rows
2132 that were actually *modified*. For example, an UPDATE statement
2133 may have no net change on a given row if the SET values
2134 given are the same as those present in the row already.
2135 Such a row would be matched but not modified.
2136 On backends that feature both styles, such as MySQL,
2137 rowcount is configured to return the match
2138 count in all cases.
2139
2140 * :attr:`_engine.CursorResult.rowcount` in the default case is
2141 *only* useful in conjunction with an UPDATE or DELETE statement,
2142 and only with a single set of parameters. For other kinds of
2143 statements, SQLAlchemy will not attempt to pre-memoize the value
2144 unless the
2145 :paramref:`.Connection.execution_options.preserve_rowcount`
2146 execution option is used. Note that contrary to :pep:`249`, many
2147 DBAPIs do not support rowcount values for statements that are not
2148 UPDATE or DELETE, particularly when rows are being returned which
2149 are not fully pre-buffered. DBAPIs that dont support rowcount
2150 for a particular kind of statement should return the value ``-1``
2151 for such statements.
2152
2153 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2154 when executing a single statement with multiple parameter sets
2155 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2156 values across multiple parameter sets and will return ``-1``
2157 when accessed.
2158
2159 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2160 a correct population of :attr:`_engine.CursorResult.rowcount`
2161 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2162 execution option is set to True.
2163
2164 * Statements that use RETURNING may not support rowcount, returning
2165 a ``-1`` value instead.
2166
2167 .. seealso::
2168
2169 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2170
2171 :paramref:`.Connection.execution_options.preserve_rowcount`
2172
2173 """ # noqa: E501
2174 try:
2175 return self.context.rowcount
2176 except BaseException as e:
2177 self.cursor_strategy.handle_exception(self, self.cursor, e)
2178 raise # not called
2179
2180 @property
2181 def lastrowid(self) -> int:
2182 """Return the 'lastrowid' accessor on the DBAPI cursor.
2183
2184 This is a DBAPI specific method and is only functional
2185 for those backends which support it, for statements
2186 where it is appropriate. It's behavior is not
2187 consistent across backends.
2188
2189 Usage of this method is normally unnecessary when
2190 using insert() expression constructs; the
2191 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2192 tuple of primary key values for a newly inserted row,
2193 regardless of database backend.
2194
2195 """
2196 try:
2197 return self.context.get_lastrowid()
2198 except BaseException as e:
2199 self.cursor_strategy.handle_exception(self, self.cursor, e)
2200
2201 @property
2202 def returns_rows(self) -> bool:
2203 """True if this :class:`_engine.CursorResult` returns zero or more
2204 rows.
2205
2206 I.e. if it is legal to call the methods
2207 :meth:`_engine.CursorResult.fetchone`,
2208 :meth:`_engine.CursorResult.fetchmany`
2209 :meth:`_engine.CursorResult.fetchall`.
2210
2211 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2212 always be synonymous with whether or not the DBAPI cursor had a
2213 ``.description`` attribute, indicating the presence of result columns,
2214 noting that a cursor that returns zero rows still has a
2215 ``.description`` if a row-returning statement was emitted.
2216
2217 This attribute should be True for all results that are against
2218 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2219 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2220 not using RETURNING, the value will usually be False, however
2221 there are some dialect-specific exceptions to this, such as when
2222 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2223 order to retrieve an inserted primary key value.
2224
2225 .. seealso::
2226
2227 :meth:`.Result.close`
2228
2229 :attr:`.Result.closed`
2230
2231 """
2232 return self._metadata.returns_rows
2233
2234 @property
2235 def is_insert(self) -> bool:
2236 """True if this :class:`_engine.CursorResult` is the result
2237 of a executing an expression language compiled
2238 :func:`_expression.insert` construct.
2239
2240 When True, this implies that the
2241 :attr:`inserted_primary_key` attribute is accessible,
2242 assuming the statement did not include
2243 a user defined "returning" construct.
2244
2245 """
2246 return self.context.isinsert
2247
2248 def _fetchiter_impl(self) -> Iterator[Any]:
2249 fetchone = self.cursor_strategy.fetchone
2250
2251 while True:
2252 row = fetchone(self, self.cursor)
2253 if row is None:
2254 break
2255 yield row
2256
2257 def _fetchone_impl(self, hard_close: bool = False) -> Any:
2258 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2259
2260 def _fetchall_impl(self) -> Any:
2261 return self.cursor_strategy.fetchall(self, self.cursor)
2262
2263 def _fetchmany_impl(self, size: Optional[int] = None) -> Any:
2264 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2265
2266 def _raw_row_iterator(self) -> Any:
2267 return self._fetchiter_impl()
2268
2269 def merge(self, *others: Result[Any]) -> MergedResult[Any]:
2270 merged_result = super().merge(*others)
2271 if self.context._has_rowcount:
2272 merged_result.rowcount = sum(
2273 cast("CursorResult[Any]", result).rowcount
2274 for result in (self,) + others
2275 )
2276 return merged_result
2277
2278 def close(self) -> None:
2279 """Close this :class:`_engine.CursorResult`.
2280
2281 This closes out the underlying DBAPI cursor corresponding to the
2282 statement execution, if one is still present. Note that the DBAPI
2283 cursor is automatically released when the :class:`_engine.CursorResult`
2284 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2285 generally an optional method except in the case when discarding a
2286 :class:`_engine.CursorResult` that still has additional rows pending
2287 for fetch.
2288
2289 After this method is called, it is no longer valid to call upon
2290 the fetch methods, which will raise a :class:`.ResourceClosedError`
2291 on subsequent use.
2292
2293 .. seealso::
2294
2295 :ref:`connections_toplevel`
2296
2297 """
2298 self._soft_close(hard=True)
2299
2300 @_generative
2301 def yield_per(self, num: int) -> Self:
2302 self._yield_per = num
2303 self.cursor_strategy.yield_per(self, self.cursor, num)
2304 return self
2305
2306
2307ResultProxy = CursorResult