1# engine/cursor.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define cursor-specific result set constructs including
9:class:`.CursorResult`."""
10
11
12from __future__ import annotations
13
14import collections
15import operator
16import typing
17from typing import Any
18from typing import cast
19from typing import ClassVar
20from typing import Deque
21from typing import Dict
22from typing import Final
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Literal
27from typing import Mapping
28from typing import NoReturn
29from typing import Optional
30from typing import Sequence
31from typing import Tuple
32from typing import TYPE_CHECKING
33from typing import Union
34
35from .result import IteratorResult
36from .result import MergedResult
37from .result import Result
38from .result import ResultMetaData
39from .result import SimpleResultMetaData
40from .result import tuplegetter
41from .row import Row
42from .. import exc
43from .. import util
44from ..sql import elements
45from ..sql import sqltypes
46from ..sql import util as sql_util
47from ..sql.base import _generative
48from ..sql.compiler import ResultColumnsEntry
49from ..sql.compiler import RM_NAME
50from ..sql.compiler import RM_OBJECTS
51from ..sql.compiler import RM_RENDERED_NAME
52from ..sql.compiler import RM_TYPE
53from ..sql.type_api import TypeEngine
54from ..util.typing import Self
55from ..util.typing import TupleAny
56from ..util.typing import TypeVarTuple
57from ..util.typing import Unpack
58
59
60if typing.TYPE_CHECKING:
61 from .base import Connection
62 from .default import DefaultExecutionContext
63 from .interfaces import _DBAPICursorDescription
64 from .interfaces import _MutableCoreSingleExecuteParams
65 from .interfaces import CoreExecuteOptionsParameter
66 from .interfaces import DBAPICursor
67 from .interfaces import DBAPIType
68 from .interfaces import Dialect
69 from .interfaces import ExecutionContext
70 from .result import _KeyIndexType
71 from .result import _KeyMapRecType
72 from .result import _KeyMapType
73 from .result import _KeyType
74 from .result import _ProcessorsType
75 from .result import _TupleGetterType
76 from ..sql.schema import Column
77 from ..sql.type_api import _ResultProcessorType
78
79
80_Ts = TypeVarTuple("_Ts")
81
82
83# metadata entry tuple indexes.
84# using raw tuple is faster than namedtuple.
85# these match up to the positions in
86# _CursorKeyMapRecType
87MD_INDEX: Final[Literal[0]] = 0
88"""integer index in cursor.description
89
90"""
91
92MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1
93"""integer index in compiled._result_columns"""
94
95MD_OBJECTS: Final[Literal[2]] = 2
96"""other string keys and ColumnElement obj that can match.
97
98This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
99
100"""
101
102MD_LOOKUP_KEY: Final[Literal[3]] = 3
103"""string key we usually expect for key-based lookup
104
105this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
106"""
107
108
109MD_RENDERED_NAME: Final[Literal[4]] = 4
110"""name that is usually in cursor.description
111
112this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
113"""
114
115
116MD_PROCESSOR: Final[Literal[5]] = 5
117"""callable to process a result value into a row"""
118
119MD_UNTRANSLATED: Final[Literal[6]] = 6
120"""raw name from cursor.description"""
121
122
123_CursorKeyMapRecType = Tuple[
124 Optional[int], # MD_INDEX, None means the record is ambiguously named
125 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None
126 TupleAny, # MD_OBJECTS
127 str, # MD_LOOKUP_KEY
128 str, # MD_RENDERED_NAME
129 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
130 Optional[str], # MD_UNTRANSLATED
131]
132
133_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
134
135# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
136# not None
137_NonAmbigCursorKeyMapRecType = Tuple[
138 int,
139 int,
140 List[Any],
141 str,
142 str,
143 Optional["_ResultProcessorType[Any]"],
144 str,
145]
146
147_MergeColTuple = Tuple[
148 int,
149 Optional[int],
150 str,
151 TypeEngine[Any],
152 "DBAPIType",
153 Optional[TupleAny],
154 Optional[str],
155]
156
157
158class CursorResultMetaData(ResultMetaData):
159 """Result metadata for DBAPI cursors."""
160
161 __slots__ = (
162 "_keymap",
163 "_processors",
164 "_keys",
165 "_keymap_by_result_column_idx",
166 "_tuplefilter",
167 "_translated_indexes",
168 "_safe_for_cache",
169 "_unpickled",
170 "_key_to_index",
171 # don't need _unique_filters support here for now. Can be added
172 # if a need arises.
173 )
174
175 _keymap: _CursorKeyMapType
176 _processors: _ProcessorsType
177 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
178 _unpickled: bool
179 _safe_for_cache: bool
180 _translated_indexes: Optional[List[int]]
181
182 returns_rows: ClassVar[bool] = True
183
184 def _has_key(self, key: Any) -> bool:
185 return key in self._keymap
186
187 def _for_freeze(self) -> ResultMetaData:
188 return SimpleResultMetaData(
189 self._keys,
190 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
191 )
192
193 def _make_new_metadata(
194 self,
195 *,
196 unpickled: bool,
197 processors: _ProcessorsType,
198 keys: Sequence[str],
199 keymap: _KeyMapType,
200 tuplefilter: Optional[_TupleGetterType],
201 translated_indexes: Optional[List[int]],
202 safe_for_cache: bool,
203 keymap_by_result_column_idx: Any,
204 ) -> Self:
205 new_obj = self.__class__.__new__(self.__class__)
206 new_obj._unpickled = unpickled
207 new_obj._processors = processors
208 new_obj._keys = keys
209 new_obj._keymap = keymap
210 new_obj._tuplefilter = tuplefilter
211 new_obj._translated_indexes = translated_indexes
212 new_obj._safe_for_cache = safe_for_cache
213 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
214 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
215 return new_obj
216
217 def _remove_processors_and_tuple_filter(self) -> Self:
218 if self._tuplefilter:
219 proc = self._tuplefilter(self._processors)
220 else:
221 proc = self._processors
222 return self._make_new_metadata(
223 unpickled=self._unpickled,
224 processors=[None] * len(proc),
225 tuplefilter=None,
226 translated_indexes=None,
227 keymap={
228 key: value[0:5] + (None,) + value[6:]
229 for key, value in self._keymap.items()
230 },
231 keys=self._keys,
232 safe_for_cache=self._safe_for_cache,
233 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
234 )
235
236 def _splice_horizontally(self, other: CursorResultMetaData) -> Self:
237 keymap = dict(self._keymap)
238 offset = len(self._keys)
239
240 for key, value in other._keymap.items():
241 # int index should be None for ambiguous key
242 if value[MD_INDEX] is not None and key not in keymap:
243 md_index = value[MD_INDEX] + offset
244 md_object = value[MD_RESULT_MAP_INDEX] + offset
245 else:
246 md_index = None
247 md_object = -1
248 keymap[key] = (md_index, md_object, *value[2:])
249
250 self_tf = self._tuplefilter
251 other_tf = other._tuplefilter
252
253 proc: List[Any] = []
254 for pp, tf in [
255 (self._processors, self_tf),
256 (other._processors, other_tf),
257 ]:
258 proc.extend(pp if tf is None else tf(pp))
259
260 new_keys = [*self._keys, *other._keys]
261 assert len(proc) == len(new_keys)
262
263 return self._make_new_metadata(
264 unpickled=self._unpickled,
265 processors=proc,
266 tuplefilter=None,
267 translated_indexes=None,
268 keys=new_keys,
269 keymap=keymap,
270 safe_for_cache=self._safe_for_cache,
271 keymap_by_result_column_idx={
272 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
273 for metadata_entry in keymap.values()
274 },
275 )
276
277 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self:
278 recs = list(self._metadata_for_keys(keys))
279
280 indexes = [rec[MD_INDEX] for rec in recs]
281 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
282
283 if self._translated_indexes:
284 indexes = [self._translated_indexes[idx] for idx in indexes]
285 tup = tuplegetter(*indexes)
286 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
287
288 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
289 # TODO: need unit test for:
290 # result = connection.execute("raw sql, no columns").scalars()
291 # without the "or ()" it's failing because MD_OBJECTS is None
292 keymap.update(
293 (e, new_rec)
294 for new_rec in new_recs
295 for e in new_rec[MD_OBJECTS] or ()
296 )
297
298 return self._make_new_metadata(
299 unpickled=self._unpickled,
300 processors=self._processors,
301 keys=new_keys,
302 tuplefilter=tup,
303 translated_indexes=indexes,
304 keymap=keymap, # type: ignore[arg-type]
305 safe_for_cache=self._safe_for_cache,
306 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
307 )
308
309 def _adapt_to_context(self, context: ExecutionContext) -> Self:
310 """When using a cached Compiled construct that has a _result_map,
311 for a new statement that used the cached Compiled, we need to ensure
312 the keymap has the Column objects from our new statement as keys.
313 So here we rewrite keymap with new entries for the new columns
314 as matched to those of the cached statement.
315
316 """
317
318 if not context.compiled or not context.compiled._result_columns:
319 return self
320
321 compiled_statement = context.compiled.statement
322 invoked_statement = context.invoked_statement
323
324 if TYPE_CHECKING:
325 assert isinstance(invoked_statement, elements.ClauseElement)
326
327 if compiled_statement is invoked_statement:
328 return self
329
330 assert invoked_statement is not None
331
332 # this is the most common path for Core statements when
333 # caching is used. In ORM use, this codepath is not really used
334 # as the _result_disable_adapt_to_context execution option is
335 # set by the ORM.
336
337 # make a copy and add the columns from the invoked statement
338 # to the result map.
339
340 keymap_by_position = self._keymap_by_result_column_idx
341
342 if keymap_by_position is None:
343 # first retrieval from cache, this map will not be set up yet,
344 # initialize lazily
345 keymap_by_position = self._keymap_by_result_column_idx = {
346 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
347 for metadata_entry in self._keymap.values()
348 }
349
350 return self._make_new_metadata(
351 keymap=self._keymap
352 | {
353 new: keymap_by_position[idx]
354 for idx, new in enumerate(
355 invoked_statement._all_selected_columns
356 )
357 if idx in keymap_by_position
358 },
359 unpickled=self._unpickled,
360 processors=self._processors,
361 tuplefilter=self._tuplefilter,
362 translated_indexes=None,
363 keys=self._keys,
364 safe_for_cache=self._safe_for_cache,
365 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
366 )
367
368 def __init__(
369 self,
370 parent: CursorResult[Unpack[TupleAny]],
371 cursor_description: _DBAPICursorDescription,
372 *,
373 driver_column_names: bool = False,
374 num_sentinel_cols: int = 0,
375 ):
376 context = parent.context
377 if num_sentinel_cols > 0:
378 # this is slightly faster than letting tuplegetter use the indexes
379 self._tuplefilter = tuplefilter = operator.itemgetter(
380 slice(-num_sentinel_cols)
381 )
382 cursor_description = tuplefilter(cursor_description)
383 else:
384 self._tuplefilter = tuplefilter = None
385 self._translated_indexes = None
386 self._safe_for_cache = self._unpickled = False
387
388 if context.result_column_struct:
389 (
390 result_columns,
391 cols_are_ordered,
392 textual_ordered,
393 ad_hoc_textual,
394 loose_column_name_matching,
395 ) = context.result_column_struct
396 if tuplefilter is not None:
397 result_columns = tuplefilter(result_columns)
398 num_ctx_cols = len(result_columns)
399 else:
400 result_columns = cols_are_ordered = ( # type: ignore
401 num_ctx_cols
402 ) = ad_hoc_textual = loose_column_name_matching = (
403 textual_ordered
404 ) = False
405
406 # merge cursor.description with the column info
407 # present in the compiled structure, if any
408 raw = self._merge_cursor_description(
409 context,
410 cursor_description,
411 result_columns,
412 num_ctx_cols,
413 cols_are_ordered,
414 textual_ordered,
415 ad_hoc_textual,
416 loose_column_name_matching,
417 driver_column_names,
418 )
419
420 # processors in key order which are used when building up
421 # a row
422 self._processors = [
423 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
424 ]
425 if num_sentinel_cols > 0:
426 # add the number of sentinel columns since these are passed
427 # to the tuplefilters before being used
428 self._processors.extend([None] * num_sentinel_cols)
429
430 # this is used when using this ResultMetaData in a Core-only cache
431 # retrieval context. it's initialized on first cache retrieval
432 # when the _result_disable_adapt_to_context execution option
433 # (which the ORM generally sets) is not set.
434 self._keymap_by_result_column_idx = None
435
436 # for compiled SQL constructs, copy additional lookup keys into
437 # the key lookup map, such as Column objects, labels,
438 # column keys and other names
439 if num_ctx_cols:
440 # keymap by primary string...
441 by_key: Dict[_KeyType, _CursorKeyMapRecType] = {
442 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
443 for metadata_entry in raw
444 }
445
446 if len(by_key) != num_ctx_cols:
447 # if by-primary-string dictionary smaller than
448 # number of columns, assume we have dupes; (this check
449 # is also in place if string dictionary is bigger, as
450 # can occur when '*' was used as one of the compiled columns,
451 # which may or may not be suggestive of dupes), rewrite
452 # dupe records with "None" for index which results in
453 # ambiguous column exception when accessed.
454 #
455 # this is considered to be the less common case as it is not
456 # common to have dupe column keys in a SELECT statement.
457 #
458 # new in 1.4: get the complete set of all possible keys,
459 # strings, objects, whatever, that are dupes across two
460 # different records, first.
461 index_by_key: Dict[Any, Any] = {}
462 dupes = set()
463 for metadata_entry in raw:
464 for key in (metadata_entry[MD_RENDERED_NAME],) + (
465 metadata_entry[MD_OBJECTS] or ()
466 ):
467 idx = metadata_entry[MD_INDEX]
468 # if this key has been associated with more than one
469 # positional index, it's a dupe
470 if index_by_key.setdefault(key, idx) != idx:
471 dupes.add(key)
472
473 # then put everything we have into the keymap excluding only
474 # those keys that are dupes.
475 self._keymap = {
476 obj_elem: metadata_entry
477 for metadata_entry in raw
478 if metadata_entry[MD_OBJECTS]
479 for obj_elem in metadata_entry[MD_OBJECTS]
480 if obj_elem not in dupes
481 }
482
483 # then for the dupe keys, put the "ambiguous column"
484 # record into by_key.
485 by_key.update(
486 {
487 key: (None, -1, (), key, key, None, None)
488 for key in dupes
489 }
490 )
491
492 else:
493 # no dupes - copy secondary elements from compiled
494 # columns into self._keymap. this is the most common
495 # codepath for Core / ORM statement executions before the
496 # result metadata is cached
497 self._keymap = {
498 obj_elem: metadata_entry
499 for metadata_entry in raw
500 if metadata_entry[MD_OBJECTS]
501 for obj_elem in metadata_entry[MD_OBJECTS]
502 }
503 # update keymap with primary string names taking
504 # precedence
505 self._keymap.update(by_key)
506 else:
507 # no compiled objects to map, just create keymap by primary string
508 self._keymap = {
509 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
510 for metadata_entry in raw
511 }
512
513 # update keymap with "translated" names.
514 # the "translated" name thing has a long history:
515 # 1. originally, it was used to fix an issue in very old SQLite
516 # versions prior to 3.10.0. This code is still there in the
517 # sqlite dialect.
518 # 2. Next, the pyhive third party dialect started using this hook
519 # for some driver related issue on their end.
520 # 3. Most recently, the "driver_column_names" execution option has
521 # taken advantage of this hook to get raw DBAPI col names in the
522 # result keys without disrupting the usual merge process.
523
524 if driver_column_names or (
525 not num_ctx_cols and context._translate_colname
526 ):
527 self._keymap.update(
528 {
529 metadata_entry[MD_UNTRANSLATED]: self._keymap[
530 metadata_entry[MD_LOOKUP_KEY]
531 ]
532 for metadata_entry in raw
533 if metadata_entry[MD_UNTRANSLATED]
534 }
535 )
536
537 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
538
539 def _merge_cursor_description(
540 self,
541 context: DefaultExecutionContext,
542 cursor_description: _DBAPICursorDescription,
543 result_columns: Sequence[ResultColumnsEntry],
544 num_ctx_cols: int,
545 cols_are_ordered: bool,
546 textual_ordered: bool,
547 ad_hoc_textual: bool,
548 loose_column_name_matching: bool,
549 driver_column_names: bool,
550 ) -> List[_CursorKeyMapRecType]:
551 """Merge a cursor.description with compiled result column information.
552
553 There are at least four separate strategies used here, selected
554 depending on the type of SQL construct used to start with.
555
556 The most common case is that of the compiled SQL expression construct,
557 which generated the column names present in the raw SQL string and
558 which has the identical number of columns as were reported by
559 cursor.description. In this case, we assume a 1-1 positional mapping
560 between the entries in cursor.description and the compiled object.
561 This is also the most performant case as we disregard extracting /
562 decoding the column names present in cursor.description since we
563 already have the desired name we generated in the compiled SQL
564 construct.
565
566 The next common case is that of the completely raw string SQL,
567 such as passed to connection.execute(). In this case we have no
568 compiled construct to work with, so we extract and decode the
569 names from cursor.description and index those as the primary
570 result row target keys.
571
572 The remaining fairly common case is that of the textual SQL
573 that includes at least partial column information; this is when
574 we use a :class:`_expression.TextualSelect` construct.
575 This construct may have
576 unordered or ordered column information. In the ordered case, we
577 merge the cursor.description and the compiled construct's information
578 positionally, and warn if there are additional description names
579 present, however we still decode the names in cursor.description
580 as we don't have a guarantee that the names in the columns match
581 on these. In the unordered case, we match names in cursor.description
582 to that of the compiled construct based on name matching.
583 In both of these cases, the cursor.description names and the column
584 expression objects and names are indexed as result row target keys.
585
586 The final case is much less common, where we have a compiled
587 non-textual SQL expression construct, but the number of columns
588 in cursor.description doesn't match what's in the compiled
589 construct. We make the guess here that there might be textual
590 column expressions in the compiled construct that themselves include
591 a comma in them causing them to split. We do the same name-matching
592 as with textual non-ordered columns.
593
594 The name-matched system of merging is the same as that used by
595 SQLAlchemy for all cases up through the 0.9 series. Positional
596 matching for compiled SQL expressions was introduced in 1.0 as a
597 major performance feature, and positional matching for textual
598 :class:`_expression.TextualSelect` objects in 1.1.
599 As name matching is no longer
600 a common case, it was acceptable to factor it into smaller generator-
601 oriented methods that are easier to understand, but incur slightly
602 more performance overhead.
603
604 """
605
606 if (
607 num_ctx_cols
608 and cols_are_ordered
609 and not textual_ordered
610 and num_ctx_cols == len(cursor_description)
611 and not driver_column_names
612 ):
613 self._keys = [elem[0] for elem in result_columns]
614 # pure positional 1-1 case; doesn't need to read
615 # the names from cursor.description
616
617 # most common case for Core and ORM
618
619 # this metadata is safe to
620 # cache because we are guaranteed
621 # to have the columns in the same order for new executions
622 self._safe_for_cache = True
623
624 return [
625 (
626 idx,
627 idx,
628 rmap_entry[RM_OBJECTS],
629 rmap_entry[RM_NAME],
630 rmap_entry[RM_RENDERED_NAME],
631 context.get_result_processor(
632 rmap_entry[RM_TYPE],
633 rmap_entry[RM_RENDERED_NAME],
634 cursor_description[idx][1],
635 ),
636 None,
637 )
638 for idx, rmap_entry in enumerate(result_columns)
639 ]
640 else:
641 # name-based or text-positional cases, where we need
642 # to read cursor.description names
643
644 if textual_ordered or (
645 ad_hoc_textual and len(cursor_description) == num_ctx_cols
646 ):
647 self._safe_for_cache = not driver_column_names
648 # textual positional case
649 raw_iterator = self._merge_textual_cols_by_position(
650 context,
651 cursor_description,
652 result_columns,
653 driver_column_names,
654 )
655 elif num_ctx_cols:
656 # compiled SQL with a mismatch of description cols
657 # vs. compiled cols, or textual w/ unordered columns
658 # the order of columns can change if the query is
659 # against a "select *", so not safe to cache
660 self._safe_for_cache = False
661 raw_iterator = self._merge_cols_by_name(
662 context,
663 cursor_description,
664 result_columns,
665 loose_column_name_matching,
666 driver_column_names,
667 )
668 else:
669 # no compiled SQL, just a raw string, order of columns
670 # can change for "select *"
671 self._safe_for_cache = False
672 raw_iterator = self._merge_cols_by_none(
673 context, cursor_description, driver_column_names
674 )
675
676 return [
677 (
678 idx,
679 ridx,
680 obj,
681 cursor_colname,
682 cursor_colname,
683 context.get_result_processor(
684 mapped_type, cursor_colname, coltype
685 ),
686 untranslated,
687 ) # type: ignore[misc]
688 for (
689 idx,
690 ridx,
691 cursor_colname,
692 mapped_type,
693 coltype,
694 obj,
695 untranslated,
696 ) in raw_iterator
697 ]
698
699 def _colnames_from_description(
700 self,
701 context: DefaultExecutionContext,
702 cursor_description: _DBAPICursorDescription,
703 driver_column_names: bool,
704 ) -> Iterator[Tuple[int, str, str, Optional[str], DBAPIType]]:
705 """Extract column names and data types from a cursor.description.
706
707 Applies unicode decoding, column translation, "normalization",
708 and case sensitivity rules to the names based on the dialect.
709
710 """
711 dialect = context.dialect
712 translate_colname = context._translate_colname
713 normalize_name = (
714 dialect.normalize_name if dialect.requires_name_normalize else None
715 )
716
717 untranslated = None
718
719 for idx, rec in enumerate(cursor_description):
720 colname = unnormalized = rec[0]
721 coltype = rec[1]
722
723 if translate_colname:
724 # a None here for "untranslated" means "the dialect did not
725 # change the column name and the untranslated case can be
726 # ignored". otherwise "untranslated" is expected to be the
727 # original, unchanged colname (e.g. is == to "unnormalized")
728 colname, untranslated = translate_colname(colname)
729
730 assert untranslated is None or untranslated == unnormalized
731
732 if normalize_name:
733 colname = normalize_name(colname)
734
735 if driver_column_names:
736 yield idx, colname, unnormalized, unnormalized, coltype
737
738 else:
739 yield idx, colname, unnormalized, untranslated, coltype
740
741 def _merge_textual_cols_by_position(
742 self,
743 context: DefaultExecutionContext,
744 cursor_description: _DBAPICursorDescription,
745 result_columns: Sequence[ResultColumnsEntry],
746 driver_column_names: bool,
747 ) -> Iterator[_MergeColTuple]:
748 num_ctx_cols = len(result_columns)
749
750 if num_ctx_cols > len(cursor_description):
751 util.warn(
752 "Number of columns in textual SQL (%d) is "
753 "smaller than number of columns requested (%d)"
754 % (num_ctx_cols, len(cursor_description))
755 )
756 seen = set()
757
758 self._keys = []
759
760 uses_denormalize = context.dialect.requires_name_normalize
761 for (
762 idx,
763 colname,
764 unnormalized,
765 untranslated,
766 coltype,
767 ) in self._colnames_from_description(
768 context, cursor_description, driver_column_names
769 ):
770 if idx < num_ctx_cols:
771 ctx_rec = result_columns[idx]
772 obj = ctx_rec[RM_OBJECTS]
773 ridx = idx
774 mapped_type = ctx_rec[RM_TYPE]
775 if obj[0] in seen:
776 raise exc.InvalidRequestError(
777 "Duplicate column expression requested "
778 "in textual SQL: %r" % obj[0]
779 )
780 seen.add(obj[0])
781
782 # special check for all uppercase unnormalized name;
783 # use the unnormalized name as the key.
784 # see #10788
785 # if these names don't match, then we still honor the
786 # cursor.description name as the key and not what the
787 # Column has, see
788 # test_resultset.py::PositionalTextTest::test_via_column
789 if (
790 uses_denormalize
791 and unnormalized == ctx_rec[RM_RENDERED_NAME]
792 ):
793 result_name = unnormalized
794 else:
795 result_name = colname
796 else:
797 mapped_type = sqltypes.NULLTYPE
798 obj = None
799 ridx = None
800
801 result_name = colname
802
803 if driver_column_names:
804 assert untranslated is not None
805 self._keys.append(untranslated)
806 else:
807 self._keys.append(result_name)
808
809 yield (
810 idx,
811 ridx,
812 result_name,
813 mapped_type,
814 coltype,
815 obj,
816 untranslated,
817 )
818
819 def _merge_cols_by_name(
820 self,
821 context: DefaultExecutionContext,
822 cursor_description: _DBAPICursorDescription,
823 result_columns: Sequence[ResultColumnsEntry],
824 loose_column_name_matching: bool,
825 driver_column_names: bool,
826 ) -> Iterator[_MergeColTuple]:
827 match_map = self._create_description_match_map(
828 result_columns, loose_column_name_matching
829 )
830 mapped_type: TypeEngine[Any]
831
832 self._keys = []
833
834 for (
835 idx,
836 colname,
837 unnormalized,
838 untranslated,
839 coltype,
840 ) in self._colnames_from_description(
841 context, cursor_description, driver_column_names
842 ):
843 try:
844 ctx_rec = match_map[colname]
845 except KeyError:
846 mapped_type = sqltypes.NULLTYPE
847 obj = None
848 result_columns_idx = None
849 else:
850 obj = ctx_rec[1]
851 mapped_type = ctx_rec[2]
852 result_columns_idx = ctx_rec[3]
853
854 if driver_column_names:
855 assert untranslated is not None
856 self._keys.append(untranslated)
857 else:
858 self._keys.append(colname)
859 yield (
860 idx,
861 result_columns_idx,
862 colname,
863 mapped_type,
864 coltype,
865 obj,
866 untranslated,
867 )
868
869 @classmethod
870 def _create_description_match_map(
871 cls,
872 result_columns: Sequence[ResultColumnsEntry],
873 loose_column_name_matching: bool = False,
874 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]:
875 """when matching cursor.description to a set of names that are present
876 in a Compiled object, as is the case with TextualSelect, get all the
877 names we expect might match those in cursor.description.
878 """
879
880 d: Dict[
881 Union[str, object],
882 Tuple[str, TupleAny, TypeEngine[Any], int],
883 ] = {}
884 for ridx, elem in enumerate(result_columns):
885 key = elem[RM_RENDERED_NAME]
886
887 if key in d:
888 # conflicting keyname - just add the column-linked objects
889 # to the existing record. if there is a duplicate column
890 # name in the cursor description, this will allow all of those
891 # objects to raise an ambiguous column error
892 e_name, e_obj, e_type, e_ridx = d[key]
893 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
894 else:
895 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
896
897 if loose_column_name_matching:
898 # when using a textual statement with an unordered set
899 # of columns that line up, we are expecting the user
900 # to be using label names in the SQL that match to the column
901 # expressions. Enable more liberal matching for this case;
902 # duplicate keys that are ambiguous will be fixed later.
903 for r_key in elem[RM_OBJECTS]:
904 d.setdefault(
905 r_key,
906 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
907 )
908 return d
909
910 def _merge_cols_by_none(
911 self,
912 context: DefaultExecutionContext,
913 cursor_description: _DBAPICursorDescription,
914 driver_column_names: bool,
915 ) -> Iterator[_MergeColTuple]:
916 self._keys = []
917
918 for (
919 idx,
920 colname,
921 unnormalized,
922 untranslated,
923 coltype,
924 ) in self._colnames_from_description(
925 context, cursor_description, driver_column_names
926 ):
927
928 if driver_column_names:
929 assert untranslated is not None
930 self._keys.append(untranslated)
931 else:
932 self._keys.append(colname)
933
934 yield (
935 idx,
936 None,
937 colname,
938 sqltypes.NULLTYPE,
939 coltype,
940 None,
941 untranslated,
942 )
943
944 if not TYPE_CHECKING:
945
946 def _key_fallback(
947 self, key: Any, err: Optional[Exception], raiseerr: bool = True
948 ) -> Optional[NoReturn]:
949 if raiseerr:
950 if self._unpickled and isinstance(key, elements.ColumnElement):
951 raise exc.NoSuchColumnError(
952 "Row was unpickled; lookup by ColumnElement "
953 "is unsupported"
954 ) from err
955 else:
956 raise exc.NoSuchColumnError(
957 "Could not locate column in row for column '%s'"
958 % util.string_or_unprintable(key)
959 ) from err
960 else:
961 return None
962
963 def _raise_for_ambiguous_column_name(
964 self, rec: _KeyMapRecType
965 ) -> NoReturn:
966 raise exc.InvalidRequestError(
967 "Ambiguous column name '%s' in "
968 "result set column descriptions" % rec[MD_LOOKUP_KEY]
969 )
970
971 def _index_for_key(
972 self, key: _KeyIndexType, raiseerr: bool = True
973 ) -> Optional[int]:
974 # TODO: can consider pre-loading ints and negative ints
975 # into _keymap - also no coverage here
976 if isinstance(key, int):
977 key = self._keys[key]
978
979 try:
980 rec = self._keymap[key]
981 except KeyError as ke:
982 x = self._key_fallback(key, ke, raiseerr)
983 assert x is None
984 return None
985
986 index = rec[0]
987
988 if index is None:
989 self._raise_for_ambiguous_column_name(rec)
990 return index
991
992 def _indexes_for_keys(
993 self, keys: Sequence[_KeyIndexType]
994 ) -> Sequence[int]:
995 try:
996 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501
997 except KeyError as ke:
998 # ensure it raises
999 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
1000
1001 def _metadata_for_keys(
1002 self, keys: Sequence[_KeyIndexType]
1003 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
1004 for key in keys:
1005 if isinstance(key, int):
1006 key = self._keys[key]
1007
1008 try:
1009 rec = self._keymap[key]
1010 except KeyError as ke:
1011 # ensure it raises
1012 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
1013
1014 index = rec[MD_INDEX]
1015
1016 if index is None:
1017 self._raise_for_ambiguous_column_name(rec)
1018
1019 yield cast(_NonAmbigCursorKeyMapRecType, rec)
1020
1021 def __getstate__(self) -> Dict[str, Any]:
1022 # TODO: consider serializing this as SimpleResultMetaData
1023 return {
1024 "_keymap": {
1025 key: (
1026 rec[MD_INDEX],
1027 rec[MD_RESULT_MAP_INDEX],
1028 [],
1029 key,
1030 rec[MD_RENDERED_NAME],
1031 None,
1032 None,
1033 )
1034 for key, rec in self._keymap.items()
1035 if isinstance(key, (str, int))
1036 },
1037 "_keys": self._keys,
1038 "_translated_indexes": self._translated_indexes,
1039 }
1040
1041 def __setstate__(self, state: Dict[str, Any]) -> None:
1042 self._processors = [None for _ in range(len(state["_keys"]))]
1043 self._keymap = state["_keymap"]
1044 self._keymap_by_result_column_idx = None
1045 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
1046 self._keys = state["_keys"]
1047 self._unpickled = True
1048 if state["_translated_indexes"]:
1049 translated_indexes: List[Any]
1050 self._translated_indexes = translated_indexes = state[
1051 "_translated_indexes"
1052 ]
1053 self._tuplefilter = tuplegetter(*translated_indexes)
1054 else:
1055 self._translated_indexes = self._tuplefilter = None
1056
1057
1058class ResultFetchStrategy:
1059 """Define a fetching strategy for a result object.
1060
1061
1062 .. versionadded:: 1.4
1063
1064 """
1065
1066 __slots__ = ()
1067
1068 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
1069
1070 def soft_close(
1071 self,
1072 result: CursorResult[Unpack[TupleAny]],
1073 dbapi_cursor: Optional[DBAPICursor],
1074 ) -> None:
1075 raise NotImplementedError()
1076
1077 def hard_close(
1078 self,
1079 result: CursorResult[Unpack[TupleAny]],
1080 dbapi_cursor: Optional[DBAPICursor],
1081 ) -> None:
1082 raise NotImplementedError()
1083
1084 def yield_per(
1085 self,
1086 result: CursorResult[Unpack[TupleAny]],
1087 dbapi_cursor: DBAPICursor,
1088 num: int,
1089 ) -> None:
1090 return
1091
1092 def fetchone(
1093 self,
1094 result: CursorResult[Unpack[TupleAny]],
1095 dbapi_cursor: DBAPICursor,
1096 hard_close: bool = False,
1097 ) -> Any:
1098 raise NotImplementedError()
1099
1100 def fetchmany(
1101 self,
1102 result: CursorResult[Unpack[TupleAny]],
1103 dbapi_cursor: DBAPICursor,
1104 size: Optional[int] = None,
1105 ) -> Any:
1106 raise NotImplementedError()
1107
1108 def fetchall(
1109 self,
1110 result: CursorResult[Unpack[TupleAny]],
1111 dbapi_cursor: DBAPICursor,
1112 ) -> Any:
1113 raise NotImplementedError()
1114
1115 def handle_exception(
1116 self,
1117 result: CursorResult[Unpack[TupleAny]],
1118 dbapi_cursor: Optional[DBAPICursor],
1119 err: BaseException,
1120 ) -> NoReturn:
1121 raise err
1122
1123
1124class NoCursorFetchStrategy(ResultFetchStrategy):
1125 """Cursor strategy for a result that has no open cursor.
1126
1127 There are two varieties of this strategy, one for DQL and one for
1128 DML (and also DDL), each of which represent a result that had a cursor
1129 but no longer has one.
1130
1131 """
1132
1133 __slots__ = ()
1134
1135 def soft_close(
1136 self,
1137 result: CursorResult[Unpack[TupleAny]],
1138 dbapi_cursor: Optional[DBAPICursor],
1139 ) -> None:
1140 pass
1141
1142 def hard_close(
1143 self,
1144 result: CursorResult[Unpack[TupleAny]],
1145 dbapi_cursor: Optional[DBAPICursor],
1146 ) -> None:
1147 pass
1148
1149 def fetchone(
1150 self,
1151 result: CursorResult[Unpack[TupleAny]],
1152 dbapi_cursor: DBAPICursor,
1153 hard_close: bool = False,
1154 ) -> Any:
1155 return self._non_result(result, None)
1156
1157 def fetchmany(
1158 self,
1159 result: CursorResult[Unpack[TupleAny]],
1160 dbapi_cursor: DBAPICursor,
1161 size: Optional[int] = None,
1162 ) -> Any:
1163 return self._non_result(result, [])
1164
1165 def fetchall(
1166 self, result: CursorResult[Unpack[TupleAny]], dbapi_cursor: DBAPICursor
1167 ) -> Any:
1168 return self._non_result(result, [])
1169
1170 def _non_result(
1171 self,
1172 result: CursorResult[Unpack[TupleAny]],
1173 default: Any,
1174 err: Optional[BaseException] = None,
1175 ) -> Any:
1176 raise NotImplementedError()
1177
1178
1179class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1180 """Cursor strategy for a DQL result that has no open cursor.
1181
1182 This is a result set that can return rows, i.e. for a SELECT, or for an
1183 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1184 where the cursor is closed and no rows remain available. The owning result
1185 object may or may not be "hard closed", which determines if the fetch
1186 methods send empty results or raise for closed result.
1187
1188 """
1189
1190 __slots__ = ()
1191
1192 def _non_result(
1193 self,
1194 result: CursorResult[Unpack[TupleAny]],
1195 default: Any,
1196 err: Optional[BaseException] = None,
1197 ) -> Any:
1198 if result.closed:
1199 raise exc.ResourceClosedError(
1200 "This result object is closed."
1201 ) from err
1202 else:
1203 return default
1204
1205
1206_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1207
1208
1209class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1210 """Cursor strategy for a DML result that has no open cursor.
1211
1212 This is a result set that does not return rows, i.e. for an INSERT,
1213 UPDATE, DELETE that does not include RETURNING.
1214
1215 """
1216
1217 __slots__ = ()
1218
1219 def _non_result(
1220 self,
1221 result: CursorResult[Unpack[TupleAny]],
1222 default: Any,
1223 err: Optional[BaseException] = None,
1224 ) -> Any:
1225 # we only expect to have a _NoResultMetaData() here right now.
1226 assert not result._metadata.returns_rows
1227 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr]
1228
1229
1230_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1231
1232
1233class CursorFetchStrategy(ResultFetchStrategy):
1234 """Call fetch methods from a DBAPI cursor.
1235
1236 Alternate versions of this class may instead buffer the rows from
1237 cursors or not use cursors at all.
1238
1239 """
1240
1241 __slots__ = ()
1242
1243 def soft_close(
1244 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1245 ) -> None:
1246 result.cursor_strategy = _NO_CURSOR_DQL
1247
1248 def hard_close(
1249 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1250 ) -> None:
1251 result.cursor_strategy = _NO_CURSOR_DQL
1252
1253 def handle_exception(
1254 self,
1255 result: CursorResult[Any],
1256 dbapi_cursor: Optional[DBAPICursor],
1257 err: BaseException,
1258 ) -> NoReturn:
1259 result.connection._handle_dbapi_exception(
1260 err, None, None, dbapi_cursor, result.context
1261 )
1262
1263 def yield_per(
1264 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1265 ) -> None:
1266 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1267 dbapi_cursor,
1268 {"max_row_buffer": num},
1269 initial_buffer=collections.deque(),
1270 growth_factor=0,
1271 )
1272
1273 def fetchone(
1274 self,
1275 result: CursorResult[Any],
1276 dbapi_cursor: DBAPICursor,
1277 hard_close: bool = False,
1278 ) -> Any:
1279 try:
1280 row = dbapi_cursor.fetchone()
1281 if row is None:
1282 result._soft_close(hard=hard_close)
1283 return row
1284 except BaseException as e:
1285 self.handle_exception(result, dbapi_cursor, e)
1286
1287 def fetchmany(
1288 self,
1289 result: CursorResult[Any],
1290 dbapi_cursor: DBAPICursor,
1291 size: Optional[int] = None,
1292 ) -> Any:
1293 try:
1294 if size is None:
1295 l = dbapi_cursor.fetchmany()
1296 else:
1297 l = dbapi_cursor.fetchmany(size)
1298
1299 if not l:
1300 result._soft_close()
1301 return l
1302 except BaseException as e:
1303 self.handle_exception(result, dbapi_cursor, e)
1304
1305 def fetchall(
1306 self,
1307 result: CursorResult[Any],
1308 dbapi_cursor: DBAPICursor,
1309 ) -> Any:
1310 try:
1311 rows = dbapi_cursor.fetchall()
1312 result._soft_close()
1313 return rows
1314 except BaseException as e:
1315 self.handle_exception(result, dbapi_cursor, e)
1316
1317
1318_DEFAULT_FETCH = CursorFetchStrategy()
1319
1320
1321class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1322 """A cursor fetch strategy with row buffering behavior.
1323
1324 This strategy buffers the contents of a selection of rows
1325 before ``fetchone()`` is called. This is to allow the results of
1326 ``cursor.description`` to be available immediately, when
1327 interfacing with a DB-API that requires rows to be consumed before
1328 this information is available (currently psycopg2, when used with
1329 server-side cursors).
1330
1331 The pre-fetching behavior fetches only one row initially, and then
1332 grows its buffer size by a fixed amount with each successive need
1333 for additional rows up the ``max_row_buffer`` size, which defaults
1334 to 1000::
1335
1336 with psycopg2_engine.connect() as conn:
1337
1338 result = conn.execution_options(
1339 stream_results=True, max_row_buffer=50
1340 ).execute(text("select * from table"))
1341
1342 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1343
1344 .. seealso::
1345
1346 :ref:`psycopg2_execution_options`
1347 """
1348
1349 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1350
1351 def __init__(
1352 self,
1353 dbapi_cursor: DBAPICursor,
1354 execution_options: CoreExecuteOptionsParameter,
1355 growth_factor: int = 5,
1356 initial_buffer: Optional[Deque[Any]] = None,
1357 ) -> None:
1358 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1359
1360 if initial_buffer is not None:
1361 self._rowbuffer = initial_buffer
1362 else:
1363 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1364 self._growth_factor = growth_factor
1365
1366 if growth_factor:
1367 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1368 else:
1369 self._bufsize = self._max_row_buffer
1370
1371 @classmethod
1372 def create(
1373 cls, result: CursorResult[Any]
1374 ) -> BufferedRowCursorFetchStrategy:
1375 return BufferedRowCursorFetchStrategy(
1376 result.cursor,
1377 result.context.execution_options,
1378 )
1379
1380 def _buffer_rows(
1381 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1382 ) -> None:
1383 """this is currently used only by fetchone()."""
1384
1385 size = self._bufsize
1386 try:
1387 if size < 1:
1388 new_rows = dbapi_cursor.fetchall()
1389 else:
1390 new_rows = dbapi_cursor.fetchmany(size)
1391 except BaseException as e:
1392 self.handle_exception(result, dbapi_cursor, e)
1393
1394 if not new_rows:
1395 return
1396 self._rowbuffer = collections.deque(new_rows)
1397 if self._growth_factor and size < self._max_row_buffer:
1398 self._bufsize = min(
1399 self._max_row_buffer, size * self._growth_factor
1400 )
1401
1402 def yield_per(
1403 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1404 ) -> None:
1405 self._growth_factor = 0
1406 self._max_row_buffer = self._bufsize = num
1407
1408 def soft_close(
1409 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1410 ) -> None:
1411 self._rowbuffer.clear()
1412 super().soft_close(result, dbapi_cursor)
1413
1414 def hard_close(
1415 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1416 ) -> None:
1417 self._rowbuffer.clear()
1418 super().hard_close(result, dbapi_cursor)
1419
1420 def fetchone(
1421 self,
1422 result: CursorResult[Any],
1423 dbapi_cursor: DBAPICursor,
1424 hard_close: bool = False,
1425 ) -> Any:
1426 if not self._rowbuffer:
1427 self._buffer_rows(result, dbapi_cursor)
1428 if not self._rowbuffer:
1429 try:
1430 result._soft_close(hard=hard_close)
1431 except BaseException as e:
1432 self.handle_exception(result, dbapi_cursor, e)
1433 return None
1434 return self._rowbuffer.popleft()
1435
1436 def fetchmany(
1437 self,
1438 result: CursorResult[Any],
1439 dbapi_cursor: DBAPICursor,
1440 size: Optional[int] = None,
1441 ) -> Any:
1442 if size is None:
1443 return self.fetchall(result, dbapi_cursor)
1444
1445 rb = self._rowbuffer
1446 lb = len(rb)
1447 close = False
1448 if size > lb:
1449 try:
1450 new = dbapi_cursor.fetchmany(size - lb)
1451 except BaseException as e:
1452 self.handle_exception(result, dbapi_cursor, e)
1453 else:
1454 if not new:
1455 # defer closing since it may clear the row buffer
1456 close = True
1457 else:
1458 rb.extend(new)
1459
1460 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1461 if close:
1462 result._soft_close()
1463 return res
1464
1465 def fetchall(
1466 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1467 ) -> Any:
1468 try:
1469 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1470 self._rowbuffer.clear()
1471 result._soft_close()
1472 return ret
1473 except BaseException as e:
1474 self.handle_exception(result, dbapi_cursor, e)
1475
1476
1477class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1478 """A cursor strategy that buffers rows fully upon creation.
1479
1480 Used for operations where a result is to be delivered
1481 after the database conversation can not be continued,
1482 such as MSSQL INSERT...OUTPUT after an autocommit.
1483
1484 """
1485
1486 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1487
1488 def __init__(
1489 self,
1490 dbapi_cursor: Optional[DBAPICursor],
1491 alternate_description: Optional[_DBAPICursorDescription] = None,
1492 initial_buffer: Optional[Iterable[Any]] = None,
1493 ):
1494 self.alternate_cursor_description = alternate_description
1495 if initial_buffer is not None:
1496 self._rowbuffer = collections.deque(initial_buffer)
1497 else:
1498 assert dbapi_cursor is not None
1499 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1500
1501 def yield_per(
1502 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1503 ) -> Any:
1504 pass
1505
1506 def soft_close(
1507 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1508 ) -> None:
1509 self._rowbuffer.clear()
1510 super().soft_close(result, dbapi_cursor)
1511
1512 def hard_close(
1513 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1514 ) -> None:
1515 self._rowbuffer.clear()
1516 super().hard_close(result, dbapi_cursor)
1517
1518 def fetchone(
1519 self,
1520 result: CursorResult[Any],
1521 dbapi_cursor: DBAPICursor,
1522 hard_close: bool = False,
1523 ) -> Any:
1524 if self._rowbuffer:
1525 return self._rowbuffer.popleft()
1526 else:
1527 result._soft_close(hard=hard_close)
1528 return None
1529
1530 def fetchmany(
1531 self,
1532 result: CursorResult[Any],
1533 dbapi_cursor: DBAPICursor,
1534 size: Optional[int] = None,
1535 ) -> Any:
1536 if size is None:
1537 return self.fetchall(result, dbapi_cursor)
1538
1539 rb = self._rowbuffer
1540 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1541 if not rows:
1542 result._soft_close()
1543 return rows
1544
1545 def fetchall(
1546 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1547 ) -> Any:
1548 ret = self._rowbuffer
1549 self._rowbuffer = collections.deque()
1550 result._soft_close()
1551 return ret
1552
1553
1554class _NoResultMetaData(ResultMetaData):
1555 __slots__ = ()
1556
1557 returns_rows = False
1558
1559 def _we_dont_return_rows(
1560 self, err: Optional[BaseException] = None
1561 ) -> NoReturn:
1562 raise exc.ResourceClosedError(
1563 "This result object does not return rows. "
1564 "It has been closed automatically."
1565 ) from err
1566
1567 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn:
1568 self._we_dont_return_rows()
1569
1570 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1571 self._we_dont_return_rows()
1572
1573 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1574 self._we_dont_return_rows()
1575
1576 @property
1577 def _keymap(self) -> NoReturn: # type: ignore[override]
1578 self._we_dont_return_rows()
1579
1580 @property
1581 def _key_to_index(self) -> NoReturn: # type: ignore[override]
1582 self._we_dont_return_rows()
1583
1584 @property
1585 def _processors(self) -> NoReturn: # type: ignore[override]
1586 self._we_dont_return_rows()
1587
1588 @property
1589 def keys(self) -> NoReturn:
1590 self._we_dont_return_rows()
1591
1592
1593_NO_RESULT_METADATA = _NoResultMetaData()
1594
1595
1596def null_dml_result() -> IteratorResult[Any]:
1597 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1598 it._soft_close()
1599 return it
1600
1601
1602class CursorResult(Result[Unpack[_Ts]]):
1603 """A Result that is representing state from a DBAPI cursor.
1604
1605 .. versionchanged:: 1.4 The :class:`.CursorResult``
1606 class replaces the previous :class:`.ResultProxy` interface.
1607 This classes are based on the :class:`.Result` calling API
1608 which provides an updated usage model and calling facade for
1609 SQLAlchemy Core and SQLAlchemy ORM.
1610
1611 Returns database rows via the :class:`.Row` class, which provides
1612 additional API features and behaviors on top of the raw data returned by
1613 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1614 method, other kinds of objects may also be returned.
1615
1616 .. seealso::
1617
1618 :ref:`tutorial_selecting_data` - introductory material for accessing
1619 :class:`_engine.CursorResult` and :class:`.Row` objects.
1620
1621 """
1622
1623 __slots__ = (
1624 "context",
1625 "dialect",
1626 "cursor",
1627 "cursor_strategy",
1628 "_echo",
1629 "connection",
1630 )
1631
1632 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1633 _no_result_metadata = _NO_RESULT_METADATA
1634 _soft_closed: bool = False
1635 closed: bool = False
1636 _is_cursor = True
1637
1638 context: DefaultExecutionContext
1639 dialect: Dialect
1640 cursor_strategy: ResultFetchStrategy
1641 connection: Connection
1642
1643 def __init__(
1644 self,
1645 context: DefaultExecutionContext,
1646 cursor_strategy: ResultFetchStrategy,
1647 cursor_description: Optional[_DBAPICursorDescription],
1648 ):
1649 self.context = context
1650 self.dialect = context.dialect
1651 self.cursor = context.cursor
1652 self.cursor_strategy = cursor_strategy
1653 self.connection = context.root_connection
1654 self._echo = echo = (
1655 self.connection._echo and context.engine._should_log_debug()
1656 )
1657
1658 if cursor_description is not None:
1659 self._init_metadata(context, cursor_description)
1660
1661 if echo:
1662 log = self.context.connection._log_debug
1663
1664 def _log_row(row: Any) -> Any:
1665 log("Row %r", sql_util._repr_row(row))
1666 return row
1667
1668 self._row_logging_fn = _log_row
1669
1670 # call Result._row_getter to set up the row factory
1671 self._row_getter
1672
1673 else:
1674 assert context._num_sentinel_cols == 0
1675 self._metadata = self._no_result_metadata
1676
1677 def _init_metadata(
1678 self,
1679 context: DefaultExecutionContext,
1680 cursor_description: _DBAPICursorDescription,
1681 ) -> CursorResultMetaData:
1682 driver_column_names = context.execution_options.get(
1683 "driver_column_names", False
1684 )
1685 if context.compiled:
1686 compiled = context.compiled
1687
1688 metadata: CursorResultMetaData
1689
1690 if driver_column_names:
1691 # TODO: test this case
1692 metadata = CursorResultMetaData(
1693 self,
1694 cursor_description,
1695 driver_column_names=True,
1696 num_sentinel_cols=context._num_sentinel_cols,
1697 )
1698 assert not metadata._safe_for_cache
1699 elif compiled._cached_metadata:
1700 metadata = compiled._cached_metadata
1701 else:
1702 metadata = CursorResultMetaData(
1703 self,
1704 cursor_description,
1705 # the number of sentinel columns is stored on the context
1706 # but it's a characteristic of the compiled object
1707 # so it's ok to apply it to a cacheable metadata.
1708 num_sentinel_cols=context._num_sentinel_cols,
1709 )
1710 if metadata._safe_for_cache:
1711 compiled._cached_metadata = metadata
1712
1713 # result rewrite/ adapt step. this is to suit the case
1714 # when we are invoked against a cached Compiled object, we want
1715 # to rewrite the ResultMetaData to reflect the Column objects
1716 # that are in our current SQL statement object, not the one
1717 # that is associated with the cached Compiled object.
1718 # the Compiled object may also tell us to not
1719 # actually do this step; this is to support the ORM where
1720 # it is to produce a new Result object in any case, and will
1721 # be using the cached Column objects against this database result
1722 # so we don't want to rewrite them.
1723 #
1724 # Basically this step suits the use case where the end user
1725 # is using Core SQL expressions and is accessing columns in the
1726 # result row using row._mapping[table.c.column].
1727 if (
1728 not context.execution_options.get(
1729 "_result_disable_adapt_to_context", False
1730 )
1731 and compiled._result_columns
1732 and context.cache_hit is context.dialect.CACHE_HIT
1733 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501
1734 ):
1735 metadata = metadata._adapt_to_context(context)
1736
1737 self._metadata = metadata
1738
1739 else:
1740 self._metadata = metadata = CursorResultMetaData(
1741 self,
1742 cursor_description,
1743 driver_column_names=driver_column_names,
1744 )
1745 if self._echo:
1746 context.connection._log_debug(
1747 "Col %r", tuple(x[0] for x in cursor_description)
1748 )
1749 return metadata
1750
1751 def _soft_close(self, hard: bool = False) -> None:
1752 """Soft close this :class:`_engine.CursorResult`.
1753
1754 This releases all DBAPI cursor resources, but leaves the
1755 CursorResult "open" from a semantic perspective, meaning the
1756 fetchXXX() methods will continue to return empty results.
1757
1758 This method is called automatically when:
1759
1760 * all result rows are exhausted using the fetchXXX() methods.
1761 * cursor.description is None.
1762
1763 This method is **not public**, but is documented in order to clarify
1764 the "autoclose" process used.
1765
1766 .. seealso::
1767
1768 :meth:`_engine.CursorResult.close`
1769
1770
1771 """
1772
1773 if (not hard and self._soft_closed) or (hard and self.closed):
1774 return
1775
1776 if hard:
1777 self.closed = True
1778 self.cursor_strategy.hard_close(self, self.cursor)
1779 else:
1780 self.cursor_strategy.soft_close(self, self.cursor)
1781
1782 if not self._soft_closed:
1783 cursor = self.cursor
1784 self.cursor = None # type: ignore
1785 self.connection._safe_close_cursor(cursor)
1786 self._soft_closed = True
1787
1788 @property
1789 def inserted_primary_key_rows(self) -> List[Optional[Any]]:
1790 """Return the value of
1791 :attr:`_engine.CursorResult.inserted_primary_key`
1792 as a row contained within a list; some dialects may support a
1793 multiple row form as well.
1794
1795 .. note:: As indicated below, in current SQLAlchemy versions this
1796 accessor is only useful beyond what's already supplied by
1797 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1798 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1799 generalize this feature to more dialects.
1800
1801 This accessor is added to support dialects that offer the feature
1802 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1803 feature, currently **only the psycopg2 dialect**, which provides
1804 for many rows to be INSERTed at once while still retaining the
1805 behavior of being able to return server-generated primary key values.
1806
1807 * **When using the psycopg2 dialect, or other dialects that may support
1808 "fast executemany" style inserts in upcoming releases** : When
1809 invoking an INSERT statement while passing a list of rows as the
1810 second argument to :meth:`_engine.Connection.execute`, this accessor
1811 will then provide a list of rows, where each row contains the primary
1812 key value for each row that was INSERTed.
1813
1814 * **When using all other dialects / backends that don't yet support
1815 this feature**: This accessor is only useful for **single row INSERT
1816 statements**, and returns the same information as that of the
1817 :attr:`_engine.CursorResult.inserted_primary_key` within a
1818 single-element list. When an INSERT statement is executed in
1819 conjunction with a list of rows to be INSERTed, the list will contain
1820 one row per row inserted in the statement, however it will contain
1821 ``None`` for any server-generated values.
1822
1823 Future releases of SQLAlchemy will further generalize the
1824 "fast execution helper" feature of psycopg2 to suit other dialects,
1825 thus allowing this accessor to be of more general use.
1826
1827 .. versionadded:: 1.4
1828
1829 .. seealso::
1830
1831 :attr:`_engine.CursorResult.inserted_primary_key`
1832
1833 """
1834 if not self.context.compiled:
1835 raise exc.InvalidRequestError(
1836 "Statement is not a compiled expression construct."
1837 )
1838 elif not self.context.isinsert:
1839 raise exc.InvalidRequestError(
1840 "Statement is not an insert() expression construct."
1841 )
1842 elif self.context._is_explicit_returning:
1843 raise exc.InvalidRequestError(
1844 "Can't call inserted_primary_key "
1845 "when returning() "
1846 "is used."
1847 )
1848 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501
1849
1850 @property
1851 def inserted_primary_key(self) -> Optional[Any]:
1852 """Return the primary key for the row just inserted.
1853
1854 The return value is a :class:`_result.Row` object representing
1855 a named tuple of primary key values in the order in which the
1856 primary key columns are configured in the source
1857 :class:`_schema.Table`.
1858
1859 .. versionchanged:: 1.4.8 - the
1860 :attr:`_engine.CursorResult.inserted_primary_key`
1861 value is now a named tuple via the :class:`_result.Row` class,
1862 rather than a plain tuple.
1863
1864 This accessor only applies to single row :func:`_expression.insert`
1865 constructs which did not explicitly specify
1866 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1867 while not yet available for most backends, would be accessed using
1868 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1869
1870 Note that primary key columns which specify a server_default clause, or
1871 otherwise do not qualify as "autoincrement" columns (see the notes at
1872 :class:`_schema.Column`), and were generated using the database-side
1873 default, will appear in this list as ``None`` unless the backend
1874 supports "returning" and the insert statement executed with the
1875 "implicit returning" enabled.
1876
1877 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1878 statement is not a compiled expression construct
1879 or is not an insert() construct.
1880
1881 """
1882
1883 if self.context.executemany:
1884 raise exc.InvalidRequestError(
1885 "This statement was an executemany call; if primary key "
1886 "returning is supported, please "
1887 "use .inserted_primary_key_rows."
1888 )
1889
1890 ikp = self.inserted_primary_key_rows
1891 if ikp:
1892 return ikp[0]
1893 else:
1894 return None
1895
1896 def last_updated_params(
1897 self,
1898 ) -> Union[
1899 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1900 ]:
1901 """Return the collection of updated parameters from this
1902 execution.
1903
1904 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1905 statement is not a compiled expression construct
1906 or is not an update() construct.
1907
1908 """
1909 if not self.context.compiled:
1910 raise exc.InvalidRequestError(
1911 "Statement is not a compiled expression construct."
1912 )
1913 elif not self.context.isupdate:
1914 raise exc.InvalidRequestError(
1915 "Statement is not an update() expression construct."
1916 )
1917 elif self.context.executemany:
1918 return self.context.compiled_parameters
1919 else:
1920 return self.context.compiled_parameters[0]
1921
1922 def last_inserted_params(
1923 self,
1924 ) -> Union[
1925 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1926 ]:
1927 """Return the collection of inserted parameters from this
1928 execution.
1929
1930 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1931 statement is not a compiled expression construct
1932 or is not an insert() construct.
1933
1934 """
1935 if not self.context.compiled:
1936 raise exc.InvalidRequestError(
1937 "Statement is not a compiled expression construct."
1938 )
1939 elif not self.context.isinsert:
1940 raise exc.InvalidRequestError(
1941 "Statement is not an insert() expression construct."
1942 )
1943 elif self.context.executemany:
1944 return self.context.compiled_parameters
1945 else:
1946 return self.context.compiled_parameters[0]
1947
1948 @property
1949 def returned_defaults_rows(
1950 self,
1951 ) -> Optional[Sequence[Row[Unpack[TupleAny]]]]:
1952 """Return a list of rows each containing the values of default
1953 columns that were fetched using
1954 the :meth:`.ValuesBase.return_defaults` feature.
1955
1956 The return value is a list of :class:`.Row` objects.
1957
1958 .. versionadded:: 1.4
1959
1960 """
1961 return self.context.returned_default_rows
1962
1963 def splice_horizontally(self, other: CursorResult[Any]) -> Self:
1964 """Return a new :class:`.CursorResult` that "horizontally splices"
1965 together the rows of this :class:`.CursorResult` with that of another
1966 :class:`.CursorResult`.
1967
1968 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1969 not intended for general use.
1970
1971 "horizontally splices" means that for each row in the first and second
1972 result sets, a new row that concatenates the two rows together is
1973 produced, which then becomes the new row. The incoming
1974 :class:`.CursorResult` must have the identical number of rows. It is
1975 typically expected that the two result sets come from the same sort
1976 order as well, as the result rows are spliced together based on their
1977 position in the result.
1978
1979 The expected use case here is so that multiple INSERT..RETURNING
1980 statements (which definitely need to be sorted) against different
1981 tables can produce a single result that looks like a JOIN of those two
1982 tables.
1983
1984 E.g.::
1985
1986 r1 = connection.execute(
1987 users.insert().returning(
1988 users.c.user_name, users.c.user_id, sort_by_parameter_order=True
1989 ),
1990 user_values,
1991 )
1992
1993 r2 = connection.execute(
1994 addresses.insert().returning(
1995 addresses.c.address_id,
1996 addresses.c.address,
1997 addresses.c.user_id,
1998 sort_by_parameter_order=True,
1999 ),
2000 address_values,
2001 )
2002
2003 rows = r1.splice_horizontally(r2).all()
2004 assert rows == [
2005 ("john", 1, 1, "foo@bar.com", 1),
2006 ("jack", 2, 2, "bar@bat.com", 2),
2007 ]
2008
2009 .. versionadded:: 2.0
2010
2011 .. seealso::
2012
2013 :meth:`.CursorResult.splice_vertically`
2014
2015
2016 """ # noqa: E501
2017
2018 clone = self._generate()
2019 assert clone is self # just to note
2020 assert isinstance(other._metadata, CursorResultMetaData)
2021 assert isinstance(self._metadata, CursorResultMetaData)
2022 self_tf = self._metadata._tuplefilter
2023 other_tf = other._metadata._tuplefilter
2024 clone._metadata = self._metadata._splice_horizontally(other._metadata)
2025
2026 total_rows = [
2027 tuple(r1 if self_tf is None else self_tf(r1))
2028 + tuple(r2 if other_tf is None else other_tf(r2))
2029 for r1, r2 in zip(
2030 list(self._raw_row_iterator()),
2031 list(other._raw_row_iterator()),
2032 )
2033 ]
2034
2035 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
2036 None,
2037 initial_buffer=total_rows,
2038 )
2039 clone._reset_memoizations()
2040 return clone
2041
2042 def splice_vertically(self, other: CursorResult[Any]) -> Self:
2043 """Return a new :class:`.CursorResult` that "vertically splices",
2044 i.e. "extends", the rows of this :class:`.CursorResult` with that of
2045 another :class:`.CursorResult`.
2046
2047 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
2048 not intended for general use.
2049
2050 "vertically splices" means the rows of the given result are appended to
2051 the rows of this cursor result. The incoming :class:`.CursorResult`
2052 must have rows that represent the identical list of columns in the
2053 identical order as they are in this :class:`.CursorResult`.
2054
2055 .. versionadded:: 2.0
2056
2057 .. seealso::
2058
2059 :meth:`.CursorResult.splice_horizontally`
2060
2061 """
2062 clone = self._generate()
2063 total_rows = list(self._raw_row_iterator()) + list(
2064 other._raw_row_iterator()
2065 )
2066
2067 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
2068 None,
2069 initial_buffer=total_rows,
2070 )
2071 clone._reset_memoizations()
2072 return clone
2073
2074 def _rewind(self, rows: Any) -> Self:
2075 """rewind this result back to the given rowset.
2076
2077 this is used internally for the case where an :class:`.Insert`
2078 construct combines the use of
2079 :meth:`.Insert.return_defaults` along with the
2080 "supplemental columns" feature.
2081
2082 NOTE: this method has not effect then an unique filter is applied
2083 to the result, meaning that no row will be returned.
2084
2085 """
2086
2087 if self._echo:
2088 self.context.connection._log_debug(
2089 "CursorResult rewound %d row(s)", len(rows)
2090 )
2091
2092 # the rows given are expected to be Row objects, so we
2093 # have to clear out processors which have already run on these
2094 # rows
2095 self._metadata = cast(
2096 CursorResultMetaData, self._metadata
2097 )._remove_processors_and_tuple_filter()
2098
2099 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
2100 None,
2101 # TODO: if these are Row objects, can we save on not having to
2102 # re-make new Row objects out of them a second time? is that
2103 # what's actually happening right now? maybe look into this
2104 initial_buffer=rows,
2105 )
2106 self._reset_memoizations()
2107 return self
2108
2109 @property
2110 def returned_defaults(self) -> Optional[Row[Unpack[TupleAny]]]:
2111 """Return the values of default columns that were fetched using
2112 the :meth:`.ValuesBase.return_defaults` feature.
2113
2114 The value is an instance of :class:`.Row`, or ``None``
2115 if :meth:`.ValuesBase.return_defaults` was not used or if the
2116 backend does not support RETURNING.
2117
2118 .. seealso::
2119
2120 :meth:`.ValuesBase.return_defaults`
2121
2122 """
2123
2124 if self.context.executemany:
2125 raise exc.InvalidRequestError(
2126 "This statement was an executemany call; if return defaults "
2127 "is supported, please use .returned_defaults_rows."
2128 )
2129
2130 rows = self.context.returned_default_rows
2131 if rows:
2132 return rows[0]
2133 else:
2134 return None
2135
2136 def lastrow_has_defaults(self) -> bool:
2137 """Return ``lastrow_has_defaults()`` from the underlying
2138 :class:`.ExecutionContext`.
2139
2140 See :class:`.ExecutionContext` for details.
2141
2142 """
2143
2144 return self.context.lastrow_has_defaults()
2145
2146 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2147 """Return ``postfetch_cols()`` from the underlying
2148 :class:`.ExecutionContext`.
2149
2150 See :class:`.ExecutionContext` for details.
2151
2152 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2153 statement is not a compiled expression construct
2154 or is not an insert() or update() construct.
2155
2156 """
2157
2158 if not self.context.compiled:
2159 raise exc.InvalidRequestError(
2160 "Statement is not a compiled expression construct."
2161 )
2162 elif not self.context.isinsert and not self.context.isupdate:
2163 raise exc.InvalidRequestError(
2164 "Statement is not an insert() or update() "
2165 "expression construct."
2166 )
2167 return self.context.postfetch_cols
2168
2169 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2170 """Return ``prefetch_cols()`` from the underlying
2171 :class:`.ExecutionContext`.
2172
2173 See :class:`.ExecutionContext` for details.
2174
2175 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2176 statement is not a compiled expression construct
2177 or is not an insert() or update() construct.
2178
2179 """
2180
2181 if not self.context.compiled:
2182 raise exc.InvalidRequestError(
2183 "Statement is not a compiled expression construct."
2184 )
2185 elif not self.context.isinsert and not self.context.isupdate:
2186 raise exc.InvalidRequestError(
2187 "Statement is not an insert() or update() "
2188 "expression construct."
2189 )
2190 return self.context.prefetch_cols
2191
2192 def supports_sane_rowcount(self) -> bool:
2193 """Return ``supports_sane_rowcount`` from the dialect.
2194
2195 See :attr:`_engine.CursorResult.rowcount` for background.
2196
2197 """
2198
2199 return self.dialect.supports_sane_rowcount
2200
2201 def supports_sane_multi_rowcount(self) -> bool:
2202 """Return ``supports_sane_multi_rowcount`` from the dialect.
2203
2204 See :attr:`_engine.CursorResult.rowcount` for background.
2205
2206 """
2207
2208 return self.dialect.supports_sane_multi_rowcount
2209
2210 @util.memoized_property
2211 def rowcount(self) -> int:
2212 """Return the 'rowcount' for this result.
2213
2214 The primary purpose of 'rowcount' is to report the number of rows
2215 matched by the WHERE criterion of an UPDATE or DELETE statement
2216 executed once (i.e. for a single parameter set), which may then be
2217 compared to the number of rows expected to be updated or deleted as a
2218 means of asserting data integrity.
2219
2220 This attribute is transferred from the ``cursor.rowcount`` attribute
2221 of the DBAPI before the cursor is closed, to support DBAPIs that
2222 don't make this value available after cursor close. Some DBAPIs may
2223 offer meaningful values for other kinds of statements, such as INSERT
2224 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
2225 for these statements, set the
2226 :paramref:`.Connection.execution_options.preserve_rowcount`
2227 execution option to True, which will cause the ``cursor.rowcount``
2228 value to be unconditionally memoized before any results are returned
2229 or the cursor is closed, regardless of statement type.
2230
2231 For cases where the DBAPI does not support rowcount for a particular
2232 kind of statement and/or execution, the returned value will be ``-1``,
2233 which is delivered directly from the DBAPI and is part of :pep:`249`.
2234 All DBAPIs should support rowcount for single-parameter-set
2235 UPDATE and DELETE statements, however.
2236
2237 .. note::
2238
2239 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2240
2241
2242 * This attribute returns the number of rows *matched*,
2243 which is not necessarily the same as the number of rows
2244 that were actually *modified*. For example, an UPDATE statement
2245 may have no net change on a given row if the SET values
2246 given are the same as those present in the row already.
2247 Such a row would be matched but not modified.
2248 On backends that feature both styles, such as MySQL,
2249 rowcount is configured to return the match
2250 count in all cases.
2251
2252 * :attr:`_engine.CursorResult.rowcount` in the default case is
2253 *only* useful in conjunction with an UPDATE or DELETE statement,
2254 and only with a single set of parameters. For other kinds of
2255 statements, SQLAlchemy will not attempt to pre-memoize the value
2256 unless the
2257 :paramref:`.Connection.execution_options.preserve_rowcount`
2258 execution option is used. Note that contrary to :pep:`249`, many
2259 DBAPIs do not support rowcount values for statements that are not
2260 UPDATE or DELETE, particularly when rows are being returned which
2261 are not fully pre-buffered. DBAPIs that dont support rowcount
2262 for a particular kind of statement should return the value ``-1``
2263 for such statements.
2264
2265 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2266 when executing a single statement with multiple parameter sets
2267 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2268 values across multiple parameter sets and will return ``-1``
2269 when accessed.
2270
2271 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2272 a correct population of :attr:`_engine.CursorResult.rowcount`
2273 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2274 execution option is set to True.
2275
2276 * Statements that use RETURNING may not support rowcount, returning
2277 a ``-1`` value instead.
2278
2279 .. seealso::
2280
2281 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2282
2283 :paramref:`.Connection.execution_options.preserve_rowcount`
2284
2285 """ # noqa: E501
2286 try:
2287 return self.context.rowcount
2288 except BaseException as e:
2289 self.cursor_strategy.handle_exception(self, self.cursor, e)
2290 raise # not called
2291
2292 @property
2293 def lastrowid(self) -> int:
2294 """Return the 'lastrowid' accessor on the DBAPI cursor.
2295
2296 This is a DBAPI specific method and is only functional
2297 for those backends which support it, for statements
2298 where it is appropriate. It's behavior is not
2299 consistent across backends.
2300
2301 Usage of this method is normally unnecessary when
2302 using insert() expression constructs; the
2303 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2304 tuple of primary key values for a newly inserted row,
2305 regardless of database backend.
2306
2307 """
2308 try:
2309 return self.context.get_lastrowid()
2310 except BaseException as e:
2311 self.cursor_strategy.handle_exception(self, self.cursor, e)
2312
2313 @property
2314 def returns_rows(self) -> bool:
2315 """True if this :class:`_engine.CursorResult` returns zero or more
2316 rows.
2317
2318 I.e. if it is legal to call the methods
2319 :meth:`_engine.CursorResult.fetchone`,
2320 :meth:`_engine.CursorResult.fetchmany`
2321 :meth:`_engine.CursorResult.fetchall`.
2322
2323 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2324 always be synonymous with whether or not the DBAPI cursor had a
2325 ``.description`` attribute, indicating the presence of result columns,
2326 noting that a cursor that returns zero rows still has a
2327 ``.description`` if a row-returning statement was emitted.
2328
2329 This attribute should be True for all results that are against
2330 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2331 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2332 not using RETURNING, the value will usually be False, however
2333 there are some dialect-specific exceptions to this, such as when
2334 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2335 order to retrieve an inserted primary key value.
2336
2337
2338 """
2339 return self._metadata.returns_rows
2340
2341 @property
2342 def is_insert(self) -> bool:
2343 """True if this :class:`_engine.CursorResult` is the result
2344 of a executing an expression language compiled
2345 :func:`_expression.insert` construct.
2346
2347 When True, this implies that the
2348 :attr:`inserted_primary_key` attribute is accessible,
2349 assuming the statement did not include
2350 a user defined "returning" construct.
2351
2352 """
2353 return self.context.isinsert
2354
2355 def _fetchiter_impl(self) -> Iterator[Any]:
2356 fetchone = self.cursor_strategy.fetchone
2357
2358 while True:
2359 row = fetchone(self, self.cursor)
2360 if row is None:
2361 break
2362 yield row
2363
2364 def _fetchone_impl(self, hard_close: bool = False) -> Any:
2365 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2366
2367 def _fetchall_impl(self) -> Any:
2368 return self.cursor_strategy.fetchall(self, self.cursor)
2369
2370 def _fetchmany_impl(self, size: Optional[int] = None) -> Any:
2371 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2372
2373 def _raw_row_iterator(self) -> Any:
2374 return self._fetchiter_impl()
2375
2376 def merge(
2377 self, *others: Result[Unpack[TupleAny]]
2378 ) -> MergedResult[Unpack[TupleAny]]:
2379 merged_result = super().merge(*others)
2380 if self.context._has_rowcount:
2381 merged_result.rowcount = sum(
2382 cast("CursorResult[Any]", result).rowcount
2383 for result in (self,) + others
2384 )
2385 return merged_result
2386
2387 def close(self) -> None:
2388 """Close this :class:`_engine.CursorResult`.
2389
2390 This closes out the underlying DBAPI cursor corresponding to the
2391 statement execution, if one is still present. Note that the DBAPI
2392 cursor is automatically released when the :class:`_engine.CursorResult`
2393 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2394 generally an optional method except in the case when discarding a
2395 :class:`_engine.CursorResult` that still has additional rows pending
2396 for fetch.
2397
2398 After this method is called, it is no longer valid to call upon
2399 the fetch methods, which will raise a :class:`.ResourceClosedError`
2400 on subsequent use.
2401
2402 .. seealso::
2403
2404 :ref:`connections_toplevel`
2405
2406 """
2407 self._soft_close(hard=True)
2408
2409 @_generative
2410 def yield_per(self, num: int) -> Self:
2411 self._yield_per = num
2412 self.cursor_strategy.yield_per(self, self.cursor, num)
2413 return self
2414
2415
2416ResultProxy = CursorResult