1# engine/cursor.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define cursor-specific result set constructs including
9:class:`.CursorResult`."""
10
11from __future__ import annotations
12
13import collections
14import operator
15import typing
16from typing import Any
17from typing import cast
18from typing import ClassVar
19from typing import Deque
20from typing import Dict
21from typing import Final
22from typing import Iterable
23from typing import Iterator
24from typing import List
25from typing import Literal
26from typing import Mapping
27from typing import NoReturn
28from typing import Optional
29from typing import Sequence
30from typing import Tuple
31from typing import TYPE_CHECKING
32from typing import Union
33
34from .result import IteratorResult
35from .result import MergedResult
36from .result import Result
37from .result import ResultMetaData
38from .result import SimpleResultMetaData
39from .result import tuplegetter
40from .row import Row
41from .. import exc
42from .. import util
43from ..sql import elements
44from ..sql import sqltypes
45from ..sql import util as sql_util
46from ..sql.base import _generative
47from ..sql.compiler import ResultColumnsEntry
48from ..sql.compiler import RM_NAME
49from ..sql.compiler import RM_OBJECTS
50from ..sql.compiler import RM_RENDERED_NAME
51from ..sql.compiler import RM_TYPE
52from ..sql.type_api import TypeEngine
53from ..util.typing import Self
54from ..util.typing import TupleAny
55from ..util.typing import TypeVarTuple
56from ..util.typing import Unpack
57
58if typing.TYPE_CHECKING:
59 from .base import Connection
60 from .default import DefaultExecutionContext
61 from .interfaces import _DBAPICursorDescription
62 from .interfaces import _MutableCoreSingleExecuteParams
63 from .interfaces import CoreExecuteOptionsParameter
64 from .interfaces import DBAPICursor
65 from .interfaces import DBAPIType
66 from .interfaces import Dialect
67 from .interfaces import ExecutionContext
68 from .result import _KeyIndexType
69 from .result import _KeyMapRecType
70 from .result import _KeyMapType
71 from .result import _KeyType
72 from .result import _ProcessorsType
73 from .result import _TupleGetterType
74 from ..sql.schema import Column
75 from ..sql.type_api import _ResultProcessorType
76
77
78_Ts = TypeVarTuple("_Ts")
79
80
81# metadata entry tuple indexes.
82# using raw tuple is faster than namedtuple.
83# these match up to the positions in
84# _CursorKeyMapRecType
85MD_INDEX: Final[Literal[0]] = 0
86"""integer index in cursor.description
87
88"""
89
90MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1
91"""integer index in compiled._result_columns"""
92
93MD_OBJECTS: Final[Literal[2]] = 2
94"""other string keys and ColumnElement obj that can match.
95
96This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
97
98"""
99
100MD_LOOKUP_KEY: Final[Literal[3]] = 3
101"""string key we usually expect for key-based lookup
102
103this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
104"""
105
106
107MD_RENDERED_NAME: Final[Literal[4]] = 4
108"""name that is usually in cursor.description
109
110this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
111"""
112
113
114MD_PROCESSOR: Final[Literal[5]] = 5
115"""callable to process a result value into a row"""
116
117MD_UNTRANSLATED: Final[Literal[6]] = 6
118"""raw name from cursor.description"""
119
120
121_CursorKeyMapRecType = Tuple[
122 Optional[int], # MD_INDEX, None means the record is ambiguously named
123 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None
124 TupleAny, # MD_OBJECTS
125 str, # MD_LOOKUP_KEY
126 str, # MD_RENDERED_NAME
127 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
128 Optional[str], # MD_UNTRANSLATED
129]
130
131_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
132
133# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
134# not None
135_NonAmbigCursorKeyMapRecType = Tuple[
136 int,
137 int,
138 List[Any],
139 str,
140 str,
141 Optional["_ResultProcessorType[Any]"],
142 str,
143]
144
145_MergeColTuple = Tuple[
146 int,
147 Optional[int],
148 str,
149 TypeEngine[Any],
150 "DBAPIType",
151 Optional[TupleAny],
152 Optional[str],
153]
154
155
156class CursorResultMetaData(ResultMetaData):
157 """Result metadata for DBAPI cursors."""
158
159 __slots__ = (
160 "_keymap",
161 "_processors",
162 "_keys",
163 "_keymap_by_result_column_idx",
164 "_tuplefilter",
165 "_translated_indexes",
166 "_safe_for_cache",
167 "_unpickled",
168 "_key_to_index",
169 # don't need _unique_filters support here for now. Can be added
170 # if a need arises.
171 )
172
173 _keymap: _CursorKeyMapType
174 _processors: _ProcessorsType
175 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
176 _unpickled: bool
177 _safe_for_cache: bool
178 _translated_indexes: Optional[List[int]]
179
180 returns_rows: ClassVar[bool] = True
181
182 def _has_key(self, key: Any) -> bool:
183 return key in self._keymap
184
185 def _for_freeze(self) -> ResultMetaData:
186 return SimpleResultMetaData(
187 self._keys,
188 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
189 )
190
191 def _make_new_metadata(
192 self,
193 *,
194 unpickled: bool,
195 processors: _ProcessorsType,
196 keys: Sequence[str],
197 keymap: _KeyMapType,
198 tuplefilter: Optional[_TupleGetterType],
199 translated_indexes: Optional[List[int]],
200 safe_for_cache: bool,
201 keymap_by_result_column_idx: Any,
202 ) -> Self:
203 new_obj = self.__class__.__new__(self.__class__)
204 new_obj._unpickled = unpickled
205 new_obj._processors = processors
206 new_obj._keys = keys
207 new_obj._keymap = keymap
208 new_obj._tuplefilter = tuplefilter
209 new_obj._translated_indexes = translated_indexes
210 new_obj._safe_for_cache = safe_for_cache
211 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
212 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
213 return new_obj
214
215 def _remove_processors_and_tuple_filter(self) -> Self:
216 if self._tuplefilter:
217 proc = self._tuplefilter(self._processors)
218 else:
219 proc = self._processors
220 return self._make_new_metadata(
221 unpickled=self._unpickled,
222 processors=[None] * len(proc),
223 tuplefilter=None,
224 translated_indexes=None,
225 keymap={
226 key: value[0:5] + (None,) + value[6:]
227 for key, value in self._keymap.items()
228 },
229 keys=self._keys,
230 safe_for_cache=self._safe_for_cache,
231 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
232 )
233
234 def _splice_horizontally(self, other: CursorResultMetaData) -> Self:
235 keymap = dict(self._keymap)
236 offset = len(self._keys)
237
238 for key, value in other._keymap.items():
239 # int index should be None for ambiguous key
240 if value[MD_INDEX] is not None and key not in keymap:
241 md_index = value[MD_INDEX] + offset
242 md_object = value[MD_RESULT_MAP_INDEX] + offset
243 else:
244 md_index = None
245 md_object = -1
246 keymap[key] = (md_index, md_object, *value[2:])
247
248 self_tf = self._tuplefilter
249 other_tf = other._tuplefilter
250
251 proc: List[Any] = []
252 for pp, tf in [
253 (self._processors, self_tf),
254 (other._processors, other_tf),
255 ]:
256 proc.extend(pp if tf is None else tf(pp))
257
258 new_keys = [*self._keys, *other._keys]
259 assert len(proc) == len(new_keys)
260
261 return self._make_new_metadata(
262 unpickled=self._unpickled,
263 processors=proc,
264 tuplefilter=None,
265 translated_indexes=None,
266 keys=new_keys,
267 keymap=keymap,
268 safe_for_cache=self._safe_for_cache,
269 keymap_by_result_column_idx={
270 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
271 for metadata_entry in keymap.values()
272 },
273 )
274
275 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self:
276 recs = list(self._metadata_for_keys(keys))
277
278 indexes = [rec[MD_INDEX] for rec in recs]
279 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
280
281 if self._translated_indexes:
282 indexes = [self._translated_indexes[idx] for idx in indexes]
283 tup = tuplegetter(*indexes)
284 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
285
286 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
287 # TODO: need unit test for:
288 # result = connection.execute("raw sql, no columns").scalars()
289 # without the "or ()" it's failing because MD_OBJECTS is None
290 keymap.update(
291 (e, new_rec)
292 for new_rec in new_recs
293 for e in new_rec[MD_OBJECTS] or ()
294 )
295
296 return self._make_new_metadata(
297 unpickled=self._unpickled,
298 processors=self._processors,
299 keys=new_keys,
300 tuplefilter=tup,
301 translated_indexes=indexes,
302 keymap=keymap, # type: ignore[arg-type]
303 safe_for_cache=self._safe_for_cache,
304 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
305 )
306
307 def _adapt_to_context(self, context: ExecutionContext) -> Self:
308 """When using a cached Compiled construct that has a _result_map,
309 for a new statement that used the cached Compiled, we need to ensure
310 the keymap has the Column objects from our new statement as keys.
311 So here we rewrite keymap with new entries for the new columns
312 as matched to those of the cached statement.
313
314 """
315
316 if not context.compiled or not context.compiled._result_columns:
317 return self
318
319 compiled_statement = context.compiled.statement
320 invoked_statement = context.invoked_statement
321
322 if TYPE_CHECKING:
323 assert isinstance(invoked_statement, elements.ClauseElement)
324
325 if compiled_statement is invoked_statement:
326 return self
327
328 assert invoked_statement is not None
329
330 # this is the most common path for Core statements when
331 # caching is used. In ORM use, this codepath is not really used
332 # as the _result_disable_adapt_to_context execution option is
333 # set by the ORM.
334
335 # make a copy and add the columns from the invoked statement
336 # to the result map.
337
338 keymap_by_position = self._keymap_by_result_column_idx
339
340 if keymap_by_position is None:
341 # first retrieval from cache, this map will not be set up yet,
342 # initialize lazily
343 keymap_by_position = self._keymap_by_result_column_idx = {
344 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
345 for metadata_entry in self._keymap.values()
346 }
347
348 return self._make_new_metadata(
349 keymap=self._keymap
350 | {
351 new: keymap_by_position[idx]
352 for idx, new in enumerate(
353 invoked_statement._all_selected_columns
354 )
355 if idx in keymap_by_position
356 },
357 unpickled=self._unpickled,
358 processors=self._processors,
359 tuplefilter=self._tuplefilter,
360 translated_indexes=None,
361 keys=self._keys,
362 safe_for_cache=self._safe_for_cache,
363 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
364 )
365
366 def __init__(
367 self,
368 parent: CursorResult[Unpack[TupleAny]],
369 cursor_description: _DBAPICursorDescription,
370 *,
371 driver_column_names: bool = False,
372 num_sentinel_cols: int = 0,
373 ):
374 context = parent.context
375 if num_sentinel_cols > 0:
376 # this is slightly faster than letting tuplegetter use the indexes
377 self._tuplefilter = tuplefilter = operator.itemgetter(
378 slice(-num_sentinel_cols)
379 )
380 cursor_description = tuplefilter(cursor_description)
381 else:
382 self._tuplefilter = tuplefilter = None
383 self._translated_indexes = None
384 self._safe_for_cache = self._unpickled = False
385
386 if context.result_column_struct:
387 (
388 result_columns,
389 cols_are_ordered,
390 textual_ordered,
391 ad_hoc_textual,
392 loose_column_name_matching,
393 ) = context.result_column_struct
394 if tuplefilter is not None:
395 result_columns = tuplefilter(result_columns)
396 num_ctx_cols = len(result_columns)
397 else:
398 result_columns = cols_are_ordered = ( # type: ignore
399 num_ctx_cols
400 ) = ad_hoc_textual = loose_column_name_matching = (
401 textual_ordered
402 ) = False
403
404 # merge cursor.description with the column info
405 # present in the compiled structure, if any
406 raw = self._merge_cursor_description(
407 context,
408 cursor_description,
409 result_columns,
410 num_ctx_cols,
411 cols_are_ordered,
412 textual_ordered,
413 ad_hoc_textual,
414 loose_column_name_matching,
415 driver_column_names,
416 )
417
418 # processors in key order which are used when building up
419 # a row
420 self._processors = [
421 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
422 ]
423 if num_sentinel_cols > 0:
424 # add the number of sentinel columns since these are passed
425 # to the tuplefilters before being used
426 self._processors.extend([None] * num_sentinel_cols)
427
428 # this is used when using this ResultMetaData in a Core-only cache
429 # retrieval context. it's initialized on first cache retrieval
430 # when the _result_disable_adapt_to_context execution option
431 # (which the ORM generally sets) is not set.
432 self._keymap_by_result_column_idx = None
433
434 # for compiled SQL constructs, copy additional lookup keys into
435 # the key lookup map, such as Column objects, labels,
436 # column keys and other names
437 if num_ctx_cols:
438 # keymap by primary string...
439 by_key: Dict[_KeyType, _CursorKeyMapRecType] = {
440 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
441 for metadata_entry in raw
442 }
443
444 if len(by_key) != num_ctx_cols:
445 # if by-primary-string dictionary smaller than
446 # number of columns, assume we have dupes; (this check
447 # is also in place if string dictionary is bigger, as
448 # can occur when '*' was used as one of the compiled columns,
449 # which may or may not be suggestive of dupes), rewrite
450 # dupe records with "None" for index which results in
451 # ambiguous column exception when accessed.
452 #
453 # this is considered to be the less common case as it is not
454 # common to have dupe column keys in a SELECT statement.
455 #
456 # new in 1.4: get the complete set of all possible keys,
457 # strings, objects, whatever, that are dupes across two
458 # different records, first.
459 index_by_key: Dict[Any, Any] = {}
460 dupes = set()
461 for metadata_entry in raw:
462 for key in (metadata_entry[MD_RENDERED_NAME],) + (
463 metadata_entry[MD_OBJECTS] or ()
464 ):
465 idx = metadata_entry[MD_INDEX]
466 # if this key has been associated with more than one
467 # positional index, it's a dupe
468 if index_by_key.setdefault(key, idx) != idx:
469 dupes.add(key)
470
471 # then put everything we have into the keymap excluding only
472 # those keys that are dupes.
473 self._keymap = {
474 obj_elem: metadata_entry
475 for metadata_entry in raw
476 if metadata_entry[MD_OBJECTS]
477 for obj_elem in metadata_entry[MD_OBJECTS]
478 if obj_elem not in dupes
479 }
480
481 # then for the dupe keys, put the "ambiguous column"
482 # record into by_key.
483 by_key.update(
484 {
485 key: (None, -1, (), key, key, None, None)
486 for key in dupes
487 }
488 )
489
490 else:
491 # no dupes - copy secondary elements from compiled
492 # columns into self._keymap. this is the most common
493 # codepath for Core / ORM statement executions before the
494 # result metadata is cached
495 self._keymap = {
496 obj_elem: metadata_entry
497 for metadata_entry in raw
498 if metadata_entry[MD_OBJECTS]
499 for obj_elem in metadata_entry[MD_OBJECTS]
500 }
501 # update keymap with primary string names taking
502 # precedence
503 self._keymap.update(by_key)
504 else:
505 # no compiled objects to map, just create keymap by primary string
506 self._keymap = {
507 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
508 for metadata_entry in raw
509 }
510
511 # update keymap with "translated" names.
512 # the "translated" name thing has a long history:
513 # 1. originally, it was used to fix an issue in very old SQLite
514 # versions prior to 3.10.0. This code is still there in the
515 # sqlite dialect.
516 # 2. Next, the pyhive third party dialect started using this hook
517 # for some driver related issue on their end.
518 # 3. Most recently, the "driver_column_names" execution option has
519 # taken advantage of this hook to get raw DBAPI col names in the
520 # result keys without disrupting the usual merge process.
521
522 if driver_column_names or (
523 not num_ctx_cols and context._translate_colname
524 ):
525 self._keymap.update(
526 {
527 metadata_entry[MD_UNTRANSLATED]: self._keymap[
528 metadata_entry[MD_LOOKUP_KEY]
529 ]
530 for metadata_entry in raw
531 if metadata_entry[MD_UNTRANSLATED]
532 }
533 )
534
535 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
536
537 def _merge_cursor_description(
538 self,
539 context: DefaultExecutionContext,
540 cursor_description: _DBAPICursorDescription,
541 result_columns: Sequence[ResultColumnsEntry],
542 num_ctx_cols: int,
543 cols_are_ordered: bool,
544 textual_ordered: bool,
545 ad_hoc_textual: bool,
546 loose_column_name_matching: bool,
547 driver_column_names: bool,
548 ) -> List[_CursorKeyMapRecType]:
549 """Merge a cursor.description with compiled result column information.
550
551 There are at least four separate strategies used here, selected
552 depending on the type of SQL construct used to start with.
553
554 The most common case is that of the compiled SQL expression construct,
555 which generated the column names present in the raw SQL string and
556 which has the identical number of columns as were reported by
557 cursor.description. In this case, we assume a 1-1 positional mapping
558 between the entries in cursor.description and the compiled object.
559 This is also the most performant case as we disregard extracting /
560 decoding the column names present in cursor.description since we
561 already have the desired name we generated in the compiled SQL
562 construct.
563
564 The next common case is that of the completely raw string SQL,
565 such as passed to connection.execute(). In this case we have no
566 compiled construct to work with, so we extract and decode the
567 names from cursor.description and index those as the primary
568 result row target keys.
569
570 The remaining fairly common case is that of the textual SQL
571 that includes at least partial column information; this is when
572 we use a :class:`_expression.TextualSelect` construct.
573 This construct may have
574 unordered or ordered column information. In the ordered case, we
575 merge the cursor.description and the compiled construct's information
576 positionally, and warn if there are additional description names
577 present, however we still decode the names in cursor.description
578 as we don't have a guarantee that the names in the columns match
579 on these. In the unordered case, we match names in cursor.description
580 to that of the compiled construct based on name matching.
581 In both of these cases, the cursor.description names and the column
582 expression objects and names are indexed as result row target keys.
583
584 The final case is much less common, where we have a compiled
585 non-textual SQL expression construct, but the number of columns
586 in cursor.description doesn't match what's in the compiled
587 construct. We make the guess here that there might be textual
588 column expressions in the compiled construct that themselves include
589 a comma in them causing them to split. We do the same name-matching
590 as with textual non-ordered columns.
591
592 The name-matched system of merging is the same as that used by
593 SQLAlchemy for all cases up through the 0.9 series. Positional
594 matching for compiled SQL expressions was introduced in 1.0 as a
595 major performance feature, and positional matching for textual
596 :class:`_expression.TextualSelect` objects in 1.1.
597 As name matching is no longer
598 a common case, it was acceptable to factor it into smaller generator-
599 oriented methods that are easier to understand, but incur slightly
600 more performance overhead.
601
602 """
603
604 if (
605 num_ctx_cols
606 and cols_are_ordered
607 and not textual_ordered
608 and num_ctx_cols == len(cursor_description)
609 and not driver_column_names
610 ):
611 self._keys = [elem[0] for elem in result_columns]
612 # pure positional 1-1 case; doesn't need to read
613 # the names from cursor.description
614
615 # most common case for Core and ORM
616
617 # this metadata is safe to
618 # cache because we are guaranteed
619 # to have the columns in the same order for new executions
620 self._safe_for_cache = True
621
622 return [
623 (
624 idx,
625 idx,
626 rmap_entry[RM_OBJECTS],
627 rmap_entry[RM_NAME],
628 rmap_entry[RM_RENDERED_NAME],
629 context.get_result_processor(
630 rmap_entry[RM_TYPE],
631 rmap_entry[RM_RENDERED_NAME],
632 cursor_description[idx][1],
633 ),
634 None,
635 )
636 for idx, rmap_entry in enumerate(result_columns)
637 ]
638 else:
639 # name-based or text-positional cases, where we need
640 # to read cursor.description names
641
642 if textual_ordered or (
643 ad_hoc_textual and len(cursor_description) == num_ctx_cols
644 ):
645 self._safe_for_cache = not driver_column_names
646 # textual positional case
647 raw_iterator = self._merge_textual_cols_by_position(
648 context,
649 cursor_description,
650 result_columns,
651 driver_column_names,
652 )
653 elif num_ctx_cols:
654 # compiled SQL with a mismatch of description cols
655 # vs. compiled cols, or textual w/ unordered columns
656 # the order of columns can change if the query is
657 # against a "select *", so not safe to cache
658 self._safe_for_cache = False
659 raw_iterator = self._merge_cols_by_name(
660 context,
661 cursor_description,
662 result_columns,
663 loose_column_name_matching,
664 driver_column_names,
665 )
666 else:
667 # no compiled SQL, just a raw string, order of columns
668 # can change for "select *"
669 self._safe_for_cache = False
670 raw_iterator = self._merge_cols_by_none(
671 context, cursor_description, driver_column_names
672 )
673
674 return [
675 (
676 idx,
677 ridx,
678 obj,
679 cursor_colname,
680 cursor_colname,
681 context.get_result_processor(
682 mapped_type, cursor_colname, coltype
683 ),
684 untranslated,
685 ) # type: ignore[misc]
686 for (
687 idx,
688 ridx,
689 cursor_colname,
690 mapped_type,
691 coltype,
692 obj,
693 untranslated,
694 ) in raw_iterator
695 ]
696
697 def _colnames_from_description(
698 self,
699 context: DefaultExecutionContext,
700 cursor_description: _DBAPICursorDescription,
701 driver_column_names: bool,
702 ) -> Iterator[Tuple[int, str, str, Optional[str], DBAPIType]]:
703 """Extract column names and data types from a cursor.description.
704
705 Applies unicode decoding, column translation, "normalization",
706 and case sensitivity rules to the names based on the dialect.
707
708 """
709 dialect = context.dialect
710 translate_colname = context._translate_colname
711 normalize_name = (
712 dialect.normalize_name if dialect.requires_name_normalize else None
713 )
714
715 untranslated = None
716
717 for idx, rec in enumerate(cursor_description):
718 colname = unnormalized = rec[0]
719 coltype = rec[1]
720
721 if translate_colname:
722 # a None here for "untranslated" means "the dialect did not
723 # change the column name and the untranslated case can be
724 # ignored". otherwise "untranslated" is expected to be the
725 # original, unchanged colname (e.g. is == to "unnormalized")
726 colname, untranslated = translate_colname(colname)
727
728 assert untranslated is None or untranslated == unnormalized
729
730 if normalize_name:
731 colname = normalize_name(colname)
732
733 if driver_column_names:
734 yield idx, colname, unnormalized, unnormalized, coltype
735
736 else:
737 yield idx, colname, unnormalized, untranslated, coltype
738
739 def _merge_textual_cols_by_position(
740 self,
741 context: DefaultExecutionContext,
742 cursor_description: _DBAPICursorDescription,
743 result_columns: Sequence[ResultColumnsEntry],
744 driver_column_names: bool,
745 ) -> Iterator[_MergeColTuple]:
746 num_ctx_cols = len(result_columns)
747
748 if num_ctx_cols > len(cursor_description):
749 util.warn(
750 "Number of columns in textual SQL (%d) is "
751 "smaller than number of columns requested (%d)"
752 % (num_ctx_cols, len(cursor_description))
753 )
754 seen = set()
755
756 self._keys = []
757
758 uses_denormalize = context.dialect.requires_name_normalize
759 for (
760 idx,
761 colname,
762 unnormalized,
763 untranslated,
764 coltype,
765 ) in self._colnames_from_description(
766 context, cursor_description, driver_column_names
767 ):
768 if idx < num_ctx_cols:
769 ctx_rec = result_columns[idx]
770 obj = ctx_rec[RM_OBJECTS]
771 ridx = idx
772 mapped_type = ctx_rec[RM_TYPE]
773 if obj[0] in seen:
774 raise exc.InvalidRequestError(
775 "Duplicate column expression requested "
776 "in textual SQL: %r" % obj[0]
777 )
778 seen.add(obj[0])
779
780 # special check for all uppercase unnormalized name;
781 # use the unnormalized name as the key.
782 # see #10788
783 # if these names don't match, then we still honor the
784 # cursor.description name as the key and not what the
785 # Column has, see
786 # test_resultset.py::PositionalTextTest::test_via_column
787 if (
788 uses_denormalize
789 and unnormalized == ctx_rec[RM_RENDERED_NAME]
790 ):
791 result_name = unnormalized
792 else:
793 result_name = colname
794 else:
795 mapped_type = sqltypes.NULLTYPE
796 obj = None
797 ridx = None
798
799 result_name = colname
800
801 if driver_column_names:
802 assert untranslated is not None
803 self._keys.append(untranslated)
804 else:
805 self._keys.append(result_name)
806
807 yield (
808 idx,
809 ridx,
810 result_name,
811 mapped_type,
812 coltype,
813 obj,
814 untranslated,
815 )
816
817 def _merge_cols_by_name(
818 self,
819 context: DefaultExecutionContext,
820 cursor_description: _DBAPICursorDescription,
821 result_columns: Sequence[ResultColumnsEntry],
822 loose_column_name_matching: bool,
823 driver_column_names: bool,
824 ) -> Iterator[_MergeColTuple]:
825 match_map = self._create_description_match_map(
826 result_columns, loose_column_name_matching
827 )
828 mapped_type: TypeEngine[Any]
829
830 self._keys = []
831
832 for (
833 idx,
834 colname,
835 unnormalized,
836 untranslated,
837 coltype,
838 ) in self._colnames_from_description(
839 context, cursor_description, driver_column_names
840 ):
841 try:
842 ctx_rec = match_map[colname]
843 except KeyError:
844 mapped_type = sqltypes.NULLTYPE
845 obj = None
846 result_columns_idx = None
847 else:
848 obj = ctx_rec[1]
849 mapped_type = ctx_rec[2]
850 result_columns_idx = ctx_rec[3]
851
852 if driver_column_names:
853 assert untranslated is not None
854 self._keys.append(untranslated)
855 else:
856 self._keys.append(colname)
857 yield (
858 idx,
859 result_columns_idx,
860 colname,
861 mapped_type,
862 coltype,
863 obj,
864 untranslated,
865 )
866
867 @classmethod
868 def _create_description_match_map(
869 cls,
870 result_columns: Sequence[ResultColumnsEntry],
871 loose_column_name_matching: bool = False,
872 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]:
873 """when matching cursor.description to a set of names that are present
874 in a Compiled object, as is the case with TextualSelect, get all the
875 names we expect might match those in cursor.description.
876 """
877
878 d: Dict[
879 Union[str, object],
880 Tuple[str, TupleAny, TypeEngine[Any], int],
881 ] = {}
882 for ridx, elem in enumerate(result_columns):
883 key = elem[RM_RENDERED_NAME]
884
885 if key in d:
886 # conflicting keyname - just add the column-linked objects
887 # to the existing record. if there is a duplicate column
888 # name in the cursor description, this will allow all of those
889 # objects to raise an ambiguous column error
890 e_name, e_obj, e_type, e_ridx = d[key]
891 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
892 else:
893 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
894
895 if loose_column_name_matching:
896 # when using a textual statement with an unordered set
897 # of columns that line up, we are expecting the user
898 # to be using label names in the SQL that match to the column
899 # expressions. Enable more liberal matching for this case;
900 # duplicate keys that are ambiguous will be fixed later.
901 for r_key in elem[RM_OBJECTS]:
902 d.setdefault(
903 r_key,
904 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
905 )
906 return d
907
908 def _merge_cols_by_none(
909 self,
910 context: DefaultExecutionContext,
911 cursor_description: _DBAPICursorDescription,
912 driver_column_names: bool,
913 ) -> Iterator[_MergeColTuple]:
914 self._keys = []
915
916 for (
917 idx,
918 colname,
919 unnormalized,
920 untranslated,
921 coltype,
922 ) in self._colnames_from_description(
923 context, cursor_description, driver_column_names
924 ):
925
926 if driver_column_names:
927 assert untranslated is not None
928 self._keys.append(untranslated)
929 else:
930 self._keys.append(colname)
931
932 yield (
933 idx,
934 None,
935 colname,
936 sqltypes.NULLTYPE,
937 coltype,
938 None,
939 untranslated,
940 )
941
942 if not TYPE_CHECKING:
943
944 def _key_fallback(
945 self, key: Any, err: Optional[Exception], raiseerr: bool = True
946 ) -> Optional[NoReturn]:
947 if raiseerr:
948 if self._unpickled and isinstance(key, elements.ColumnElement):
949 raise exc.NoSuchColumnError(
950 "Row was unpickled; lookup by ColumnElement "
951 "is unsupported"
952 ) from err
953 else:
954 raise exc.NoSuchColumnError(
955 "Could not locate column in row for column '%s'"
956 % util.string_or_unprintable(key)
957 ) from err
958 else:
959 return None
960
961 def _raise_for_ambiguous_column_name(
962 self, rec: _KeyMapRecType
963 ) -> NoReturn:
964 raise exc.InvalidRequestError(
965 "Ambiguous column name '%s' in "
966 "result set column descriptions" % rec[MD_LOOKUP_KEY]
967 )
968
969 def _index_for_key(
970 self, key: _KeyIndexType, raiseerr: bool = True
971 ) -> Optional[int]:
972 # TODO: can consider pre-loading ints and negative ints
973 # into _keymap - also no coverage here
974 if isinstance(key, int):
975 key = self._keys[key]
976
977 try:
978 rec = self._keymap[key]
979 except KeyError as ke:
980 x = self._key_fallback(key, ke, raiseerr)
981 assert x is None
982 return None
983
984 index = rec[0]
985
986 if index is None:
987 self._raise_for_ambiguous_column_name(rec)
988 return index
989
990 def _indexes_for_keys(
991 self, keys: Sequence[_KeyIndexType]
992 ) -> Sequence[int]:
993 try:
994 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501
995 except KeyError as ke:
996 # ensure it raises
997 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
998
999 def _metadata_for_keys(
1000 self, keys: Sequence[_KeyIndexType]
1001 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
1002 for key in keys:
1003 if isinstance(key, int):
1004 key = self._keys[key]
1005
1006 try:
1007 rec = self._keymap[key]
1008 except KeyError as ke:
1009 # ensure it raises
1010 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
1011
1012 index = rec[MD_INDEX]
1013
1014 if index is None:
1015 self._raise_for_ambiguous_column_name(rec)
1016
1017 yield cast(_NonAmbigCursorKeyMapRecType, rec)
1018
1019 def __getstate__(self) -> Dict[str, Any]:
1020 # TODO: consider serializing this as SimpleResultMetaData
1021 return {
1022 "_keymap": {
1023 key: (
1024 rec[MD_INDEX],
1025 rec[MD_RESULT_MAP_INDEX],
1026 [],
1027 key,
1028 rec[MD_RENDERED_NAME],
1029 None,
1030 None,
1031 )
1032 for key, rec in self._keymap.items()
1033 if isinstance(key, (str, int))
1034 },
1035 "_keys": self._keys,
1036 "_translated_indexes": self._translated_indexes,
1037 }
1038
1039 def __setstate__(self, state: Dict[str, Any]) -> None:
1040 self._processors = [None for _ in range(len(state["_keys"]))]
1041 self._keymap = state["_keymap"]
1042 self._keymap_by_result_column_idx = None
1043 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
1044 self._keys = state["_keys"]
1045 self._unpickled = True
1046 if state["_translated_indexes"]:
1047 translated_indexes: List[Any]
1048 self._translated_indexes = translated_indexes = state[
1049 "_translated_indexes"
1050 ]
1051 self._tuplefilter = tuplegetter(*translated_indexes)
1052 else:
1053 self._translated_indexes = self._tuplefilter = None
1054
1055
1056class ResultFetchStrategy:
1057 """Define a fetching strategy for a result object.
1058
1059
1060 .. versionadded:: 1.4
1061
1062 """
1063
1064 __slots__ = ()
1065
1066 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
1067
1068 def soft_close(
1069 self,
1070 result: CursorResult[Unpack[TupleAny]],
1071 dbapi_cursor: Optional[DBAPICursor],
1072 ) -> None:
1073 raise NotImplementedError()
1074
1075 def hard_close(
1076 self,
1077 result: CursorResult[Unpack[TupleAny]],
1078 dbapi_cursor: Optional[DBAPICursor],
1079 ) -> None:
1080 raise NotImplementedError()
1081
1082 def yield_per(
1083 self,
1084 result: CursorResult[Unpack[TupleAny]],
1085 dbapi_cursor: DBAPICursor,
1086 num: int,
1087 ) -> None:
1088 return
1089
1090 def fetchone(
1091 self,
1092 result: CursorResult[Unpack[TupleAny]],
1093 dbapi_cursor: DBAPICursor,
1094 hard_close: bool = False,
1095 ) -> Any:
1096 raise NotImplementedError()
1097
1098 def fetchmany(
1099 self,
1100 result: CursorResult[Unpack[TupleAny]],
1101 dbapi_cursor: DBAPICursor,
1102 size: Optional[int] = None,
1103 ) -> Any:
1104 raise NotImplementedError()
1105
1106 def fetchall(
1107 self,
1108 result: CursorResult[Unpack[TupleAny]],
1109 dbapi_cursor: DBAPICursor,
1110 ) -> Any:
1111 raise NotImplementedError()
1112
1113 def handle_exception(
1114 self,
1115 result: CursorResult[Unpack[TupleAny]],
1116 dbapi_cursor: Optional[DBAPICursor],
1117 err: BaseException,
1118 ) -> NoReturn:
1119 raise err
1120
1121
1122class NoCursorFetchStrategy(ResultFetchStrategy):
1123 """Cursor strategy for a result that has no open cursor.
1124
1125 There are two varieties of this strategy, one for DQL and one for
1126 DML (and also DDL), each of which represent a result that had a cursor
1127 but no longer has one.
1128
1129 """
1130
1131 __slots__ = ()
1132
1133 def soft_close(
1134 self,
1135 result: CursorResult[Unpack[TupleAny]],
1136 dbapi_cursor: Optional[DBAPICursor],
1137 ) -> None:
1138 pass
1139
1140 def hard_close(
1141 self,
1142 result: CursorResult[Unpack[TupleAny]],
1143 dbapi_cursor: Optional[DBAPICursor],
1144 ) -> None:
1145 pass
1146
1147 def fetchone(
1148 self,
1149 result: CursorResult[Unpack[TupleAny]],
1150 dbapi_cursor: DBAPICursor,
1151 hard_close: bool = False,
1152 ) -> Any:
1153 return self._non_result(result, None)
1154
1155 def fetchmany(
1156 self,
1157 result: CursorResult[Unpack[TupleAny]],
1158 dbapi_cursor: DBAPICursor,
1159 size: Optional[int] = None,
1160 ) -> Any:
1161 return self._non_result(result, [])
1162
1163 def fetchall(
1164 self, result: CursorResult[Unpack[TupleAny]], dbapi_cursor: DBAPICursor
1165 ) -> Any:
1166 return self._non_result(result, [])
1167
1168 def _non_result(
1169 self,
1170 result: CursorResult[Unpack[TupleAny]],
1171 default: Any,
1172 err: Optional[BaseException] = None,
1173 ) -> Any:
1174 raise NotImplementedError()
1175
1176
1177class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1178 """Cursor strategy for a DQL result that has no open cursor.
1179
1180 This is a result set that can return rows, i.e. for a SELECT, or for an
1181 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1182 where the cursor is closed and no rows remain available. The owning result
1183 object may or may not be "hard closed", which determines if the fetch
1184 methods send empty results or raise for closed result.
1185
1186 """
1187
1188 __slots__ = ()
1189
1190 def _non_result(
1191 self,
1192 result: CursorResult[Unpack[TupleAny]],
1193 default: Any,
1194 err: Optional[BaseException] = None,
1195 ) -> Any:
1196 if result.closed:
1197 raise exc.ResourceClosedError(
1198 "This result object is closed."
1199 ) from err
1200 else:
1201 return default
1202
1203
1204_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1205
1206
1207class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1208 """Cursor strategy for a DML result that has no open cursor.
1209
1210 This is a result set that does not return rows, i.e. for an INSERT,
1211 UPDATE, DELETE that does not include RETURNING.
1212
1213 """
1214
1215 __slots__ = ()
1216
1217 def _non_result(
1218 self,
1219 result: CursorResult[Unpack[TupleAny]],
1220 default: Any,
1221 err: Optional[BaseException] = None,
1222 ) -> Any:
1223 # we only expect to have a _NoResultMetaData() here right now.
1224 assert not result._metadata.returns_rows
1225 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr]
1226
1227
1228_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1229
1230
1231class CursorFetchStrategy(ResultFetchStrategy):
1232 """Call fetch methods from a DBAPI cursor.
1233
1234 Alternate versions of this class may instead buffer the rows from
1235 cursors or not use cursors at all.
1236
1237 """
1238
1239 __slots__ = ()
1240
1241 def soft_close(
1242 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1243 ) -> None:
1244 result.cursor_strategy = _NO_CURSOR_DQL
1245
1246 def hard_close(
1247 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1248 ) -> None:
1249 result.cursor_strategy = _NO_CURSOR_DQL
1250
1251 def handle_exception(
1252 self,
1253 result: CursorResult[Any],
1254 dbapi_cursor: Optional[DBAPICursor],
1255 err: BaseException,
1256 ) -> NoReturn:
1257 result.connection._handle_dbapi_exception(
1258 err, None, None, dbapi_cursor, result.context
1259 )
1260
1261 def yield_per(
1262 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1263 ) -> None:
1264 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1265 dbapi_cursor,
1266 {"max_row_buffer": num},
1267 initial_buffer=collections.deque(),
1268 growth_factor=0,
1269 )
1270
1271 def fetchone(
1272 self,
1273 result: CursorResult[Any],
1274 dbapi_cursor: DBAPICursor,
1275 hard_close: bool = False,
1276 ) -> Any:
1277 try:
1278 row = dbapi_cursor.fetchone()
1279 if row is None:
1280 result._soft_close(hard=hard_close)
1281 return row
1282 except BaseException as e:
1283 self.handle_exception(result, dbapi_cursor, e)
1284
1285 def fetchmany(
1286 self,
1287 result: CursorResult[Any],
1288 dbapi_cursor: DBAPICursor,
1289 size: Optional[int] = None,
1290 ) -> Any:
1291 try:
1292 if size is None:
1293 l = dbapi_cursor.fetchmany()
1294 else:
1295 l = dbapi_cursor.fetchmany(size)
1296
1297 if not l:
1298 result._soft_close()
1299 return l
1300 except BaseException as e:
1301 self.handle_exception(result, dbapi_cursor, e)
1302
1303 def fetchall(
1304 self,
1305 result: CursorResult[Any],
1306 dbapi_cursor: DBAPICursor,
1307 ) -> Any:
1308 try:
1309 rows = dbapi_cursor.fetchall()
1310 result._soft_close()
1311 return rows
1312 except BaseException as e:
1313 self.handle_exception(result, dbapi_cursor, e)
1314
1315
1316_DEFAULT_FETCH = CursorFetchStrategy()
1317
1318
1319class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1320 """A cursor fetch strategy with row buffering behavior.
1321
1322 This strategy buffers the contents of a selection of rows
1323 before ``fetchone()`` is called. This is to allow the results of
1324 ``cursor.description`` to be available immediately, when
1325 interfacing with a DB-API that requires rows to be consumed before
1326 this information is available (currently psycopg2, when used with
1327 server-side cursors).
1328
1329 The pre-fetching behavior fetches only one row initially, and then
1330 grows its buffer size by a fixed amount with each successive need
1331 for additional rows up the ``max_row_buffer`` size, which defaults
1332 to 1000::
1333
1334 with psycopg2_engine.connect() as conn:
1335
1336 result = conn.execution_options(
1337 stream_results=True, max_row_buffer=50
1338 ).execute(text("select * from table"))
1339
1340 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1341
1342 .. seealso::
1343
1344 :ref:`psycopg2_execution_options`
1345 """
1346
1347 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1348
1349 def __init__(
1350 self,
1351 dbapi_cursor: DBAPICursor,
1352 execution_options: CoreExecuteOptionsParameter,
1353 growth_factor: int = 5,
1354 initial_buffer: Optional[Deque[Any]] = None,
1355 ) -> None:
1356 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1357
1358 if initial_buffer is not None:
1359 self._rowbuffer = initial_buffer
1360 else:
1361 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1362 self._growth_factor = growth_factor
1363
1364 if growth_factor:
1365 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1366 else:
1367 self._bufsize = self._max_row_buffer
1368
1369 @classmethod
1370 def create(
1371 cls, result: CursorResult[Any]
1372 ) -> BufferedRowCursorFetchStrategy:
1373 return BufferedRowCursorFetchStrategy(
1374 result.cursor,
1375 result.context.execution_options,
1376 )
1377
1378 def _buffer_rows(
1379 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1380 ) -> None:
1381 """this is currently used only by fetchone()."""
1382
1383 size = self._bufsize
1384 try:
1385 if size < 1:
1386 new_rows = dbapi_cursor.fetchall()
1387 else:
1388 new_rows = dbapi_cursor.fetchmany(size)
1389 except BaseException as e:
1390 self.handle_exception(result, dbapi_cursor, e)
1391
1392 if not new_rows:
1393 return
1394 self._rowbuffer = collections.deque(new_rows)
1395 if self._growth_factor and size < self._max_row_buffer:
1396 self._bufsize = min(
1397 self._max_row_buffer, size * self._growth_factor
1398 )
1399
1400 def yield_per(
1401 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1402 ) -> None:
1403 self._growth_factor = 0
1404 self._max_row_buffer = self._bufsize = num
1405
1406 def soft_close(
1407 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1408 ) -> None:
1409 self._rowbuffer.clear()
1410 super().soft_close(result, dbapi_cursor)
1411
1412 def hard_close(
1413 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1414 ) -> None:
1415 self._rowbuffer.clear()
1416 super().hard_close(result, dbapi_cursor)
1417
1418 def fetchone(
1419 self,
1420 result: CursorResult[Any],
1421 dbapi_cursor: DBAPICursor,
1422 hard_close: bool = False,
1423 ) -> Any:
1424 if not self._rowbuffer:
1425 self._buffer_rows(result, dbapi_cursor)
1426 if not self._rowbuffer:
1427 try:
1428 result._soft_close(hard=hard_close)
1429 except BaseException as e:
1430 self.handle_exception(result, dbapi_cursor, e)
1431 return None
1432 return self._rowbuffer.popleft()
1433
1434 def fetchmany(
1435 self,
1436 result: CursorResult[Any],
1437 dbapi_cursor: DBAPICursor,
1438 size: Optional[int] = None,
1439 ) -> Any:
1440 if size is None:
1441 return self.fetchall(result, dbapi_cursor)
1442
1443 rb = self._rowbuffer
1444 lb = len(rb)
1445 close = False
1446 if size > lb:
1447 try:
1448 new = dbapi_cursor.fetchmany(size - lb)
1449 except BaseException as e:
1450 self.handle_exception(result, dbapi_cursor, e)
1451 else:
1452 if not new:
1453 # defer closing since it may clear the row buffer
1454 close = True
1455 else:
1456 rb.extend(new)
1457
1458 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1459 if close:
1460 result._soft_close()
1461 return res
1462
1463 def fetchall(
1464 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1465 ) -> Any:
1466 try:
1467 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1468 self._rowbuffer.clear()
1469 result._soft_close()
1470 return ret
1471 except BaseException as e:
1472 self.handle_exception(result, dbapi_cursor, e)
1473
1474
1475class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1476 """A cursor strategy that buffers rows fully upon creation.
1477
1478 Used for operations where a result is to be delivered
1479 after the database conversation can not be continued,
1480 such as MSSQL INSERT...OUTPUT after an autocommit.
1481
1482 """
1483
1484 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1485
1486 def __init__(
1487 self,
1488 dbapi_cursor: Optional[DBAPICursor],
1489 alternate_description: Optional[_DBAPICursorDescription] = None,
1490 initial_buffer: Optional[Iterable[Any]] = None,
1491 ):
1492 self.alternate_cursor_description = alternate_description
1493 if initial_buffer is not None:
1494 self._rowbuffer = collections.deque(initial_buffer)
1495 else:
1496 assert dbapi_cursor is not None
1497 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1498
1499 def yield_per(
1500 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int
1501 ) -> Any:
1502 pass
1503
1504 def soft_close(
1505 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1506 ) -> None:
1507 self._rowbuffer.clear()
1508 super().soft_close(result, dbapi_cursor)
1509
1510 def hard_close(
1511 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1512 ) -> None:
1513 self._rowbuffer.clear()
1514 super().hard_close(result, dbapi_cursor)
1515
1516 def fetchone(
1517 self,
1518 result: CursorResult[Any],
1519 dbapi_cursor: DBAPICursor,
1520 hard_close: bool = False,
1521 ) -> Any:
1522 if self._rowbuffer:
1523 return self._rowbuffer.popleft()
1524 else:
1525 result._soft_close(hard=hard_close)
1526 return None
1527
1528 def fetchmany(
1529 self,
1530 result: CursorResult[Any],
1531 dbapi_cursor: DBAPICursor,
1532 size: Optional[int] = None,
1533 ) -> Any:
1534 if size is None:
1535 return self.fetchall(result, dbapi_cursor)
1536
1537 rb = self._rowbuffer
1538 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1539 if not rows:
1540 result._soft_close()
1541 return rows
1542
1543 def fetchall(
1544 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor
1545 ) -> Any:
1546 ret = self._rowbuffer
1547 self._rowbuffer = collections.deque()
1548 result._soft_close()
1549 return ret
1550
1551
1552class _NoResultMetaData(ResultMetaData):
1553 __slots__ = ()
1554
1555 returns_rows = False
1556
1557 def _we_dont_return_rows(
1558 self, err: Optional[BaseException] = None
1559 ) -> NoReturn:
1560 raise exc.ResourceClosedError(
1561 "This result object does not return rows. "
1562 "It has been closed automatically."
1563 ) from err
1564
1565 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn:
1566 self._we_dont_return_rows()
1567
1568 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1569 self._we_dont_return_rows()
1570
1571 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn:
1572 self._we_dont_return_rows()
1573
1574 @property
1575 def _keymap(self) -> NoReturn: # type: ignore[override]
1576 self._we_dont_return_rows()
1577
1578 @property
1579 def _key_to_index(self) -> NoReturn: # type: ignore[override]
1580 self._we_dont_return_rows()
1581
1582 @property
1583 def _processors(self) -> NoReturn: # type: ignore[override]
1584 self._we_dont_return_rows()
1585
1586 @property
1587 def keys(self) -> NoReturn:
1588 self._we_dont_return_rows()
1589
1590
1591_NO_RESULT_METADATA = _NoResultMetaData()
1592
1593
1594def null_dml_result() -> IteratorResult[Any]:
1595 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1596 it._soft_close()
1597 return it
1598
1599
1600class CursorResult(Result[Unpack[_Ts]]):
1601 """A Result that is representing state from a DBAPI cursor.
1602
1603 .. versionchanged:: 1.4 The :class:`.CursorResult``
1604 class replaces the previous :class:`.ResultProxy` interface.
1605 This classes are based on the :class:`.Result` calling API
1606 which provides an updated usage model and calling facade for
1607 SQLAlchemy Core and SQLAlchemy ORM.
1608
1609 Returns database rows via the :class:`.Row` class, which provides
1610 additional API features and behaviors on top of the raw data returned by
1611 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1612 method, other kinds of objects may also be returned.
1613
1614 .. seealso::
1615
1616 :ref:`tutorial_selecting_data` - introductory material for accessing
1617 :class:`_engine.CursorResult` and :class:`.Row` objects.
1618
1619 """
1620
1621 __slots__ = (
1622 "context",
1623 "dialect",
1624 "cursor",
1625 "cursor_strategy",
1626 "_echo",
1627 "connection",
1628 )
1629
1630 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1631 _no_result_metadata = _NO_RESULT_METADATA
1632 _soft_closed: bool = False
1633 closed: bool = False
1634 _is_cursor = True
1635
1636 context: DefaultExecutionContext
1637 dialect: Dialect
1638 cursor_strategy: ResultFetchStrategy
1639 connection: Connection
1640
1641 def __init__(
1642 self,
1643 context: DefaultExecutionContext,
1644 cursor_strategy: ResultFetchStrategy,
1645 cursor_description: Optional[_DBAPICursorDescription],
1646 ):
1647 self.context = context
1648 self.dialect = context.dialect
1649 self.cursor = context.cursor
1650 self.cursor_strategy = cursor_strategy
1651 self.connection = context.root_connection
1652 self._echo = echo = (
1653 self.connection._echo and context.engine._should_log_debug()
1654 )
1655
1656 if cursor_description is not None:
1657 self._init_metadata(context, cursor_description)
1658
1659 if echo:
1660 log = self.context.connection._log_debug
1661
1662 def _log_row(row: Any) -> Any:
1663 log("Row %r", sql_util._repr_row(row))
1664 return row
1665
1666 self._row_logging_fn = _log_row
1667
1668 # call Result._row_getter to set up the row factory
1669 self._row_getter
1670
1671 else:
1672 assert context._num_sentinel_cols == 0
1673 self._metadata = self._no_result_metadata
1674
1675 def _init_metadata(
1676 self,
1677 context: DefaultExecutionContext,
1678 cursor_description: _DBAPICursorDescription,
1679 ) -> CursorResultMetaData:
1680 driver_column_names = context.execution_options.get(
1681 "driver_column_names", False
1682 )
1683 if context.compiled:
1684 compiled = context.compiled
1685
1686 metadata: CursorResultMetaData
1687
1688 if driver_column_names:
1689 # TODO: test this case
1690 metadata = CursorResultMetaData(
1691 self,
1692 cursor_description,
1693 driver_column_names=True,
1694 num_sentinel_cols=context._num_sentinel_cols,
1695 )
1696 assert not metadata._safe_for_cache
1697 elif compiled._cached_metadata:
1698 metadata = compiled._cached_metadata
1699 else:
1700 metadata = CursorResultMetaData(
1701 self,
1702 cursor_description,
1703 # the number of sentinel columns is stored on the context
1704 # but it's a characteristic of the compiled object
1705 # so it's ok to apply it to a cacheable metadata.
1706 num_sentinel_cols=context._num_sentinel_cols,
1707 )
1708 if metadata._safe_for_cache:
1709 compiled._cached_metadata = metadata
1710
1711 # result rewrite/ adapt step. this is to suit the case
1712 # when we are invoked against a cached Compiled object, we want
1713 # to rewrite the ResultMetaData to reflect the Column objects
1714 # that are in our current SQL statement object, not the one
1715 # that is associated with the cached Compiled object.
1716 # the Compiled object may also tell us to not
1717 # actually do this step; this is to support the ORM where
1718 # it is to produce a new Result object in any case, and will
1719 # be using the cached Column objects against this database result
1720 # so we don't want to rewrite them.
1721 #
1722 # Basically this step suits the use case where the end user
1723 # is using Core SQL expressions and is accessing columns in the
1724 # result row using row._mapping[table.c.column].
1725 if (
1726 not context.execution_options.get(
1727 "_result_disable_adapt_to_context", False
1728 )
1729 and compiled._result_columns
1730 and context.cache_hit is context.dialect.CACHE_HIT
1731 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501
1732 ):
1733 metadata = metadata._adapt_to_context(context)
1734
1735 self._metadata = metadata
1736
1737 else:
1738 self._metadata = metadata = CursorResultMetaData(
1739 self,
1740 cursor_description,
1741 driver_column_names=driver_column_names,
1742 )
1743 if self._echo:
1744 context.connection._log_debug(
1745 "Col %r", tuple(x[0] for x in cursor_description)
1746 )
1747 return metadata
1748
1749 def _soft_close(self, hard: bool = False) -> None:
1750 """Soft close this :class:`_engine.CursorResult`.
1751
1752 This releases all DBAPI cursor resources, but leaves the
1753 CursorResult "open" from a semantic perspective, meaning the
1754 fetchXXX() methods will continue to return empty results.
1755
1756 This method is called automatically when:
1757
1758 * all result rows are exhausted using the fetchXXX() methods.
1759 * cursor.description is None.
1760
1761 This method is **not public**, but is documented in order to clarify
1762 the "autoclose" process used.
1763
1764 .. seealso::
1765
1766 :meth:`_engine.CursorResult.close`
1767
1768
1769 """
1770
1771 if (not hard and self._soft_closed) or (hard and self.closed):
1772 return
1773
1774 if hard:
1775 self.closed = True
1776 self.cursor_strategy.hard_close(self, self.cursor)
1777 else:
1778 self.cursor_strategy.soft_close(self, self.cursor)
1779
1780 if not self._soft_closed:
1781 cursor = self.cursor
1782 self.cursor = None # type: ignore
1783 self.connection._safe_close_cursor(cursor)
1784 self._soft_closed = True
1785
1786 @property
1787 def inserted_primary_key_rows(self) -> List[Optional[Any]]:
1788 """Return the value of
1789 :attr:`_engine.CursorResult.inserted_primary_key`
1790 as a row contained within a list; some dialects may support a
1791 multiple row form as well.
1792
1793 .. note:: As indicated below, in current SQLAlchemy versions this
1794 accessor is only useful beyond what's already supplied by
1795 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1796 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1797 generalize this feature to more dialects.
1798
1799 This accessor is added to support dialects that offer the feature
1800 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1801 feature, currently **only the psycopg2 dialect**, which provides
1802 for many rows to be INSERTed at once while still retaining the
1803 behavior of being able to return server-generated primary key values.
1804
1805 * **When using the psycopg2 dialect, or other dialects that may support
1806 "fast executemany" style inserts in upcoming releases** : When
1807 invoking an INSERT statement while passing a list of rows as the
1808 second argument to :meth:`_engine.Connection.execute`, this accessor
1809 will then provide a list of rows, where each row contains the primary
1810 key value for each row that was INSERTed.
1811
1812 * **When using all other dialects / backends that don't yet support
1813 this feature**: This accessor is only useful for **single row INSERT
1814 statements**, and returns the same information as that of the
1815 :attr:`_engine.CursorResult.inserted_primary_key` within a
1816 single-element list. When an INSERT statement is executed in
1817 conjunction with a list of rows to be INSERTed, the list will contain
1818 one row per row inserted in the statement, however it will contain
1819 ``None`` for any server-generated values.
1820
1821 Future releases of SQLAlchemy will further generalize the
1822 "fast execution helper" feature of psycopg2 to suit other dialects,
1823 thus allowing this accessor to be of more general use.
1824
1825 .. versionadded:: 1.4
1826
1827 .. seealso::
1828
1829 :attr:`_engine.CursorResult.inserted_primary_key`
1830
1831 """
1832 if not self.context.compiled:
1833 raise exc.InvalidRequestError(
1834 "Statement is not a compiled expression construct."
1835 )
1836 elif not self.context.isinsert:
1837 raise exc.InvalidRequestError(
1838 "Statement is not an insert() expression construct."
1839 )
1840 elif self.context._is_explicit_returning:
1841 raise exc.InvalidRequestError(
1842 "Can't call inserted_primary_key "
1843 "when returning() "
1844 "is used."
1845 )
1846 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501
1847
1848 @property
1849 def inserted_primary_key(self) -> Optional[Any]:
1850 """Return the primary key for the row just inserted.
1851
1852 The return value is a :class:`_result.Row` object representing
1853 a named tuple of primary key values in the order in which the
1854 primary key columns are configured in the source
1855 :class:`_schema.Table`.
1856
1857 .. versionchanged:: 1.4.8 - the
1858 :attr:`_engine.CursorResult.inserted_primary_key`
1859 value is now a named tuple via the :class:`_result.Row` class,
1860 rather than a plain tuple.
1861
1862 This accessor only applies to single row :func:`_expression.insert`
1863 constructs which did not explicitly specify
1864 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1865 while not yet available for most backends, would be accessed using
1866 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1867
1868 Note that primary key columns which specify a server_default clause, or
1869 otherwise do not qualify as "autoincrement" columns (see the notes at
1870 :class:`_schema.Column`), and were generated using the database-side
1871 default, will appear in this list as ``None`` unless the backend
1872 supports "returning" and the insert statement executed with the
1873 "implicit returning" enabled.
1874
1875 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1876 statement is not a compiled expression construct
1877 or is not an insert() construct.
1878
1879 """
1880
1881 if self.context.executemany:
1882 raise exc.InvalidRequestError(
1883 "This statement was an executemany call; if primary key "
1884 "returning is supported, please "
1885 "use .inserted_primary_key_rows."
1886 )
1887
1888 ikp = self.inserted_primary_key_rows
1889 if ikp:
1890 return ikp[0]
1891 else:
1892 return None
1893
1894 def last_updated_params(
1895 self,
1896 ) -> Union[
1897 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1898 ]:
1899 """Return the collection of updated parameters from this
1900 execution.
1901
1902 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1903 statement is not a compiled expression construct
1904 or is not an update() construct.
1905
1906 """
1907 if not self.context.compiled:
1908 raise exc.InvalidRequestError(
1909 "Statement is not a compiled expression construct."
1910 )
1911 elif not self.context.isupdate:
1912 raise exc.InvalidRequestError(
1913 "Statement is not an update() expression construct."
1914 )
1915 elif self.context.executemany:
1916 return self.context.compiled_parameters
1917 else:
1918 return self.context.compiled_parameters[0]
1919
1920 def last_inserted_params(
1921 self,
1922 ) -> Union[
1923 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams
1924 ]:
1925 """Return the collection of inserted parameters from this
1926 execution.
1927
1928 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1929 statement is not a compiled expression construct
1930 or is not an insert() construct.
1931
1932 """
1933 if not self.context.compiled:
1934 raise exc.InvalidRequestError(
1935 "Statement is not a compiled expression construct."
1936 )
1937 elif not self.context.isinsert:
1938 raise exc.InvalidRequestError(
1939 "Statement is not an insert() expression construct."
1940 )
1941 elif self.context.executemany:
1942 return self.context.compiled_parameters
1943 else:
1944 return self.context.compiled_parameters[0]
1945
1946 @property
1947 def returned_defaults_rows(
1948 self,
1949 ) -> Optional[Sequence[Row[Unpack[TupleAny]]]]:
1950 """Return a list of rows each containing the values of default
1951 columns that were fetched using
1952 the :meth:`.ValuesBase.return_defaults` feature.
1953
1954 The return value is a list of :class:`.Row` objects.
1955
1956 .. versionadded:: 1.4
1957
1958 """
1959 return self.context.returned_default_rows
1960
1961 def splice_horizontally(self, other: CursorResult[Any]) -> Self:
1962 """Return a new :class:`.CursorResult` that "horizontally splices"
1963 together the rows of this :class:`.CursorResult` with that of another
1964 :class:`.CursorResult`.
1965
1966 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1967 not intended for general use.
1968
1969 "horizontally splices" means that for each row in the first and second
1970 result sets, a new row that concatenates the two rows together is
1971 produced, which then becomes the new row. The incoming
1972 :class:`.CursorResult` must have the identical number of rows. It is
1973 typically expected that the two result sets come from the same sort
1974 order as well, as the result rows are spliced together based on their
1975 position in the result.
1976
1977 The expected use case here is so that multiple INSERT..RETURNING
1978 statements (which definitely need to be sorted) against different
1979 tables can produce a single result that looks like a JOIN of those two
1980 tables.
1981
1982 E.g.::
1983
1984 r1 = connection.execute(
1985 users.insert().returning(
1986 users.c.user_name, users.c.user_id, sort_by_parameter_order=True
1987 ),
1988 user_values,
1989 )
1990
1991 r2 = connection.execute(
1992 addresses.insert().returning(
1993 addresses.c.address_id,
1994 addresses.c.address,
1995 addresses.c.user_id,
1996 sort_by_parameter_order=True,
1997 ),
1998 address_values,
1999 )
2000
2001 rows = r1.splice_horizontally(r2).all()
2002 assert rows == [
2003 ("john", 1, 1, "foo@bar.com", 1),
2004 ("jack", 2, 2, "bar@bat.com", 2),
2005 ]
2006
2007 .. versionadded:: 2.0
2008
2009 .. seealso::
2010
2011 :meth:`.CursorResult.splice_vertically`
2012
2013
2014 """ # noqa: E501
2015
2016 clone = self._generate()
2017 assert clone is self # just to note
2018 assert isinstance(other._metadata, CursorResultMetaData)
2019 assert isinstance(self._metadata, CursorResultMetaData)
2020 self_tf = self._metadata._tuplefilter
2021 other_tf = other._metadata._tuplefilter
2022 clone._metadata = self._metadata._splice_horizontally(other._metadata)
2023
2024 total_rows = [
2025 tuple(r1 if self_tf is None else self_tf(r1))
2026 + tuple(r2 if other_tf is None else other_tf(r2))
2027 for r1, r2 in zip(
2028 list(self._raw_row_iterator()),
2029 list(other._raw_row_iterator()),
2030 )
2031 ]
2032
2033 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
2034 None,
2035 initial_buffer=total_rows,
2036 )
2037 clone._reset_memoizations()
2038 return clone
2039
2040 def splice_vertically(self, other: CursorResult[Any]) -> Self:
2041 """Return a new :class:`.CursorResult` that "vertically splices",
2042 i.e. "extends", the rows of this :class:`.CursorResult` with that of
2043 another :class:`.CursorResult`.
2044
2045 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
2046 not intended for general use.
2047
2048 "vertically splices" means the rows of the given result are appended to
2049 the rows of this cursor result. The incoming :class:`.CursorResult`
2050 must have rows that represent the identical list of columns in the
2051 identical order as they are in this :class:`.CursorResult`.
2052
2053 .. versionadded:: 2.0
2054
2055 .. seealso::
2056
2057 :meth:`.CursorResult.splice_horizontally`
2058
2059 """
2060 clone = self._generate()
2061 total_rows = list(self._raw_row_iterator()) + list(
2062 other._raw_row_iterator()
2063 )
2064
2065 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
2066 None,
2067 initial_buffer=total_rows,
2068 )
2069 clone._reset_memoizations()
2070 return clone
2071
2072 def _rewind(self, rows: Any) -> Self:
2073 """rewind this result back to the given rowset.
2074
2075 this is used internally for the case where an :class:`.Insert`
2076 construct combines the use of
2077 :meth:`.Insert.return_defaults` along with the
2078 "supplemental columns" feature.
2079
2080 NOTE: this method has not effect then an unique filter is applied
2081 to the result, meaning that no row will be returned.
2082
2083 """
2084
2085 if self._echo:
2086 self.context.connection._log_debug(
2087 "CursorResult rewound %d row(s)", len(rows)
2088 )
2089
2090 # the rows given are expected to be Row objects, so we
2091 # have to clear out processors which have already run on these
2092 # rows
2093 self._metadata = cast(
2094 CursorResultMetaData, self._metadata
2095 )._remove_processors_and_tuple_filter()
2096
2097 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
2098 None,
2099 # TODO: if these are Row objects, can we save on not having to
2100 # re-make new Row objects out of them a second time? is that
2101 # what's actually happening right now? maybe look into this
2102 initial_buffer=rows,
2103 )
2104 self._reset_memoizations()
2105 return self
2106
2107 @property
2108 def returned_defaults(self) -> Optional[Row[Unpack[TupleAny]]]:
2109 """Return the values of default columns that were fetched using
2110 the :meth:`.ValuesBase.return_defaults` feature.
2111
2112 The value is an instance of :class:`.Row`, or ``None``
2113 if :meth:`.ValuesBase.return_defaults` was not used or if the
2114 backend does not support RETURNING.
2115
2116 .. seealso::
2117
2118 :meth:`.ValuesBase.return_defaults`
2119
2120 """
2121
2122 if self.context.executemany:
2123 raise exc.InvalidRequestError(
2124 "This statement was an executemany call; if return defaults "
2125 "is supported, please use .returned_defaults_rows."
2126 )
2127
2128 rows = self.context.returned_default_rows
2129 if rows:
2130 return rows[0]
2131 else:
2132 return None
2133
2134 def lastrow_has_defaults(self) -> bool:
2135 """Return ``lastrow_has_defaults()`` from the underlying
2136 :class:`.ExecutionContext`.
2137
2138 See :class:`.ExecutionContext` for details.
2139
2140 """
2141
2142 return self.context.lastrow_has_defaults()
2143
2144 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2145 """Return ``postfetch_cols()`` from the underlying
2146 :class:`.ExecutionContext`.
2147
2148 See :class:`.ExecutionContext` for details.
2149
2150 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2151 statement is not a compiled expression construct
2152 or is not an insert() or update() construct.
2153
2154 """
2155
2156 if not self.context.compiled:
2157 raise exc.InvalidRequestError(
2158 "Statement is not a compiled expression construct."
2159 )
2160 elif not self.context.isinsert and not self.context.isupdate:
2161 raise exc.InvalidRequestError(
2162 "Statement is not an insert() or update() "
2163 "expression construct."
2164 )
2165 return self.context.postfetch_cols
2166
2167 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
2168 """Return ``prefetch_cols()`` from the underlying
2169 :class:`.ExecutionContext`.
2170
2171 See :class:`.ExecutionContext` for details.
2172
2173 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2174 statement is not a compiled expression construct
2175 or is not an insert() or update() construct.
2176
2177 """
2178
2179 if not self.context.compiled:
2180 raise exc.InvalidRequestError(
2181 "Statement is not a compiled expression construct."
2182 )
2183 elif not self.context.isinsert and not self.context.isupdate:
2184 raise exc.InvalidRequestError(
2185 "Statement is not an insert() or update() "
2186 "expression construct."
2187 )
2188 return self.context.prefetch_cols
2189
2190 def supports_sane_rowcount(self) -> bool:
2191 """Return ``supports_sane_rowcount`` from the dialect.
2192
2193 See :attr:`_engine.CursorResult.rowcount` for background.
2194
2195 """
2196
2197 return self.dialect.supports_sane_rowcount
2198
2199 def supports_sane_multi_rowcount(self) -> bool:
2200 """Return ``supports_sane_multi_rowcount`` from the dialect.
2201
2202 See :attr:`_engine.CursorResult.rowcount` for background.
2203
2204 """
2205
2206 return self.dialect.supports_sane_multi_rowcount
2207
2208 @util.memoized_property
2209 def rowcount(self) -> int:
2210 """Return the 'rowcount' for this result.
2211
2212 The primary purpose of 'rowcount' is to report the number of rows
2213 matched by the WHERE criterion of an UPDATE or DELETE statement
2214 executed once (i.e. for a single parameter set), which may then be
2215 compared to the number of rows expected to be updated or deleted as a
2216 means of asserting data integrity.
2217
2218 This attribute is transferred from the ``cursor.rowcount`` attribute
2219 of the DBAPI before the cursor is closed, to support DBAPIs that
2220 don't make this value available after cursor close. Some DBAPIs may
2221 offer meaningful values for other kinds of statements, such as INSERT
2222 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
2223 for these statements, set the
2224 :paramref:`.Connection.execution_options.preserve_rowcount`
2225 execution option to True, which will cause the ``cursor.rowcount``
2226 value to be unconditionally memoized before any results are returned
2227 or the cursor is closed, regardless of statement type.
2228
2229 For cases where the DBAPI does not support rowcount for a particular
2230 kind of statement and/or execution, the returned value will be ``-1``,
2231 which is delivered directly from the DBAPI and is part of :pep:`249`.
2232 All DBAPIs should support rowcount for single-parameter-set
2233 UPDATE and DELETE statements, however.
2234
2235 .. note::
2236
2237 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2238
2239
2240 * This attribute returns the number of rows *matched*,
2241 which is not necessarily the same as the number of rows
2242 that were actually *modified*. For example, an UPDATE statement
2243 may have no net change on a given row if the SET values
2244 given are the same as those present in the row already.
2245 Such a row would be matched but not modified.
2246 On backends that feature both styles, such as MySQL,
2247 rowcount is configured to return the match
2248 count in all cases.
2249
2250 * :attr:`_engine.CursorResult.rowcount` in the default case is
2251 *only* useful in conjunction with an UPDATE or DELETE statement,
2252 and only with a single set of parameters. For other kinds of
2253 statements, SQLAlchemy will not attempt to pre-memoize the value
2254 unless the
2255 :paramref:`.Connection.execution_options.preserve_rowcount`
2256 execution option is used. Note that contrary to :pep:`249`, many
2257 DBAPIs do not support rowcount values for statements that are not
2258 UPDATE or DELETE, particularly when rows are being returned which
2259 are not fully pre-buffered. DBAPIs that dont support rowcount
2260 for a particular kind of statement should return the value ``-1``
2261 for such statements.
2262
2263 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2264 when executing a single statement with multiple parameter sets
2265 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2266 values across multiple parameter sets and will return ``-1``
2267 when accessed.
2268
2269 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2270 a correct population of :attr:`_engine.CursorResult.rowcount`
2271 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2272 execution option is set to True.
2273
2274 * Statements that use RETURNING may not support rowcount, returning
2275 a ``-1`` value instead.
2276
2277 .. seealso::
2278
2279 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2280
2281 :paramref:`.Connection.execution_options.preserve_rowcount`
2282
2283 """ # noqa: E501
2284 try:
2285 return self.context.rowcount
2286 except BaseException as e:
2287 self.cursor_strategy.handle_exception(self, self.cursor, e)
2288 raise # not called
2289
2290 @property
2291 def lastrowid(self) -> int:
2292 """Return the 'lastrowid' accessor on the DBAPI cursor.
2293
2294 This is a DBAPI specific method and is only functional
2295 for those backends which support it, for statements
2296 where it is appropriate. It's behavior is not
2297 consistent across backends.
2298
2299 Usage of this method is normally unnecessary when
2300 using insert() expression constructs; the
2301 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2302 tuple of primary key values for a newly inserted row,
2303 regardless of database backend.
2304
2305 """
2306 try:
2307 return self.context.get_lastrowid()
2308 except BaseException as e:
2309 self.cursor_strategy.handle_exception(self, self.cursor, e)
2310
2311 @property
2312 def returns_rows(self) -> bool:
2313 """True if this :class:`_engine.CursorResult` returns zero or more
2314 rows.
2315
2316 I.e. if it is legal to call the methods
2317 :meth:`_engine.CursorResult.fetchone`,
2318 :meth:`_engine.CursorResult.fetchmany`
2319 :meth:`_engine.CursorResult.fetchall`.
2320
2321 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2322 always be synonymous with whether or not the DBAPI cursor had a
2323 ``.description`` attribute, indicating the presence of result columns,
2324 noting that a cursor that returns zero rows still has a
2325 ``.description`` if a row-returning statement was emitted.
2326
2327 This attribute should be True for all results that are against
2328 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2329 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2330 not using RETURNING, the value will usually be False, however
2331 there are some dialect-specific exceptions to this, such as when
2332 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2333 order to retrieve an inserted primary key value.
2334
2335 .. seealso::
2336
2337 :meth:`.Result.close`
2338
2339 :attr:`.Result.closed`
2340
2341 """
2342 return self._metadata.returns_rows
2343
2344 @property
2345 def is_insert(self) -> bool:
2346 """True if this :class:`_engine.CursorResult` is the result
2347 of a executing an expression language compiled
2348 :func:`_expression.insert` construct.
2349
2350 When True, this implies that the
2351 :attr:`inserted_primary_key` attribute is accessible,
2352 assuming the statement did not include
2353 a user defined "returning" construct.
2354
2355 """
2356 return self.context.isinsert
2357
2358 def _fetchiter_impl(self) -> Iterator[Any]:
2359 fetchone = self.cursor_strategy.fetchone
2360
2361 while True:
2362 row = fetchone(self, self.cursor)
2363 if row is None:
2364 break
2365 yield row
2366
2367 def _fetchone_impl(self, hard_close: bool = False) -> Any:
2368 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2369
2370 def _fetchall_impl(self) -> Any:
2371 return self.cursor_strategy.fetchall(self, self.cursor)
2372
2373 def _fetchmany_impl(self, size: Optional[int] = None) -> Any:
2374 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2375
2376 def _raw_row_iterator(self) -> Any:
2377 return self._fetchiter_impl()
2378
2379 def merge(
2380 self, *others: Result[Unpack[TupleAny]]
2381 ) -> MergedResult[Unpack[TupleAny]]:
2382 merged_result = super().merge(*others)
2383 if self.context._has_rowcount:
2384 merged_result.rowcount = sum(
2385 cast("CursorResult[Any]", result).rowcount
2386 for result in (self,) + others
2387 )
2388 return merged_result
2389
2390 def close(self) -> None:
2391 """Close this :class:`_engine.CursorResult`.
2392
2393 This closes out the underlying DBAPI cursor corresponding to the
2394 statement execution, if one is still present. Note that the DBAPI
2395 cursor is automatically released when the :class:`_engine.CursorResult`
2396 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2397 generally an optional method except in the case when discarding a
2398 :class:`_engine.CursorResult` that still has additional rows pending
2399 for fetch.
2400
2401 After this method is called, it is no longer valid to call upon
2402 the fetch methods, which will raise a :class:`.ResourceClosedError`
2403 on subsequent use.
2404
2405 .. seealso::
2406
2407 :ref:`connections_toplevel`
2408
2409 """
2410 self._soft_close(hard=True)
2411
2412 @_generative
2413 def yield_per(self, num: int) -> Self:
2414 self._yield_per = num
2415 self.cursor_strategy.yield_per(self, self.cursor, num)
2416 return self
2417
2418
2419ResultProxy = CursorResult