1# engine/cursor.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define cursor-specific result set constructs including
9:class:`.CursorResult`."""
10
11
12from __future__ import annotations
13
14import collections
15import functools
16import operator
17import typing
18from typing import Any
19from typing import cast
20from typing import ClassVar
21from typing import Deque
22from typing import Dict
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Mapping
27from typing import NoReturn
28from typing import Optional
29from typing import Sequence
30from typing import Tuple
31from typing import TYPE_CHECKING
32from typing import TypeVar
33from typing import Union
34
35from .result import IteratorResult
36from .result import MergedResult
37from .result import Result
38from .result import ResultMetaData
39from .result import SimpleResultMetaData
40from .result import tuplegetter
41from .row import Row
42from .. import exc
43from .. import util
44from ..sql import elements
45from ..sql import sqltypes
46from ..sql import util as sql_util
47from ..sql.base import _generative
48from ..sql.compiler import ResultColumnsEntry
49from ..sql.compiler import RM_NAME
50from ..sql.compiler import RM_OBJECTS
51from ..sql.compiler import RM_RENDERED_NAME
52from ..sql.compiler import RM_TYPE
53from ..sql.type_api import TypeEngine
54from ..util import compat
55from ..util.typing import Final
56from ..util.typing import Literal
57from ..util.typing import Self
58
59
60if typing.TYPE_CHECKING:
61 from .base import Connection
62 from .default import DefaultExecutionContext
63 from .interfaces import _DBAPICursorDescription
64 from .interfaces import _MutableCoreSingleExecuteParams
65 from .interfaces import CoreExecuteOptionsParameter
66 from .interfaces import DBAPICursor
67 from .interfaces import DBAPIType
68 from .interfaces import Dialect
69 from .interfaces import ExecutionContext
70 from .result import _KeyIndexType
71 from .result import _KeyMapRecType
72 from .result import _KeyMapType
73 from .result import _KeyType
74 from .result import _ProcessorsType
75 from .result import _TupleGetterType
76 from ..sql.schema import Column
77 from ..sql.type_api import _ResultProcessorType
78
79
80_T = TypeVar("_T", bound=Any)
81TupleAny = Tuple[Any, ...]
82
83# metadata entry tuple indexes.
84# using raw tuple is faster than namedtuple.
85# these match up to the positions in
86# _CursorKeyMapRecType
87MD_INDEX: Final[Literal[0]] = 0
88"""integer index in cursor.description
89
90"""
91
92MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1
93"""integer index in compiled._result_columns"""
94
95MD_OBJECTS: Final[Literal[2]] = 2
96"""other string keys and ColumnElement obj that can match.
97
98This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
99
100"""
101
102MD_LOOKUP_KEY: Final[Literal[3]] = 3
103"""string key we usually expect for key-based lookup
104
105this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
106"""
107
108
109MD_RENDERED_NAME: Final[Literal[4]] = 4
110"""name that is usually in cursor.description
111
112this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
113"""
114
115
116MD_PROCESSOR: Final[Literal[5]] = 5
117"""callable to process a result value into a row"""
118
119MD_UNTRANSLATED: Final[Literal[6]] = 6
120"""raw name from cursor.description"""
121
122
123_CursorKeyMapRecType = Tuple[
124 Optional[int], # MD_INDEX, None means the record is ambiguously named
125 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None
126 TupleAny, # MD_OBJECTS
127 str, # MD_LOOKUP_KEY
128 str, # MD_RENDERED_NAME
129 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
130 Optional[str], # MD_UNTRANSLATED
131]
132
133_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
134
135# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
136# not None
137_NonAmbigCursorKeyMapRecType = Tuple[
138 int,
139 int,
140 List[Any],
141 str,
142 str,
143 Optional["_ResultProcessorType[Any]"],
144 str,
145]
146
147_MergeColTuple = Tuple[
148 int,
149 Optional[int],
150 str,
151 TypeEngine[Any],
152 "DBAPIType",
153 Optional[TupleAny],
154 Optional[str],
155]
156
157
158class CursorResultMetaData(ResultMetaData):
159 """Result metadata for DBAPI cursors."""
160
161 __slots__ = (
162 "_keymap",
163 "_processors",
164 "_keys",
165 "_keymap_by_result_column_idx",
166 "_tuplefilter",
167 "_translated_indexes",
168 "_safe_for_cache",
169 "_unpickled",
170 "_key_to_index",
171 # don't need _unique_filters support here for now. Can be added
172 # if a need arises.
173 )
174
175 _keymap: _CursorKeyMapType
176 _processors: _ProcessorsType
177 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
178 _unpickled: bool
179 _safe_for_cache: bool
180 _translated_indexes: Optional[List[int]]
181
182 returns_rows: ClassVar[bool] = True
183
184 def _has_key(self, key: Any) -> bool:
185 return key in self._keymap
186
187 def _for_freeze(self) -> ResultMetaData:
188 return SimpleResultMetaData(
189 self._keys,
190 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
191 )
192
193 def _make_new_metadata(
194 self,
195 *,
196 unpickled: bool,
197 processors: _ProcessorsType,
198 keys: Sequence[str],
199 keymap: _KeyMapType,
200 tuplefilter: Optional[_TupleGetterType],
201 translated_indexes: Optional[List[int]],
202 safe_for_cache: bool,
203 keymap_by_result_column_idx: Any,
204 ) -> CursorResultMetaData:
205 new_obj = self.__class__.__new__(self.__class__)
206 new_obj._unpickled = unpickled
207 new_obj._processors = processors
208 new_obj._keys = keys
209 new_obj._keymap = keymap
210 new_obj._tuplefilter = tuplefilter
211 new_obj._translated_indexes = translated_indexes
212 new_obj._safe_for_cache = safe_for_cache
213 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
214 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
215 return new_obj
216
217 def _remove_processors(self) -> CursorResultMetaData:
218 assert not self._tuplefilter
219 return self._make_new_metadata(
220 unpickled=self._unpickled,
221 processors=[None] * len(self._processors),
222 tuplefilter=None,
223 translated_indexes=None,
224 keymap={
225 key: value[0:5] + (None,) + value[6:]
226 for key, value in self._keymap.items()
227 },
228 keys=self._keys,
229 safe_for_cache=self._safe_for_cache,
230 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
231 )
232
233 def _splice_horizontally(
234 self, other: CursorResultMetaData
235 ) -> CursorResultMetaData:
236 assert not self._tuplefilter
237
238 keymap = dict(self._keymap)
239 offset = len(self._keys)
240
241 for key, value in other._keymap.items():
242 # int index should be None for ambiguous key
243 if value[MD_INDEX] is not None and key not in keymap:
244 md_index = value[MD_INDEX] + offset
245 md_object = value[MD_RESULT_MAP_INDEX] + offset
246 else:
247 md_index = None
248 md_object = -1
249 keymap[key] = (md_index, md_object, *value[2:])
250
251 return self._make_new_metadata(
252 unpickled=self._unpickled,
253 processors=self._processors + other._processors, # type: ignore
254 tuplefilter=None,
255 translated_indexes=None,
256 keys=self._keys + other._keys, # type: ignore
257 keymap=keymap,
258 safe_for_cache=self._safe_for_cache,
259 keymap_by_result_column_idx={
260 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
261 for metadata_entry in keymap.values()
262 },
263 )
264
265 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
266 recs = list(self._metadata_for_keys(keys))
267
268 indexes = [rec[MD_INDEX] for rec in recs]
269 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
270
271 if self._translated_indexes:
272 indexes = [self._translated_indexes[idx] for idx in indexes]
273 tup = tuplegetter(*indexes)
274 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
275
276 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
277 # TODO: need unit test for:
278 # result = connection.execute("raw sql, no columns").scalars()
279 # without the "or ()" it's failing because MD_OBJECTS is None
280 keymap.update(
281 (e, new_rec)
282 for new_rec in new_recs
283 for e in new_rec[MD_OBJECTS] or ()
284 )
285
286 return self._make_new_metadata(
287 unpickled=self._unpickled,
288 processors=self._processors,
289 keys=new_keys,
290 tuplefilter=tup,
291 translated_indexes=indexes,
292 keymap=keymap, # type: ignore[arg-type]
293 safe_for_cache=self._safe_for_cache,
294 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
295 )
296
297 def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData:
298 """When using a cached Compiled construct that has a _result_map,
299 for a new statement that used the cached Compiled, we need to ensure
300 the keymap has the Column objects from our new statement as keys.
301 So here we rewrite keymap with new entries for the new columns
302 as matched to those of the cached statement.
303
304 """
305
306 if not context.compiled or not context.compiled._result_columns:
307 return self
308
309 compiled_statement = context.compiled.statement
310 invoked_statement = context.invoked_statement
311
312 if TYPE_CHECKING:
313 assert isinstance(invoked_statement, elements.ClauseElement)
314
315 if compiled_statement is invoked_statement:
316 return self
317
318 assert invoked_statement is not None
319
320 # this is the most common path for Core statements when
321 # caching is used. In ORM use, this codepath is not really used
322 # as the _result_disable_adapt_to_context execution option is
323 # set by the ORM.
324
325 # make a copy and add the columns from the invoked statement
326 # to the result map.
327
328 keymap_by_position = self._keymap_by_result_column_idx
329
330 if keymap_by_position is None:
331 # first retrival from cache, this map will not be set up yet,
332 # initialize lazily
333 keymap_by_position = self._keymap_by_result_column_idx = {
334 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
335 for metadata_entry in self._keymap.values()
336 }
337
338 assert not self._tuplefilter
339 return self._make_new_metadata(
340 keymap=compat.dict_union(
341 self._keymap,
342 {
343 new: keymap_by_position[idx]
344 for idx, new in enumerate(
345 invoked_statement._all_selected_columns
346 )
347 if idx in keymap_by_position
348 },
349 ),
350 unpickled=self._unpickled,
351 processors=self._processors,
352 tuplefilter=None,
353 translated_indexes=None,
354 keys=self._keys,
355 safe_for_cache=self._safe_for_cache,
356 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
357 )
358
359 def __init__(
360 self,
361 parent: CursorResult[Any],
362 cursor_description: _DBAPICursorDescription,
363 ):
364 context = parent.context
365 self._tuplefilter = None
366 self._translated_indexes = None
367 self._safe_for_cache = self._unpickled = False
368
369 if context.result_column_struct:
370 (
371 result_columns,
372 cols_are_ordered,
373 textual_ordered,
374 ad_hoc_textual,
375 loose_column_name_matching,
376 ) = context.result_column_struct
377 num_ctx_cols = len(result_columns)
378 else:
379 result_columns = cols_are_ordered = ( # type: ignore
380 num_ctx_cols
381 ) = ad_hoc_textual = loose_column_name_matching = (
382 textual_ordered
383 ) = False
384
385 # merge cursor.description with the column info
386 # present in the compiled structure, if any
387 raw = self._merge_cursor_description(
388 context,
389 cursor_description,
390 result_columns,
391 num_ctx_cols,
392 cols_are_ordered,
393 textual_ordered,
394 ad_hoc_textual,
395 loose_column_name_matching,
396 )
397
398 # processors in key order which are used when building up
399 # a row
400 self._processors = [
401 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
402 ]
403
404 # this is used when using this ResultMetaData in a Core-only cache
405 # retrieval context. it's initialized on first cache retrieval
406 # when the _result_disable_adapt_to_context execution option
407 # (which the ORM generally sets) is not set.
408 self._keymap_by_result_column_idx = None
409
410 # for compiled SQL constructs, copy additional lookup keys into
411 # the key lookup map, such as Column objects, labels,
412 # column keys and other names
413 if num_ctx_cols:
414 # keymap by primary string...
415 by_key: Dict[_KeyType, _CursorKeyMapRecType] = {
416 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
417 for metadata_entry in raw
418 }
419
420 if len(by_key) != num_ctx_cols:
421 # if by-primary-string dictionary smaller than
422 # number of columns, assume we have dupes; (this check
423 # is also in place if string dictionary is bigger, as
424 # can occur when '*' was used as one of the compiled columns,
425 # which may or may not be suggestive of dupes), rewrite
426 # dupe records with "None" for index which results in
427 # ambiguous column exception when accessed.
428 #
429 # this is considered to be the less common case as it is not
430 # common to have dupe column keys in a SELECT statement.
431 #
432 # new in 1.4: get the complete set of all possible keys,
433 # strings, objects, whatever, that are dupes across two
434 # different records, first.
435 index_by_key: Dict[Any, Any] = {}
436 dupes = set()
437 for metadata_entry in raw:
438 for key in (metadata_entry[MD_RENDERED_NAME],) + (
439 metadata_entry[MD_OBJECTS] or ()
440 ):
441 idx = metadata_entry[MD_INDEX]
442 # if this key has been associated with more than one
443 # positional index, it's a dupe
444 if index_by_key.setdefault(key, idx) != idx:
445 dupes.add(key)
446
447 # then put everything we have into the keymap excluding only
448 # those keys that are dupes.
449 self._keymap = {
450 obj_elem: metadata_entry
451 for metadata_entry in raw
452 if metadata_entry[MD_OBJECTS]
453 for obj_elem in metadata_entry[MD_OBJECTS]
454 if obj_elem not in dupes
455 }
456
457 # then for the dupe keys, put the "ambiguous column"
458 # record into by_key.
459 by_key.update(
460 {
461 key: (None, -1, (), key, key, None, None)
462 for key in dupes
463 }
464 )
465
466 else:
467 # no dupes - copy secondary elements from compiled
468 # columns into self._keymap. this is the most common
469 # codepath for Core / ORM statement executions before the
470 # result metadata is cached
471 self._keymap = {
472 obj_elem: metadata_entry
473 for metadata_entry in raw
474 if metadata_entry[MD_OBJECTS]
475 for obj_elem in metadata_entry[MD_OBJECTS]
476 }
477 # update keymap with primary string names taking
478 # precedence
479 self._keymap.update(by_key)
480 else:
481 # no compiled objects to map, just create keymap by primary string
482 self._keymap = {
483 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
484 for metadata_entry in raw
485 }
486
487 # update keymap with "translated" names. In SQLAlchemy this is a
488 # sqlite only thing, and in fact impacting only extremely old SQLite
489 # versions unlikely to be present in modern Python versions.
490 # however, the pyhive third party dialect is
491 # also using this hook, which means others still might use it as well.
492 # I dislike having this awkward hook here but as long as we need
493 # to use names in cursor.description in some cases we need to have
494 # some hook to accomplish this.
495 if not num_ctx_cols and context._translate_colname:
496 self._keymap.update(
497 {
498 metadata_entry[MD_UNTRANSLATED]: self._keymap[
499 metadata_entry[MD_LOOKUP_KEY]
500 ]
501 for metadata_entry in raw
502 if metadata_entry[MD_UNTRANSLATED]
503 }
504 )
505
506 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
507
508 def _merge_cursor_description(
509 self,
510 context: DefaultExecutionContext,
511 cursor_description: _DBAPICursorDescription,
512 result_columns: Sequence[ResultColumnsEntry],
513 num_ctx_cols: int,
514 cols_are_ordered: bool,
515 textual_ordered: bool,
516 ad_hoc_textual: bool,
517 loose_column_name_matching: bool,
518 ) -> List[_CursorKeyMapRecType]:
519 """Merge a cursor.description with compiled result column information.
520
521 There are at least four separate strategies used here, selected
522 depending on the type of SQL construct used to start with.
523
524 The most common case is that of the compiled SQL expression construct,
525 which generated the column names present in the raw SQL string and
526 which has the identical number of columns as were reported by
527 cursor.description. In this case, we assume a 1-1 positional mapping
528 between the entries in cursor.description and the compiled object.
529 This is also the most performant case as we disregard extracting /
530 decoding the column names present in cursor.description since we
531 already have the desired name we generated in the compiled SQL
532 construct.
533
534 The next common case is that of the completely raw string SQL,
535 such as passed to connection.execute(). In this case we have no
536 compiled construct to work with, so we extract and decode the
537 names from cursor.description and index those as the primary
538 result row target keys.
539
540 The remaining fairly common case is that of the textual SQL
541 that includes at least partial column information; this is when
542 we use a :class:`_expression.TextualSelect` construct.
543 This construct may have
544 unordered or ordered column information. In the ordered case, we
545 merge the cursor.description and the compiled construct's information
546 positionally, and warn if there are additional description names
547 present, however we still decode the names in cursor.description
548 as we don't have a guarantee that the names in the columns match
549 on these. In the unordered case, we match names in cursor.description
550 to that of the compiled construct based on name matching.
551 In both of these cases, the cursor.description names and the column
552 expression objects and names are indexed as result row target keys.
553
554 The final case is much less common, where we have a compiled
555 non-textual SQL expression construct, but the number of columns
556 in cursor.description doesn't match what's in the compiled
557 construct. We make the guess here that there might be textual
558 column expressions in the compiled construct that themselves include
559 a comma in them causing them to split. We do the same name-matching
560 as with textual non-ordered columns.
561
562 The name-matched system of merging is the same as that used by
563 SQLAlchemy for all cases up through the 0.9 series. Positional
564 matching for compiled SQL expressions was introduced in 1.0 as a
565 major performance feature, and positional matching for textual
566 :class:`_expression.TextualSelect` objects in 1.1.
567 As name matching is no longer
568 a common case, it was acceptable to factor it into smaller generator-
569 oriented methods that are easier to understand, but incur slightly
570 more performance overhead.
571
572 """
573
574 if (
575 num_ctx_cols
576 and cols_are_ordered
577 and not textual_ordered
578 and num_ctx_cols == len(cursor_description)
579 ):
580 self._keys = [elem[0] for elem in result_columns]
581 # pure positional 1-1 case; doesn't need to read
582 # the names from cursor.description
583
584 # most common case for Core and ORM
585
586 # this metadata is safe to cache because we are guaranteed
587 # to have the columns in the same order for new executions
588 self._safe_for_cache = True
589 return [
590 (
591 idx,
592 idx,
593 rmap_entry[RM_OBJECTS],
594 rmap_entry[RM_NAME],
595 rmap_entry[RM_RENDERED_NAME],
596 context.get_result_processor(
597 rmap_entry[RM_TYPE],
598 rmap_entry[RM_RENDERED_NAME],
599 cursor_description[idx][1],
600 ),
601 None,
602 )
603 for idx, rmap_entry in enumerate(result_columns)
604 ]
605 else:
606 # name-based or text-positional cases, where we need
607 # to read cursor.description names
608
609 if textual_ordered or (
610 ad_hoc_textual and len(cursor_description) == num_ctx_cols
611 ):
612 self._safe_for_cache = True
613 # textual positional case
614 raw_iterator = self._merge_textual_cols_by_position(
615 context, cursor_description, result_columns
616 )
617 elif num_ctx_cols:
618 # compiled SQL with a mismatch of description cols
619 # vs. compiled cols, or textual w/ unordered columns
620 # the order of columns can change if the query is
621 # against a "select *", so not safe to cache
622 self._safe_for_cache = False
623 raw_iterator = self._merge_cols_by_name(
624 context,
625 cursor_description,
626 result_columns,
627 loose_column_name_matching,
628 )
629 else:
630 # no compiled SQL, just a raw string, order of columns
631 # can change for "select *"
632 self._safe_for_cache = False
633 raw_iterator = self._merge_cols_by_none(
634 context, cursor_description
635 )
636
637 return [
638 (
639 idx,
640 ridx,
641 obj,
642 cursor_colname,
643 cursor_colname,
644 context.get_result_processor(
645 mapped_type, cursor_colname, coltype
646 ),
647 untranslated,
648 ) # type: ignore[misc]
649 for (
650 idx,
651 ridx,
652 cursor_colname,
653 mapped_type,
654 coltype,
655 obj,
656 untranslated,
657 ) in raw_iterator
658 ]
659
660 def _colnames_from_description(
661 self,
662 context: DefaultExecutionContext,
663 cursor_description: _DBAPICursorDescription,
664 ) -> Iterator[Tuple[int, str, Optional[str], DBAPIType]]:
665 """Extract column names and data types from a cursor.description.
666
667 Applies unicode decoding, column translation, "normalization",
668 and case sensitivity rules to the names based on the dialect.
669
670 """
671
672 dialect = context.dialect
673 translate_colname = context._translate_colname
674 normalize_name = (
675 dialect.normalize_name if dialect.requires_name_normalize else None
676 )
677 untranslated = None
678
679 self._keys = []
680
681 for idx, rec in enumerate(cursor_description):
682 colname = rec[0]
683 coltype = rec[1]
684
685 if translate_colname:
686 colname, untranslated = translate_colname(colname)
687
688 if normalize_name:
689 colname = normalize_name(colname)
690
691 self._keys.append(colname)
692
693 yield idx, colname, untranslated, coltype
694
695 def _merge_textual_cols_by_position(
696 self,
697 context: DefaultExecutionContext,
698 cursor_description: _DBAPICursorDescription,
699 result_columns: Sequence[ResultColumnsEntry],
700 ) -> Iterator[_MergeColTuple]:
701 num_ctx_cols = len(result_columns)
702
703 if num_ctx_cols > len(cursor_description):
704 util.warn(
705 "Number of columns in textual SQL (%d) is "
706 "smaller than number of columns requested (%d)"
707 % (num_ctx_cols, len(cursor_description))
708 )
709 seen = set()
710
711 for (
712 idx,
713 colname,
714 untranslated,
715 coltype,
716 ) in self._colnames_from_description(context, cursor_description):
717 if idx < num_ctx_cols:
718 ctx_rec = result_columns[idx]
719 obj = ctx_rec[RM_OBJECTS]
720 ridx = idx
721 mapped_type = ctx_rec[RM_TYPE]
722 if obj[0] in seen:
723 raise exc.InvalidRequestError(
724 "Duplicate column expression requested "
725 "in textual SQL: %r" % obj[0]
726 )
727 seen.add(obj[0])
728 else:
729 mapped_type = sqltypes.NULLTYPE
730 obj = None
731 ridx = None
732 yield idx, ridx, colname, mapped_type, coltype, obj, untranslated
733
734 def _merge_cols_by_name(
735 self,
736 context: DefaultExecutionContext,
737 cursor_description: _DBAPICursorDescription,
738 result_columns: Sequence[ResultColumnsEntry],
739 loose_column_name_matching: bool,
740 ) -> Iterator[_MergeColTuple]:
741 match_map = self._create_description_match_map(
742 result_columns, loose_column_name_matching
743 )
744 mapped_type: TypeEngine[Any]
745
746 for (
747 idx,
748 colname,
749 untranslated,
750 coltype,
751 ) in self._colnames_from_description(context, cursor_description):
752 try:
753 ctx_rec = match_map[colname]
754 except KeyError:
755 mapped_type = sqltypes.NULLTYPE
756 obj = None
757 result_columns_idx = None
758 else:
759 obj = ctx_rec[1]
760 mapped_type = ctx_rec[2]
761 result_columns_idx = ctx_rec[3]
762 yield (
763 idx,
764 result_columns_idx,
765 colname,
766 mapped_type,
767 coltype,
768 obj,
769 untranslated,
770 )
771
772 @classmethod
773 def _create_description_match_map(
774 cls,
775 result_columns: Sequence[ResultColumnsEntry],
776 loose_column_name_matching: bool = False,
777 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]:
778 """when matching cursor.description to a set of names that are present
779 in a Compiled object, as is the case with TextualSelect, get all the
780 names we expect might match those in cursor.description.
781 """
782
783 d: Dict[
784 Union[str, object],
785 Tuple[str, TupleAny, TypeEngine[Any], int],
786 ] = {}
787 for ridx, elem in enumerate(result_columns):
788 key = elem[RM_RENDERED_NAME]
789 if key in d:
790 # conflicting keyname - just add the column-linked objects
791 # to the existing record. if there is a duplicate column
792 # name in the cursor description, this will allow all of those
793 # objects to raise an ambiguous column error
794 e_name, e_obj, e_type, e_ridx = d[key]
795 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
796 else:
797 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
798
799 if loose_column_name_matching:
800 # when using a textual statement with an unordered set
801 # of columns that line up, we are expecting the user
802 # to be using label names in the SQL that match to the column
803 # expressions. Enable more liberal matching for this case;
804 # duplicate keys that are ambiguous will be fixed later.
805 for r_key in elem[RM_OBJECTS]:
806 d.setdefault(
807 r_key,
808 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
809 )
810 return d
811
812 def _merge_cols_by_none(
813 self,
814 context: DefaultExecutionContext,
815 cursor_description: _DBAPICursorDescription,
816 ) -> Iterator[_MergeColTuple]:
817 self._keys = []
818
819 for (
820 idx,
821 colname,
822 untranslated,
823 coltype,
824 ) in self._colnames_from_description(context, cursor_description):
825 yield (
826 idx,
827 None,
828 colname,
829 sqltypes.NULLTYPE,
830 coltype,
831 None,
832 untranslated,
833 )
834
835 if not TYPE_CHECKING:
836
837 def _key_fallback(
838 self, key: Any, err: Optional[Exception], raiseerr: bool = True
839 ) -> Optional[NoReturn]:
840 if raiseerr:
841 if self._unpickled and isinstance(key, elements.ColumnElement):
842 raise exc.NoSuchColumnError(
843 "Row was unpickled; lookup by ColumnElement "
844 "is unsupported"
845 ) from err
846 else:
847 raise exc.NoSuchColumnError(
848 "Could not locate column in row for column '%s'"
849 % util.string_or_unprintable(key)
850 ) from err
851 else:
852 return None
853
854 def _raise_for_ambiguous_column_name(
855 self, rec: _KeyMapRecType
856 ) -> NoReturn:
857 raise exc.InvalidRequestError(
858 "Ambiguous column name '%s' in "
859 "result set column descriptions" % rec[MD_LOOKUP_KEY]
860 )
861
862 def _index_for_key(
863 self, key: _KeyIndexType, raiseerr: bool = True
864 ) -> Optional[int]:
865 # TODO: can consider pre-loading ints and negative ints
866 # into _keymap - also no coverage here
867 if isinstance(key, int):
868 key = self._keys[key]
869
870 try:
871 rec = self._keymap[key]
872 except KeyError as ke:
873 x = self._key_fallback(key, ke, raiseerr)
874 assert x is None
875 return None
876
877 index = rec[0]
878
879 if index is None:
880 self._raise_for_ambiguous_column_name(rec)
881 return index
882
883 def _indexes_for_keys(
884 self, keys: Sequence[_KeyIndexType]
885 ) -> Sequence[int]:
886 try:
887 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501
888 except KeyError as ke:
889 # ensure it raises
890 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
891
892 def _metadata_for_keys(
893 self, keys: Sequence[_KeyIndexType]
894 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
895 for key in keys:
896 if int in key.__class__.__mro__:
897 key = self._keys[key] # type: ignore[index]
898
899 try:
900 rec = self._keymap[key] # type: ignore[index]
901 except KeyError as ke:
902 # ensure it raises
903 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
904
905 index = rec[MD_INDEX]
906
907 if index is None:
908 self._raise_for_ambiguous_column_name(rec)
909
910 yield cast(_NonAmbigCursorKeyMapRecType, rec)
911
912 def __getstate__(self) -> Dict[str, Any]:
913 # TODO: consider serializing this as SimpleResultMetaData
914 return {
915 "_keymap": {
916 key: (
917 rec[MD_INDEX],
918 rec[MD_RESULT_MAP_INDEX],
919 [],
920 key,
921 rec[MD_RENDERED_NAME],
922 None,
923 None,
924 )
925 for key, rec in self._keymap.items()
926 if isinstance(key, (str, int))
927 },
928 "_keys": self._keys,
929 "_translated_indexes": self._translated_indexes,
930 }
931
932 def __setstate__(self, state: Dict[str, Any]) -> None:
933 self._processors = [None for _ in range(len(state["_keys"]))]
934 self._keymap = state["_keymap"]
935 self._keymap_by_result_column_idx = None
936 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
937 self._keys = state["_keys"]
938 self._unpickled = True
939 if state["_translated_indexes"]:
940 self._translated_indexes = cast(
941 "List[int]", state["_translated_indexes"]
942 )
943 self._tuplefilter = tuplegetter(*self._translated_indexes)
944 else:
945 self._translated_indexes = self._tuplefilter = None
946
947
948class ResultFetchStrategy:
949 """Define a fetching strategy for a result object.
950
951
952 .. versionadded:: 1.4
953
954 """
955
956 __slots__ = ()
957
958 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
959
960 def soft_close(
961 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
962 ) -> None:
963 raise NotImplementedError()
964
965 def hard_close(
966 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
967 ) -> None:
968 raise NotImplementedError()
969
970 def yield_per(
971 self,
972 result: CursorResult[Any],
973 dbapi_cursor: DBAPICursor,
974 num: int,
975 ) -> None:
976 return
977
978 def fetchone(
979 self,
980 result: CursorResult[Any],
981 dbapi_cursor: DBAPICursor,
982 hard_close: bool = False,
983 ) -> Any:
984 raise NotImplementedError()
985
986 def fetchmany(
987 self,
988 result: CursorResult[Any],
989 dbapi_cursor: DBAPICursor,
990 size: Optional[int] = None,
991 ) -> Any:
992 raise NotImplementedError()
993
994 def fetchall(
995 self,
996 result: CursorResult[Any],
997 dbapi_cursor: DBAPICursor,
998 ) -> Any:
999 raise NotImplementedError()
1000
1001 def handle_exception(
1002 self,
1003 result: CursorResult[Any],
1004 dbapi_cursor: Optional[DBAPICursor],
1005 err: BaseException,
1006 ) -> NoReturn:
1007 raise err
1008
1009
1010class NoCursorFetchStrategy(ResultFetchStrategy):
1011 """Cursor strategy for a result that has no open cursor.
1012
1013 There are two varieties of this strategy, one for DQL and one for
1014 DML (and also DDL), each of which represent a result that had a cursor
1015 but no longer has one.
1016
1017 """
1018
1019 __slots__ = ()
1020
1021 def soft_close(
1022 self,
1023 result: CursorResult[Any],
1024 dbapi_cursor: Optional[DBAPICursor],
1025 ) -> None:
1026 pass
1027
1028 def hard_close(
1029 self,
1030 result: CursorResult[Any],
1031 dbapi_cursor: Optional[DBAPICursor],
1032 ) -> None:
1033 pass
1034
1035 def fetchone(
1036 self,
1037 result: CursorResult[Any],
1038 dbapi_cursor: DBAPICursor,
1039 hard_close: bool = False,
1040 ) -> Any:
1041 return self._non_result(result, None)
1042
1043 def fetchmany(
1044 self,
1045 result: CursorResult[Any],
1046 dbapi_cursor: DBAPICursor,
1047 size: Optional[int] = None,
1048 ) -> Any:
1049 return self._non_result(result, [])
1050
1051 def fetchall(
1052 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1053 ) -> Any:
1054 return self._non_result(result, [])
1055
1056 def _non_result(
1057 self,
1058 result: CursorResult[Any],
1059 default: Any,
1060 err: Optional[BaseException] = None,
1061 ) -> Any:
1062 raise NotImplementedError()
1063
1064
1065class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1066 """Cursor strategy for a DQL result that has no open cursor.
1067
1068 This is a result set that can return rows, i.e. for a SELECT, or for an
1069 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1070 where the cursor is closed and no rows remain available. The owning result
1071 object may or may not be "hard closed", which determines if the fetch
1072 methods send empty results or raise for closed result.
1073
1074 """
1075
1076 __slots__ = ()
1077
1078 def _non_result(
1079 self,
1080 result: CursorResult[Any],
1081 default: Any,
1082 err: Optional[BaseException] = None,
1083 ) -> Any:
1084 if result.closed:
1085 raise exc.ResourceClosedError(
1086 "This result object is closed."
1087 ) from err
1088 else:
1089 return default
1090
1091
1092_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1093
1094
1095class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1096 """Cursor strategy for a DML result that has no open cursor.
1097
1098 This is a result set that does not return rows, i.e. for an INSERT,
1099 UPDATE, DELETE that does not include RETURNING.
1100
1101 """
1102
1103 __slots__ = ()
1104
1105 def _non_result(
1106 self,
1107 result: CursorResult[Any],
1108 default: Any,
1109 err: Optional[BaseException] = None,
1110 ) -> Any:
1111 # we only expect to have a _NoResultMetaData() here right now.
1112 assert not result._metadata.returns_rows
1113 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr]
1114
1115
1116_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1117
1118
1119class CursorFetchStrategy(ResultFetchStrategy):
1120 """Call fetch methods from a DBAPI cursor.
1121
1122 Alternate versions of this class may instead buffer the rows from
1123 cursors or not use cursors at all.
1124
1125 """
1126
1127 __slots__ = ()
1128
1129 def soft_close(
1130 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1131 ) -> None:
1132 result.cursor_strategy = _NO_CURSOR_DQL
1133
1134 def hard_close(
1135 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1136 ) -> None:
1137 result.cursor_strategy = _NO_CURSOR_DQL
1138
1139 def handle_exception(
1140 self,
1141 result: CursorResult[Any],
1142 dbapi_cursor: Optional[DBAPICursor],
1143 err: BaseException,
1144 ) -> NoReturn:
1145 result.connection._handle_dbapi_exception(
1146 err, None, None, dbapi_cursor, result.context
1147 )
1148
1149 def yield_per(
1150 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1151 ) -> None:
1152 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1153 dbapi_cursor,
1154 {"max_row_buffer": num},
1155 initial_buffer=collections.deque(),
1156 growth_factor=0,
1157 )
1158
1159 def fetchone(
1160 self,
1161 result: CursorResult[Any],
1162 dbapi_cursor: DBAPICursor,
1163 hard_close: bool = False,
1164 ) -> Any:
1165 try:
1166 row = dbapi_cursor.fetchone()
1167 if row is None:
1168 result._soft_close(hard=hard_close)
1169 return row
1170 except BaseException as e:
1171 self.handle_exception(result, dbapi_cursor, e)
1172
1173 def fetchmany(
1174 self,
1175 result: CursorResult[Any],
1176 dbapi_cursor: DBAPICursor,
1177 size: Optional[int] = None,
1178 ) -> Any:
1179 try:
1180 if size is None:
1181 l = dbapi_cursor.fetchmany()
1182 else:
1183 l = dbapi_cursor.fetchmany(size)
1184
1185 if not l:
1186 result._soft_close()
1187 return l
1188 except BaseException as e:
1189 self.handle_exception(result, dbapi_cursor, e)
1190
1191 def fetchall(
1192 self,
1193 result: CursorResult[Any],
1194 dbapi_cursor: DBAPICursor,
1195 ) -> Any:
1196 try:
1197 rows = dbapi_cursor.fetchall()
1198 result._soft_close()
1199 return rows
1200 except BaseException as e:
1201 self.handle_exception(result, dbapi_cursor, e)
1202
1203
1204_DEFAULT_FETCH = CursorFetchStrategy()
1205
1206
1207class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1208 """A cursor fetch strategy with row buffering behavior.
1209
1210 This strategy buffers the contents of a selection of rows
1211 before ``fetchone()`` is called. This is to allow the results of
1212 ``cursor.description`` to be available immediately, when
1213 interfacing with a DB-API that requires rows to be consumed before
1214 this information is available (currently psycopg2, when used with
1215 server-side cursors).
1216
1217 The pre-fetching behavior fetches only one row initially, and then
1218 grows its buffer size by a fixed amount with each successive need
1219 for additional rows up the ``max_row_buffer`` size, which defaults
1220 to 1000::
1221
1222 with psycopg2_engine.connect() as conn:
1223
1224 result = conn.execution_options(
1225 stream_results=True, max_row_buffer=50
1226 ).execute(text("select * from table"))
1227
1228 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1229
1230 .. seealso::
1231
1232 :ref:`psycopg2_execution_options`
1233 """
1234
1235 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1236
1237 def __init__(
1238 self,
1239 dbapi_cursor: DBAPICursor,
1240 execution_options: CoreExecuteOptionsParameter,
1241 growth_factor: int = 5,
1242 initial_buffer: Optional[Deque[Any]] = None,
1243 ) -> None:
1244 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1245
1246 if initial_buffer is not None:
1247 self._rowbuffer = initial_buffer
1248 else:
1249 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1250 self._growth_factor = growth_factor
1251
1252 if growth_factor:
1253 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1254 else:
1255 self._bufsize = self._max_row_buffer
1256
1257 @classmethod
1258 def create(
1259 cls, result: CursorResult[Any]
1260 ) -> BufferedRowCursorFetchStrategy:
1261 return BufferedRowCursorFetchStrategy(
1262 result.cursor,
1263 result.context.execution_options,
1264 )
1265
1266 def _buffer_rows(
1267 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1268 ) -> None:
1269 """this is currently used only by fetchone()."""
1270
1271 size = self._bufsize
1272 try:
1273 if size < 1:
1274 new_rows = dbapi_cursor.fetchall()
1275 else:
1276 new_rows = dbapi_cursor.fetchmany(size)
1277 except BaseException as e:
1278 self.handle_exception(result, dbapi_cursor, e)
1279
1280 if not new_rows:
1281 return
1282 self._rowbuffer = collections.deque(new_rows)
1283 if self._growth_factor and size < self._max_row_buffer:
1284 self._bufsize = min(
1285 self._max_row_buffer, size * self._growth_factor
1286 )
1287
1288 def yield_per(
1289 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1290 ) -> None:
1291 self._growth_factor = 0
1292 self._max_row_buffer = self._bufsize = num
1293
1294 def soft_close(
1295 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1296 ) -> None:
1297 self._rowbuffer.clear()
1298 super().soft_close(result, dbapi_cursor)
1299
1300 def hard_close(
1301 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1302 ) -> None:
1303 self._rowbuffer.clear()
1304 super().hard_close(result, dbapi_cursor)
1305
1306 def fetchone(
1307 self,
1308 result: CursorResult[Any],
1309 dbapi_cursor: DBAPICursor,
1310 hard_close: bool = False,
1311 ) -> Any:
1312 if not self._rowbuffer:
1313 self._buffer_rows(result, dbapi_cursor)
1314 if not self._rowbuffer:
1315 try:
1316 result._soft_close(hard=hard_close)
1317 except BaseException as e:
1318 self.handle_exception(result, dbapi_cursor, e)
1319 return None
1320 return self._rowbuffer.popleft()
1321
1322 def fetchmany(
1323 self,
1324 result: CursorResult[Any],
1325 dbapi_cursor: DBAPICursor,
1326 size: Optional[int] = None,
1327 ) -> Any:
1328 if size is None:
1329 return self.fetchall(result, dbapi_cursor)
1330
1331 rb = self._rowbuffer
1332 lb = len(rb)
1333 close = False
1334 if size > lb:
1335 try:
1336 new = dbapi_cursor.fetchmany(size - lb)
1337 except BaseException as e:
1338 self.handle_exception(result, dbapi_cursor, e)
1339 else:
1340 if not new:
1341 # defer closing since it may clear the row buffer
1342 close = True
1343 else:
1344 rb.extend(new)
1345
1346 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1347 if close:
1348 result._soft_close()
1349 return res
1350
1351 def fetchall(
1352 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1353 ) -> Any:
1354 try:
1355 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1356 self._rowbuffer.clear()
1357 result._soft_close()
1358 return ret
1359 except BaseException as e:
1360 self.handle_exception(result, dbapi_cursor, e)
1361
1362
1363class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1364 """A cursor strategy that buffers rows fully upon creation.
1365
1366 Used for operations where a result is to be delivered
1367 after the database conversation can not be continued,
1368 such as MSSQL INSERT...OUTPUT after an autocommit.
1369
1370 """
1371
1372 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1373
1374 def __init__(
1375 self,
1376 dbapi_cursor: Optional[DBAPICursor],
1377 alternate_description: Optional[_DBAPICursorDescription] = None,
1378 initial_buffer: Optional[Iterable[Any]] = None,
1379 ):
1380 self.alternate_cursor_description = alternate_description
1381 if initial_buffer is not None:
1382 self._rowbuffer = collections.deque(initial_buffer)
1383 else:
1384 assert dbapi_cursor is not None
1385 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1386
1387 def yield_per(
1388 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1389 ) -> Any:
1390 pass
1391
1392 def soft_close(
1393 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1394 ) -> None:
1395 self._rowbuffer.clear()
1396 super().soft_close(result, dbapi_cursor)
1397
1398 def hard_close(
1399 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1400 ) -> None:
1401 self._rowbuffer.clear()
1402 super().hard_close(result, dbapi_cursor)
1403
1404 def fetchone(
1405 self,
1406 result: CursorResult[Any],
1407 dbapi_cursor: DBAPICursor,
1408 hard_close: bool = False,
1409 ) -> Any:
1410 if self._rowbuffer:
1411 return self._rowbuffer.popleft()
1412 else:
1413 result._soft_close(hard=hard_close)
1414 return None
1415
1416 def fetchmany(
1417 self,
1418 result: CursorResult[Any],
1419 dbapi_cursor: DBAPICursor,
1420 size: Optional[int] = None,
1421 ) -> Any:
1422 if size is None:
1423 return self.fetchall(result, dbapi_cursor)
1424
1425 rb = self._rowbuffer
1426 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1427 if not rows:
1428 result._soft_close()
1429 return rows
1430
1431 def fetchall(
1432 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1433 ) -> Any:
1434 ret = self._rowbuffer
1435 self._rowbuffer = collections.deque()
1436 result._soft_close()
1437 return ret
1438
1439
1440class _NoResultMetaData(ResultMetaData):
1441 __slots__ = ()
1442
1443 returns_rows = False
1444
1445 def _we_dont_return_rows(
1446 self, err: Optional[BaseException] = None
1447 ) -> NoReturn:
1448 raise exc.ResourceClosedError(
1449 "This result object does not return rows. "
1450 "It has been closed automatically."
1451 ) from err
1452
1453 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn:
1454 self._we_dont_return_rows()
1455
1456 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1457 self._we_dont_return_rows()
1458
1459 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1460 self._we_dont_return_rows()
1461
1462 @property
1463 def _keymap(self) -> NoReturn: # type: ignore[override]
1464 self._we_dont_return_rows()
1465
1466 @property
1467 def _key_to_index(self) -> NoReturn: # type: ignore[override]
1468 self._we_dont_return_rows()
1469
1470 @property
1471 def _processors(self) -> NoReturn: # type: ignore[override]
1472 self._we_dont_return_rows()
1473
1474 @property
1475 def keys(self) -> NoReturn:
1476 self._we_dont_return_rows()
1477
1478
1479_NO_RESULT_METADATA = _NoResultMetaData()
1480
1481
1482def null_dml_result() -> IteratorResult[Any]:
1483 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1484 it._soft_close()
1485 return it
1486
1487
1488class CursorResult(Result[_T]):
1489 """A Result that is representing state from a DBAPI cursor.
1490
1491 .. versionchanged:: 1.4 The :class:`.CursorResult``
1492 class replaces the previous :class:`.ResultProxy` interface.
1493 This classes are based on the :class:`.Result` calling API
1494 which provides an updated usage model and calling facade for
1495 SQLAlchemy Core and SQLAlchemy ORM.
1496
1497 Returns database rows via the :class:`.Row` class, which provides
1498 additional API features and behaviors on top of the raw data returned by
1499 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1500 method, other kinds of objects may also be returned.
1501
1502 .. seealso::
1503
1504 :ref:`tutorial_selecting_data` - introductory material for accessing
1505 :class:`_engine.CursorResult` and :class:`.Row` objects.
1506
1507 """
1508
1509 __slots__ = (
1510 "context",
1511 "dialect",
1512 "cursor",
1513 "cursor_strategy",
1514 "_echo",
1515 "connection",
1516 )
1517
1518 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1519 _no_result_metadata = _NO_RESULT_METADATA
1520 _soft_closed: bool = False
1521 closed: bool = False
1522 _is_cursor = True
1523
1524 context: DefaultExecutionContext
1525 dialect: Dialect
1526 cursor_strategy: ResultFetchStrategy
1527 connection: Connection
1528
1529 def __init__(
1530 self,
1531 context: DefaultExecutionContext,
1532 cursor_strategy: ResultFetchStrategy,
1533 cursor_description: Optional[_DBAPICursorDescription],
1534 ):
1535 self.context = context
1536 self.dialect = context.dialect
1537 self.cursor = context.cursor
1538 self.cursor_strategy = cursor_strategy
1539 self.connection = context.root_connection
1540 self._echo = echo = (
1541 self.connection._echo and context.engine._should_log_debug()
1542 )
1543
1544 if cursor_description is not None:
1545 # inline of Result._row_getter(), set up an initial row
1546 # getter assuming no transformations will be called as this
1547 # is the most common case
1548
1549 metadata = self._init_metadata(context, cursor_description)
1550
1551 _make_row: Any
1552 _make_row = functools.partial(
1553 Row,
1554 metadata,
1555 metadata._effective_processors,
1556 metadata._key_to_index,
1557 )
1558
1559 if context._num_sentinel_cols:
1560 sentinel_filter = operator.itemgetter(
1561 slice(-context._num_sentinel_cols)
1562 )
1563
1564 def _sliced_row(raw_data: Any) -> Any:
1565 return _make_row(sentinel_filter(raw_data))
1566
1567 sliced_row = _sliced_row
1568 else:
1569 sliced_row = _make_row
1570
1571 if echo:
1572 log = self.context.connection._log_debug
1573
1574 def _log_row(row: Any) -> Any:
1575 log("Row %r", sql_util._repr_row(row))
1576 return row
1577
1578 self._row_logging_fn = _log_row
1579
1580 def _make_row_2(row: Any) -> Any:
1581 return _log_row(sliced_row(row))
1582
1583 make_row = _make_row_2
1584 else:
1585 make_row = sliced_row # type: ignore[assignment]
1586 self._set_memoized_attribute("_row_getter", make_row)
1587
1588 else:
1589 assert context._num_sentinel_cols == 0
1590 self._metadata = self._no_result_metadata
1591
1592 def _init_metadata(
1593 self,
1594 context: DefaultExecutionContext,
1595 cursor_description: _DBAPICursorDescription,
1596 ) -> CursorResultMetaData:
1597
1598 if context.compiled:
1599 compiled = context.compiled
1600
1601 if compiled._cached_metadata:
1602 metadata = compiled._cached_metadata
1603 else:
1604 metadata = CursorResultMetaData(self, cursor_description)
1605 if metadata._safe_for_cache:
1606 compiled._cached_metadata = metadata
1607
1608 # result rewrite/ adapt step. this is to suit the case
1609 # when we are invoked against a cached Compiled object, we want
1610 # to rewrite the ResultMetaData to reflect the Column objects
1611 # that are in our current SQL statement object, not the one
1612 # that is associated with the cached Compiled object.
1613 # the Compiled object may also tell us to not
1614 # actually do this step; this is to support the ORM where
1615 # it is to produce a new Result object in any case, and will
1616 # be using the cached Column objects against this database result
1617 # so we don't want to rewrite them.
1618 #
1619 # Basically this step suits the use case where the end user
1620 # is using Core SQL expressions and is accessing columns in the
1621 # result row using row._mapping[table.c.column].
1622 if (
1623 not context.execution_options.get(
1624 "_result_disable_adapt_to_context", False
1625 )
1626 and compiled._result_columns
1627 and context.cache_hit is context.dialect.CACHE_HIT
1628 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501
1629 ):
1630 metadata = metadata._adapt_to_context(context) # type: ignore[assignment] # noqa: E501
1631
1632 self._metadata = metadata
1633
1634 else:
1635 self._metadata = metadata = CursorResultMetaData(
1636 self, cursor_description
1637 )
1638 if self._echo:
1639 context.connection._log_debug(
1640 "Col %r", tuple(x[0] for x in cursor_description)
1641 )
1642 return metadata
1643
1644 def _soft_close(self, hard: bool = False) -> None:
1645 """Soft close this :class:`_engine.CursorResult`.
1646
1647 This releases all DBAPI cursor resources, but leaves the
1648 CursorResult "open" from a semantic perspective, meaning the
1649 fetchXXX() methods will continue to return empty results.
1650
1651 This method is called automatically when:
1652
1653 * all result rows are exhausted using the fetchXXX() methods.
1654 * cursor.description is None.
1655
1656 This method is **not public**, but is documented in order to clarify
1657 the "autoclose" process used.
1658
1659 .. seealso::
1660
1661 :meth:`_engine.CursorResult.close`
1662
1663
1664 """
1665
1666 if (not hard and self._soft_closed) or (hard and self.closed):
1667 return
1668
1669 if hard:
1670 self.closed = True
1671 self.cursor_strategy.hard_close(self, self.cursor)
1672 else:
1673 self.cursor_strategy.soft_close(self, self.cursor)
1674
1675 if not self._soft_closed:
1676 cursor = self.cursor
1677 self.cursor = None # type: ignore
1678 self.connection._safe_close_cursor(cursor)
1679 self._soft_closed = True
1680
1681 @property
1682 def inserted_primary_key_rows(self) -> List[Optional[Any]]:
1683 """Return the value of
1684 :attr:`_engine.CursorResult.inserted_primary_key`
1685 as a row contained within a list; some dialects may support a
1686 multiple row form as well.
1687
1688 .. note:: As indicated below, in current SQLAlchemy versions this
1689 accessor is only useful beyond what's already supplied by
1690 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1691 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1692 generalize this feature to more dialects.
1693
1694 This accessor is added to support dialects that offer the feature
1695 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1696 feature, currently **only the psycopg2 dialect**, which provides
1697 for many rows to be INSERTed at once while still retaining the
1698 behavior of being able to return server-generated primary key values.
1699
1700 * **When using the psycopg2 dialect, or other dialects that may support
1701 "fast executemany" style inserts in upcoming releases** : When
1702 invoking an INSERT statement while passing a list of rows as the
1703 second argument to :meth:`_engine.Connection.execute`, this accessor
1704 will then provide a list of rows, where each row contains the primary
1705 key value for each row that was INSERTed.
1706
1707 * **When using all other dialects / backends that don't yet support
1708 this feature**: This accessor is only useful for **single row INSERT
1709 statements**, and returns the same information as that of the
1710 :attr:`_engine.CursorResult.inserted_primary_key` within a
1711 single-element list. When an INSERT statement is executed in
1712 conjunction with a list of rows to be INSERTed, the list will contain
1713 one row per row inserted in the statement, however it will contain
1714 ``None`` for any server-generated values.
1715
1716 Future releases of SQLAlchemy will further generalize the
1717 "fast execution helper" feature of psycopg2 to suit other dialects,
1718 thus allowing this accessor to be of more general use.
1719
1720 .. versionadded:: 1.4
1721
1722 .. seealso::
1723
1724 :attr:`_engine.CursorResult.inserted_primary_key`
1725
1726 """
1727 if not self.context.compiled:
1728 raise exc.InvalidRequestError(
1729 "Statement is not a compiled expression construct."
1730 )
1731 elif not self.context.isinsert:
1732 raise exc.InvalidRequestError(
1733 "Statement is not an insert() expression construct."
1734 )
1735 elif self.context._is_explicit_returning:
1736 raise exc.InvalidRequestError(
1737 "Can't call inserted_primary_key "
1738 "when returning() "
1739 "is used."
1740 )
1741 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501
1742
1743 @property
1744 def inserted_primary_key(self) -> Optional[Any]:
1745 """Return the primary key for the row just inserted.
1746
1747 The return value is a :class:`_result.Row` object representing
1748 a named tuple of primary key values in the order in which the
1749 primary key columns are configured in the source
1750 :class:`_schema.Table`.
1751
1752 .. versionchanged:: 1.4.8 - the
1753 :attr:`_engine.CursorResult.inserted_primary_key`
1754 value is now a named tuple via the :class:`_result.Row` class,
1755 rather than a plain tuple.
1756
1757 This accessor only applies to single row :func:`_expression.insert`
1758 constructs which did not explicitly specify
1759 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1760 while not yet available for most backends, would be accessed using
1761 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1762
1763 Note that primary key columns which specify a server_default clause, or
1764 otherwise do not qualify as "autoincrement" columns (see the notes at
1765 :class:`_schema.Column`), and were generated using the database-side
1766 default, will appear in this list as ``None`` unless the backend
1767 supports "returning" and the insert statement executed with the
1768 "implicit returning" enabled.
1769
1770 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1771 statement is not a compiled expression construct
1772 or is not an insert() construct.
1773
1774 """
1775
1776 if self.context.executemany:
1777 raise exc.InvalidRequestError(
1778 "This statement was an executemany call; if primary key "
1779 "returning is supported, please "
1780 "use .inserted_primary_key_rows."
1781 )
1782
1783 ikp = self.inserted_primary_key_rows
1784 if ikp:
1785 return ikp[0]
1786 else:
1787 return None
1788
1789 def last_updated_params(
1790 self,
1791 ) -> Union[
1792 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1793 ]:
1794 """Return the collection of updated parameters from this
1795 execution.
1796
1797 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1798 statement is not a compiled expression construct
1799 or is not an update() construct.
1800
1801 """
1802 if not self.context.compiled:
1803 raise exc.InvalidRequestError(
1804 "Statement is not a compiled expression construct."
1805 )
1806 elif not self.context.isupdate:
1807 raise exc.InvalidRequestError(
1808 "Statement is not an update() expression construct."
1809 )
1810 elif self.context.executemany:
1811 return self.context.compiled_parameters
1812 else:
1813 return self.context.compiled_parameters[0]
1814
1815 def last_inserted_params(
1816 self,
1817 ) -> Union[
1818 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1819 ]:
1820 """Return the collection of inserted parameters from this
1821 execution.
1822
1823 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1824 statement is not a compiled expression construct
1825 or is not an insert() construct.
1826
1827 """
1828 if not self.context.compiled:
1829 raise exc.InvalidRequestError(
1830 "Statement is not a compiled expression construct."
1831 )
1832 elif not self.context.isinsert:
1833 raise exc.InvalidRequestError(
1834 "Statement is not an insert() expression construct."
1835 )
1836 elif self.context.executemany:
1837 return self.context.compiled_parameters
1838 else:
1839 return self.context.compiled_parameters[0]
1840
1841 @property
1842 def returned_defaults_rows(
1843 self,
1844 ) -> Optional[Sequence[Row[Any]]]:
1845 """Return a list of rows each containing the values of default
1846 columns that were fetched using
1847 the :meth:`.ValuesBase.return_defaults` feature.
1848
1849 The return value is a list of :class:`.Row` objects.
1850
1851 .. versionadded:: 1.4
1852
1853 """
1854 return self.context.returned_default_rows
1855
1856 def splice_horizontally(self, other: CursorResult[Any]) -> Self:
1857 """Return a new :class:`.CursorResult` that "horizontally splices"
1858 together the rows of this :class:`.CursorResult` with that of another
1859 :class:`.CursorResult`.
1860
1861 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1862 not intended for general use.
1863
1864 "horizontally splices" means that for each row in the first and second
1865 result sets, a new row that concatenates the two rows together is
1866 produced, which then becomes the new row. The incoming
1867 :class:`.CursorResult` must have the identical number of rows. It is
1868 typically expected that the two result sets come from the same sort
1869 order as well, as the result rows are spliced together based on their
1870 position in the result.
1871
1872 The expected use case here is so that multiple INSERT..RETURNING
1873 statements (which definitely need to be sorted) against different
1874 tables can produce a single result that looks like a JOIN of those two
1875 tables.
1876
1877 E.g.::
1878
1879 r1 = connection.execute(
1880 users.insert().returning(
1881 users.c.user_name, users.c.user_id, sort_by_parameter_order=True
1882 ),
1883 user_values,
1884 )
1885
1886 r2 = connection.execute(
1887 addresses.insert().returning(
1888 addresses.c.address_id,
1889 addresses.c.address,
1890 addresses.c.user_id,
1891 sort_by_parameter_order=True,
1892 ),
1893 address_values,
1894 )
1895
1896 rows = r1.splice_horizontally(r2).all()
1897 assert rows == [
1898 ("john", 1, 1, "foo@bar.com", 1),
1899 ("jack", 2, 2, "bar@bat.com", 2),
1900 ]
1901
1902 .. versionadded:: 2.0
1903
1904 .. seealso::
1905
1906 :meth:`.CursorResult.splice_vertically`
1907
1908
1909 """ # noqa: E501
1910
1911 clone = self._generate()
1912 total_rows = [
1913 tuple(r1) + tuple(r2)
1914 for r1, r2 in zip(
1915 list(self._raw_row_iterator()),
1916 list(other._raw_row_iterator()),
1917 )
1918 ]
1919
1920 clone._metadata = clone._metadata._splice_horizontally(other._metadata) # type: ignore[union-attr, arg-type] # noqa: E501
1921
1922 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1923 None,
1924 initial_buffer=total_rows,
1925 )
1926 clone._reset_memoizations()
1927 return clone
1928
1929 def splice_vertically(self, other: CursorResult[Any]) -> Self:
1930 """Return a new :class:`.CursorResult` that "vertically splices",
1931 i.e. "extends", the rows of this :class:`.CursorResult` with that of
1932 another :class:`.CursorResult`.
1933
1934 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1935 not intended for general use.
1936
1937 "vertically splices" means the rows of the given result are appended to
1938 the rows of this cursor result. The incoming :class:`.CursorResult`
1939 must have rows that represent the identical list of columns in the
1940 identical order as they are in this :class:`.CursorResult`.
1941
1942 .. versionadded:: 2.0
1943
1944 .. seealso::
1945
1946 :meth:`.CursorResult.splice_horizontally`
1947
1948 """
1949 clone = self._generate()
1950 total_rows = list(self._raw_row_iterator()) + list(
1951 other._raw_row_iterator()
1952 )
1953
1954 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1955 None,
1956 initial_buffer=total_rows,
1957 )
1958 clone._reset_memoizations()
1959 return clone
1960
1961 def _rewind(self, rows: Any) -> Self:
1962 """rewind this result back to the given rowset.
1963
1964 this is used internally for the case where an :class:`.Insert`
1965 construct combines the use of
1966 :meth:`.Insert.return_defaults` along with the
1967 "supplemental columns" feature.
1968
1969 """
1970
1971 if self._echo:
1972 self.context.connection._log_debug(
1973 "CursorResult rewound %d row(s)", len(rows)
1974 )
1975
1976 # the rows given are expected to be Row objects, so we
1977 # have to clear out processors which have already run on these
1978 # rows
1979 self._metadata = cast(
1980 CursorResultMetaData, self._metadata
1981 )._remove_processors()
1982
1983 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
1984 None,
1985 # TODO: if these are Row objects, can we save on not having to
1986 # re-make new Row objects out of them a second time? is that
1987 # what's actually happening right now? maybe look into this
1988 initial_buffer=rows,
1989 )
1990 self._reset_memoizations()
1991 return self
1992
1993 @property
1994 def returned_defaults(self) -> Optional[Row[Any]]:
1995 """Return the values of default columns that were fetched using
1996 the :meth:`.ValuesBase.return_defaults` feature.
1997
1998 The value is an instance of :class:`.Row`, or ``None``
1999 if :meth:`.ValuesBase.return_defaults` was not used or if the
2000 backend does not support RETURNING.
2001
2002 .. seealso::
2003
2004 :meth:`.ValuesBase.return_defaults`
2005
2006 """
2007
2008 if self.context.executemany:
2009 raise exc.InvalidRequestError(
2010 "This statement was an executemany call; if return defaults "
2011 "is supported, please use .returned_defaults_rows."
2012 )
2013
2014 rows = self.context.returned_default_rows
2015 if rows:
2016 return rows[0]
2017 else:
2018 return None
2019
2020 def lastrow_has_defaults(self) -> bool:
2021 """Return ``lastrow_has_defaults()`` from the underlying
2022 :class:`.ExecutionContext`.
2023
2024 See :class:`.ExecutionContext` for details.
2025
2026 """
2027
2028 return self.context.lastrow_has_defaults()
2029
2030 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2031 """Return ``postfetch_cols()`` from the underlying
2032 :class:`.ExecutionContext`.
2033
2034 See :class:`.ExecutionContext` for details.
2035
2036 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2037 statement is not a compiled expression construct
2038 or is not an insert() or update() construct.
2039
2040 """
2041
2042 if not self.context.compiled:
2043 raise exc.InvalidRequestError(
2044 "Statement is not a compiled expression construct."
2045 )
2046 elif not self.context.isinsert and not self.context.isupdate:
2047 raise exc.InvalidRequestError(
2048 "Statement is not an insert() or update() "
2049 "expression construct."
2050 )
2051 return self.context.postfetch_cols
2052
2053 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2054 """Return ``prefetch_cols()`` from the underlying
2055 :class:`.ExecutionContext`.
2056
2057 See :class:`.ExecutionContext` for details.
2058
2059 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2060 statement is not a compiled expression construct
2061 or is not an insert() or update() construct.
2062
2063 """
2064
2065 if not self.context.compiled:
2066 raise exc.InvalidRequestError(
2067 "Statement is not a compiled expression construct."
2068 )
2069 elif not self.context.isinsert and not self.context.isupdate:
2070 raise exc.InvalidRequestError(
2071 "Statement is not an insert() or update() "
2072 "expression construct."
2073 )
2074 return self.context.prefetch_cols
2075
2076 def supports_sane_rowcount(self) -> bool:
2077 """Return ``supports_sane_rowcount`` from the dialect.
2078
2079 See :attr:`_engine.CursorResult.rowcount` for background.
2080
2081 """
2082
2083 return self.dialect.supports_sane_rowcount
2084
2085 def supports_sane_multi_rowcount(self) -> bool:
2086 """Return ``supports_sane_multi_rowcount`` from the dialect.
2087
2088 See :attr:`_engine.CursorResult.rowcount` for background.
2089
2090 """
2091
2092 return self.dialect.supports_sane_multi_rowcount
2093
2094 @util.memoized_property
2095 def rowcount(self) -> int:
2096 """Return the 'rowcount' for this result.
2097
2098 The primary purpose of 'rowcount' is to report the number of rows
2099 matched by the WHERE criterion of an UPDATE or DELETE statement
2100 executed once (i.e. for a single parameter set), which may then be
2101 compared to the number of rows expected to be updated or deleted as a
2102 means of asserting data integrity.
2103
2104 This attribute is transferred from the ``cursor.rowcount`` attribute
2105 of the DBAPI before the cursor is closed, to support DBAPIs that
2106 don't make this value available after cursor close. Some DBAPIs may
2107 offer meaningful values for other kinds of statements, such as INSERT
2108 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
2109 for these statements, set the
2110 :paramref:`.Connection.execution_options.preserve_rowcount`
2111 execution option to True, which will cause the ``cursor.rowcount``
2112 value to be unconditionally memoized before any results are returned
2113 or the cursor is closed, regardless of statement type.
2114
2115 For cases where the DBAPI does not support rowcount for a particular
2116 kind of statement and/or execution, the returned value will be ``-1``,
2117 which is delivered directly from the DBAPI and is part of :pep:`249`.
2118 All DBAPIs should support rowcount for single-parameter-set
2119 UPDATE and DELETE statements, however.
2120
2121 .. note::
2122
2123 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2124
2125
2126 * This attribute returns the number of rows *matched*,
2127 which is not necessarily the same as the number of rows
2128 that were actually *modified*. For example, an UPDATE statement
2129 may have no net change on a given row if the SET values
2130 given are the same as those present in the row already.
2131 Such a row would be matched but not modified.
2132 On backends that feature both styles, such as MySQL,
2133 rowcount is configured to return the match
2134 count in all cases.
2135
2136 * :attr:`_engine.CursorResult.rowcount` in the default case is
2137 *only* useful in conjunction with an UPDATE or DELETE statement,
2138 and only with a single set of parameters. For other kinds of
2139 statements, SQLAlchemy will not attempt to pre-memoize the value
2140 unless the
2141 :paramref:`.Connection.execution_options.preserve_rowcount`
2142 execution option is used. Note that contrary to :pep:`249`, many
2143 DBAPIs do not support rowcount values for statements that are not
2144 UPDATE or DELETE, particularly when rows are being returned which
2145 are not fully pre-buffered. DBAPIs that dont support rowcount
2146 for a particular kind of statement should return the value ``-1``
2147 for such statements.
2148
2149 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2150 when executing a single statement with multiple parameter sets
2151 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2152 values across multiple parameter sets and will return ``-1``
2153 when accessed.
2154
2155 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2156 a correct population of :attr:`_engine.CursorResult.rowcount`
2157 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2158 execution option is set to True.
2159
2160 * Statements that use RETURNING may not support rowcount, returning
2161 a ``-1`` value instead.
2162
2163 .. seealso::
2164
2165 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2166
2167 :paramref:`.Connection.execution_options.preserve_rowcount`
2168
2169 """ # noqa: E501
2170 try:
2171 return self.context.rowcount
2172 except BaseException as e:
2173 self.cursor_strategy.handle_exception(self, self.cursor, e)
2174 raise # not called
2175
2176 @property
2177 def lastrowid(self) -> int:
2178 """Return the 'lastrowid' accessor on the DBAPI cursor.
2179
2180 This is a DBAPI specific method and is only functional
2181 for those backends which support it, for statements
2182 where it is appropriate. It's behavior is not
2183 consistent across backends.
2184
2185 Usage of this method is normally unnecessary when
2186 using insert() expression constructs; the
2187 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2188 tuple of primary key values for a newly inserted row,
2189 regardless of database backend.
2190
2191 """
2192 try:
2193 return self.context.get_lastrowid()
2194 except BaseException as e:
2195 self.cursor_strategy.handle_exception(self, self.cursor, e)
2196
2197 @property
2198 def returns_rows(self) -> bool:
2199 """True if this :class:`_engine.CursorResult` returns zero or more
2200 rows.
2201
2202 I.e. if it is legal to call the methods
2203 :meth:`_engine.CursorResult.fetchone`,
2204 :meth:`_engine.CursorResult.fetchmany`
2205 :meth:`_engine.CursorResult.fetchall`.
2206
2207 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2208 always be synonymous with whether or not the DBAPI cursor had a
2209 ``.description`` attribute, indicating the presence of result columns,
2210 noting that a cursor that returns zero rows still has a
2211 ``.description`` if a row-returning statement was emitted.
2212
2213 This attribute should be True for all results that are against
2214 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2215 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2216 not using RETURNING, the value will usually be False, however
2217 there are some dialect-specific exceptions to this, such as when
2218 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2219 order to retrieve an inserted primary key value.
2220
2221
2222 """
2223 return self._metadata.returns_rows
2224
2225 @property
2226 def is_insert(self) -> bool:
2227 """True if this :class:`_engine.CursorResult` is the result
2228 of a executing an expression language compiled
2229 :func:`_expression.insert` construct.
2230
2231 When True, this implies that the
2232 :attr:`inserted_primary_key` attribute is accessible,
2233 assuming the statement did not include
2234 a user defined "returning" construct.
2235
2236 """
2237 return self.context.isinsert
2238
2239 def _fetchiter_impl(self) -> Iterator[Any]:
2240 fetchone = self.cursor_strategy.fetchone
2241
2242 while True:
2243 row = fetchone(self, self.cursor)
2244 if row is None:
2245 break
2246 yield row
2247
2248 def _fetchone_impl(self, hard_close: bool = False) -> Any:
2249 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2250
2251 def _fetchall_impl(self) -> Any:
2252 return self.cursor_strategy.fetchall(self, self.cursor)
2253
2254 def _fetchmany_impl(self, size: Optional[int] = None) -> Any:
2255 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2256
2257 def _raw_row_iterator(self) -> Any:
2258 return self._fetchiter_impl()
2259
2260 def merge(self, *others: Result[Any]) -> MergedResult[Any]:
2261 merged_result = super().merge(*others)
2262 if self.context._has_rowcount:
2263 merged_result.rowcount = sum(
2264 cast("CursorResult[Any]", result).rowcount
2265 for result in (self,) + others
2266 )
2267 return merged_result
2268
2269 def close(self) -> None:
2270 """Close this :class:`_engine.CursorResult`.
2271
2272 This closes out the underlying DBAPI cursor corresponding to the
2273 statement execution, if one is still present. Note that the DBAPI
2274 cursor is automatically released when the :class:`_engine.CursorResult`
2275 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2276 generally an optional method except in the case when discarding a
2277 :class:`_engine.CursorResult` that still has additional rows pending
2278 for fetch.
2279
2280 After this method is called, it is no longer valid to call upon
2281 the fetch methods, which will raise a :class:`.ResourceClosedError`
2282 on subsequent use.
2283
2284 .. seealso::
2285
2286 :ref:`connections_toplevel`
2287
2288 """
2289 self._soft_close(hard=True)
2290
2291 @_generative
2292 def yield_per(self, num: int) -> Self:
2293 self._yield_per = num
2294 self.cursor_strategy.yield_per(self, self.cursor, num)
2295 return self
2296
2297
2298ResultProxy = CursorResult