1# engine/cursor.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define cursor-specific result set constructs including
9:class:`.CursorResult`."""
10
11
12from __future__ import annotations
13
14import collections
15import functools
16import operator
17import typing
18from typing import Any
19from typing import cast
20from typing import ClassVar
21from typing import Deque
22from typing import Dict
23from typing import Final
24from typing import Iterable
25from typing import Iterator
26from typing import List
27from typing import Literal
28from typing import Mapping
29from typing import NoReturn
30from typing import Optional
31from typing import Sequence
32from typing import Tuple
33from typing import TYPE_CHECKING
34from typing import Union
35
36from .result import IteratorResult
37from .result import MergedResult
38from .result import Result
39from .result import ResultMetaData
40from .result import SimpleResultMetaData
41from .result import tuplegetter
42from .row import Row
43from .. import exc
44from .. import util
45from ..sql import elements
46from ..sql import sqltypes
47from ..sql import util as sql_util
48from ..sql.base import _generative
49from ..sql.compiler import ResultColumnsEntry
50from ..sql.compiler import RM_NAME
51from ..sql.compiler import RM_OBJECTS
52from ..sql.compiler import RM_RENDERED_NAME
53from ..sql.compiler import RM_TYPE
54from ..sql.type_api import TypeEngine
55from ..util.typing import Self
56from ..util.typing import TupleAny
57from ..util.typing import TypeVarTuple
58from ..util.typing import Unpack
59
60
61if typing.TYPE_CHECKING:
62 from .base import Connection
63 from .default import DefaultExecutionContext
64 from .interfaces import _DBAPICursorDescription
65 from .interfaces import _MutableCoreSingleExecuteParams
66 from .interfaces import CoreExecuteOptionsParameter
67 from .interfaces import DBAPICursor
68 from .interfaces import DBAPIType
69 from .interfaces import Dialect
70 from .interfaces import ExecutionContext
71 from .result import _KeyIndexType
72 from .result import _KeyMapRecType
73 from .result import _KeyMapType
74 from .result import _KeyType
75 from .result import _ProcessorsType
76 from .result import _TupleGetterType
77 from ..sql.schema import Column
78 from ..sql.type_api import _ResultProcessorType
79
80
81_Ts = TypeVarTuple("_Ts")
82
83
84# metadata entry tuple indexes.
85# using raw tuple is faster than namedtuple.
86# these match up to the positions in
87# _CursorKeyMapRecType
88MD_INDEX: Final[Literal[0]] = 0
89"""integer index in cursor.description
90
91"""
92
93MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1
94"""integer index in compiled._result_columns"""
95
96MD_OBJECTS: Final[Literal[2]] = 2
97"""other string keys and ColumnElement obj that can match.
98
99This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
100
101"""
102
103MD_LOOKUP_KEY: Final[Literal[3]] = 3
104"""string key we usually expect for key-based lookup
105
106this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
107"""
108
109
110MD_RENDERED_NAME: Final[Literal[4]] = 4
111"""name that is usually in cursor.description
112
113this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
114"""
115
116
117MD_PROCESSOR: Final[Literal[5]] = 5
118"""callable to process a result value into a row"""
119
120MD_UNTRANSLATED: Final[Literal[6]] = 6
121"""raw name from cursor.description"""
122
123
124_CursorKeyMapRecType = Tuple[
125 Optional[int], # MD_INDEX, None means the record is ambiguously named
126 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None
127 TupleAny, # MD_OBJECTS
128 str, # MD_LOOKUP_KEY
129 str, # MD_RENDERED_NAME
130 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
131 Optional[str], # MD_UNTRANSLATED
132]
133
134_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
135
136# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
137# not None
138_NonAmbigCursorKeyMapRecType = Tuple[
139 int,
140 int,
141 List[Any],
142 str,
143 str,
144 Optional["_ResultProcessorType[Any]"],
145 str,
146]
147
148_MergeColTuple = Tuple[
149 int,
150 Optional[int],
151 str,
152 TypeEngine[Any],
153 "DBAPIType",
154 Optional[TupleAny],
155 Optional[str],
156]
157
158
159class CursorResultMetaData(ResultMetaData):
160 """Result metadata for DBAPI cursors."""
161
162 __slots__ = (
163 "_keymap",
164 "_processors",
165 "_keys",
166 "_keymap_by_result_column_idx",
167 "_tuplefilter",
168 "_translated_indexes",
169 "_safe_for_cache",
170 "_unpickled",
171 "_key_to_index",
172 # don't need _unique_filters support here for now. Can be added
173 # if a need arises.
174 )
175
176 _keymap: _CursorKeyMapType
177 _processors: _ProcessorsType
178 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
179 _unpickled: bool
180 _safe_for_cache: bool
181 _translated_indexes: Optional[List[int]]
182
183 returns_rows: ClassVar[bool] = True
184
185 def _has_key(self, key: Any) -> bool:
186 return key in self._keymap
187
188 def _for_freeze(self) -> ResultMetaData:
189 return SimpleResultMetaData(
190 self._keys,
191 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
192 )
193
194 def _make_new_metadata(
195 self,
196 *,
197 unpickled: bool,
198 processors: _ProcessorsType,
199 keys: Sequence[str],
200 keymap: _KeyMapType,
201 tuplefilter: Optional[_TupleGetterType],
202 translated_indexes: Optional[List[int]],
203 safe_for_cache: bool,
204 keymap_by_result_column_idx: Any,
205 ) -> Self:
206 new_obj = self.__class__.__new__(self.__class__)
207 new_obj._unpickled = unpickled
208 new_obj._processors = processors
209 new_obj._keys = keys
210 new_obj._keymap = keymap
211 new_obj._tuplefilter = tuplefilter
212 new_obj._translated_indexes = translated_indexes
213 new_obj._safe_for_cache = safe_for_cache
214 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
215 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
216 return new_obj
217
218 def _remove_processors_and_tuple_filter(self) -> Self:
219 if self._tuplefilter:
220 proc = self._tuplefilter(self._processors)
221 else:
222 proc = self._processors
223 return self._make_new_metadata(
224 unpickled=self._unpickled,
225 processors=[None] * len(proc),
226 tuplefilter=None,
227 translated_indexes=None,
228 keymap={
229 key: value[0:5] + (None,) + value[6:]
230 for key, value in self._keymap.items()
231 },
232 keys=self._keys,
233 safe_for_cache=self._safe_for_cache,
234 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
235 )
236
237 def _splice_horizontally(self, other: CursorResultMetaData) -> Self:
238 keymap = dict(self._keymap)
239 offset = len(self._keys)
240
241 for key, value in other._keymap.items():
242 # int index should be None for ambiguous key
243 if value[MD_INDEX] is not None and key not in keymap:
244 md_index = value[MD_INDEX] + offset
245 md_object = value[MD_RESULT_MAP_INDEX] + offset
246 else:
247 md_index = None
248 md_object = -1
249 keymap[key] = (md_index, md_object, *value[2:])
250
251 self_tf = self._tuplefilter
252 other_tf = other._tuplefilter
253
254 proc: List[Any] = []
255 for pp, tf in [
256 (self._processors, self_tf),
257 (other._processors, other_tf),
258 ]:
259 proc.extend(pp if tf is None else tf(pp))
260
261 new_keys = [*self._keys, *other._keys]
262 assert len(proc) == len(new_keys)
263
264 return self._make_new_metadata(
265 unpickled=self._unpickled,
266 processors=proc,
267 tuplefilter=None,
268 translated_indexes=None,
269 keys=new_keys,
270 keymap=keymap,
271 safe_for_cache=self._safe_for_cache,
272 keymap_by_result_column_idx={
273 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
274 for metadata_entry in keymap.values()
275 },
276 )
277
278 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self:
279 recs = list(self._metadata_for_keys(keys))
280
281 indexes = [rec[MD_INDEX] for rec in recs]
282 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
283
284 if self._translated_indexes:
285 indexes = [self._translated_indexes[idx] for idx in indexes]
286 tup = tuplegetter(*indexes)
287 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
288
289 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
290 # TODO: need unit test for:
291 # result = connection.execute("raw sql, no columns").scalars()
292 # without the "or ()" it's failing because MD_OBJECTS is None
293 keymap.update(
294 (e, new_rec)
295 for new_rec in new_recs
296 for e in new_rec[MD_OBJECTS] or ()
297 )
298
299 return self._make_new_metadata(
300 unpickled=self._unpickled,
301 processors=self._processors,
302 keys=new_keys,
303 tuplefilter=tup,
304 translated_indexes=indexes,
305 keymap=keymap, # type: ignore[arg-type]
306 safe_for_cache=self._safe_for_cache,
307 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
308 )
309
310 def _adapt_to_context(self, context: ExecutionContext) -> Self:
311 """When using a cached Compiled construct that has a _result_map,
312 for a new statement that used the cached Compiled, we need to ensure
313 the keymap has the Column objects from our new statement as keys.
314 So here we rewrite keymap with new entries for the new columns
315 as matched to those of the cached statement.
316
317 """
318
319 if not context.compiled or not context.compiled._result_columns:
320 return self
321
322 compiled_statement = context.compiled.statement
323 invoked_statement = context.invoked_statement
324
325 if TYPE_CHECKING:
326 assert isinstance(invoked_statement, elements.ClauseElement)
327
328 if compiled_statement is invoked_statement:
329 return self
330
331 assert invoked_statement is not None
332
333 # this is the most common path for Core statements when
334 # caching is used. In ORM use, this codepath is not really used
335 # as the _result_disable_adapt_to_context execution option is
336 # set by the ORM.
337
338 # make a copy and add the columns from the invoked statement
339 # to the result map.
340
341 keymap_by_position = self._keymap_by_result_column_idx
342
343 if keymap_by_position is None:
344 # first retrival from cache, this map will not be set up yet,
345 # initialize lazily
346 keymap_by_position = self._keymap_by_result_column_idx = {
347 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
348 for metadata_entry in self._keymap.values()
349 }
350
351 return self._make_new_metadata(
352 keymap=self._keymap
353 | {
354 new: keymap_by_position[idx]
355 for idx, new in enumerate(
356 invoked_statement._all_selected_columns
357 )
358 if idx in keymap_by_position
359 },
360 unpickled=self._unpickled,
361 processors=self._processors,
362 tuplefilter=self._tuplefilter,
363 translated_indexes=None,
364 keys=self._keys,
365 safe_for_cache=self._safe_for_cache,
366 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
367 )
368
369 def __init__(
370 self,
371 parent: CursorResult[Unpack[TupleAny]],
372 cursor_description: _DBAPICursorDescription,
373 *,
374 driver_column_names: bool = False,
375 num_sentinel_cols: int = 0,
376 ):
377 context = parent.context
378 if num_sentinel_cols > 0:
379 # this is slightly faster than letting tuplegetter use the indexes
380 self._tuplefilter = tuplefilter = operator.itemgetter(
381 slice(-num_sentinel_cols)
382 )
383 cursor_description = tuplefilter(cursor_description)
384 else:
385 self._tuplefilter = tuplefilter = None
386 self._translated_indexes = None
387 self._safe_for_cache = self._unpickled = False
388
389 if context.result_column_struct:
390 (
391 result_columns,
392 cols_are_ordered,
393 textual_ordered,
394 ad_hoc_textual,
395 loose_column_name_matching,
396 ) = context.result_column_struct
397 if tuplefilter is not None:
398 result_columns = tuplefilter(result_columns)
399 num_ctx_cols = len(result_columns)
400 else:
401 result_columns = cols_are_ordered = ( # type: ignore
402 num_ctx_cols
403 ) = ad_hoc_textual = loose_column_name_matching = (
404 textual_ordered
405 ) = False
406
407 # merge cursor.description with the column info
408 # present in the compiled structure, if any
409 raw = self._merge_cursor_description(
410 context,
411 cursor_description,
412 result_columns,
413 num_ctx_cols,
414 cols_are_ordered,
415 textual_ordered,
416 ad_hoc_textual,
417 loose_column_name_matching,
418 driver_column_names,
419 )
420
421 # processors in key order which are used when building up
422 # a row
423 self._processors = [
424 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
425 ]
426 if num_sentinel_cols > 0:
427 # add the number of sentinel columns since these are passed
428 # to the tuplefilters before being used
429 self._processors.extend([None] * num_sentinel_cols)
430
431 # this is used when using this ResultMetaData in a Core-only cache
432 # retrieval context. it's initialized on first cache retrieval
433 # when the _result_disable_adapt_to_context execution option
434 # (which the ORM generally sets) is not set.
435 self._keymap_by_result_column_idx = None
436
437 # for compiled SQL constructs, copy additional lookup keys into
438 # the key lookup map, such as Column objects, labels,
439 # column keys and other names
440 if num_ctx_cols:
441 # keymap by primary string...
442 by_key: Dict[_KeyType, _CursorKeyMapRecType] = {
443 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
444 for metadata_entry in raw
445 }
446
447 if len(by_key) != num_ctx_cols:
448 # if by-primary-string dictionary smaller than
449 # number of columns, assume we have dupes; (this check
450 # is also in place if string dictionary is bigger, as
451 # can occur when '*' was used as one of the compiled columns,
452 # which may or may not be suggestive of dupes), rewrite
453 # dupe records with "None" for index which results in
454 # ambiguous column exception when accessed.
455 #
456 # this is considered to be the less common case as it is not
457 # common to have dupe column keys in a SELECT statement.
458 #
459 # new in 1.4: get the complete set of all possible keys,
460 # strings, objects, whatever, that are dupes across two
461 # different records, first.
462 index_by_key: Dict[Any, Any] = {}
463 dupes = set()
464 for metadata_entry in raw:
465 for key in (metadata_entry[MD_RENDERED_NAME],) + (
466 metadata_entry[MD_OBJECTS] or ()
467 ):
468 idx = metadata_entry[MD_INDEX]
469 # if this key has been associated with more than one
470 # positional index, it's a dupe
471 if index_by_key.setdefault(key, idx) != idx:
472 dupes.add(key)
473
474 # then put everything we have into the keymap excluding only
475 # those keys that are dupes.
476 self._keymap = {
477 obj_elem: metadata_entry
478 for metadata_entry in raw
479 if metadata_entry[MD_OBJECTS]
480 for obj_elem in metadata_entry[MD_OBJECTS]
481 if obj_elem not in dupes
482 }
483
484 # then for the dupe keys, put the "ambiguous column"
485 # record into by_key.
486 by_key.update(
487 {
488 key: (None, -1, (), key, key, None, None)
489 for key in dupes
490 }
491 )
492
493 else:
494 # no dupes - copy secondary elements from compiled
495 # columns into self._keymap. this is the most common
496 # codepath for Core / ORM statement executions before the
497 # result metadata is cached
498 self._keymap = {
499 obj_elem: metadata_entry
500 for metadata_entry in raw
501 if metadata_entry[MD_OBJECTS]
502 for obj_elem in metadata_entry[MD_OBJECTS]
503 }
504 # update keymap with primary string names taking
505 # precedence
506 self._keymap.update(by_key)
507 else:
508 # no compiled objects to map, just create keymap by primary string
509 self._keymap = {
510 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
511 for metadata_entry in raw
512 }
513
514 # update keymap with "translated" names.
515 # the "translated" name thing has a long history:
516 # 1. originally, it was used to fix an issue in very old SQLite
517 # versions prior to 3.10.0. This code is still there in the
518 # sqlite dialect.
519 # 2. Next, the pyhive third party dialect started using this hook
520 # for some driver related issue on their end.
521 # 3. Most recently, the "driver_column_names" execution option has
522 # taken advantage of this hook to get raw DBAPI col names in the
523 # result keys without disrupting the usual merge process.
524
525 if driver_column_names or (
526 not num_ctx_cols and context._translate_colname
527 ):
528 self._keymap.update(
529 {
530 metadata_entry[MD_UNTRANSLATED]: self._keymap[
531 metadata_entry[MD_LOOKUP_KEY]
532 ]
533 for metadata_entry in raw
534 if metadata_entry[MD_UNTRANSLATED]
535 }
536 )
537
538 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
539
540 def _merge_cursor_description(
541 self,
542 context: DefaultExecutionContext,
543 cursor_description: _DBAPICursorDescription,
544 result_columns: Sequence[ResultColumnsEntry],
545 num_ctx_cols: int,
546 cols_are_ordered: bool,
547 textual_ordered: bool,
548 ad_hoc_textual: bool,
549 loose_column_name_matching: bool,
550 driver_column_names: bool,
551 ) -> List[_CursorKeyMapRecType]:
552 """Merge a cursor.description with compiled result column information.
553
554 There are at least four separate strategies used here, selected
555 depending on the type of SQL construct used to start with.
556
557 The most common case is that of the compiled SQL expression construct,
558 which generated the column names present in the raw SQL string and
559 which has the identical number of columns as were reported by
560 cursor.description. In this case, we assume a 1-1 positional mapping
561 between the entries in cursor.description and the compiled object.
562 This is also the most performant case as we disregard extracting /
563 decoding the column names present in cursor.description since we
564 already have the desired name we generated in the compiled SQL
565 construct.
566
567 The next common case is that of the completely raw string SQL,
568 such as passed to connection.execute(). In this case we have no
569 compiled construct to work with, so we extract and decode the
570 names from cursor.description and index those as the primary
571 result row target keys.
572
573 The remaining fairly common case is that of the textual SQL
574 that includes at least partial column information; this is when
575 we use a :class:`_expression.TextualSelect` construct.
576 This construct may have
577 unordered or ordered column information. In the ordered case, we
578 merge the cursor.description and the compiled construct's information
579 positionally, and warn if there are additional description names
580 present, however we still decode the names in cursor.description
581 as we don't have a guarantee that the names in the columns match
582 on these. In the unordered case, we match names in cursor.description
583 to that of the compiled construct based on name matching.
584 In both of these cases, the cursor.description names and the column
585 expression objects and names are indexed as result row target keys.
586
587 The final case is much less common, where we have a compiled
588 non-textual SQL expression construct, but the number of columns
589 in cursor.description doesn't match what's in the compiled
590 construct. We make the guess here that there might be textual
591 column expressions in the compiled construct that themselves include
592 a comma in them causing them to split. We do the same name-matching
593 as with textual non-ordered columns.
594
595 The name-matched system of merging is the same as that used by
596 SQLAlchemy for all cases up through the 0.9 series. Positional
597 matching for compiled SQL expressions was introduced in 1.0 as a
598 major performance feature, and positional matching for textual
599 :class:`_expression.TextualSelect` objects in 1.1.
600 As name matching is no longer
601 a common case, it was acceptable to factor it into smaller generator-
602 oriented methods that are easier to understand, but incur slightly
603 more performance overhead.
604
605 """
606
607 if (
608 num_ctx_cols
609 and cols_are_ordered
610 and not textual_ordered
611 and num_ctx_cols == len(cursor_description)
612 and not driver_column_names
613 ):
614 self._keys = [elem[0] for elem in result_columns]
615 # pure positional 1-1 case; doesn't need to read
616 # the names from cursor.description
617
618 # most common case for Core and ORM
619
620 # this metadata is safe to
621 # cache because we are guaranteed
622 # to have the columns in the same order for new executions
623 self._safe_for_cache = True
624
625 return [
626 (
627 idx,
628 idx,
629 rmap_entry[RM_OBJECTS],
630 rmap_entry[RM_NAME],
631 rmap_entry[RM_RENDERED_NAME],
632 context.get_result_processor(
633 rmap_entry[RM_TYPE],
634 rmap_entry[RM_RENDERED_NAME],
635 cursor_description[idx][1],
636 ),
637 None,
638 )
639 for idx, rmap_entry in enumerate(result_columns)
640 ]
641 else:
642 # name-based or text-positional cases, where we need
643 # to read cursor.description names
644
645 if textual_ordered or (
646 ad_hoc_textual and len(cursor_description) == num_ctx_cols
647 ):
648 self._safe_for_cache = not driver_column_names
649 # textual positional case
650 raw_iterator = self._merge_textual_cols_by_position(
651 context,
652 cursor_description,
653 result_columns,
654 driver_column_names,
655 )
656 elif num_ctx_cols:
657 # compiled SQL with a mismatch of description cols
658 # vs. compiled cols, or textual w/ unordered columns
659 # the order of columns can change if the query is
660 # against a "select *", so not safe to cache
661 self._safe_for_cache = False
662 raw_iterator = self._merge_cols_by_name(
663 context,
664 cursor_description,
665 result_columns,
666 loose_column_name_matching,
667 driver_column_names,
668 )
669 else:
670 # no compiled SQL, just a raw string, order of columns
671 # can change for "select *"
672 self._safe_for_cache = False
673 raw_iterator = self._merge_cols_by_none(
674 context, cursor_description, driver_column_names
675 )
676
677 return [
678 (
679 idx,
680 ridx,
681 obj,
682 cursor_colname,
683 cursor_colname,
684 context.get_result_processor(
685 mapped_type, cursor_colname, coltype
686 ),
687 untranslated,
688 ) # type: ignore[misc]
689 for (
690 idx,
691 ridx,
692 cursor_colname,
693 mapped_type,
694 coltype,
695 obj,
696 untranslated,
697 ) in raw_iterator
698 ]
699
700 def _colnames_from_description(
701 self,
702 context: DefaultExecutionContext,
703 cursor_description: _DBAPICursorDescription,
704 driver_column_names: bool,
705 ) -> Iterator[Tuple[int, str, str, Optional[str], DBAPIType]]:
706 """Extract column names and data types from a cursor.description.
707
708 Applies unicode decoding, column translation, "normalization",
709 and case sensitivity rules to the names based on the dialect.
710
711 """
712 dialect = context.dialect
713 translate_colname = context._translate_colname
714 normalize_name = (
715 dialect.normalize_name if dialect.requires_name_normalize else None
716 )
717
718 untranslated = None
719
720 for idx, rec in enumerate(cursor_description):
721 colname = unnormalized = rec[0]
722 coltype = rec[1]
723
724 if translate_colname:
725 # a None here for "untranslated" means "the dialect did not
726 # change the column name and the untranslated case can be
727 # ignored". otherwise "untranslated" is expected to be the
728 # original, unchanged colname (e.g. is == to "unnormalized")
729 colname, untranslated = translate_colname(colname)
730
731 assert untranslated is None or untranslated == unnormalized
732
733 if normalize_name:
734 colname = normalize_name(colname)
735
736 if driver_column_names:
737 yield idx, colname, unnormalized, unnormalized, coltype
738
739 else:
740 yield idx, colname, unnormalized, untranslated, coltype
741
742 def _merge_textual_cols_by_position(
743 self,
744 context: DefaultExecutionContext,
745 cursor_description: _DBAPICursorDescription,
746 result_columns: Sequence[ResultColumnsEntry],
747 driver_column_names: bool,
748 ) -> Iterator[_MergeColTuple]:
749 num_ctx_cols = len(result_columns)
750
751 if num_ctx_cols > len(cursor_description):
752 util.warn(
753 "Number of columns in textual SQL (%d) is "
754 "smaller than number of columns requested (%d)"
755 % (num_ctx_cols, len(cursor_description))
756 )
757 seen = set()
758
759 self._keys = []
760
761 uses_denormalize = context.dialect.requires_name_normalize
762 for (
763 idx,
764 colname,
765 unnormalized,
766 untranslated,
767 coltype,
768 ) in self._colnames_from_description(
769 context, cursor_description, driver_column_names
770 ):
771 if idx < num_ctx_cols:
772 ctx_rec = result_columns[idx]
773 obj = ctx_rec[RM_OBJECTS]
774 ridx = idx
775 mapped_type = ctx_rec[RM_TYPE]
776 if obj[0] in seen:
777 raise exc.InvalidRequestError(
778 "Duplicate column expression requested "
779 "in textual SQL: %r" % obj[0]
780 )
781 seen.add(obj[0])
782
783 # special check for all uppercase unnormalized name;
784 # use the unnormalized name as the key.
785 # see #10788
786 # if these names don't match, then we still honor the
787 # cursor.description name as the key and not what the
788 # Column has, see
789 # test_resultset.py::PositionalTextTest::test_via_column
790 if (
791 uses_denormalize
792 and unnormalized == ctx_rec[RM_RENDERED_NAME]
793 ):
794 result_name = unnormalized
795 else:
796 result_name = colname
797 else:
798 mapped_type = sqltypes.NULLTYPE
799 obj = None
800 ridx = None
801
802 result_name = colname
803
804 if driver_column_names:
805 assert untranslated is not None
806 self._keys.append(untranslated)
807 else:
808 self._keys.append(result_name)
809
810 yield (
811 idx,
812 ridx,
813 result_name,
814 mapped_type,
815 coltype,
816 obj,
817 untranslated,
818 )
819
820 def _merge_cols_by_name(
821 self,
822 context: DefaultExecutionContext,
823 cursor_description: _DBAPICursorDescription,
824 result_columns: Sequence[ResultColumnsEntry],
825 loose_column_name_matching: bool,
826 driver_column_names: bool,
827 ) -> Iterator[_MergeColTuple]:
828 match_map = self._create_description_match_map(
829 result_columns, loose_column_name_matching
830 )
831 mapped_type: TypeEngine[Any]
832
833 self._keys = []
834
835 for (
836 idx,
837 colname,
838 unnormalized,
839 untranslated,
840 coltype,
841 ) in self._colnames_from_description(
842 context, cursor_description, driver_column_names
843 ):
844 try:
845 ctx_rec = match_map[colname]
846 except KeyError:
847 mapped_type = sqltypes.NULLTYPE
848 obj = None
849 result_columns_idx = None
850 else:
851 obj = ctx_rec[1]
852 mapped_type = ctx_rec[2]
853 result_columns_idx = ctx_rec[3]
854
855 if driver_column_names:
856 assert untranslated is not None
857 self._keys.append(untranslated)
858 else:
859 self._keys.append(colname)
860 yield (
861 idx,
862 result_columns_idx,
863 colname,
864 mapped_type,
865 coltype,
866 obj,
867 untranslated,
868 )
869
870 @classmethod
871 def _create_description_match_map(
872 cls,
873 result_columns: Sequence[ResultColumnsEntry],
874 loose_column_name_matching: bool = False,
875 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]:
876 """when matching cursor.description to a set of names that are present
877 in a Compiled object, as is the case with TextualSelect, get all the
878 names we expect might match those in cursor.description.
879 """
880
881 d: Dict[
882 Union[str, object],
883 Tuple[str, TupleAny, TypeEngine[Any], int],
884 ] = {}
885 for ridx, elem in enumerate(result_columns):
886 key = elem[RM_RENDERED_NAME]
887
888 if key in d:
889 # conflicting keyname - just add the column-linked objects
890 # to the existing record. if there is a duplicate column
891 # name in the cursor description, this will allow all of those
892 # objects to raise an ambiguous column error
893 e_name, e_obj, e_type, e_ridx = d[key]
894 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
895 else:
896 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
897
898 if loose_column_name_matching:
899 # when using a textual statement with an unordered set
900 # of columns that line up, we are expecting the user
901 # to be using label names in the SQL that match to the column
902 # expressions. Enable more liberal matching for this case;
903 # duplicate keys that are ambiguous will be fixed later.
904 for r_key in elem[RM_OBJECTS]:
905 d.setdefault(
906 r_key,
907 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
908 )
909 return d
910
911 def _merge_cols_by_none(
912 self,
913 context: DefaultExecutionContext,
914 cursor_description: _DBAPICursorDescription,
915 driver_column_names: bool,
916 ) -> Iterator[_MergeColTuple]:
917 self._keys = []
918
919 for (
920 idx,
921 colname,
922 unnormalized,
923 untranslated,
924 coltype,
925 ) in self._colnames_from_description(
926 context, cursor_description, driver_column_names
927 ):
928
929 if driver_column_names:
930 assert untranslated is not None
931 self._keys.append(untranslated)
932 else:
933 self._keys.append(colname)
934
935 yield (
936 idx,
937 None,
938 colname,
939 sqltypes.NULLTYPE,
940 coltype,
941 None,
942 untranslated,
943 )
944
945 if not TYPE_CHECKING:
946
947 def _key_fallback(
948 self, key: Any, err: Optional[Exception], raiseerr: bool = True
949 ) -> Optional[NoReturn]:
950 if raiseerr:
951 if self._unpickled and isinstance(key, elements.ColumnElement):
952 raise exc.NoSuchColumnError(
953 "Row was unpickled; lookup by ColumnElement "
954 "is unsupported"
955 ) from err
956 else:
957 raise exc.NoSuchColumnError(
958 "Could not locate column in row for column '%s'"
959 % util.string_or_unprintable(key)
960 ) from err
961 else:
962 return None
963
964 def _raise_for_ambiguous_column_name(
965 self, rec: _KeyMapRecType
966 ) -> NoReturn:
967 raise exc.InvalidRequestError(
968 "Ambiguous column name '%s' in "
969 "result set column descriptions" % rec[MD_LOOKUP_KEY]
970 )
971
972 def _index_for_key(
973 self, key: _KeyIndexType, raiseerr: bool = True
974 ) -> Optional[int]:
975 # TODO: can consider pre-loading ints and negative ints
976 # into _keymap - also no coverage here
977 if isinstance(key, int):
978 key = self._keys[key]
979
980 try:
981 rec = self._keymap[key]
982 except KeyError as ke:
983 x = self._key_fallback(key, ke, raiseerr)
984 assert x is None
985 return None
986
987 index = rec[0]
988
989 if index is None:
990 self._raise_for_ambiguous_column_name(rec)
991 return index
992
993 def _indexes_for_keys(
994 self, keys: Sequence[_KeyIndexType]
995 ) -> Sequence[int]:
996 try:
997 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501
998 except KeyError as ke:
999 # ensure it raises
1000 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
1001
1002 def _metadata_for_keys(
1003 self, keys: Sequence[_KeyIndexType]
1004 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
1005 for key in keys:
1006 if isinstance(key, int):
1007 key = self._keys[key]
1008
1009 try:
1010 rec = self._keymap[key]
1011 except KeyError as ke:
1012 # ensure it raises
1013 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
1014
1015 index = rec[MD_INDEX]
1016
1017 if index is None:
1018 self._raise_for_ambiguous_column_name(rec)
1019
1020 yield cast(_NonAmbigCursorKeyMapRecType, rec)
1021
1022 def __getstate__(self) -> Dict[str, Any]:
1023 # TODO: consider serializing this as SimpleResultMetaData
1024 return {
1025 "_keymap": {
1026 key: (
1027 rec[MD_INDEX],
1028 rec[MD_RESULT_MAP_INDEX],
1029 [],
1030 key,
1031 rec[MD_RENDERED_NAME],
1032 None,
1033 None,
1034 )
1035 for key, rec in self._keymap.items()
1036 if isinstance(key, (str, int))
1037 },
1038 "_keys": self._keys,
1039 "_translated_indexes": self._translated_indexes,
1040 }
1041
1042 def __setstate__(self, state: Dict[str, Any]) -> None:
1043 self._processors = [None for _ in range(len(state["_keys"]))]
1044 self._keymap = state["_keymap"]
1045 self._keymap_by_result_column_idx = None
1046 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
1047 self._keys = state["_keys"]
1048 self._unpickled = True
1049 if state["_translated_indexes"]:
1050 translated_indexes: List[Any]
1051 self._translated_indexes = translated_indexes = state[
1052 "_translated_indexes"
1053 ]
1054 self._tuplefilter = tuplegetter(*translated_indexes)
1055 else:
1056 self._translated_indexes = self._tuplefilter = None
1057
1058
1059class ResultFetchStrategy:
1060 """Define a fetching strategy for a result object.
1061
1062
1063 .. versionadded:: 1.4
1064
1065 """
1066
1067 __slots__ = ()
1068
1069 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
1070
1071 def soft_close(
1072 self,
1073 result: CursorResult[Unpack[TupleAny]],
1074 dbapi_cursor: Optional[DBAPICursor],
1075 ) -> None:
1076 raise NotImplementedError()
1077
1078 def hard_close(
1079 self,
1080 result: CursorResult[Unpack[TupleAny]],
1081 dbapi_cursor: Optional[DBAPICursor],
1082 ) -> None:
1083 raise NotImplementedError()
1084
1085 def yield_per(
1086 self,
1087 result: CursorResult[Unpack[TupleAny]],
1088 dbapi_cursor: DBAPICursor,
1089 num: int,
1090 ) -> None:
1091 return
1092
1093 def fetchone(
1094 self,
1095 result: CursorResult[Unpack[TupleAny]],
1096 dbapi_cursor: DBAPICursor,
1097 hard_close: bool = False,
1098 ) -> Any:
1099 raise NotImplementedError()
1100
1101 def fetchmany(
1102 self,
1103 result: CursorResult[Unpack[TupleAny]],
1104 dbapi_cursor: DBAPICursor,
1105 size: Optional[int] = None,
1106 ) -> Any:
1107 raise NotImplementedError()
1108
1109 def fetchall(
1110 self,
1111 result: CursorResult[Unpack[TupleAny]],
1112 dbapi_cursor: DBAPICursor,
1113 ) -> Any:
1114 raise NotImplementedError()
1115
1116 def handle_exception(
1117 self,
1118 result: CursorResult[Unpack[TupleAny]],
1119 dbapi_cursor: Optional[DBAPICursor],
1120 err: BaseException,
1121 ) -> NoReturn:
1122 raise err
1123
1124
1125class NoCursorFetchStrategy(ResultFetchStrategy):
1126 """Cursor strategy for a result that has no open cursor.
1127
1128 There are two varieties of this strategy, one for DQL and one for
1129 DML (and also DDL), each of which represent a result that had a cursor
1130 but no longer has one.
1131
1132 """
1133
1134 __slots__ = ()
1135
1136 def soft_close(
1137 self,
1138 result: CursorResult[Unpack[TupleAny]],
1139 dbapi_cursor: Optional[DBAPICursor],
1140 ) -> None:
1141 pass
1142
1143 def hard_close(
1144 self,
1145 result: CursorResult[Unpack[TupleAny]],
1146 dbapi_cursor: Optional[DBAPICursor],
1147 ) -> None:
1148 pass
1149
1150 def fetchone(
1151 self,
1152 result: CursorResult[Unpack[TupleAny]],
1153 dbapi_cursor: DBAPICursor,
1154 hard_close: bool = False,
1155 ) -> Any:
1156 return self._non_result(result, None)
1157
1158 def fetchmany(
1159 self,
1160 result: CursorResult[Unpack[TupleAny]],
1161 dbapi_cursor: DBAPICursor,
1162 size: Optional[int] = None,
1163 ) -> Any:
1164 return self._non_result(result, [])
1165
1166 def fetchall(
1167 self, result: CursorResult[Unpack[TupleAny]], dbapi_cursor: DBAPICursor
1168 ) -> Any:
1169 return self._non_result(result, [])
1170
1171 def _non_result(
1172 self,
1173 result: CursorResult[Unpack[TupleAny]],
1174 default: Any,
1175 err: Optional[BaseException] = None,
1176 ) -> Any:
1177 raise NotImplementedError()
1178
1179
1180class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1181 """Cursor strategy for a DQL result that has no open cursor.
1182
1183 This is a result set that can return rows, i.e. for a SELECT, or for an
1184 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1185 where the cursor is closed and no rows remain available. The owning result
1186 object may or may not be "hard closed", which determines if the fetch
1187 methods send empty results or raise for closed result.
1188
1189 """
1190
1191 __slots__ = ()
1192
1193 def _non_result(
1194 self,
1195 result: CursorResult[Unpack[TupleAny]],
1196 default: Any,
1197 err: Optional[BaseException] = None,
1198 ) -> Any:
1199 if result.closed:
1200 raise exc.ResourceClosedError(
1201 "This result object is closed."
1202 ) from err
1203 else:
1204 return default
1205
1206
1207_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1208
1209
1210class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1211 """Cursor strategy for a DML result that has no open cursor.
1212
1213 This is a result set that does not return rows, i.e. for an INSERT,
1214 UPDATE, DELETE that does not include RETURNING.
1215
1216 """
1217
1218 __slots__ = ()
1219
1220 def _non_result(
1221 self,
1222 result: CursorResult[Unpack[TupleAny]],
1223 default: Any,
1224 err: Optional[BaseException] = None,
1225 ) -> Any:
1226 # we only expect to have a _NoResultMetaData() here right now.
1227 assert not result._metadata.returns_rows
1228 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr]
1229
1230
1231_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1232
1233
1234class CursorFetchStrategy(ResultFetchStrategy):
1235 """Call fetch methods from a DBAPI cursor.
1236
1237 Alternate versions of this class may instead buffer the rows from
1238 cursors or not use cursors at all.
1239
1240 """
1241
1242 __slots__ = ()
1243
1244 def soft_close(
1245 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1246 ) -> None:
1247 result.cursor_strategy = _NO_CURSOR_DQL
1248
1249 def hard_close(
1250 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1251 ) -> None:
1252 result.cursor_strategy = _NO_CURSOR_DQL
1253
1254 def handle_exception(
1255 self,
1256 result: CursorResult[Any],
1257 dbapi_cursor: Optional[DBAPICursor],
1258 err: BaseException,
1259 ) -> NoReturn:
1260 result.connection._handle_dbapi_exception(
1261 err, None, None, dbapi_cursor, result.context
1262 )
1263
1264 def yield_per(
1265 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1266 ) -> None:
1267 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1268 dbapi_cursor,
1269 {"max_row_buffer": num},
1270 initial_buffer=collections.deque(),
1271 growth_factor=0,
1272 )
1273
1274 def fetchone(
1275 self,
1276 result: CursorResult[Any],
1277 dbapi_cursor: DBAPICursor,
1278 hard_close: bool = False,
1279 ) -> Any:
1280 try:
1281 row = dbapi_cursor.fetchone()
1282 if row is None:
1283 result._soft_close(hard=hard_close)
1284 return row
1285 except BaseException as e:
1286 self.handle_exception(result, dbapi_cursor, e)
1287
1288 def fetchmany(
1289 self,
1290 result: CursorResult[Any],
1291 dbapi_cursor: DBAPICursor,
1292 size: Optional[int] = None,
1293 ) -> Any:
1294 try:
1295 if size is None:
1296 l = dbapi_cursor.fetchmany()
1297 else:
1298 l = dbapi_cursor.fetchmany(size)
1299
1300 if not l:
1301 result._soft_close()
1302 return l
1303 except BaseException as e:
1304 self.handle_exception(result, dbapi_cursor, e)
1305
1306 def fetchall(
1307 self,
1308 result: CursorResult[Any],
1309 dbapi_cursor: DBAPICursor,
1310 ) -> Any:
1311 try:
1312 rows = dbapi_cursor.fetchall()
1313 result._soft_close()
1314 return rows
1315 except BaseException as e:
1316 self.handle_exception(result, dbapi_cursor, e)
1317
1318
1319_DEFAULT_FETCH = CursorFetchStrategy()
1320
1321
1322class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1323 """A cursor fetch strategy with row buffering behavior.
1324
1325 This strategy buffers the contents of a selection of rows
1326 before ``fetchone()`` is called. This is to allow the results of
1327 ``cursor.description`` to be available immediately, when
1328 interfacing with a DB-API that requires rows to be consumed before
1329 this information is available (currently psycopg2, when used with
1330 server-side cursors).
1331
1332 The pre-fetching behavior fetches only one row initially, and then
1333 grows its buffer size by a fixed amount with each successive need
1334 for additional rows up the ``max_row_buffer`` size, which defaults
1335 to 1000::
1336
1337 with psycopg2_engine.connect() as conn:
1338
1339 result = conn.execution_options(
1340 stream_results=True, max_row_buffer=50
1341 ).execute(text("select * from table"))
1342
1343 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1344
1345 .. seealso::
1346
1347 :ref:`psycopg2_execution_options`
1348 """
1349
1350 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1351
1352 def __init__(
1353 self,
1354 dbapi_cursor: DBAPICursor,
1355 execution_options: CoreExecuteOptionsParameter,
1356 growth_factor: int = 5,
1357 initial_buffer: Optional[Deque[Any]] = None,
1358 ) -> None:
1359 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1360
1361 if initial_buffer is not None:
1362 self._rowbuffer = initial_buffer
1363 else:
1364 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1365 self._growth_factor = growth_factor
1366
1367 if growth_factor:
1368 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1369 else:
1370 self._bufsize = self._max_row_buffer
1371
1372 @classmethod
1373 def create(
1374 cls, result: CursorResult[Any]
1375 ) -> BufferedRowCursorFetchStrategy:
1376 return BufferedRowCursorFetchStrategy(
1377 result.cursor,
1378 result.context.execution_options,
1379 )
1380
1381 def _buffer_rows(
1382 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1383 ) -> None:
1384 """this is currently used only by fetchone()."""
1385
1386 size = self._bufsize
1387 try:
1388 if size < 1:
1389 new_rows = dbapi_cursor.fetchall()
1390 else:
1391 new_rows = dbapi_cursor.fetchmany(size)
1392 except BaseException as e:
1393 self.handle_exception(result, dbapi_cursor, e)
1394
1395 if not new_rows:
1396 return
1397 self._rowbuffer = collections.deque(new_rows)
1398 if self._growth_factor and size < self._max_row_buffer:
1399 self._bufsize = min(
1400 self._max_row_buffer, size * self._growth_factor
1401 )
1402
1403 def yield_per(
1404 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1405 ) -> None:
1406 self._growth_factor = 0
1407 self._max_row_buffer = self._bufsize = num
1408
1409 def soft_close(
1410 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1411 ) -> None:
1412 self._rowbuffer.clear()
1413 super().soft_close(result, dbapi_cursor)
1414
1415 def hard_close(
1416 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1417 ) -> None:
1418 self._rowbuffer.clear()
1419 super().hard_close(result, dbapi_cursor)
1420
1421 def fetchone(
1422 self,
1423 result: CursorResult[Any],
1424 dbapi_cursor: DBAPICursor,
1425 hard_close: bool = False,
1426 ) -> Any:
1427 if not self._rowbuffer:
1428 self._buffer_rows(result, dbapi_cursor)
1429 if not self._rowbuffer:
1430 try:
1431 result._soft_close(hard=hard_close)
1432 except BaseException as e:
1433 self.handle_exception(result, dbapi_cursor, e)
1434 return None
1435 return self._rowbuffer.popleft()
1436
1437 def fetchmany(
1438 self,
1439 result: CursorResult[Any],
1440 dbapi_cursor: DBAPICursor,
1441 size: Optional[int] = None,
1442 ) -> Any:
1443 if size is None:
1444 return self.fetchall(result, dbapi_cursor)
1445
1446 rb = self._rowbuffer
1447 lb = len(rb)
1448 close = False
1449 if size > lb:
1450 try:
1451 new = dbapi_cursor.fetchmany(size - lb)
1452 except BaseException as e:
1453 self.handle_exception(result, dbapi_cursor, e)
1454 else:
1455 if not new:
1456 # defer closing since it may clear the row buffer
1457 close = True
1458 else:
1459 rb.extend(new)
1460
1461 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1462 if close:
1463 result._soft_close()
1464 return res
1465
1466 def fetchall(
1467 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1468 ) -> Any:
1469 try:
1470 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1471 self._rowbuffer.clear()
1472 result._soft_close()
1473 return ret
1474 except BaseException as e:
1475 self.handle_exception(result, dbapi_cursor, e)
1476
1477
1478class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1479 """A cursor strategy that buffers rows fully upon creation.
1480
1481 Used for operations where a result is to be delivered
1482 after the database conversation can not be continued,
1483 such as MSSQL INSERT...OUTPUT after an autocommit.
1484
1485 """
1486
1487 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1488
1489 def __init__(
1490 self,
1491 dbapi_cursor: Optional[DBAPICursor],
1492 alternate_description: Optional[_DBAPICursorDescription] = None,
1493 initial_buffer: Optional[Iterable[Any]] = None,
1494 ):
1495 self.alternate_cursor_description = alternate_description
1496 if initial_buffer is not None:
1497 self._rowbuffer = collections.deque(initial_buffer)
1498 else:
1499 assert dbapi_cursor is not None
1500 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1501
1502 def yield_per(
1503 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1504 ) -> Any:
1505 pass
1506
1507 def soft_close(
1508 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1509 ) -> None:
1510 self._rowbuffer.clear()
1511 super().soft_close(result, dbapi_cursor)
1512
1513 def hard_close(
1514 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1515 ) -> None:
1516 self._rowbuffer.clear()
1517 super().hard_close(result, dbapi_cursor)
1518
1519 def fetchone(
1520 self,
1521 result: CursorResult[Any],
1522 dbapi_cursor: DBAPICursor,
1523 hard_close: bool = False,
1524 ) -> Any:
1525 if self._rowbuffer:
1526 return self._rowbuffer.popleft()
1527 else:
1528 result._soft_close(hard=hard_close)
1529 return None
1530
1531 def fetchmany(
1532 self,
1533 result: CursorResult[Any],
1534 dbapi_cursor: DBAPICursor,
1535 size: Optional[int] = None,
1536 ) -> Any:
1537 if size is None:
1538 return self.fetchall(result, dbapi_cursor)
1539
1540 rb = self._rowbuffer
1541 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1542 if not rows:
1543 result._soft_close()
1544 return rows
1545
1546 def fetchall(
1547 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1548 ) -> Any:
1549 ret = self._rowbuffer
1550 self._rowbuffer = collections.deque()
1551 result._soft_close()
1552 return ret
1553
1554
1555class _NoResultMetaData(ResultMetaData):
1556 __slots__ = ()
1557
1558 returns_rows = False
1559
1560 def _we_dont_return_rows(
1561 self, err: Optional[BaseException] = None
1562 ) -> NoReturn:
1563 raise exc.ResourceClosedError(
1564 "This result object does not return rows. "
1565 "It has been closed automatically."
1566 ) from err
1567
1568 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn:
1569 self._we_dont_return_rows()
1570
1571 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1572 self._we_dont_return_rows()
1573
1574 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1575 self._we_dont_return_rows()
1576
1577 @property
1578 def _keymap(self) -> NoReturn: # type: ignore[override]
1579 self._we_dont_return_rows()
1580
1581 @property
1582 def _key_to_index(self) -> NoReturn: # type: ignore[override]
1583 self._we_dont_return_rows()
1584
1585 @property
1586 def _processors(self) -> NoReturn: # type: ignore[override]
1587 self._we_dont_return_rows()
1588
1589 @property
1590 def keys(self) -> NoReturn:
1591 self._we_dont_return_rows()
1592
1593
1594_NO_RESULT_METADATA = _NoResultMetaData()
1595
1596
1597def null_dml_result() -> IteratorResult[Any]:
1598 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1599 it._soft_close()
1600 return it
1601
1602
1603class CursorResult(Result[Unpack[_Ts]]):
1604 """A Result that is representing state from a DBAPI cursor.
1605
1606 .. versionchanged:: 1.4 The :class:`.CursorResult``
1607 class replaces the previous :class:`.ResultProxy` interface.
1608 This classes are based on the :class:`.Result` calling API
1609 which provides an updated usage model and calling facade for
1610 SQLAlchemy Core and SQLAlchemy ORM.
1611
1612 Returns database rows via the :class:`.Row` class, which provides
1613 additional API features and behaviors on top of the raw data returned by
1614 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1615 method, other kinds of objects may also be returned.
1616
1617 .. seealso::
1618
1619 :ref:`tutorial_selecting_data` - introductory material for accessing
1620 :class:`_engine.CursorResult` and :class:`.Row` objects.
1621
1622 """
1623
1624 __slots__ = (
1625 "context",
1626 "dialect",
1627 "cursor",
1628 "cursor_strategy",
1629 "_echo",
1630 "connection",
1631 )
1632
1633 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1634 _no_result_metadata = _NO_RESULT_METADATA
1635 _soft_closed: bool = False
1636 closed: bool = False
1637 _is_cursor = True
1638
1639 context: DefaultExecutionContext
1640 dialect: Dialect
1641 cursor_strategy: ResultFetchStrategy
1642 connection: Connection
1643
1644 def __init__(
1645 self,
1646 context: DefaultExecutionContext,
1647 cursor_strategy: ResultFetchStrategy,
1648 cursor_description: Optional[_DBAPICursorDescription],
1649 ):
1650 self.context = context
1651 self.dialect = context.dialect
1652 self.cursor = context.cursor
1653 self.cursor_strategy = cursor_strategy
1654 self.connection = context.root_connection
1655 self._echo = echo = (
1656 self.connection._echo and context.engine._should_log_debug()
1657 )
1658
1659 if cursor_description is not None:
1660 # inline of Result._row_getter(), set up an initial row
1661 # getter assuming no transformations will be called as this
1662 # is the most common case
1663
1664 metadata = self._init_metadata(context, cursor_description)
1665
1666 _make_row: Any
1667 proc = metadata._effective_processors
1668 tf = metadata._tuplefilter
1669 _make_row = functools.partial(
1670 Row,
1671 metadata,
1672 proc if tf is None or proc is None else tf(proc),
1673 metadata._key_to_index,
1674 )
1675 if tf is not None:
1676 _fixed_tf = tf # needed to make mypy happy...
1677
1678 def _sliced_row(raw_data: Any) -> Any:
1679 return _make_row(_fixed_tf(raw_data))
1680
1681 sliced_row = _sliced_row
1682 else:
1683 sliced_row = _make_row
1684
1685 if echo:
1686 log = self.context.connection._log_debug
1687
1688 def _log_row(row: Any) -> Any:
1689 log("Row %r", sql_util._repr_row(row))
1690 return row
1691
1692 self._row_logging_fn = _log_row
1693
1694 def _make_row_2(row: Any) -> Any:
1695 return _log_row(sliced_row(row))
1696
1697 make_row = _make_row_2
1698 else:
1699 make_row = sliced_row # type: ignore[assignment]
1700 self._set_memoized_attribute("_row_getter", make_row)
1701
1702 else:
1703 assert context._num_sentinel_cols == 0
1704 self._metadata = self._no_result_metadata
1705
1706 def _init_metadata(
1707 self,
1708 context: DefaultExecutionContext,
1709 cursor_description: _DBAPICursorDescription,
1710 ) -> CursorResultMetaData:
1711 driver_column_names = context.execution_options.get(
1712 "driver_column_names", False
1713 )
1714 if context.compiled:
1715 compiled = context.compiled
1716
1717 metadata: CursorResultMetaData
1718
1719 if driver_column_names:
1720 # TODO: test this case
1721 metadata = CursorResultMetaData(
1722 self,
1723 cursor_description,
1724 driver_column_names=True,
1725 num_sentinel_cols=context._num_sentinel_cols,
1726 )
1727 assert not metadata._safe_for_cache
1728 elif compiled._cached_metadata:
1729 metadata = compiled._cached_metadata
1730 else:
1731 metadata = CursorResultMetaData(
1732 self,
1733 cursor_description,
1734 # the number of sentinel columns is stored on the context
1735 # but it's a characteristic of the compiled object
1736 # so it's ok to apply it to a cacheable metadata.
1737 num_sentinel_cols=context._num_sentinel_cols,
1738 )
1739 if metadata._safe_for_cache:
1740 compiled._cached_metadata = metadata
1741
1742 # result rewrite/ adapt step. this is to suit the case
1743 # when we are invoked against a cached Compiled object, we want
1744 # to rewrite the ResultMetaData to reflect the Column objects
1745 # that are in our current SQL statement object, not the one
1746 # that is associated with the cached Compiled object.
1747 # the Compiled object may also tell us to not
1748 # actually do this step; this is to support the ORM where
1749 # it is to produce a new Result object in any case, and will
1750 # be using the cached Column objects against this database result
1751 # so we don't want to rewrite them.
1752 #
1753 # Basically this step suits the use case where the end user
1754 # is using Core SQL expressions and is accessing columns in the
1755 # result row using row._mapping[table.c.column].
1756 if (
1757 not context.execution_options.get(
1758 "_result_disable_adapt_to_context", False
1759 )
1760 and compiled._result_columns
1761 and context.cache_hit is context.dialect.CACHE_HIT
1762 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501
1763 ):
1764 metadata = metadata._adapt_to_context(context)
1765
1766 self._metadata = metadata
1767
1768 else:
1769 self._metadata = metadata = CursorResultMetaData(
1770 self,
1771 cursor_description,
1772 driver_column_names=driver_column_names,
1773 )
1774 if self._echo:
1775 context.connection._log_debug(
1776 "Col %r", tuple(x[0] for x in cursor_description)
1777 )
1778 return metadata
1779
1780 def _soft_close(self, hard: bool = False) -> None:
1781 """Soft close this :class:`_engine.CursorResult`.
1782
1783 This releases all DBAPI cursor resources, but leaves the
1784 CursorResult "open" from a semantic perspective, meaning the
1785 fetchXXX() methods will continue to return empty results.
1786
1787 This method is called automatically when:
1788
1789 * all result rows are exhausted using the fetchXXX() methods.
1790 * cursor.description is None.
1791
1792 This method is **not public**, but is documented in order to clarify
1793 the "autoclose" process used.
1794
1795 .. seealso::
1796
1797 :meth:`_engine.CursorResult.close`
1798
1799
1800 """
1801
1802 if (not hard and self._soft_closed) or (hard and self.closed):
1803 return
1804
1805 if hard:
1806 self.closed = True
1807 self.cursor_strategy.hard_close(self, self.cursor)
1808 else:
1809 self.cursor_strategy.soft_close(self, self.cursor)
1810
1811 if not self._soft_closed:
1812 cursor = self.cursor
1813 self.cursor = None # type: ignore
1814 self.connection._safe_close_cursor(cursor)
1815 self._soft_closed = True
1816
1817 @property
1818 def inserted_primary_key_rows(self) -> List[Optional[Any]]:
1819 """Return the value of
1820 :attr:`_engine.CursorResult.inserted_primary_key`
1821 as a row contained within a list; some dialects may support a
1822 multiple row form as well.
1823
1824 .. note:: As indicated below, in current SQLAlchemy versions this
1825 accessor is only useful beyond what's already supplied by
1826 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1827 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1828 generalize this feature to more dialects.
1829
1830 This accessor is added to support dialects that offer the feature
1831 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1832 feature, currently **only the psycopg2 dialect**, which provides
1833 for many rows to be INSERTed at once while still retaining the
1834 behavior of being able to return server-generated primary key values.
1835
1836 * **When using the psycopg2 dialect, or other dialects that may support
1837 "fast executemany" style inserts in upcoming releases** : When
1838 invoking an INSERT statement while passing a list of rows as the
1839 second argument to :meth:`_engine.Connection.execute`, this accessor
1840 will then provide a list of rows, where each row contains the primary
1841 key value for each row that was INSERTed.
1842
1843 * **When using all other dialects / backends that don't yet support
1844 this feature**: This accessor is only useful for **single row INSERT
1845 statements**, and returns the same information as that of the
1846 :attr:`_engine.CursorResult.inserted_primary_key` within a
1847 single-element list. When an INSERT statement is executed in
1848 conjunction with a list of rows to be INSERTed, the list will contain
1849 one row per row inserted in the statement, however it will contain
1850 ``None`` for any server-generated values.
1851
1852 Future releases of SQLAlchemy will further generalize the
1853 "fast execution helper" feature of psycopg2 to suit other dialects,
1854 thus allowing this accessor to be of more general use.
1855
1856 .. versionadded:: 1.4
1857
1858 .. seealso::
1859
1860 :attr:`_engine.CursorResult.inserted_primary_key`
1861
1862 """
1863 if not self.context.compiled:
1864 raise exc.InvalidRequestError(
1865 "Statement is not a compiled expression construct."
1866 )
1867 elif not self.context.isinsert:
1868 raise exc.InvalidRequestError(
1869 "Statement is not an insert() expression construct."
1870 )
1871 elif self.context._is_explicit_returning:
1872 raise exc.InvalidRequestError(
1873 "Can't call inserted_primary_key "
1874 "when returning() "
1875 "is used."
1876 )
1877 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501
1878
1879 @property
1880 def inserted_primary_key(self) -> Optional[Any]:
1881 """Return the primary key for the row just inserted.
1882
1883 The return value is a :class:`_result.Row` object representing
1884 a named tuple of primary key values in the order in which the
1885 primary key columns are configured in the source
1886 :class:`_schema.Table`.
1887
1888 .. versionchanged:: 1.4.8 - the
1889 :attr:`_engine.CursorResult.inserted_primary_key`
1890 value is now a named tuple via the :class:`_result.Row` class,
1891 rather than a plain tuple.
1892
1893 This accessor only applies to single row :func:`_expression.insert`
1894 constructs which did not explicitly specify
1895 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1896 while not yet available for most backends, would be accessed using
1897 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1898
1899 Note that primary key columns which specify a server_default clause, or
1900 otherwise do not qualify as "autoincrement" columns (see the notes at
1901 :class:`_schema.Column`), and were generated using the database-side
1902 default, will appear in this list as ``None`` unless the backend
1903 supports "returning" and the insert statement executed with the
1904 "implicit returning" enabled.
1905
1906 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1907 statement is not a compiled expression construct
1908 or is not an insert() construct.
1909
1910 """
1911
1912 if self.context.executemany:
1913 raise exc.InvalidRequestError(
1914 "This statement was an executemany call; if primary key "
1915 "returning is supported, please "
1916 "use .inserted_primary_key_rows."
1917 )
1918
1919 ikp = self.inserted_primary_key_rows
1920 if ikp:
1921 return ikp[0]
1922 else:
1923 return None
1924
1925 def last_updated_params(
1926 self,
1927 ) -> Union[
1928 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1929 ]:
1930 """Return the collection of updated parameters from this
1931 execution.
1932
1933 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1934 statement is not a compiled expression construct
1935 or is not an update() construct.
1936
1937 """
1938 if not self.context.compiled:
1939 raise exc.InvalidRequestError(
1940 "Statement is not a compiled expression construct."
1941 )
1942 elif not self.context.isupdate:
1943 raise exc.InvalidRequestError(
1944 "Statement is not an update() expression construct."
1945 )
1946 elif self.context.executemany:
1947 return self.context.compiled_parameters
1948 else:
1949 return self.context.compiled_parameters[0]
1950
1951 def last_inserted_params(
1952 self,
1953 ) -> Union[
1954 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1955 ]:
1956 """Return the collection of inserted parameters from this
1957 execution.
1958
1959 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1960 statement is not a compiled expression construct
1961 or is not an insert() construct.
1962
1963 """
1964 if not self.context.compiled:
1965 raise exc.InvalidRequestError(
1966 "Statement is not a compiled expression construct."
1967 )
1968 elif not self.context.isinsert:
1969 raise exc.InvalidRequestError(
1970 "Statement is not an insert() expression construct."
1971 )
1972 elif self.context.executemany:
1973 return self.context.compiled_parameters
1974 else:
1975 return self.context.compiled_parameters[0]
1976
1977 @property
1978 def returned_defaults_rows(
1979 self,
1980 ) -> Optional[Sequence[Row[Unpack[TupleAny]]]]:
1981 """Return a list of rows each containing the values of default
1982 columns that were fetched using
1983 the :meth:`.ValuesBase.return_defaults` feature.
1984
1985 The return value is a list of :class:`.Row` objects.
1986
1987 .. versionadded:: 1.4
1988
1989 """
1990 return self.context.returned_default_rows
1991
1992 def splice_horizontally(self, other: CursorResult[Any]) -> Self:
1993 """Return a new :class:`.CursorResult` that "horizontally splices"
1994 together the rows of this :class:`.CursorResult` with that of another
1995 :class:`.CursorResult`.
1996
1997 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1998 not intended for general use.
1999
2000 "horizontally splices" means that for each row in the first and second
2001 result sets, a new row that concatenates the two rows together is
2002 produced, which then becomes the new row. The incoming
2003 :class:`.CursorResult` must have the identical number of rows. It is
2004 typically expected that the two result sets come from the same sort
2005 order as well, as the result rows are spliced together based on their
2006 position in the result.
2007
2008 The expected use case here is so that multiple INSERT..RETURNING
2009 statements (which definitely need to be sorted) against different
2010 tables can produce a single result that looks like a JOIN of those two
2011 tables.
2012
2013 E.g.::
2014
2015 r1 = connection.execute(
2016 users.insert().returning(
2017 users.c.user_name, users.c.user_id, sort_by_parameter_order=True
2018 ),
2019 user_values,
2020 )
2021
2022 r2 = connection.execute(
2023 addresses.insert().returning(
2024 addresses.c.address_id,
2025 addresses.c.address,
2026 addresses.c.user_id,
2027 sort_by_parameter_order=True,
2028 ),
2029 address_values,
2030 )
2031
2032 rows = r1.splice_horizontally(r2).all()
2033 assert rows == [
2034 ("john", 1, 1, "foo@bar.com", 1),
2035 ("jack", 2, 2, "bar@bat.com", 2),
2036 ]
2037
2038 .. versionadded:: 2.0
2039
2040 .. seealso::
2041
2042 :meth:`.CursorResult.splice_vertically`
2043
2044
2045 """ # noqa: E501
2046
2047 clone = self._generate()
2048 assert clone is self # just to note
2049 assert isinstance(other._metadata, CursorResultMetaData)
2050 assert isinstance(self._metadata, CursorResultMetaData)
2051 self_tf = self._metadata._tuplefilter
2052 other_tf = other._metadata._tuplefilter
2053 clone._metadata = self._metadata._splice_horizontally(other._metadata)
2054
2055 total_rows = [
2056 tuple(r1 if self_tf is None else self_tf(r1))
2057 + tuple(r2 if other_tf is None else other_tf(r2))
2058 for r1, r2 in zip(
2059 list(self._raw_row_iterator()),
2060 list(other._raw_row_iterator()),
2061 )
2062 ]
2063
2064 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
2065 None,
2066 initial_buffer=total_rows,
2067 )
2068 clone._reset_memoizations()
2069 return clone
2070
2071 def splice_vertically(self, other: CursorResult[Any]) -> Self:
2072 """Return a new :class:`.CursorResult` that "vertically splices",
2073 i.e. "extends", the rows of this :class:`.CursorResult` with that of
2074 another :class:`.CursorResult`.
2075
2076 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
2077 not intended for general use.
2078
2079 "vertically splices" means the rows of the given result are appended to
2080 the rows of this cursor result. The incoming :class:`.CursorResult`
2081 must have rows that represent the identical list of columns in the
2082 identical order as they are in this :class:`.CursorResult`.
2083
2084 .. versionadded:: 2.0
2085
2086 .. seealso::
2087
2088 :meth:`.CursorResult.splice_horizontally`
2089
2090 """
2091 clone = self._generate()
2092 total_rows = list(self._raw_row_iterator()) + list(
2093 other._raw_row_iterator()
2094 )
2095
2096 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
2097 None,
2098 initial_buffer=total_rows,
2099 )
2100 clone._reset_memoizations()
2101 return clone
2102
2103 def _rewind(self, rows: Any) -> Self:
2104 """rewind this result back to the given rowset.
2105
2106 this is used internally for the case where an :class:`.Insert`
2107 construct combines the use of
2108 :meth:`.Insert.return_defaults` along with the
2109 "supplemental columns" feature.
2110
2111 NOTE: this method has not effect then an unique filter is applied
2112 to the result, meaning that no row will be returned.
2113
2114 """
2115
2116 if self._echo:
2117 self.context.connection._log_debug(
2118 "CursorResult rewound %d row(s)", len(rows)
2119 )
2120
2121 # the rows given are expected to be Row objects, so we
2122 # have to clear out processors which have already run on these
2123 # rows
2124 self._metadata = cast(
2125 CursorResultMetaData, self._metadata
2126 )._remove_processors_and_tuple_filter()
2127
2128 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
2129 None,
2130 # TODO: if these are Row objects, can we save on not having to
2131 # re-make new Row objects out of them a second time? is that
2132 # what's actually happening right now? maybe look into this
2133 initial_buffer=rows,
2134 )
2135 self._reset_memoizations()
2136 return self
2137
2138 @property
2139 def returned_defaults(self) -> Optional[Row[Unpack[TupleAny]]]:
2140 """Return the values of default columns that were fetched using
2141 the :meth:`.ValuesBase.return_defaults` feature.
2142
2143 The value is an instance of :class:`.Row`, or ``None``
2144 if :meth:`.ValuesBase.return_defaults` was not used or if the
2145 backend does not support RETURNING.
2146
2147 .. seealso::
2148
2149 :meth:`.ValuesBase.return_defaults`
2150
2151 """
2152
2153 if self.context.executemany:
2154 raise exc.InvalidRequestError(
2155 "This statement was an executemany call; if return defaults "
2156 "is supported, please use .returned_defaults_rows."
2157 )
2158
2159 rows = self.context.returned_default_rows
2160 if rows:
2161 return rows[0]
2162 else:
2163 return None
2164
2165 def lastrow_has_defaults(self) -> bool:
2166 """Return ``lastrow_has_defaults()`` from the underlying
2167 :class:`.ExecutionContext`.
2168
2169 See :class:`.ExecutionContext` for details.
2170
2171 """
2172
2173 return self.context.lastrow_has_defaults()
2174
2175 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2176 """Return ``postfetch_cols()`` from the underlying
2177 :class:`.ExecutionContext`.
2178
2179 See :class:`.ExecutionContext` for details.
2180
2181 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2182 statement is not a compiled expression construct
2183 or is not an insert() or update() construct.
2184
2185 """
2186
2187 if not self.context.compiled:
2188 raise exc.InvalidRequestError(
2189 "Statement is not a compiled expression construct."
2190 )
2191 elif not self.context.isinsert and not self.context.isupdate:
2192 raise exc.InvalidRequestError(
2193 "Statement is not an insert() or update() "
2194 "expression construct."
2195 )
2196 return self.context.postfetch_cols
2197
2198 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2199 """Return ``prefetch_cols()`` from the underlying
2200 :class:`.ExecutionContext`.
2201
2202 See :class:`.ExecutionContext` for details.
2203
2204 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2205 statement is not a compiled expression construct
2206 or is not an insert() or update() construct.
2207
2208 """
2209
2210 if not self.context.compiled:
2211 raise exc.InvalidRequestError(
2212 "Statement is not a compiled expression construct."
2213 )
2214 elif not self.context.isinsert and not self.context.isupdate:
2215 raise exc.InvalidRequestError(
2216 "Statement is not an insert() or update() "
2217 "expression construct."
2218 )
2219 return self.context.prefetch_cols
2220
2221 def supports_sane_rowcount(self) -> bool:
2222 """Return ``supports_sane_rowcount`` from the dialect.
2223
2224 See :attr:`_engine.CursorResult.rowcount` for background.
2225
2226 """
2227
2228 return self.dialect.supports_sane_rowcount
2229
2230 def supports_sane_multi_rowcount(self) -> bool:
2231 """Return ``supports_sane_multi_rowcount`` from the dialect.
2232
2233 See :attr:`_engine.CursorResult.rowcount` for background.
2234
2235 """
2236
2237 return self.dialect.supports_sane_multi_rowcount
2238
2239 @util.memoized_property
2240 def rowcount(self) -> int:
2241 """Return the 'rowcount' for this result.
2242
2243 The primary purpose of 'rowcount' is to report the number of rows
2244 matched by the WHERE criterion of an UPDATE or DELETE statement
2245 executed once (i.e. for a single parameter set), which may then be
2246 compared to the number of rows expected to be updated or deleted as a
2247 means of asserting data integrity.
2248
2249 This attribute is transferred from the ``cursor.rowcount`` attribute
2250 of the DBAPI before the cursor is closed, to support DBAPIs that
2251 don't make this value available after cursor close. Some DBAPIs may
2252 offer meaningful values for other kinds of statements, such as INSERT
2253 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
2254 for these statements, set the
2255 :paramref:`.Connection.execution_options.preserve_rowcount`
2256 execution option to True, which will cause the ``cursor.rowcount``
2257 value to be unconditionally memoized before any results are returned
2258 or the cursor is closed, regardless of statement type.
2259
2260 For cases where the DBAPI does not support rowcount for a particular
2261 kind of statement and/or execution, the returned value will be ``-1``,
2262 which is delivered directly from the DBAPI and is part of :pep:`249`.
2263 All DBAPIs should support rowcount for single-parameter-set
2264 UPDATE and DELETE statements, however.
2265
2266 .. note::
2267
2268 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2269
2270
2271 * This attribute returns the number of rows *matched*,
2272 which is not necessarily the same as the number of rows
2273 that were actually *modified*. For example, an UPDATE statement
2274 may have no net change on a given row if the SET values
2275 given are the same as those present in the row already.
2276 Such a row would be matched but not modified.
2277 On backends that feature both styles, such as MySQL,
2278 rowcount is configured to return the match
2279 count in all cases.
2280
2281 * :attr:`_engine.CursorResult.rowcount` in the default case is
2282 *only* useful in conjunction with an UPDATE or DELETE statement,
2283 and only with a single set of parameters. For other kinds of
2284 statements, SQLAlchemy will not attempt to pre-memoize the value
2285 unless the
2286 :paramref:`.Connection.execution_options.preserve_rowcount`
2287 execution option is used. Note that contrary to :pep:`249`, many
2288 DBAPIs do not support rowcount values for statements that are not
2289 UPDATE or DELETE, particularly when rows are being returned which
2290 are not fully pre-buffered. DBAPIs that dont support rowcount
2291 for a particular kind of statement should return the value ``-1``
2292 for such statements.
2293
2294 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2295 when executing a single statement with multiple parameter sets
2296 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2297 values across multiple parameter sets and will return ``-1``
2298 when accessed.
2299
2300 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2301 a correct population of :attr:`_engine.CursorResult.rowcount`
2302 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2303 execution option is set to True.
2304
2305 * Statements that use RETURNING may not support rowcount, returning
2306 a ``-1`` value instead.
2307
2308 .. seealso::
2309
2310 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2311
2312 :paramref:`.Connection.execution_options.preserve_rowcount`
2313
2314 """ # noqa: E501
2315 try:
2316 return self.context.rowcount
2317 except BaseException as e:
2318 self.cursor_strategy.handle_exception(self, self.cursor, e)
2319 raise # not called
2320
2321 @property
2322 def lastrowid(self) -> int:
2323 """Return the 'lastrowid' accessor on the DBAPI cursor.
2324
2325 This is a DBAPI specific method and is only functional
2326 for those backends which support it, for statements
2327 where it is appropriate. It's behavior is not
2328 consistent across backends.
2329
2330 Usage of this method is normally unnecessary when
2331 using insert() expression constructs; the
2332 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2333 tuple of primary key values for a newly inserted row,
2334 regardless of database backend.
2335
2336 """
2337 try:
2338 return self.context.get_lastrowid()
2339 except BaseException as e:
2340 self.cursor_strategy.handle_exception(self, self.cursor, e)
2341
2342 @property
2343 def returns_rows(self) -> bool:
2344 """True if this :class:`_engine.CursorResult` returns zero or more
2345 rows.
2346
2347 I.e. if it is legal to call the methods
2348 :meth:`_engine.CursorResult.fetchone`,
2349 :meth:`_engine.CursorResult.fetchmany`
2350 :meth:`_engine.CursorResult.fetchall`.
2351
2352 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2353 always be synonymous with whether or not the DBAPI cursor had a
2354 ``.description`` attribute, indicating the presence of result columns,
2355 noting that a cursor that returns zero rows still has a
2356 ``.description`` if a row-returning statement was emitted.
2357
2358 This attribute should be True for all results that are against
2359 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2360 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2361 not using RETURNING, the value will usually be False, however
2362 there are some dialect-specific exceptions to this, such as when
2363 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2364 order to retrieve an inserted primary key value.
2365
2366
2367 """
2368 return self._metadata.returns_rows
2369
2370 @property
2371 def is_insert(self) -> bool:
2372 """True if this :class:`_engine.CursorResult` is the result
2373 of a executing an expression language compiled
2374 :func:`_expression.insert` construct.
2375
2376 When True, this implies that the
2377 :attr:`inserted_primary_key` attribute is accessible,
2378 assuming the statement did not include
2379 a user defined "returning" construct.
2380
2381 """
2382 return self.context.isinsert
2383
2384 def _fetchiter_impl(self) -> Iterator[Any]:
2385 fetchone = self.cursor_strategy.fetchone
2386
2387 while True:
2388 row = fetchone(self, self.cursor)
2389 if row is None:
2390 break
2391 yield row
2392
2393 def _fetchone_impl(self, hard_close: bool = False) -> Any:
2394 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2395
2396 def _fetchall_impl(self) -> Any:
2397 return self.cursor_strategy.fetchall(self, self.cursor)
2398
2399 def _fetchmany_impl(self, size: Optional[int] = None) -> Any:
2400 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2401
2402 def _raw_row_iterator(self) -> Any:
2403 return self._fetchiter_impl()
2404
2405 def merge(
2406 self, *others: Result[Unpack[TupleAny]]
2407 ) -> MergedResult[Unpack[TupleAny]]:
2408 merged_result = super().merge(*others)
2409 if self.context._has_rowcount:
2410 merged_result.rowcount = sum(
2411 cast("CursorResult[Any]", result).rowcount
2412 for result in (self,) + others
2413 )
2414 return merged_result
2415
2416 def close(self) -> None:
2417 """Close this :class:`_engine.CursorResult`.
2418
2419 This closes out the underlying DBAPI cursor corresponding to the
2420 statement execution, if one is still present. Note that the DBAPI
2421 cursor is automatically released when the :class:`_engine.CursorResult`
2422 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2423 generally an optional method except in the case when discarding a
2424 :class:`_engine.CursorResult` that still has additional rows pending
2425 for fetch.
2426
2427 After this method is called, it is no longer valid to call upon
2428 the fetch methods, which will raise a :class:`.ResourceClosedError`
2429 on subsequent use.
2430
2431 .. seealso::
2432
2433 :ref:`connections_toplevel`
2434
2435 """
2436 self._soft_close(hard=True)
2437
2438 @_generative
2439 def yield_per(self, num: int) -> Self:
2440 self._yield_per = num
2441 self.cursor_strategy.yield_per(self, self.cursor, num)
2442 return self
2443
2444
2445ResultProxy = CursorResult