1# engine/cursor.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define cursor-specific result set constructs including
9:class:`.CursorResult`."""
10
11from __future__ import annotations
12
13import collections
14import operator
15import typing
16from typing import Any
17from typing import cast
18from typing import ClassVar
19from typing import Deque
20from typing import Dict
21from typing import Final
22from typing import Iterable
23from typing import Iterator
24from typing import List
25from typing import Literal
26from typing import Mapping
27from typing import NoReturn
28from typing import Optional
29from typing import Sequence
30from typing import Tuple
31from typing import TYPE_CHECKING
32from typing import Union
33
34from .result import IteratorResult
35from .result import MergedResult
36from .result import Result
37from .result import ResultMetaData
38from .result import SimpleResultMetaData
39from .result import tuplegetter
40from .row import Row
41from .. import exc
42from .. import util
43from ..sql import elements
44from ..sql import sqltypes
45from ..sql import util as sql_util
46from ..sql.base import _generative
47from ..sql.compiler import ResultColumnsEntry
48from ..sql.compiler import RM_NAME
49from ..sql.compiler import RM_OBJECTS
50from ..sql.compiler import RM_RENDERED_NAME
51from ..sql.compiler import RM_TYPE
52from ..sql.type_api import TypeEngine
53from ..util.typing import Self
54from ..util.typing import TupleAny
55from ..util.typing import TypeVarTuple
56from ..util.typing import Unpack
57
58if typing.TYPE_CHECKING:
59 from .base import Connection
60 from .default import DefaultExecutionContext
61 from .interfaces import _DBAPICursorDescription
62 from .interfaces import _MutableCoreSingleExecuteParams
63 from .interfaces import CoreExecuteOptionsParameter
64 from .interfaces import DBAPICursor
65 from .interfaces import DBAPIType
66 from .interfaces import Dialect
67 from .interfaces import ExecutionContext
68 from .result import _KeyIndexType
69 from .result import _KeyMapRecType
70 from .result import _KeyMapType
71 from .result import _KeyType
72 from .result import _ProcessorsType
73 from .result import _TupleGetterType
74 from ..sql.schema import Column
75 from ..sql.type_api import _ResultProcessorType
76
77
78_Ts = TypeVarTuple("_Ts")
79
80
81# metadata entry tuple indexes.
82# using raw tuple is faster than namedtuple.
83# these match up to the positions in
84# _CursorKeyMapRecType
85MD_INDEX: Final[Literal[0]] = 0
86"""integer index in cursor.description
87
88"""
89
90MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1
91"""integer index in compiled._result_columns"""
92
93MD_OBJECTS: Final[Literal[2]] = 2
94"""other string keys and ColumnElement obj that can match.
95
96This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
97
98"""
99
100MD_LOOKUP_KEY: Final[Literal[3]] = 3
101"""string key we usually expect for key-based lookup
102
103this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
104"""
105
106
107MD_RENDERED_NAME: Final[Literal[4]] = 4
108"""name that is usually in cursor.description
109
110this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
111"""
112
113
114MD_PROCESSOR: Final[Literal[5]] = 5
115"""callable to process a result value into a row"""
116
117MD_UNTRANSLATED: Final[Literal[6]] = 6
118"""raw name from cursor.description"""
119
120
121_CursorKeyMapRecType = Tuple[
122 Optional[int], # MD_INDEX, None means the record is ambiguously named
123 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None
124 TupleAny, # MD_OBJECTS
125 str, # MD_LOOKUP_KEY
126 str, # MD_RENDERED_NAME
127 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
128 Optional[str], # MD_UNTRANSLATED
129]
130
131_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
132
133# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
134# not None
135_NonAmbigCursorKeyMapRecType = Tuple[
136 int,
137 int,
138 List[Any],
139 str,
140 str,
141 Optional["_ResultProcessorType[Any]"],
142 str,
143]
144
145_MergeColTuple = Tuple[
146 int,
147 Optional[int],
148 str,
149 TypeEngine[Any],
150 "DBAPIType",
151 Optional[TupleAny],
152 Optional[str],
153]
154
155
156class CursorResultMetaData(ResultMetaData):
157 """Result metadata for DBAPI cursors."""
158
159 __slots__ = (
160 "_keymap",
161 "_processors",
162 "_keys",
163 "_keymap_by_result_column_idx",
164 "_tuplefilter",
165 "_translated_indexes",
166 "_safe_for_cache",
167 "_unpickled",
168 "_key_to_index",
169 # don't need _create_unique_filters here for now. Can be added
170 # if a need arises.
171 )
172
173 _keymap: _CursorKeyMapType
174 _processors: _ProcessorsType
175 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
176 _unpickled: bool
177 _safe_for_cache: bool
178 _translated_indexes: Optional[List[int]]
179
180 returns_rows: ClassVar[bool] = True
181
182 def _has_key(self, key: Any) -> bool:
183 return key in self._keymap
184
185 def _for_freeze(self) -> ResultMetaData:
186 ambiguous = {
187 rec[MD_LOOKUP_KEY]
188 for rec in self._keymap.values()
189 if rec[MD_INDEX] is None
190 }
191 return SimpleResultMetaData(
192 self._keys,
193 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
194 _ambiguous_keys=frozenset(ambiguous) if ambiguous else None,
195 )
196
197 def _make_new_metadata(
198 self,
199 *,
200 unpickled: bool,
201 processors: _ProcessorsType,
202 keys: Sequence[str],
203 keymap: _KeyMapType,
204 tuplefilter: Optional[_TupleGetterType],
205 translated_indexes: Optional[List[int]],
206 safe_for_cache: bool,
207 keymap_by_result_column_idx: Any,
208 ) -> Self:
209 new_obj = self.__class__.__new__(self.__class__)
210 new_obj._unpickled = unpickled
211 new_obj._processors = processors
212 new_obj._keys = keys
213 new_obj._keymap = keymap
214 new_obj._tuplefilter = tuplefilter
215 new_obj._translated_indexes = translated_indexes
216 new_obj._safe_for_cache = safe_for_cache
217 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
218 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
219 return new_obj
220
221 def _remove_processors_and_tuple_filter(self) -> Self:
222 if self._tuplefilter:
223 proc = self._tuplefilter(self._processors)
224 else:
225 proc = self._processors
226 return self._make_new_metadata(
227 unpickled=self._unpickled,
228 processors=[None] * len(proc),
229 tuplefilter=None,
230 translated_indexes=None,
231 keymap={
232 key: value[0:5] + (None,) + value[6:]
233 for key, value in self._keymap.items()
234 },
235 keys=self._keys,
236 safe_for_cache=self._safe_for_cache,
237 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
238 )
239
240 def _splice_horizontally(self, other: CursorResultMetaData) -> Self:
241 keymap = dict(self._keymap)
242 offset = len(self._keys)
243
244 for key, value in other._keymap.items():
245 # int index should be None for ambiguous key
246 if value[MD_INDEX] is not None and key not in keymap:
247 md_index = value[MD_INDEX] + offset
248 md_object = value[MD_RESULT_MAP_INDEX] + offset
249 else:
250 md_index = None
251 md_object = -1
252 keymap[key] = (md_index, md_object, *value[2:])
253
254 self_tf = self._tuplefilter
255 other_tf = other._tuplefilter
256
257 proc: List[Any] = []
258 for pp, tf in [
259 (self._processors, self_tf),
260 (other._processors, other_tf),
261 ]:
262 proc.extend(pp if tf is None else tf(pp))
263
264 new_keys = [*self._keys, *other._keys]
265 assert len(proc) == len(new_keys)
266
267 return self._make_new_metadata(
268 unpickled=self._unpickled,
269 processors=proc,
270 tuplefilter=None,
271 translated_indexes=None,
272 keys=new_keys,
273 keymap=keymap,
274 safe_for_cache=self._safe_for_cache,
275 keymap_by_result_column_idx={
276 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
277 for metadata_entry in keymap.values()
278 },
279 )
280
281 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self:
282 recs = list(self._metadata_for_keys(keys))
283
284 indexes = [rec[MD_INDEX] for rec in recs]
285 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
286
287 if self._translated_indexes:
288 indexes = [self._translated_indexes[idx] for idx in indexes]
289 tup = tuplegetter(*indexes)
290 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
291
292 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
293 # TODO: need unit test for:
294 # result = connection.execute("raw sql, no columns").scalars()
295 # without the "or ()" it's failing because MD_OBJECTS is None
296 keymap.update(
297 (e, new_rec)
298 for new_rec in new_recs
299 for e in new_rec[MD_OBJECTS] or ()
300 )
301
302 return self._make_new_metadata(
303 unpickled=self._unpickled,
304 processors=self._processors,
305 keys=new_keys,
306 tuplefilter=tup,
307 translated_indexes=indexes,
308 keymap=keymap, # type: ignore[arg-type]
309 safe_for_cache=self._safe_for_cache,
310 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
311 )
312
313 def _adapt_to_context(self, context: ExecutionContext) -> Self:
314 """When using a cached Compiled construct that has a _result_map,
315 for a new statement that used the cached Compiled, we need to ensure
316 the keymap has the Column objects from our new statement as keys.
317 So here we rewrite keymap with new entries for the new columns
318 as matched to those of the cached statement.
319
320 """
321
322 if not context.compiled or not context.compiled._result_columns:
323 return self
324
325 compiled_statement = context.compiled.statement
326 invoked_statement = context.invoked_statement
327
328 if TYPE_CHECKING:
329 assert isinstance(invoked_statement, elements.ClauseElement)
330
331 if compiled_statement is invoked_statement:
332 return self
333
334 assert invoked_statement is not None
335
336 # this is the most common path for Core statements when
337 # caching is used. In ORM use, this codepath is not really used
338 # as the _result_disable_adapt_to_context execution option is
339 # set by the ORM.
340
341 # make a copy and add the columns from the invoked statement
342 # to the result map.
343
344 keymap_by_position = self._keymap_by_result_column_idx
345
346 if keymap_by_position is None:
347 # first retrieval from cache, this map will not be set up yet,
348 # initialize lazily
349 keymap_by_position = self._keymap_by_result_column_idx = {
350 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
351 for metadata_entry in self._keymap.values()
352 }
353
354 return self._make_new_metadata(
355 keymap=self._keymap
356 | {
357 new: keymap_by_position[idx]
358 for idx, new in enumerate(
359 invoked_statement._all_selected_columns
360 )
361 if idx in keymap_by_position
362 },
363 unpickled=self._unpickled,
364 processors=self._processors,
365 tuplefilter=self._tuplefilter,
366 translated_indexes=None,
367 keys=self._keys,
368 safe_for_cache=self._safe_for_cache,
369 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
370 )
371
372 def __init__(
373 self,
374 parent: CursorResult[Unpack[TupleAny]],
375 cursor_description: _DBAPICursorDescription,
376 *,
377 driver_column_names: bool = False,
378 num_sentinel_cols: int = 0,
379 ):
380 context = parent.context
381 if num_sentinel_cols > 0:
382 # this is slightly faster than letting tuplegetter use the indexes
383 self._tuplefilter = tuplefilter = operator.itemgetter(
384 slice(-num_sentinel_cols)
385 )
386 cursor_description = tuplefilter(cursor_description)
387 else:
388 self._tuplefilter = tuplefilter = None
389 self._translated_indexes = None
390 self._safe_for_cache = self._unpickled = False
391
392 if context.result_column_struct:
393 (
394 result_columns,
395 cols_are_ordered,
396 textual_ordered,
397 ad_hoc_textual,
398 loose_column_name_matching,
399 ) = context.result_column_struct
400 if tuplefilter is not None:
401 result_columns = tuplefilter(result_columns)
402 num_ctx_cols = len(result_columns)
403 else:
404 result_columns = cols_are_ordered = ( # type: ignore
405 num_ctx_cols
406 ) = ad_hoc_textual = loose_column_name_matching = (
407 textual_ordered
408 ) = False
409
410 # merge cursor.description with the column info
411 # present in the compiled structure, if any
412 raw = self._merge_cursor_description(
413 context,
414 cursor_description,
415 result_columns,
416 num_ctx_cols,
417 cols_are_ordered,
418 textual_ordered,
419 ad_hoc_textual,
420 loose_column_name_matching,
421 driver_column_names,
422 )
423
424 # processors in key order which are used when building up
425 # a row
426 self._processors = [
427 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
428 ]
429 if num_sentinel_cols > 0:
430 # add the number of sentinel columns since these are passed
431 # to the tuplefilters before being used
432 self._processors.extend([None] * num_sentinel_cols)
433
434 # this is used when using this ResultMetaData in a Core-only cache
435 # retrieval context. it's initialized on first cache retrieval
436 # when the _result_disable_adapt_to_context execution option
437 # (which the ORM generally sets) is not set.
438 self._keymap_by_result_column_idx = None
439
440 # for compiled SQL constructs, copy additional lookup keys into
441 # the key lookup map, such as Column objects, labels,
442 # column keys and other names
443 if num_ctx_cols:
444 # keymap by primary string...
445 by_key: Dict[_KeyType, _CursorKeyMapRecType] = {
446 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
447 for metadata_entry in raw
448 }
449
450 if len(by_key) != num_ctx_cols:
451 # if by-primary-string dictionary smaller than
452 # number of columns, assume we have dupes; (this check
453 # is also in place if string dictionary is bigger, as
454 # can occur when '*' was used as one of the compiled columns,
455 # which may or may not be suggestive of dupes), rewrite
456 # dupe records with "None" for index which results in
457 # ambiguous column exception when accessed.
458 #
459 # this is considered to be the less common case as it is not
460 # common to have dupe column keys in a SELECT statement.
461 #
462 # new in 1.4: get the complete set of all possible keys,
463 # strings, objects, whatever, that are dupes across two
464 # different records, first.
465 index_by_key: Dict[Any, Any] = {}
466 dupes = set()
467 for metadata_entry in raw:
468 for key in (metadata_entry[MD_RENDERED_NAME],) + (
469 metadata_entry[MD_OBJECTS] or ()
470 ):
471 idx = metadata_entry[MD_INDEX]
472 # if this key has been associated with more than one
473 # positional index, it's a dupe
474 if index_by_key.setdefault(key, idx) != idx:
475 dupes.add(key)
476
477 # then put everything we have into the keymap excluding only
478 # those keys that are dupes.
479 self._keymap = {
480 obj_elem: metadata_entry
481 for metadata_entry in raw
482 if metadata_entry[MD_OBJECTS]
483 for obj_elem in metadata_entry[MD_OBJECTS]
484 if obj_elem not in dupes
485 }
486
487 # then for the dupe keys, put the "ambiguous column"
488 # record into by_key.
489 by_key.update(
490 {
491 key: (None, -1, (), key, key, None, None)
492 for key in dupes
493 }
494 )
495
496 else:
497 # no dupes - copy secondary elements from compiled
498 # columns into self._keymap. this is the most common
499 # codepath for Core / ORM statement executions before the
500 # result metadata is cached
501 self._keymap = {
502 obj_elem: metadata_entry
503 for metadata_entry in raw
504 if metadata_entry[MD_OBJECTS]
505 for obj_elem in metadata_entry[MD_OBJECTS]
506 }
507 # update keymap with primary string names taking
508 # precedence
509 self._keymap.update(by_key)
510 else:
511 # no compiled objects to map, just create keymap by primary string
512 self._keymap = {
513 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
514 for metadata_entry in raw
515 }
516
517 # update keymap with "translated" names.
518 # the "translated" name thing has a long history:
519 # 1. originally, it was used to fix an issue in very old SQLite
520 # versions prior to 3.10.0. This code is still there in the
521 # sqlite dialect.
522 # 2. Next, the pyhive third party dialect started using this hook
523 # for some driver related issue on their end.
524 # 3. Most recently, the "driver_column_names" execution option has
525 # taken advantage of this hook to get raw DBAPI col names in the
526 # result keys without disrupting the usual merge process.
527
528 if driver_column_names or (
529 not num_ctx_cols and context._translate_colname
530 ):
531 self._keymap.update(
532 {
533 metadata_entry[MD_UNTRANSLATED]: self._keymap[
534 metadata_entry[MD_LOOKUP_KEY]
535 ]
536 for metadata_entry in raw
537 if metadata_entry[MD_UNTRANSLATED]
538 }
539 )
540
541 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
542
543 def _merge_cursor_description(
544 self,
545 context: DefaultExecutionContext,
546 cursor_description: _DBAPICursorDescription,
547 result_columns: Sequence[ResultColumnsEntry],
548 num_ctx_cols: int,
549 cols_are_ordered: bool,
550 textual_ordered: bool,
551 ad_hoc_textual: bool,
552 loose_column_name_matching: bool,
553 driver_column_names: bool,
554 ) -> List[_CursorKeyMapRecType]:
555 """Merge a cursor.description with compiled result column information.
556
557 There are at least four separate strategies used here, selected
558 depending on the type of SQL construct used to start with.
559
560 The most common case is that of the compiled SQL expression construct,
561 which generated the column names present in the raw SQL string and
562 which has the identical number of columns as were reported by
563 cursor.description. In this case, we assume a 1-1 positional mapping
564 between the entries in cursor.description and the compiled object.
565 This is also the most performant case as we disregard extracting /
566 decoding the column names present in cursor.description since we
567 already have the desired name we generated in the compiled SQL
568 construct.
569
570 The next common case is that of the completely raw string SQL,
571 such as passed to connection.execute(). In this case we have no
572 compiled construct to work with, so we extract and decode the
573 names from cursor.description and index those as the primary
574 result row target keys.
575
576 The remaining fairly common case is that of the textual SQL
577 that includes at least partial column information; this is when
578 we use a :class:`_expression.TextualSelect` construct.
579 This construct may have
580 unordered or ordered column information. In the ordered case, we
581 merge the cursor.description and the compiled construct's information
582 positionally, and warn if there are additional description names
583 present, however we still decode the names in cursor.description
584 as we don't have a guarantee that the names in the columns match
585 on these. In the unordered case, we match names in cursor.description
586 to that of the compiled construct based on name matching.
587 In both of these cases, the cursor.description names and the column
588 expression objects and names are indexed as result row target keys.
589
590 The final case is much less common, where we have a compiled
591 non-textual SQL expression construct, but the number of columns
592 in cursor.description doesn't match what's in the compiled
593 construct. We make the guess here that there might be textual
594 column expressions in the compiled construct that themselves include
595 a comma in them causing them to split. We do the same name-matching
596 as with textual non-ordered columns.
597
598 The name-matched system of merging is the same as that used by
599 SQLAlchemy for all cases up through the 0.9 series. Positional
600 matching for compiled SQL expressions was introduced in 1.0 as a
601 major performance feature, and positional matching for textual
602 :class:`_expression.TextualSelect` objects in 1.1.
603 As name matching is no longer
604 a common case, it was acceptable to factor it into smaller generator-
605 oriented methods that are easier to understand, but incur slightly
606 more performance overhead.
607
608 """
609
610 if (
611 num_ctx_cols
612 and cols_are_ordered
613 and not textual_ordered
614 and num_ctx_cols == len(cursor_description)
615 and not driver_column_names
616 ):
617 self._keys = [elem[0] for elem in result_columns]
618 # pure positional 1-1 case; doesn't need to read
619 # the names from cursor.description
620
621 # most common case for Core and ORM
622
623 # this metadata is safe to
624 # cache because we are guaranteed
625 # to have the columns in the same order for new executions
626 self._safe_for_cache = True
627
628 return [
629 (
630 idx,
631 idx,
632 rmap_entry[RM_OBJECTS],
633 rmap_entry[RM_NAME],
634 rmap_entry[RM_RENDERED_NAME],
635 context.get_result_processor(
636 rmap_entry[RM_TYPE],
637 rmap_entry[RM_RENDERED_NAME],
638 cursor_description[idx][1],
639 ),
640 None,
641 )
642 for idx, rmap_entry in enumerate(result_columns)
643 ]
644 else:
645 # name-based or text-positional cases, where we need
646 # to read cursor.description names
647
648 if textual_ordered or (
649 ad_hoc_textual and len(cursor_description) == num_ctx_cols
650 ):
651 self._safe_for_cache = not driver_column_names
652 # textual positional case
653 raw_iterator = self._merge_textual_cols_by_position(
654 context,
655 cursor_description,
656 result_columns,
657 driver_column_names,
658 )
659 elif num_ctx_cols:
660 # compiled SQL with a mismatch of description cols
661 # vs. compiled cols, or textual w/ unordered columns
662 # the order of columns can change if the query is
663 # against a "select *", so not safe to cache
664 self._safe_for_cache = False
665 raw_iterator = self._merge_cols_by_name(
666 context,
667 cursor_description,
668 result_columns,
669 loose_column_name_matching,
670 driver_column_names,
671 )
672 else:
673 # no compiled SQL, just a raw string, order of columns
674 # can change for "select *"
675 self._safe_for_cache = False
676 raw_iterator = self._merge_cols_by_none(
677 context, cursor_description, driver_column_names
678 )
679
680 return [
681 (
682 idx,
683 ridx,
684 obj,
685 cursor_colname,
686 cursor_colname,
687 context.get_result_processor(
688 mapped_type, cursor_colname, coltype
689 ),
690 untranslated,
691 ) # type: ignore[misc]
692 for (
693 idx,
694 ridx,
695 cursor_colname,
696 mapped_type,
697 coltype,
698 obj,
699 untranslated,
700 ) in raw_iterator
701 ]
702
703 def _colnames_from_description(
704 self,
705 context: DefaultExecutionContext,
706 cursor_description: _DBAPICursorDescription,
707 driver_column_names: bool,
708 ) -> Iterator[Tuple[int, str, str, Optional[str], DBAPIType]]:
709 """Extract column names and data types from a cursor.description.
710
711 Applies unicode decoding, column translation, "normalization",
712 and case sensitivity rules to the names based on the dialect.
713
714 """
715 dialect = context.dialect
716 translate_colname = context._translate_colname
717 normalize_name = (
718 dialect.normalize_name if dialect.requires_name_normalize else None
719 )
720
721 untranslated = None
722
723 for idx, rec in enumerate(cursor_description):
724 colname = unnormalized = rec[0]
725 coltype = rec[1]
726
727 if translate_colname:
728 # a None here for "untranslated" means "the dialect did not
729 # change the column name and the untranslated case can be
730 # ignored". otherwise "untranslated" is expected to be the
731 # original, unchanged colname (e.g. is == to "unnormalized")
732 colname, untranslated = translate_colname(colname)
733
734 assert untranslated is None or untranslated == unnormalized
735
736 if normalize_name:
737 colname = normalize_name(colname)
738
739 if driver_column_names:
740 yield idx, colname, unnormalized, unnormalized, coltype
741
742 else:
743 yield idx, colname, unnormalized, untranslated, coltype
744
745 def _merge_textual_cols_by_position(
746 self,
747 context: DefaultExecutionContext,
748 cursor_description: _DBAPICursorDescription,
749 result_columns: Sequence[ResultColumnsEntry],
750 driver_column_names: bool,
751 ) -> Iterator[_MergeColTuple]:
752 num_ctx_cols = len(result_columns)
753
754 if num_ctx_cols > len(cursor_description):
755 util.warn(
756 "Number of columns in textual SQL (%d) is "
757 "smaller than number of columns requested (%d)"
758 % (num_ctx_cols, len(cursor_description))
759 )
760 seen = set()
761
762 self._keys = []
763
764 uses_denormalize = context.dialect.requires_name_normalize
765 for (
766 idx,
767 colname,
768 unnormalized,
769 untranslated,
770 coltype,
771 ) in self._colnames_from_description(
772 context, cursor_description, driver_column_names
773 ):
774 if idx < num_ctx_cols:
775 ctx_rec = result_columns[idx]
776 obj = ctx_rec[RM_OBJECTS]
777 ridx = idx
778 mapped_type = ctx_rec[RM_TYPE]
779 if obj[0] in seen:
780 raise exc.InvalidRequestError(
781 "Duplicate column expression requested "
782 "in textual SQL: %r" % obj[0]
783 )
784 seen.add(obj[0])
785
786 # special check for all uppercase unnormalized name;
787 # use the unnormalized name as the key.
788 # see #10788
789 # if these names don't match, then we still honor the
790 # cursor.description name as the key and not what the
791 # Column has, see
792 # test_resultset.py::PositionalTextTest::test_via_column
793 if (
794 uses_denormalize
795 and unnormalized == ctx_rec[RM_RENDERED_NAME]
796 ):
797 result_name = unnormalized
798 else:
799 result_name = colname
800 else:
801 mapped_type = sqltypes.NULLTYPE
802 obj = None
803 ridx = None
804
805 result_name = colname
806
807 if driver_column_names:
808 assert untranslated is not None
809 self._keys.append(untranslated)
810 else:
811 self._keys.append(result_name)
812
813 yield (
814 idx,
815 ridx,
816 result_name,
817 mapped_type,
818 coltype,
819 obj,
820 untranslated,
821 )
822
823 def _merge_cols_by_name(
824 self,
825 context: DefaultExecutionContext,
826 cursor_description: _DBAPICursorDescription,
827 result_columns: Sequence[ResultColumnsEntry],
828 loose_column_name_matching: bool,
829 driver_column_names: bool,
830 ) -> Iterator[_MergeColTuple]:
831 match_map = self._create_description_match_map(
832 result_columns, loose_column_name_matching
833 )
834 mapped_type: TypeEngine[Any]
835
836 self._keys = []
837
838 for (
839 idx,
840 colname,
841 unnormalized,
842 untranslated,
843 coltype,
844 ) in self._colnames_from_description(
845 context, cursor_description, driver_column_names
846 ):
847 try:
848 ctx_rec = match_map[colname]
849 except KeyError:
850 mapped_type = sqltypes.NULLTYPE
851 obj = None
852 result_columns_idx = None
853 else:
854 obj = ctx_rec[1]
855 mapped_type = ctx_rec[2]
856 result_columns_idx = ctx_rec[3]
857
858 if driver_column_names:
859 assert untranslated is not None
860 self._keys.append(untranslated)
861 else:
862 self._keys.append(colname)
863 yield (
864 idx,
865 result_columns_idx,
866 colname,
867 mapped_type,
868 coltype,
869 obj,
870 untranslated,
871 )
872
873 @classmethod
874 def _create_description_match_map(
875 cls,
876 result_columns: Sequence[ResultColumnsEntry],
877 loose_column_name_matching: bool = False,
878 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]:
879 """when matching cursor.description to a set of names that are present
880 in a Compiled object, as is the case with TextualSelect, get all the
881 names we expect might match those in cursor.description.
882 """
883
884 d: Dict[
885 Union[str, object],
886 Tuple[str, TupleAny, TypeEngine[Any], int],
887 ] = {}
888 for ridx, elem in enumerate(result_columns):
889 key = elem[RM_RENDERED_NAME]
890
891 if key in d:
892 # conflicting keyname - just add the column-linked objects
893 # to the existing record. if there is a duplicate column
894 # name in the cursor description, this will allow all of those
895 # objects to raise an ambiguous column error
896 e_name, e_obj, e_type, e_ridx = d[key]
897 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
898 else:
899 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
900
901 if loose_column_name_matching:
902 # when using a textual statement with an unordered set
903 # of columns that line up, we are expecting the user
904 # to be using label names in the SQL that match to the column
905 # expressions. Enable more liberal matching for this case;
906 # duplicate keys that are ambiguous will be fixed later.
907 for r_key in elem[RM_OBJECTS]:
908 d.setdefault(
909 r_key,
910 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
911 )
912 return d
913
914 def _merge_cols_by_none(
915 self,
916 context: DefaultExecutionContext,
917 cursor_description: _DBAPICursorDescription,
918 driver_column_names: bool,
919 ) -> Iterator[_MergeColTuple]:
920 self._keys = []
921
922 for (
923 idx,
924 colname,
925 unnormalized,
926 untranslated,
927 coltype,
928 ) in self._colnames_from_description(
929 context, cursor_description, driver_column_names
930 ):
931
932 if driver_column_names:
933 assert untranslated is not None
934 self._keys.append(untranslated)
935 else:
936 self._keys.append(colname)
937
938 yield (
939 idx,
940 None,
941 colname,
942 sqltypes.NULLTYPE,
943 coltype,
944 None,
945 untranslated,
946 )
947
948 if not TYPE_CHECKING:
949
950 def _key_fallback(
951 self, key: Any, err: Optional[Exception], raiseerr: bool = True
952 ) -> Optional[NoReturn]:
953 if raiseerr:
954 if self._unpickled and isinstance(key, elements.ColumnElement):
955 raise exc.NoSuchColumnError(
956 "Row was unpickled; lookup by ColumnElement "
957 "is unsupported"
958 ) from err
959 else:
960 raise exc.NoSuchColumnError(
961 "Could not locate column in row for column '%s'"
962 % util.string_or_unprintable(key)
963 ) from err
964 else:
965 return None
966
967 def _raise_for_ambiguous_column_name(
968 self, rec: _KeyMapRecType
969 ) -> NoReturn:
970 raise exc.InvalidRequestError(
971 "Ambiguous column name '%s' in "
972 "result set column descriptions" % rec[MD_LOOKUP_KEY]
973 )
974
975 def _index_for_key(
976 self, key: _KeyIndexType, raiseerr: bool = True
977 ) -> Optional[int]:
978 # TODO: can consider pre-loading ints and negative ints
979 # into _keymap - also no coverage here
980 if isinstance(key, int):
981 key = self._keys[key]
982
983 try:
984 rec = self._keymap[key]
985 except KeyError as ke:
986 x = self._key_fallback(key, ke, raiseerr)
987 assert x is None
988 return None
989
990 index = rec[0]
991
992 if index is None:
993 self._raise_for_ambiguous_column_name(rec)
994 return index
995
996 def _indexes_for_keys(
997 self, keys: Sequence[_KeyIndexType]
998 ) -> Sequence[int]:
999 try:
1000 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501
1001 except KeyError as ke:
1002 # ensure it raises
1003 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
1004
1005 def _metadata_for_keys(
1006 self, keys: Sequence[_KeyIndexType]
1007 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
1008 for key in keys:
1009 if isinstance(key, int):
1010 key = self._keys[key]
1011
1012 try:
1013 rec = self._keymap[key]
1014 except KeyError as ke:
1015 # ensure it raises
1016 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
1017
1018 index = rec[MD_INDEX]
1019
1020 if index is None:
1021 self._raise_for_ambiguous_column_name(rec)
1022
1023 yield cast(_NonAmbigCursorKeyMapRecType, rec)
1024
1025 def __getstate__(self) -> Dict[str, Any]:
1026 # TODO: consider serializing this as SimpleResultMetaData
1027 return {
1028 "_keymap": {
1029 key: (
1030 rec[MD_INDEX],
1031 rec[MD_RESULT_MAP_INDEX],
1032 [],
1033 key,
1034 rec[MD_RENDERED_NAME],
1035 None,
1036 None,
1037 )
1038 for key, rec in self._keymap.items()
1039 if isinstance(key, (str, int))
1040 },
1041 "_keys": self._keys,
1042 "_translated_indexes": self._translated_indexes,
1043 }
1044
1045 def __setstate__(self, state: Dict[str, Any]) -> None:
1046 self._processors = [None for _ in range(len(state["_keys"]))]
1047 self._keymap = state["_keymap"]
1048 self._keymap_by_result_column_idx = None
1049 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
1050 self._keys = state["_keys"]
1051 self._unpickled = True
1052 if state["_translated_indexes"]:
1053 translated_indexes: List[Any]
1054 self._translated_indexes = translated_indexes = state[
1055 "_translated_indexes"
1056 ]
1057 self._tuplefilter = tuplegetter(*translated_indexes)
1058 else:
1059 self._translated_indexes = self._tuplefilter = None
1060
1061
1062class ResultFetchStrategy:
1063 """Define a fetching strategy for a result object.
1064
1065
1066 .. versionadded:: 1.4
1067
1068 """
1069
1070 __slots__ = ()
1071
1072 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
1073
1074 def soft_close(
1075 self,
1076 result: CursorResult[Unpack[TupleAny]],
1077 dbapi_cursor: Optional[DBAPICursor],
1078 ) -> None:
1079 raise NotImplementedError()
1080
1081 def hard_close(
1082 self,
1083 result: CursorResult[Unpack[TupleAny]],
1084 dbapi_cursor: Optional[DBAPICursor],
1085 ) -> None:
1086 raise NotImplementedError()
1087
1088 def yield_per(
1089 self,
1090 result: CursorResult[Unpack[TupleAny]],
1091 dbapi_cursor: DBAPICursor,
1092 num: int,
1093 ) -> None:
1094 return
1095
1096 def fetchone(
1097 self,
1098 result: CursorResult[Unpack[TupleAny]],
1099 dbapi_cursor: DBAPICursor,
1100 hard_close: bool = False,
1101 ) -> Any:
1102 raise NotImplementedError()
1103
1104 def fetchmany(
1105 self,
1106 result: CursorResult[Unpack[TupleAny]],
1107 dbapi_cursor: DBAPICursor,
1108 size: Optional[int] = None,
1109 ) -> Any:
1110 raise NotImplementedError()
1111
1112 def fetchall(
1113 self,
1114 result: CursorResult[Unpack[TupleAny]],
1115 dbapi_cursor: DBAPICursor,
1116 ) -> Any:
1117 raise NotImplementedError()
1118
1119 def handle_exception(
1120 self,
1121 result: CursorResult[Unpack[TupleAny]],
1122 dbapi_cursor: Optional[DBAPICursor],
1123 err: BaseException,
1124 ) -> NoReturn:
1125 raise err
1126
1127
1128class NoCursorFetchStrategy(ResultFetchStrategy):
1129 """Cursor strategy for a result that has no open cursor.
1130
1131 There are two varieties of this strategy, one for DQL and one for
1132 DML (and also DDL), each of which represent a result that had a cursor
1133 but no longer has one.
1134
1135 """
1136
1137 __slots__ = ()
1138
1139 def soft_close(
1140 self,
1141 result: CursorResult[Unpack[TupleAny]],
1142 dbapi_cursor: Optional[DBAPICursor],
1143 ) -> None:
1144 pass
1145
1146 def hard_close(
1147 self,
1148 result: CursorResult[Unpack[TupleAny]],
1149 dbapi_cursor: Optional[DBAPICursor],
1150 ) -> None:
1151 pass
1152
1153 def fetchone(
1154 self,
1155 result: CursorResult[Unpack[TupleAny]],
1156 dbapi_cursor: DBAPICursor,
1157 hard_close: bool = False,
1158 ) -> Any:
1159 return self._non_result(result, None)
1160
1161 def fetchmany(
1162 self,
1163 result: CursorResult[Unpack[TupleAny]],
1164 dbapi_cursor: DBAPICursor,
1165 size: Optional[int] = None,
1166 ) -> Any:
1167 return self._non_result(result, [])
1168
1169 def fetchall(
1170 self, result: CursorResult[Unpack[TupleAny]], dbapi_cursor: DBAPICursor
1171 ) -> Any:
1172 return self._non_result(result, [])
1173
1174 def _non_result(
1175 self,
1176 result: CursorResult[Unpack[TupleAny]],
1177 default: Any,
1178 err: Optional[BaseException] = None,
1179 ) -> Any:
1180 raise NotImplementedError()
1181
1182
1183class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1184 """Cursor strategy for a DQL result that has no open cursor.
1185
1186 This is a result set that can return rows, i.e. for a SELECT, or for an
1187 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1188 where the cursor is closed and no rows remain available. The owning result
1189 object may or may not be "hard closed", which determines if the fetch
1190 methods send empty results or raise for closed result.
1191
1192 """
1193
1194 __slots__ = ()
1195
1196 def _non_result(
1197 self,
1198 result: CursorResult[Unpack[TupleAny]],
1199 default: Any,
1200 err: Optional[BaseException] = None,
1201 ) -> Any:
1202 if result.closed:
1203 raise exc.ResourceClosedError(
1204 "This result object is closed."
1205 ) from err
1206 else:
1207 return default
1208
1209
1210_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1211
1212
1213class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1214 """Cursor strategy for a DML result that has no open cursor.
1215
1216 This is a result set that does not return rows, i.e. for an INSERT,
1217 UPDATE, DELETE that does not include RETURNING.
1218
1219 """
1220
1221 __slots__ = ()
1222
1223 def _non_result(
1224 self,
1225 result: CursorResult[Unpack[TupleAny]],
1226 default: Any,
1227 err: Optional[BaseException] = None,
1228 ) -> Any:
1229 # we only expect to have a _NoResultMetaData() here right now.
1230 assert not result._metadata.returns_rows
1231 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr]
1232
1233
1234_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1235
1236
1237class CursorFetchStrategy(ResultFetchStrategy):
1238 """Call fetch methods from a DBAPI cursor.
1239
1240 Alternate versions of this class may instead buffer the rows from
1241 cursors or not use cursors at all.
1242
1243 """
1244
1245 __slots__ = ()
1246
1247 def soft_close(
1248 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1249 ) -> None:
1250 result.cursor_strategy = _NO_CURSOR_DQL
1251
1252 def hard_close(
1253 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1254 ) -> None:
1255 result.cursor_strategy = _NO_CURSOR_DQL
1256
1257 def handle_exception(
1258 self,
1259 result: CursorResult[Any],
1260 dbapi_cursor: Optional[DBAPICursor],
1261 err: BaseException,
1262 ) -> NoReturn:
1263 result.connection._handle_dbapi_exception(
1264 err, None, None, dbapi_cursor, result.context
1265 )
1266
1267 def yield_per(
1268 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1269 ) -> None:
1270 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1271 dbapi_cursor,
1272 {"max_row_buffer": num},
1273 initial_buffer=collections.deque(),
1274 growth_factor=0,
1275 )
1276
1277 def fetchone(
1278 self,
1279 result: CursorResult[Any],
1280 dbapi_cursor: DBAPICursor,
1281 hard_close: bool = False,
1282 ) -> Any:
1283 try:
1284 row = dbapi_cursor.fetchone()
1285 if row is None:
1286 result._soft_close(hard=hard_close)
1287 return row
1288 except BaseException as e:
1289 self.handle_exception(result, dbapi_cursor, e)
1290
1291 def fetchmany(
1292 self,
1293 result: CursorResult[Any],
1294 dbapi_cursor: DBAPICursor,
1295 size: Optional[int] = None,
1296 ) -> Any:
1297 try:
1298 if size is None:
1299 l = dbapi_cursor.fetchmany()
1300 else:
1301 l = dbapi_cursor.fetchmany(size)
1302
1303 if not l:
1304 result._soft_close()
1305 return l
1306 except BaseException as e:
1307 self.handle_exception(result, dbapi_cursor, e)
1308
1309 def fetchall(
1310 self,
1311 result: CursorResult[Any],
1312 dbapi_cursor: DBAPICursor,
1313 ) -> Any:
1314 try:
1315 rows = dbapi_cursor.fetchall()
1316 result._soft_close()
1317 return rows
1318 except BaseException as e:
1319 self.handle_exception(result, dbapi_cursor, e)
1320
1321
1322_DEFAULT_FETCH = CursorFetchStrategy()
1323
1324
1325class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1326 """A cursor fetch strategy with row buffering behavior.
1327
1328 This strategy buffers the contents of a selection of rows
1329 before ``fetchone()`` is called. This is to allow the results of
1330 ``cursor.description`` to be available immediately, when
1331 interfacing with a DB-API that requires rows to be consumed before
1332 this information is available (currently psycopg2, when used with
1333 server-side cursors).
1334
1335 The pre-fetching behavior fetches only one row initially, and then
1336 grows its buffer size by a fixed amount with each successive need
1337 for additional rows up the ``max_row_buffer`` size, which defaults
1338 to 1000::
1339
1340 with psycopg2_engine.connect() as conn:
1341
1342 result = conn.execution_options(
1343 stream_results=True, max_row_buffer=50
1344 ).execute(text("select * from table"))
1345
1346 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1347
1348 .. seealso::
1349
1350 :ref:`psycopg2_execution_options`
1351 """
1352
1353 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1354
1355 def __init__(
1356 self,
1357 dbapi_cursor: DBAPICursor,
1358 execution_options: CoreExecuteOptionsParameter,
1359 growth_factor: int = 5,
1360 initial_buffer: Optional[Deque[Any]] = None,
1361 ) -> None:
1362 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1363
1364 if initial_buffer is not None:
1365 self._rowbuffer = initial_buffer
1366 else:
1367 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1368 self._growth_factor = growth_factor
1369
1370 if growth_factor:
1371 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1372 else:
1373 self._bufsize = self._max_row_buffer
1374
1375 @classmethod
1376 def create(
1377 cls, result: CursorResult[Any]
1378 ) -> BufferedRowCursorFetchStrategy:
1379 return BufferedRowCursorFetchStrategy(
1380 result.cursor,
1381 result.context.execution_options,
1382 )
1383
1384 def _buffer_rows(
1385 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1386 ) -> None:
1387 """this is currently used only by fetchone()."""
1388
1389 size = self._bufsize
1390 try:
1391 if size < 1:
1392 new_rows = dbapi_cursor.fetchall()
1393 else:
1394 new_rows = dbapi_cursor.fetchmany(size)
1395 except BaseException as e:
1396 self.handle_exception(result, dbapi_cursor, e)
1397
1398 if not new_rows:
1399 return
1400 self._rowbuffer = collections.deque(new_rows)
1401 if self._growth_factor and size < self._max_row_buffer:
1402 self._bufsize = min(
1403 self._max_row_buffer, size * self._growth_factor
1404 )
1405
1406 def yield_per(
1407 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1408 ) -> None:
1409 self._growth_factor = 0
1410 self._max_row_buffer = self._bufsize = num
1411
1412 def soft_close(
1413 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1414 ) -> None:
1415 self._rowbuffer.clear()
1416 super().soft_close(result, dbapi_cursor)
1417
1418 def hard_close(
1419 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1420 ) -> None:
1421 self._rowbuffer.clear()
1422 super().hard_close(result, dbapi_cursor)
1423
1424 def fetchone(
1425 self,
1426 result: CursorResult[Any],
1427 dbapi_cursor: DBAPICursor,
1428 hard_close: bool = False,
1429 ) -> Any:
1430 if not self._rowbuffer:
1431 self._buffer_rows(result, dbapi_cursor)
1432 if not self._rowbuffer:
1433 try:
1434 result._soft_close(hard=hard_close)
1435 except BaseException as e:
1436 self.handle_exception(result, dbapi_cursor, e)
1437 return None
1438 return self._rowbuffer.popleft()
1439
1440 def fetchmany(
1441 self,
1442 result: CursorResult[Any],
1443 dbapi_cursor: DBAPICursor,
1444 size: Optional[int] = None,
1445 ) -> Any:
1446 if size is None:
1447 return self.fetchall(result, dbapi_cursor)
1448
1449 rb = self._rowbuffer
1450 lb = len(rb)
1451 close = False
1452 if size > lb:
1453 try:
1454 new = dbapi_cursor.fetchmany(size - lb)
1455 except BaseException as e:
1456 self.handle_exception(result, dbapi_cursor, e)
1457 else:
1458 if not new:
1459 # defer closing since it may clear the row buffer
1460 close = True
1461 else:
1462 rb.extend(new)
1463
1464 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1465 if close:
1466 result._soft_close()
1467 return res
1468
1469 def fetchall(
1470 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1471 ) -> Any:
1472 try:
1473 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1474 self._rowbuffer.clear()
1475 result._soft_close()
1476 return ret
1477 except BaseException as e:
1478 self.handle_exception(result, dbapi_cursor, e)
1479
1480
1481class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1482 """A cursor strategy that buffers rows fully upon creation.
1483
1484 Used for operations where a result is to be delivered
1485 after the database conversation can not be continued,
1486 such as MSSQL INSERT...OUTPUT after an autocommit.
1487
1488 """
1489
1490 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1491
1492 def __init__(
1493 self,
1494 dbapi_cursor: Optional[DBAPICursor],
1495 alternate_description: Optional[_DBAPICursorDescription] = None,
1496 initial_buffer: Optional[Iterable[Any]] = None,
1497 ):
1498 self.alternate_cursor_description = alternate_description
1499 if initial_buffer is not None:
1500 self._rowbuffer = collections.deque(initial_buffer)
1501 else:
1502 assert dbapi_cursor is not None
1503 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1504
1505 def yield_per(
1506 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1507 ) -> Any:
1508 pass
1509
1510 def soft_close(
1511 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1512 ) -> None:
1513 self._rowbuffer.clear()
1514 super().soft_close(result, dbapi_cursor)
1515
1516 def hard_close(
1517 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1518 ) -> None:
1519 self._rowbuffer.clear()
1520 super().hard_close(result, dbapi_cursor)
1521
1522 def fetchone(
1523 self,
1524 result: CursorResult[Any],
1525 dbapi_cursor: DBAPICursor,
1526 hard_close: bool = False,
1527 ) -> Any:
1528 if self._rowbuffer:
1529 return self._rowbuffer.popleft()
1530 else:
1531 result._soft_close(hard=hard_close)
1532 return None
1533
1534 def fetchmany(
1535 self,
1536 result: CursorResult[Any],
1537 dbapi_cursor: DBAPICursor,
1538 size: Optional[int] = None,
1539 ) -> Any:
1540 if size is None:
1541 return self.fetchall(result, dbapi_cursor)
1542
1543 rb = self._rowbuffer
1544 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1545 if not rows:
1546 result._soft_close()
1547 return rows
1548
1549 def fetchall(
1550 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1551 ) -> Any:
1552 ret = self._rowbuffer
1553 self._rowbuffer = collections.deque()
1554 result._soft_close()
1555 return ret
1556
1557
1558class _NoResultMetaData(ResultMetaData):
1559 __slots__ = ()
1560
1561 returns_rows = False
1562
1563 def _we_dont_return_rows(
1564 self, err: Optional[BaseException] = None
1565 ) -> NoReturn:
1566 raise exc.ResourceClosedError(
1567 "This result object does not return rows. "
1568 "It has been closed automatically."
1569 ) from err
1570
1571 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn:
1572 self._we_dont_return_rows()
1573
1574 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1575 self._we_dont_return_rows()
1576
1577 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1578 self._we_dont_return_rows()
1579
1580 @property
1581 def _keymap(self) -> NoReturn: # type: ignore[override]
1582 self._we_dont_return_rows()
1583
1584 @property
1585 def _key_to_index(self) -> NoReturn: # type: ignore[override]
1586 self._we_dont_return_rows()
1587
1588 @property
1589 def _processors(self) -> NoReturn: # type: ignore[override]
1590 self._we_dont_return_rows()
1591
1592 @property
1593 def keys(self) -> NoReturn:
1594 self._we_dont_return_rows()
1595
1596
1597_NO_RESULT_METADATA = _NoResultMetaData()
1598
1599
1600def null_dml_result() -> IteratorResult[Any]:
1601 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1602 it._soft_close()
1603 return it
1604
1605
1606class CursorResult(Result[Unpack[_Ts]]):
1607 """A Result that is representing state from a DBAPI cursor.
1608
1609 .. versionchanged:: 1.4 The :class:`.CursorResult``
1610 class replaces the previous :class:`.ResultProxy` interface.
1611 This classes are based on the :class:`.Result` calling API
1612 which provides an updated usage model and calling facade for
1613 SQLAlchemy Core and SQLAlchemy ORM.
1614
1615 Returns database rows via the :class:`.Row` class, which provides
1616 additional API features and behaviors on top of the raw data returned by
1617 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1618 method, other kinds of objects may also be returned.
1619
1620 .. seealso::
1621
1622 :ref:`tutorial_selecting_data` - introductory material for accessing
1623 :class:`_engine.CursorResult` and :class:`.Row` objects.
1624
1625 """
1626
1627 __slots__ = (
1628 "context",
1629 "dialect",
1630 "cursor",
1631 "cursor_strategy",
1632 "_echo",
1633 "connection",
1634 )
1635
1636 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1637 _no_result_metadata = _NO_RESULT_METADATA
1638 _soft_closed: bool = False
1639 closed: bool = False
1640 _is_cursor = True
1641
1642 context: DefaultExecutionContext
1643 dialect: Dialect
1644 cursor_strategy: ResultFetchStrategy
1645 connection: Connection
1646
1647 def __init__(
1648 self,
1649 context: DefaultExecutionContext,
1650 cursor_strategy: ResultFetchStrategy,
1651 cursor_description: Optional[_DBAPICursorDescription],
1652 ):
1653 self.context = context
1654 self.dialect = context.dialect
1655 self.cursor = context.cursor
1656 self.cursor_strategy = cursor_strategy
1657 self.connection = context.root_connection
1658 self._echo = echo = (
1659 self.connection._echo and context.engine._should_log_debug()
1660 )
1661
1662 if cursor_description is not None:
1663 self._init_metadata(context, cursor_description)
1664
1665 if echo:
1666 log = self.context.connection._log_debug
1667
1668 def _log_row(row: Any) -> Any:
1669 log("Row %r", sql_util._repr_row(row))
1670 return row
1671
1672 self._row_logging_fn = _log_row
1673
1674 # call Result._row_getter to set up the row factory
1675 self._row_getter
1676
1677 else:
1678 assert context._num_sentinel_cols == 0
1679 self._metadata = self._no_result_metadata
1680
1681 def _init_metadata(
1682 self,
1683 context: DefaultExecutionContext,
1684 cursor_description: _DBAPICursorDescription,
1685 ) -> CursorResultMetaData:
1686 driver_column_names = context.execution_options.get(
1687 "driver_column_names", False
1688 )
1689 if context.compiled:
1690 compiled = context.compiled
1691
1692 metadata: CursorResultMetaData
1693
1694 if driver_column_names:
1695 # TODO: test this case
1696 metadata = CursorResultMetaData(
1697 self,
1698 cursor_description,
1699 driver_column_names=True,
1700 num_sentinel_cols=context._num_sentinel_cols,
1701 )
1702 assert not metadata._safe_for_cache
1703 elif compiled._cached_metadata:
1704 metadata = compiled._cached_metadata
1705 else:
1706 metadata = CursorResultMetaData(
1707 self,
1708 cursor_description,
1709 # the number of sentinel columns is stored on the context
1710 # but it's a characteristic of the compiled object
1711 # so it's ok to apply it to a cacheable metadata.
1712 num_sentinel_cols=context._num_sentinel_cols,
1713 )
1714 if metadata._safe_for_cache:
1715 compiled._cached_metadata = metadata
1716
1717 # result rewrite/ adapt step. this is to suit the case
1718 # when we are invoked against a cached Compiled object, we want
1719 # to rewrite the ResultMetaData to reflect the Column objects
1720 # that are in our current SQL statement object, not the one
1721 # that is associated with the cached Compiled object.
1722 # the Compiled object may also tell us to not
1723 # actually do this step; this is to support the ORM where
1724 # it is to produce a new Result object in any case, and will
1725 # be using the cached Column objects against this database result
1726 # so we don't want to rewrite them.
1727 #
1728 # Basically this step suits the use case where the end user
1729 # is using Core SQL expressions and is accessing columns in the
1730 # result row using row._mapping[table.c.column].
1731 if (
1732 not context.execution_options.get(
1733 "_result_disable_adapt_to_context", False
1734 )
1735 and compiled._result_columns
1736 and context.cache_hit is context.dialect.CACHE_HIT
1737 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501
1738 ):
1739 metadata = metadata._adapt_to_context(context)
1740
1741 self._metadata = metadata
1742
1743 else:
1744 self._metadata = metadata = CursorResultMetaData(
1745 self,
1746 cursor_description,
1747 driver_column_names=driver_column_names,
1748 )
1749 if self._echo:
1750 context.connection._log_debug(
1751 "Col %r", tuple(x[0] for x in cursor_description)
1752 )
1753 return metadata
1754
1755 def _soft_close(self, hard: bool = False) -> None:
1756 """Soft close this :class:`_engine.CursorResult`.
1757
1758 This releases all DBAPI cursor resources, but leaves the
1759 CursorResult "open" from a semantic perspective, meaning the
1760 fetchXXX() methods will continue to return empty results.
1761
1762 This method is called automatically when:
1763
1764 * all result rows are exhausted using the fetchXXX() methods.
1765 * cursor.description is None.
1766
1767 This method is **not public**, but is documented in order to clarify
1768 the "autoclose" process used.
1769
1770 .. seealso::
1771
1772 :meth:`_engine.CursorResult.close`
1773
1774
1775 """
1776
1777 if (not hard and self._soft_closed) or (hard and self.closed):
1778 return
1779
1780 if hard:
1781 self.closed = True
1782 self.cursor_strategy.hard_close(self, self.cursor)
1783 else:
1784 self.cursor_strategy.soft_close(self, self.cursor)
1785
1786 if not self._soft_closed:
1787 cursor = self.cursor
1788 self.cursor = None # type: ignore
1789 self.connection._safe_close_cursor(cursor)
1790 self._soft_closed = True
1791
1792 @property
1793 def inserted_primary_key_rows(self) -> List[Optional[Any]]:
1794 """Return the value of
1795 :attr:`_engine.CursorResult.inserted_primary_key`
1796 as a row contained within a list; some dialects may support a
1797 multiple row form as well.
1798
1799 .. note:: As indicated below, in current SQLAlchemy versions this
1800 accessor is only useful beyond what's already supplied by
1801 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1802 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1803 generalize this feature to more dialects.
1804
1805 This accessor is added to support dialects that offer the feature
1806 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1807 feature, currently **only the psycopg2 dialect**, which provides
1808 for many rows to be INSERTed at once while still retaining the
1809 behavior of being able to return server-generated primary key values.
1810
1811 * **When using the psycopg2 dialect, or other dialects that may support
1812 "fast executemany" style inserts in upcoming releases** : When
1813 invoking an INSERT statement while passing a list of rows as the
1814 second argument to :meth:`_engine.Connection.execute`, this accessor
1815 will then provide a list of rows, where each row contains the primary
1816 key value for each row that was INSERTed.
1817
1818 * **When using all other dialects / backends that don't yet support
1819 this feature**: This accessor is only useful for **single row INSERT
1820 statements**, and returns the same information as that of the
1821 :attr:`_engine.CursorResult.inserted_primary_key` within a
1822 single-element list. When an INSERT statement is executed in
1823 conjunction with a list of rows to be INSERTed, the list will contain
1824 one row per row inserted in the statement, however it will contain
1825 ``None`` for any server-generated values.
1826
1827 Future releases of SQLAlchemy will further generalize the
1828 "fast execution helper" feature of psycopg2 to suit other dialects,
1829 thus allowing this accessor to be of more general use.
1830
1831 .. versionadded:: 1.4
1832
1833 .. seealso::
1834
1835 :attr:`_engine.CursorResult.inserted_primary_key`
1836
1837 """
1838 if not self.context.compiled:
1839 raise exc.InvalidRequestError(
1840 "Statement is not a compiled expression construct."
1841 )
1842 elif not self.context.isinsert:
1843 raise exc.InvalidRequestError(
1844 "Statement is not an insert() expression construct."
1845 )
1846 elif self.context._is_explicit_returning:
1847 raise exc.InvalidRequestError(
1848 "Can't call inserted_primary_key "
1849 "when returning() "
1850 "is used."
1851 )
1852 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501
1853
1854 @property
1855 def inserted_primary_key(self) -> Optional[Any]:
1856 """Return the primary key for the row just inserted.
1857
1858 The return value is a :class:`_result.Row` object representing
1859 a named tuple of primary key values in the order in which the
1860 primary key columns are configured in the source
1861 :class:`_schema.Table`.
1862
1863 .. versionchanged:: 1.4.8 - the
1864 :attr:`_engine.CursorResult.inserted_primary_key`
1865 value is now a named tuple via the :class:`_result.Row` class,
1866 rather than a plain tuple.
1867
1868 This accessor only applies to single row :func:`_expression.insert`
1869 constructs which did not explicitly specify
1870 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1871 while not yet available for most backends, would be accessed using
1872 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1873
1874 Note that primary key columns which specify a server_default clause, or
1875 otherwise do not qualify as "autoincrement" columns (see the notes at
1876 :class:`_schema.Column`), and were generated using the database-side
1877 default, will appear in this list as ``None`` unless the backend
1878 supports "returning" and the insert statement executed with the
1879 "implicit returning" enabled.
1880
1881 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1882 statement is not a compiled expression construct
1883 or is not an insert() construct.
1884
1885 """
1886
1887 if self.context.executemany:
1888 raise exc.InvalidRequestError(
1889 "This statement was an executemany call; if primary key "
1890 "returning is supported, please "
1891 "use .inserted_primary_key_rows."
1892 )
1893
1894 ikp = self.inserted_primary_key_rows
1895 if ikp:
1896 return ikp[0]
1897 else:
1898 return None
1899
1900 def last_updated_params(
1901 self,
1902 ) -> Union[
1903 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1904 ]:
1905 """Return the collection of updated parameters from this
1906 execution.
1907
1908 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1909 statement is not a compiled expression construct
1910 or is not an update() construct.
1911
1912 """
1913 if not self.context.compiled:
1914 raise exc.InvalidRequestError(
1915 "Statement is not a compiled expression construct."
1916 )
1917 elif not self.context.isupdate:
1918 raise exc.InvalidRequestError(
1919 "Statement is not an update() expression construct."
1920 )
1921 elif self.context.executemany:
1922 return self.context.compiled_parameters
1923 else:
1924 return self.context.compiled_parameters[0]
1925
1926 def last_inserted_params(
1927 self,
1928 ) -> Union[
1929 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1930 ]:
1931 """Return the collection of inserted parameters from this
1932 execution.
1933
1934 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1935 statement is not a compiled expression construct
1936 or is not an insert() construct.
1937
1938 """
1939 if not self.context.compiled:
1940 raise exc.InvalidRequestError(
1941 "Statement is not a compiled expression construct."
1942 )
1943 elif not self.context.isinsert:
1944 raise exc.InvalidRequestError(
1945 "Statement is not an insert() expression construct."
1946 )
1947 elif self.context.executemany:
1948 return self.context.compiled_parameters
1949 else:
1950 return self.context.compiled_parameters[0]
1951
1952 @property
1953 def returned_defaults_rows(
1954 self,
1955 ) -> Optional[Sequence[Row[Unpack[TupleAny]]]]:
1956 """Return a list of rows each containing the values of default
1957 columns that were fetched using
1958 the :meth:`.ValuesBase.return_defaults` feature.
1959
1960 The return value is a list of :class:`.Row` objects.
1961
1962 .. versionadded:: 1.4
1963
1964 """
1965 return self.context.returned_default_rows
1966
1967 def splice_horizontally(self, other: CursorResult[Any]) -> Self:
1968 """Return a new :class:`.CursorResult` that "horizontally splices"
1969 together the rows of this :class:`.CursorResult` with that of another
1970 :class:`.CursorResult`.
1971
1972 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1973 not intended for general use.
1974
1975 "horizontally splices" means that for each row in the first and second
1976 result sets, a new row that concatenates the two rows together is
1977 produced, which then becomes the new row. The incoming
1978 :class:`.CursorResult` must have the identical number of rows. It is
1979 typically expected that the two result sets come from the same sort
1980 order as well, as the result rows are spliced together based on their
1981 position in the result.
1982
1983 The expected use case here is so that multiple INSERT..RETURNING
1984 statements (which definitely need to be sorted) against different
1985 tables can produce a single result that looks like a JOIN of those two
1986 tables.
1987
1988 E.g.::
1989
1990 r1 = connection.execute(
1991 users.insert().returning(
1992 users.c.user_name, users.c.user_id, sort_by_parameter_order=True
1993 ),
1994 user_values,
1995 )
1996
1997 r2 = connection.execute(
1998 addresses.insert().returning(
1999 addresses.c.address_id,
2000 addresses.c.address,
2001 addresses.c.user_id,
2002 sort_by_parameter_order=True,
2003 ),
2004 address_values,
2005 )
2006
2007 rows = r1.splice_horizontally(r2).all()
2008 assert rows == [
2009 ("john", 1, 1, "foo@bar.com", 1),
2010 ("jack", 2, 2, "bar@bat.com", 2),
2011 ]
2012
2013 .. versionadded:: 2.0
2014
2015 .. seealso::
2016
2017 :meth:`.CursorResult.splice_vertically`
2018
2019
2020 """ # noqa: E501
2021
2022 clone = self._generate()
2023 assert clone is self # just to note
2024 assert isinstance(other._metadata, CursorResultMetaData)
2025 assert isinstance(self._metadata, CursorResultMetaData)
2026 self_tf = self._metadata._tuplefilter
2027 other_tf = other._metadata._tuplefilter
2028 clone._metadata = self._metadata._splice_horizontally(other._metadata)
2029
2030 total_rows = [
2031 tuple(r1 if self_tf is None else self_tf(r1))
2032 + tuple(r2 if other_tf is None else other_tf(r2))
2033 for r1, r2 in zip(
2034 list(self._raw_row_iterator()),
2035 list(other._raw_row_iterator()),
2036 )
2037 ]
2038
2039 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
2040 None,
2041 initial_buffer=total_rows,
2042 )
2043 clone._reset_memoizations()
2044 return clone
2045
2046 def splice_vertically(self, other: CursorResult[Any]) -> Self:
2047 """Return a new :class:`.CursorResult` that "vertically splices",
2048 i.e. "extends", the rows of this :class:`.CursorResult` with that of
2049 another :class:`.CursorResult`.
2050
2051 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
2052 not intended for general use.
2053
2054 "vertically splices" means the rows of the given result are appended to
2055 the rows of this cursor result. The incoming :class:`.CursorResult`
2056 must have rows that represent the identical list of columns in the
2057 identical order as they are in this :class:`.CursorResult`.
2058
2059 .. versionadded:: 2.0
2060
2061 .. seealso::
2062
2063 :meth:`.CursorResult.splice_horizontally`
2064
2065 """
2066 clone = self._generate()
2067 total_rows = list(self._raw_row_iterator()) + list(
2068 other._raw_row_iterator()
2069 )
2070
2071 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
2072 None,
2073 initial_buffer=total_rows,
2074 )
2075 clone._reset_memoizations()
2076 return clone
2077
2078 def _rewind(self, rows: Any) -> Self:
2079 """rewind this result back to the given rowset.
2080
2081 this is used internally for the case where an :class:`.Insert`
2082 construct combines the use of
2083 :meth:`.Insert.return_defaults` along with the
2084 "supplemental columns" feature.
2085
2086 NOTE: this method has not effect then an unique filter is applied
2087 to the result, meaning that no row will be returned.
2088
2089 """
2090
2091 if self._echo:
2092 self.context.connection._log_debug(
2093 "CursorResult rewound %d row(s)", len(rows)
2094 )
2095
2096 # the rows given are expected to be Row objects, so we
2097 # have to clear out processors which have already run on these
2098 # rows
2099 self._metadata = cast(
2100 CursorResultMetaData, self._metadata
2101 )._remove_processors_and_tuple_filter()
2102
2103 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
2104 None,
2105 # TODO: if these are Row objects, can we save on not having to
2106 # re-make new Row objects out of them a second time? is that
2107 # what's actually happening right now? maybe look into this
2108 initial_buffer=rows,
2109 )
2110 self._reset_memoizations()
2111 return self
2112
2113 @property
2114 def returned_defaults(self) -> Optional[Row[Unpack[TupleAny]]]:
2115 """Return the values of default columns that were fetched using
2116 the :meth:`.ValuesBase.return_defaults` feature.
2117
2118 The value is an instance of :class:`.Row`, or ``None``
2119 if :meth:`.ValuesBase.return_defaults` was not used or if the
2120 backend does not support RETURNING.
2121
2122 .. seealso::
2123
2124 :meth:`.ValuesBase.return_defaults`
2125
2126 """
2127
2128 if self.context.executemany:
2129 raise exc.InvalidRequestError(
2130 "This statement was an executemany call; if return defaults "
2131 "is supported, please use .returned_defaults_rows."
2132 )
2133
2134 rows = self.context.returned_default_rows
2135 if rows:
2136 return rows[0]
2137 else:
2138 return None
2139
2140 def lastrow_has_defaults(self) -> bool:
2141 """Return ``lastrow_has_defaults()`` from the underlying
2142 :class:`.ExecutionContext`.
2143
2144 See :class:`.ExecutionContext` for details.
2145
2146 """
2147
2148 return self.context.lastrow_has_defaults()
2149
2150 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2151 """Return ``postfetch_cols()`` from the underlying
2152 :class:`.ExecutionContext`.
2153
2154 See :class:`.ExecutionContext` for details.
2155
2156 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2157 statement is not a compiled expression construct
2158 or is not an insert() or update() construct.
2159
2160 """
2161
2162 if not self.context.compiled:
2163 raise exc.InvalidRequestError(
2164 "Statement is not a compiled expression construct."
2165 )
2166 elif not self.context.isinsert and not self.context.isupdate:
2167 raise exc.InvalidRequestError(
2168 "Statement is not an insert() or update() "
2169 "expression construct."
2170 )
2171 return self.context.postfetch_cols
2172
2173 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2174 """Return ``prefetch_cols()`` from the underlying
2175 :class:`.ExecutionContext`.
2176
2177 See :class:`.ExecutionContext` for details.
2178
2179 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2180 statement is not a compiled expression construct
2181 or is not an insert() or update() construct.
2182
2183 """
2184
2185 if not self.context.compiled:
2186 raise exc.InvalidRequestError(
2187 "Statement is not a compiled expression construct."
2188 )
2189 elif not self.context.isinsert and not self.context.isupdate:
2190 raise exc.InvalidRequestError(
2191 "Statement is not an insert() or update() "
2192 "expression construct."
2193 )
2194 return self.context.prefetch_cols
2195
2196 def supports_sane_rowcount(self) -> bool:
2197 """Return ``supports_sane_rowcount`` from the dialect.
2198
2199 See :attr:`_engine.CursorResult.rowcount` for background.
2200
2201 """
2202
2203 return self.dialect.supports_sane_rowcount
2204
2205 def supports_sane_multi_rowcount(self) -> bool:
2206 """Return ``supports_sane_multi_rowcount`` from the dialect.
2207
2208 See :attr:`_engine.CursorResult.rowcount` for background.
2209
2210 """
2211
2212 return self.dialect.supports_sane_multi_rowcount
2213
2214 @util.memoized_property
2215 def rowcount(self) -> int:
2216 """Return the 'rowcount' for this result.
2217
2218 The primary purpose of 'rowcount' is to report the number of rows
2219 matched by the WHERE criterion of an UPDATE or DELETE statement
2220 executed once (i.e. for a single parameter set), which may then be
2221 compared to the number of rows expected to be updated or deleted as a
2222 means of asserting data integrity.
2223
2224 This attribute is transferred from the ``cursor.rowcount`` attribute
2225 of the DBAPI before the cursor is closed, to support DBAPIs that
2226 don't make this value available after cursor close. Some DBAPIs may
2227 offer meaningful values for other kinds of statements, such as INSERT
2228 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
2229 for these statements, set the
2230 :paramref:`.Connection.execution_options.preserve_rowcount`
2231 execution option to True, which will cause the ``cursor.rowcount``
2232 value to be unconditionally memoized before any results are returned
2233 or the cursor is closed, regardless of statement type.
2234
2235 For cases where the DBAPI does not support rowcount for a particular
2236 kind of statement and/or execution, the returned value will be ``-1``,
2237 which is delivered directly from the DBAPI and is part of :pep:`249`.
2238 All DBAPIs should support rowcount for single-parameter-set
2239 UPDATE and DELETE statements, however.
2240
2241 .. note::
2242
2243 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2244
2245
2246 * This attribute returns the number of rows *matched*,
2247 which is not necessarily the same as the number of rows
2248 that were actually *modified*. For example, an UPDATE statement
2249 may have no net change on a given row if the SET values
2250 given are the same as those present in the row already.
2251 Such a row would be matched but not modified.
2252 On backends that feature both styles, such as MySQL,
2253 rowcount is configured to return the match
2254 count in all cases.
2255
2256 * :attr:`_engine.CursorResult.rowcount` in the default case is
2257 *only* useful in conjunction with an UPDATE or DELETE statement,
2258 and only with a single set of parameters. For other kinds of
2259 statements, SQLAlchemy will not attempt to pre-memoize the value
2260 unless the
2261 :paramref:`.Connection.execution_options.preserve_rowcount`
2262 execution option is used. Note that contrary to :pep:`249`, many
2263 DBAPIs do not support rowcount values for statements that are not
2264 UPDATE or DELETE, particularly when rows are being returned which
2265 are not fully pre-buffered. DBAPIs that dont support rowcount
2266 for a particular kind of statement should return the value ``-1``
2267 for such statements.
2268
2269 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2270 when executing a single statement with multiple parameter sets
2271 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2272 values across multiple parameter sets and will return ``-1``
2273 when accessed.
2274
2275 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2276 a correct population of :attr:`_engine.CursorResult.rowcount`
2277 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2278 execution option is set to True.
2279
2280 * Statements that use RETURNING may not support rowcount, returning
2281 a ``-1`` value instead.
2282
2283 .. seealso::
2284
2285 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2286
2287 :paramref:`.Connection.execution_options.preserve_rowcount`
2288
2289 """ # noqa: E501
2290 try:
2291 return self.context.rowcount
2292 except BaseException as e:
2293 self.cursor_strategy.handle_exception(self, self.cursor, e)
2294 raise # not called
2295
2296 @property
2297 def lastrowid(self) -> int:
2298 """Return the 'lastrowid' accessor on the DBAPI cursor.
2299
2300 This is a DBAPI specific method and is only functional
2301 for those backends which support it, for statements
2302 where it is appropriate. It's behavior is not
2303 consistent across backends.
2304
2305 Usage of this method is normally unnecessary when
2306 using insert() expression constructs; the
2307 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2308 tuple of primary key values for a newly inserted row,
2309 regardless of database backend.
2310
2311 """
2312 try:
2313 return self.context.get_lastrowid()
2314 except BaseException as e:
2315 self.cursor_strategy.handle_exception(self, self.cursor, e)
2316
2317 @property
2318 def returns_rows(self) -> bool:
2319 """True if this :class:`_engine.CursorResult` returns zero or more
2320 rows.
2321
2322 I.e. if it is legal to call the methods
2323 :meth:`_engine.CursorResult.fetchone`,
2324 :meth:`_engine.CursorResult.fetchmany`
2325 :meth:`_engine.CursorResult.fetchall`.
2326
2327 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2328 always be synonymous with whether or not the DBAPI cursor had a
2329 ``.description`` attribute, indicating the presence of result columns,
2330 noting that a cursor that returns zero rows still has a
2331 ``.description`` if a row-returning statement was emitted.
2332
2333 This attribute should be True for all results that are against
2334 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2335 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2336 not using RETURNING, the value will usually be False, however
2337 there are some dialect-specific exceptions to this, such as when
2338 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2339 order to retrieve an inserted primary key value.
2340
2341 .. seealso::
2342
2343 :meth:`.Result.close`
2344
2345 :attr:`.Result.closed`
2346
2347 """
2348 return self._metadata.returns_rows
2349
2350 @property
2351 def is_insert(self) -> bool:
2352 """True if this :class:`_engine.CursorResult` is the result
2353 of a executing an expression language compiled
2354 :func:`_expression.insert` construct.
2355
2356 When True, this implies that the
2357 :attr:`inserted_primary_key` attribute is accessible,
2358 assuming the statement did not include
2359 a user defined "returning" construct.
2360
2361 """
2362 return self.context.isinsert
2363
2364 def _fetchiter_impl(self) -> Iterator[Any]:
2365 fetchone = self.cursor_strategy.fetchone
2366
2367 while True:
2368 row = fetchone(self, self.cursor)
2369 if row is None:
2370 break
2371 yield row
2372
2373 def _fetchone_impl(self, hard_close: bool = False) -> Any:
2374 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2375
2376 def _fetchall_impl(self) -> Any:
2377 return self.cursor_strategy.fetchall(self, self.cursor)
2378
2379 def _fetchmany_impl(self, size: Optional[int] = None) -> Any:
2380 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2381
2382 def _raw_row_iterator(self) -> Any:
2383 return self._fetchiter_impl()
2384
2385 def merge(
2386 self, *others: Result[Unpack[TupleAny]]
2387 ) -> MergedResult[Unpack[TupleAny]]:
2388 merged_result = super().merge(*others)
2389 if self.context._has_rowcount:
2390 merged_result.rowcount = sum(
2391 cast("CursorResult[Any]", result).rowcount
2392 for result in (self,) + others
2393 )
2394 return merged_result
2395
2396 def close(self) -> None:
2397 """Close this :class:`_engine.CursorResult`.
2398
2399 This closes out the underlying DBAPI cursor corresponding to the
2400 statement execution, if one is still present. Note that the DBAPI
2401 cursor is automatically released when the :class:`_engine.CursorResult`
2402 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2403 generally an optional method except in the case when discarding a
2404 :class:`_engine.CursorResult` that still has additional rows pending
2405 for fetch.
2406
2407 After this method is called, it is no longer valid to call upon
2408 the fetch methods, which will raise a :class:`.ResourceClosedError`
2409 on subsequent use.
2410
2411 .. seealso::
2412
2413 :ref:`connections_toplevel`
2414
2415 """
2416 self._soft_close(hard=True)
2417
2418 @_generative
2419 def yield_per(self, num: int) -> Self:
2420 self._yield_per = num
2421 self.cursor_strategy.yield_per(self, self.cursor, num)
2422 return self
2423
2424
2425ResultProxy = CursorResult