1# engine/cursor.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Define cursor-specific result set constructs including
10:class:`.CursorResult`."""
11
12
13from __future__ import annotations
14
15import collections
16import functools
17import operator
18import typing
19from typing import Any
20from typing import cast
21from typing import ClassVar
22from typing import Dict
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Mapping
27from typing import NoReturn
28from typing import Optional
29from typing import Sequence
30from typing import Tuple
31from typing import TYPE_CHECKING
32from typing import Union
33
34from .result import IteratorResult
35from .result import MergedResult
36from .result import Result
37from .result import ResultMetaData
38from .result import SimpleResultMetaData
39from .result import tuplegetter
40from .row import Row
41from .. import exc
42from .. import util
43from ..sql import elements
44from ..sql import sqltypes
45from ..sql import util as sql_util
46from ..sql.base import _generative
47from ..sql.compiler import ResultColumnsEntry
48from ..sql.compiler import RM_NAME
49from ..sql.compiler import RM_OBJECTS
50from ..sql.compiler import RM_RENDERED_NAME
51from ..sql.compiler import RM_TYPE
52from ..sql.type_api import TypeEngine
53from ..util.typing import Literal
54from ..util.typing import Self
55from ..util.typing import TupleAny
56from ..util.typing import TypeVarTuple
57from ..util.typing import Unpack
58
59
60if typing.TYPE_CHECKING:
61 from .base import Connection
62 from .default import DefaultExecutionContext
63 from .interfaces import _DBAPICursorDescription
64 from .interfaces import DBAPICursor
65 from .interfaces import Dialect
66 from .interfaces import ExecutionContext
67 from .result import _KeyIndexType
68 from .result import _KeyMapRecType
69 from .result import _KeyMapType
70 from .result import _KeyType
71 from .result import _ProcessorsType
72 from .result import _TupleGetterType
73 from ..sql.type_api import _ResultProcessorType
74
75
76_Ts = TypeVarTuple("_Ts")
77
78
79# metadata entry tuple indexes.
80# using raw tuple is faster than namedtuple.
81# these match up to the positions in
82# _CursorKeyMapRecType
83MD_INDEX: Literal[0] = 0
84"""integer index in cursor.description
85
86"""
87
88MD_RESULT_MAP_INDEX: Literal[1] = 1
89"""integer index in compiled._result_columns"""
90
91MD_OBJECTS: Literal[2] = 2
92"""other string keys and ColumnElement obj that can match.
93
94This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
95
96"""
97
98MD_LOOKUP_KEY: Literal[3] = 3
99"""string key we usually expect for key-based lookup
100
101this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
102"""
103
104
105MD_RENDERED_NAME: Literal[4] = 4
106"""name that is usually in cursor.description
107
108this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
109"""
110
111
112MD_PROCESSOR: Literal[5] = 5
113"""callable to process a result value into a row"""
114
115MD_UNTRANSLATED: Literal[6] = 6
116"""raw name from cursor.description"""
117
118
119_CursorKeyMapRecType = Tuple[
120 Optional[int], # MD_INDEX, None means the record is ambiguously named
121 int, # MD_RESULT_MAP_INDEX
122 List[Any], # MD_OBJECTS
123 str, # MD_LOOKUP_KEY
124 str, # MD_RENDERED_NAME
125 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
126 Optional[str], # MD_UNTRANSLATED
127]
128
129_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
130
131# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
132# not None
133_NonAmbigCursorKeyMapRecType = Tuple[
134 int,
135 int,
136 List[Any],
137 str,
138 str,
139 Optional["_ResultProcessorType[Any]"],
140 str,
141]
142
143
144class CursorResultMetaData(ResultMetaData):
145 """Result metadata for DBAPI cursors."""
146
147 __slots__ = (
148 "_keymap",
149 "_processors",
150 "_keys",
151 "_keymap_by_result_column_idx",
152 "_tuplefilter",
153 "_translated_indexes",
154 "_safe_for_cache",
155 "_unpickled",
156 "_key_to_index",
157 # don't need _unique_filters support here for now. Can be added
158 # if a need arises.
159 )
160
161 _keymap: _CursorKeyMapType
162 _processors: _ProcessorsType
163 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
164 _unpickled: bool
165 _safe_for_cache: bool
166 _translated_indexes: Optional[List[int]]
167
168 returns_rows: ClassVar[bool] = True
169
170 def _has_key(self, key: Any) -> bool:
171 return key in self._keymap
172
173 def _for_freeze(self) -> ResultMetaData:
174 return SimpleResultMetaData(
175 self._keys,
176 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
177 )
178
179 def _make_new_metadata(
180 self,
181 *,
182 unpickled: bool,
183 processors: _ProcessorsType,
184 keys: Sequence[str],
185 keymap: _KeyMapType,
186 tuplefilter: Optional[_TupleGetterType],
187 translated_indexes: Optional[List[int]],
188 safe_for_cache: bool,
189 keymap_by_result_column_idx: Any,
190 ) -> Self:
191 new_obj = self.__class__.__new__(self.__class__)
192 new_obj._unpickled = unpickled
193 new_obj._processors = processors
194 new_obj._keys = keys
195 new_obj._keymap = keymap
196 new_obj._tuplefilter = tuplefilter
197 new_obj._translated_indexes = translated_indexes
198 new_obj._safe_for_cache = safe_for_cache
199 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
200 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
201 return new_obj
202
203 def _remove_processors(self) -> Self:
204 assert not self._tuplefilter
205 return self._make_new_metadata(
206 unpickled=self._unpickled,
207 processors=[None] * len(self._processors),
208 tuplefilter=None,
209 translated_indexes=None,
210 keymap={
211 key: value[0:5] + (None,) + value[6:]
212 for key, value in self._keymap.items()
213 },
214 keys=self._keys,
215 safe_for_cache=self._safe_for_cache,
216 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
217 )
218
219 def _splice_horizontally(self, other: CursorResultMetaData) -> Self:
220 assert not self._tuplefilter
221
222 keymap = dict(self._keymap)
223 offset = len(self._keys)
224 keymap.update(
225 {
226 key: (
227 # int index should be None for ambiguous key
228 (
229 value[0] + offset
230 if value[0] is not None and key not in keymap
231 else None
232 ),
233 value[1] + offset,
234 *value[2:],
235 )
236 for key, value in other._keymap.items()
237 }
238 )
239 return self._make_new_metadata(
240 unpickled=self._unpickled,
241 processors=self._processors + other._processors, # type: ignore
242 tuplefilter=None,
243 translated_indexes=None,
244 keys=self._keys + other._keys, # type: ignore
245 keymap=keymap,
246 safe_for_cache=self._safe_for_cache,
247 keymap_by_result_column_idx={
248 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
249 for metadata_entry in keymap.values()
250 },
251 )
252
253 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self:
254 recs = list(self._metadata_for_keys(keys))
255
256 indexes = [rec[MD_INDEX] for rec in recs]
257 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
258
259 if self._translated_indexes:
260 indexes = [self._translated_indexes[idx] for idx in indexes]
261 tup = tuplegetter(*indexes)
262 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
263
264 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
265 # TODO: need unit test for:
266 # result = connection.execute("raw sql, no columns").scalars()
267 # without the "or ()" it's failing because MD_OBJECTS is None
268 keymap.update(
269 (e, new_rec)
270 for new_rec in new_recs
271 for e in new_rec[MD_OBJECTS] or ()
272 )
273
274 return self._make_new_metadata(
275 unpickled=self._unpickled,
276 processors=self._processors,
277 keys=new_keys,
278 tuplefilter=tup,
279 translated_indexes=indexes,
280 keymap=keymap, # type: ignore[arg-type]
281 safe_for_cache=self._safe_for_cache,
282 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
283 )
284
285 def _adapt_to_context(self, context: ExecutionContext) -> Self:
286 """When using a cached Compiled construct that has a _result_map,
287 for a new statement that used the cached Compiled, we need to ensure
288 the keymap has the Column objects from our new statement as keys.
289 So here we rewrite keymap with new entries for the new columns
290 as matched to those of the cached statement.
291
292 """
293
294 if not context.compiled or not context.compiled._result_columns:
295 return self
296
297 compiled_statement = context.compiled.statement
298 invoked_statement = context.invoked_statement
299
300 if TYPE_CHECKING:
301 assert isinstance(invoked_statement, elements.ClauseElement)
302
303 if compiled_statement is invoked_statement:
304 return self
305
306 assert invoked_statement is not None
307
308 # this is the most common path for Core statements when
309 # caching is used. In ORM use, this codepath is not really used
310 # as the _result_disable_adapt_to_context execution option is
311 # set by the ORM.
312
313 # make a copy and add the columns from the invoked statement
314 # to the result map.
315
316 keymap_by_position = self._keymap_by_result_column_idx
317
318 if keymap_by_position is None:
319 # first retrival from cache, this map will not be set up yet,
320 # initialize lazily
321 keymap_by_position = self._keymap_by_result_column_idx = {
322 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
323 for metadata_entry in self._keymap.values()
324 }
325
326 assert not self._tuplefilter
327 return self._make_new_metadata(
328 keymap=self._keymap
329 | {
330 new: keymap_by_position[idx]
331 for idx, new in enumerate(
332 invoked_statement._all_selected_columns
333 )
334 if idx in keymap_by_position
335 },
336 unpickled=self._unpickled,
337 processors=self._processors,
338 tuplefilter=None,
339 translated_indexes=None,
340 keys=self._keys,
341 safe_for_cache=self._safe_for_cache,
342 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
343 )
344
345 def __init__(
346 self,
347 parent: CursorResult[Unpack[TupleAny]],
348 cursor_description: _DBAPICursorDescription,
349 *,
350 driver_column_names: bool = False,
351 ):
352 context = parent.context
353 self._tuplefilter = None
354 self._translated_indexes = None
355 self._safe_for_cache = self._unpickled = False
356
357 if context.result_column_struct:
358 (
359 result_columns,
360 cols_are_ordered,
361 textual_ordered,
362 ad_hoc_textual,
363 loose_column_name_matching,
364 ) = context.result_column_struct
365 num_ctx_cols = len(result_columns)
366 else:
367 result_columns = cols_are_ordered = ( # type: ignore
368 num_ctx_cols
369 ) = ad_hoc_textual = loose_column_name_matching = (
370 textual_ordered
371 ) = False
372
373 # merge cursor.description with the column info
374 # present in the compiled structure, if any
375 raw = self._merge_cursor_description(
376 context,
377 cursor_description,
378 result_columns,
379 num_ctx_cols,
380 cols_are_ordered,
381 textual_ordered,
382 ad_hoc_textual,
383 loose_column_name_matching,
384 driver_column_names,
385 )
386
387 # processors in key order which are used when building up
388 # a row
389 self._processors = [
390 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
391 ]
392
393 # this is used when using this ResultMetaData in a Core-only cache
394 # retrieval context. it's initialized on first cache retrieval
395 # when the _result_disable_adapt_to_context execution option
396 # (which the ORM generally sets) is not set.
397 self._keymap_by_result_column_idx = None
398
399 # for compiled SQL constructs, copy additional lookup keys into
400 # the key lookup map, such as Column objects, labels,
401 # column keys and other names
402 if num_ctx_cols:
403 # keymap by primary string...
404 by_key = {
405 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
406 for metadata_entry in raw
407 }
408
409 if len(by_key) != num_ctx_cols:
410 # if by-primary-string dictionary smaller than
411 # number of columns, assume we have dupes; (this check
412 # is also in place if string dictionary is bigger, as
413 # can occur when '*' was used as one of the compiled columns,
414 # which may or may not be suggestive of dupes), rewrite
415 # dupe records with "None" for index which results in
416 # ambiguous column exception when accessed.
417 #
418 # this is considered to be the less common case as it is not
419 # common to have dupe column keys in a SELECT statement.
420 #
421 # new in 1.4: get the complete set of all possible keys,
422 # strings, objects, whatever, that are dupes across two
423 # different records, first.
424 index_by_key: Dict[Any, Any] = {}
425 dupes = set()
426 for metadata_entry in raw:
427 for key in (metadata_entry[MD_RENDERED_NAME],) + (
428 metadata_entry[MD_OBJECTS] or ()
429 ):
430 idx = metadata_entry[MD_INDEX]
431 # if this key has been associated with more than one
432 # positional index, it's a dupe
433 if index_by_key.setdefault(key, idx) != idx:
434 dupes.add(key)
435
436 # then put everything we have into the keymap excluding only
437 # those keys that are dupes.
438 self._keymap = {
439 obj_elem: metadata_entry
440 for metadata_entry in raw
441 if metadata_entry[MD_OBJECTS]
442 for obj_elem in metadata_entry[MD_OBJECTS]
443 if obj_elem not in dupes
444 }
445
446 # then for the dupe keys, put the "ambiguous column"
447 # record into by_key.
448 by_key.update(
449 {
450 key: (None, None, [], key, key, None, None)
451 for key in dupes
452 }
453 )
454
455 else:
456 # no dupes - copy secondary elements from compiled
457 # columns into self._keymap. this is the most common
458 # codepath for Core / ORM statement executions before the
459 # result metadata is cached
460 self._keymap = {
461 obj_elem: metadata_entry
462 for metadata_entry in raw
463 if metadata_entry[MD_OBJECTS]
464 for obj_elem in metadata_entry[MD_OBJECTS]
465 }
466 # update keymap with primary string names taking
467 # precedence
468 self._keymap.update(by_key)
469 else:
470 # no compiled objects to map, just create keymap by primary string
471 self._keymap = {
472 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
473 for metadata_entry in raw
474 }
475
476 # update keymap with "translated" names.
477 # the "translated" name thing has a long history:
478 # 1. originally, it was used to fix an issue in very old SQLite
479 # versions prior to 3.10.0. This code is still there in the
480 # sqlite dialect.
481 # 2. Next, the pyhive third party dialect started using this hook
482 # for some driver related issue on their end.
483 # 3. Most recently, the "driver_column_names" execution option has
484 # taken advantage of this hook to get raw DBAPI col names in the
485 # result keys without disrupting the usual merge process.
486
487 if driver_column_names or (
488 not num_ctx_cols and context._translate_colname
489 ):
490 self._keymap.update(
491 {
492 metadata_entry[MD_UNTRANSLATED]: self._keymap[
493 metadata_entry[MD_LOOKUP_KEY]
494 ]
495 for metadata_entry in raw
496 if metadata_entry[MD_UNTRANSLATED]
497 }
498 )
499
500 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
501
502 def _merge_cursor_description(
503 self,
504 context,
505 cursor_description,
506 result_columns,
507 num_ctx_cols,
508 cols_are_ordered,
509 textual_ordered,
510 ad_hoc_textual,
511 loose_column_name_matching,
512 driver_column_names,
513 ):
514 """Merge a cursor.description with compiled result column information.
515
516 There are at least four separate strategies used here, selected
517 depending on the type of SQL construct used to start with.
518
519 The most common case is that of the compiled SQL expression construct,
520 which generated the column names present in the raw SQL string and
521 which has the identical number of columns as were reported by
522 cursor.description. In this case, we assume a 1-1 positional mapping
523 between the entries in cursor.description and the compiled object.
524 This is also the most performant case as we disregard extracting /
525 decoding the column names present in cursor.description since we
526 already have the desired name we generated in the compiled SQL
527 construct.
528
529 The next common case is that of the completely raw string SQL,
530 such as passed to connection.execute(). In this case we have no
531 compiled construct to work with, so we extract and decode the
532 names from cursor.description and index those as the primary
533 result row target keys.
534
535 The remaining fairly common case is that of the textual SQL
536 that includes at least partial column information; this is when
537 we use a :class:`_expression.TextualSelect` construct.
538 This construct may have
539 unordered or ordered column information. In the ordered case, we
540 merge the cursor.description and the compiled construct's information
541 positionally, and warn if there are additional description names
542 present, however we still decode the names in cursor.description
543 as we don't have a guarantee that the names in the columns match
544 on these. In the unordered case, we match names in cursor.description
545 to that of the compiled construct based on name matching.
546 In both of these cases, the cursor.description names and the column
547 expression objects and names are indexed as result row target keys.
548
549 The final case is much less common, where we have a compiled
550 non-textual SQL expression construct, but the number of columns
551 in cursor.description doesn't match what's in the compiled
552 construct. We make the guess here that there might be textual
553 column expressions in the compiled construct that themselves include
554 a comma in them causing them to split. We do the same name-matching
555 as with textual non-ordered columns.
556
557 The name-matched system of merging is the same as that used by
558 SQLAlchemy for all cases up through the 0.9 series. Positional
559 matching for compiled SQL expressions was introduced in 1.0 as a
560 major performance feature, and positional matching for textual
561 :class:`_expression.TextualSelect` objects in 1.1.
562 As name matching is no longer
563 a common case, it was acceptable to factor it into smaller generator-
564 oriented methods that are easier to understand, but incur slightly
565 more performance overhead.
566
567 """
568
569 if (
570 num_ctx_cols
571 and cols_are_ordered
572 and not textual_ordered
573 and num_ctx_cols == len(cursor_description)
574 and not driver_column_names
575 ):
576 self._keys = [elem[0] for elem in result_columns]
577 # pure positional 1-1 case; doesn't need to read
578 # the names from cursor.description
579
580 # most common case for Core and ORM
581
582 # this metadata is safe to
583 # cache because we are guaranteed
584 # to have the columns in the same order for new executions
585 self._safe_for_cache = True
586
587 return [
588 (
589 idx,
590 idx,
591 rmap_entry[RM_OBJECTS],
592 rmap_entry[RM_NAME],
593 rmap_entry[RM_RENDERED_NAME],
594 context.get_result_processor(
595 rmap_entry[RM_TYPE],
596 rmap_entry[RM_RENDERED_NAME],
597 cursor_description[idx][1],
598 ),
599 None,
600 )
601 for idx, rmap_entry in enumerate(result_columns)
602 ]
603 else:
604 # name-based or text-positional cases, where we need
605 # to read cursor.description names
606
607 if textual_ordered or (
608 ad_hoc_textual and len(cursor_description) == num_ctx_cols
609 ):
610 self._safe_for_cache = not driver_column_names
611 # textual positional case
612 raw_iterator = self._merge_textual_cols_by_position(
613 context,
614 cursor_description,
615 result_columns,
616 driver_column_names,
617 )
618 elif num_ctx_cols:
619 # compiled SQL with a mismatch of description cols
620 # vs. compiled cols, or textual w/ unordered columns
621 # the order of columns can change if the query is
622 # against a "select *", so not safe to cache
623 self._safe_for_cache = False
624 raw_iterator = self._merge_cols_by_name(
625 context,
626 cursor_description,
627 result_columns,
628 loose_column_name_matching,
629 driver_column_names,
630 )
631 else:
632 # no compiled SQL, just a raw string, order of columns
633 # can change for "select *"
634 self._safe_for_cache = False
635 raw_iterator = self._merge_cols_by_none(
636 context, cursor_description, driver_column_names
637 )
638
639 return [
640 (
641 idx,
642 ridx,
643 obj,
644 cursor_colname,
645 cursor_colname,
646 context.get_result_processor(
647 mapped_type, cursor_colname, coltype
648 ),
649 untranslated,
650 )
651 for (
652 idx,
653 ridx,
654 cursor_colname,
655 mapped_type,
656 coltype,
657 obj,
658 untranslated,
659 ) in raw_iterator
660 ]
661
662 def _colnames_from_description(
663 self, context, cursor_description, driver_column_names
664 ):
665 """Extract column names and data types from a cursor.description.
666
667 Applies unicode decoding, column translation, "normalization",
668 and case sensitivity rules to the names based on the dialect.
669
670 """
671 dialect = context.dialect
672 translate_colname = context._translate_colname
673 normalize_name = (
674 dialect.normalize_name if dialect.requires_name_normalize else None
675 )
676
677 untranslated = None
678
679 for idx, rec in enumerate(cursor_description):
680 colname = unnormalized = rec[0]
681 coltype = rec[1]
682
683 if translate_colname:
684 # a None here for "untranslated" means "the dialect did not
685 # change the column name and the untranslated case can be
686 # ignored". otherwise "untranslated" is expected to be the
687 # original, unchanged colname (e.g. is == to "unnormalized")
688 colname, untranslated = translate_colname(colname)
689
690 assert untranslated is None or untranslated == unnormalized
691
692 if normalize_name:
693 colname = normalize_name(colname)
694
695 if driver_column_names:
696 yield idx, colname, unnormalized, unnormalized, coltype
697
698 else:
699 yield idx, colname, unnormalized, untranslated, coltype
700
701 def _merge_textual_cols_by_position(
702 self, context, cursor_description, result_columns, driver_column_names
703 ):
704 num_ctx_cols = len(result_columns)
705
706 if num_ctx_cols > len(cursor_description):
707 util.warn(
708 "Number of columns in textual SQL (%d) is "
709 "smaller than number of columns requested (%d)"
710 % (num_ctx_cols, len(cursor_description))
711 )
712 seen = set()
713
714 self._keys = []
715
716 uses_denormalize = context.dialect.requires_name_normalize
717 for (
718 idx,
719 colname,
720 unnormalized,
721 untranslated,
722 coltype,
723 ) in self._colnames_from_description(
724 context, cursor_description, driver_column_names
725 ):
726 if idx < num_ctx_cols:
727 ctx_rec = result_columns[idx]
728 obj = ctx_rec[RM_OBJECTS]
729 ridx = idx
730 mapped_type = ctx_rec[RM_TYPE]
731 if obj[0] in seen:
732 raise exc.InvalidRequestError(
733 "Duplicate column expression requested "
734 "in textual SQL: %r" % obj[0]
735 )
736 seen.add(obj[0])
737
738 # special check for all uppercase unnormalized name;
739 # use the unnormalized name as the key.
740 # see #10788
741 # if these names don't match, then we still honor the
742 # cursor.description name as the key and not what the
743 # Column has, see
744 # test_resultset.py::PositionalTextTest::test_via_column
745 if (
746 uses_denormalize
747 and unnormalized == ctx_rec[RM_RENDERED_NAME]
748 ):
749 result_name = unnormalized
750 else:
751 result_name = colname
752 else:
753 mapped_type = sqltypes.NULLTYPE
754 obj = None
755 ridx = None
756
757 result_name = colname
758
759 if driver_column_names:
760 assert untranslated is not None
761 self._keys.append(untranslated)
762 else:
763 self._keys.append(result_name)
764
765 yield (
766 idx,
767 ridx,
768 result_name,
769 mapped_type,
770 coltype,
771 obj,
772 untranslated,
773 )
774
775 def _merge_cols_by_name(
776 self,
777 context,
778 cursor_description,
779 result_columns,
780 loose_column_name_matching,
781 driver_column_names,
782 ):
783 match_map = self._create_description_match_map(
784 result_columns, loose_column_name_matching
785 )
786 mapped_type: TypeEngine[Any]
787
788 self._keys = []
789
790 for (
791 idx,
792 colname,
793 unnormalized,
794 untranslated,
795 coltype,
796 ) in self._colnames_from_description(
797 context, cursor_description, driver_column_names
798 ):
799 try:
800 ctx_rec = match_map[colname]
801 except KeyError:
802 mapped_type = sqltypes.NULLTYPE
803 obj = None
804 result_columns_idx = None
805 else:
806 obj = ctx_rec[1]
807 mapped_type = ctx_rec[2]
808 result_columns_idx = ctx_rec[3]
809
810 if driver_column_names:
811 assert untranslated is not None
812 self._keys.append(untranslated)
813 else:
814 self._keys.append(colname)
815 yield (
816 idx,
817 result_columns_idx,
818 colname,
819 mapped_type,
820 coltype,
821 obj,
822 untranslated,
823 )
824
825 @classmethod
826 def _create_description_match_map(
827 cls,
828 result_columns: List[ResultColumnsEntry],
829 loose_column_name_matching: bool = False,
830 ) -> Dict[
831 Union[str, object], Tuple[str, Tuple[Any, ...], TypeEngine[Any], int]
832 ]:
833 """when matching cursor.description to a set of names that are present
834 in a Compiled object, as is the case with TextualSelect, get all the
835 names we expect might match those in cursor.description.
836 """
837
838 d: Dict[
839 Union[str, object],
840 Tuple[str, Tuple[Any, ...], TypeEngine[Any], int],
841 ] = {}
842 for ridx, elem in enumerate(result_columns):
843 key = elem[RM_RENDERED_NAME]
844
845 if key in d:
846 # conflicting keyname - just add the column-linked objects
847 # to the existing record. if there is a duplicate column
848 # name in the cursor description, this will allow all of those
849 # objects to raise an ambiguous column error
850 e_name, e_obj, e_type, e_ridx = d[key]
851 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
852 else:
853 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
854
855 if loose_column_name_matching:
856 # when using a textual statement with an unordered set
857 # of columns that line up, we are expecting the user
858 # to be using label names in the SQL that match to the column
859 # expressions. Enable more liberal matching for this case;
860 # duplicate keys that are ambiguous will be fixed later.
861 for r_key in elem[RM_OBJECTS]:
862 d.setdefault(
863 r_key,
864 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
865 )
866 return d
867
868 def _merge_cols_by_none(
869 self, context, cursor_description, driver_column_names
870 ):
871 self._keys = []
872
873 for (
874 idx,
875 colname,
876 unnormalized,
877 untranslated,
878 coltype,
879 ) in self._colnames_from_description(
880 context, cursor_description, driver_column_names
881 ):
882
883 if driver_column_names:
884 assert untranslated is not None
885 self._keys.append(untranslated)
886 else:
887 self._keys.append(colname)
888
889 yield (
890 idx,
891 None,
892 colname,
893 sqltypes.NULLTYPE,
894 coltype,
895 None,
896 untranslated,
897 )
898
899 if not TYPE_CHECKING:
900
901 def _key_fallback(
902 self, key: Any, err: Optional[Exception], raiseerr: bool = True
903 ) -> Optional[NoReturn]:
904 if raiseerr:
905 if self._unpickled and isinstance(key, elements.ColumnElement):
906 raise exc.NoSuchColumnError(
907 "Row was unpickled; lookup by ColumnElement "
908 "is unsupported"
909 ) from err
910 else:
911 raise exc.NoSuchColumnError(
912 "Could not locate column in row for column '%s'"
913 % util.string_or_unprintable(key)
914 ) from err
915 else:
916 return None
917
918 def _raise_for_ambiguous_column_name(self, rec):
919 raise exc.InvalidRequestError(
920 "Ambiguous column name '%s' in "
921 "result set column descriptions" % rec[MD_LOOKUP_KEY]
922 )
923
924 def _index_for_key(self, key: Any, raiseerr: bool = True) -> Optional[int]:
925 # TODO: can consider pre-loading ints and negative ints
926 # into _keymap - also no coverage here
927 if isinstance(key, int):
928 key = self._keys[key]
929
930 try:
931 rec = self._keymap[key]
932 except KeyError as ke:
933 x = self._key_fallback(key, ke, raiseerr)
934 assert x is None
935 return None
936
937 index = rec[0]
938
939 if index is None:
940 self._raise_for_ambiguous_column_name(rec)
941 return index
942
943 def _indexes_for_keys(self, keys):
944 try:
945 return [self._keymap[key][0] for key in keys]
946 except KeyError as ke:
947 # ensure it raises
948 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
949
950 def _metadata_for_keys(
951 self, keys: Sequence[Any]
952 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
953 for key in keys:
954 if int in key.__class__.__mro__:
955 key = self._keys[key]
956
957 try:
958 rec = self._keymap[key]
959 except KeyError as ke:
960 # ensure it raises
961 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
962
963 index = rec[MD_INDEX]
964
965 if index is None:
966 self._raise_for_ambiguous_column_name(rec)
967
968 yield cast(_NonAmbigCursorKeyMapRecType, rec)
969
970 def __getstate__(self):
971 # TODO: consider serializing this as SimpleResultMetaData
972 return {
973 "_keymap": {
974 key: (
975 rec[MD_INDEX],
976 rec[MD_RESULT_MAP_INDEX],
977 [],
978 key,
979 rec[MD_RENDERED_NAME],
980 None,
981 None,
982 )
983 for key, rec in self._keymap.items()
984 if isinstance(key, (str, int))
985 },
986 "_keys": self._keys,
987 "_translated_indexes": self._translated_indexes,
988 }
989
990 def __setstate__(self, state):
991 self._processors = [None for _ in range(len(state["_keys"]))]
992 self._keymap = state["_keymap"]
993 self._keymap_by_result_column_idx = None
994 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
995 self._keys = state["_keys"]
996 self._unpickled = True
997 if state["_translated_indexes"]:
998 self._translated_indexes = cast(
999 "List[int]", state["_translated_indexes"]
1000 )
1001 self._tuplefilter = tuplegetter(*self._translated_indexes)
1002 else:
1003 self._translated_indexes = self._tuplefilter = None
1004
1005
1006class ResultFetchStrategy:
1007 """Define a fetching strategy for a result object.
1008
1009
1010 .. versionadded:: 1.4
1011
1012 """
1013
1014 __slots__ = ()
1015
1016 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
1017
1018 def soft_close(
1019 self,
1020 result: CursorResult[Unpack[TupleAny]],
1021 dbapi_cursor: Optional[DBAPICursor],
1022 ) -> None:
1023 raise NotImplementedError()
1024
1025 def hard_close(
1026 self,
1027 result: CursorResult[Unpack[TupleAny]],
1028 dbapi_cursor: Optional[DBAPICursor],
1029 ) -> None:
1030 raise NotImplementedError()
1031
1032 def yield_per(
1033 self,
1034 result: CursorResult[Unpack[TupleAny]],
1035 dbapi_cursor: Optional[DBAPICursor],
1036 num: int,
1037 ) -> None:
1038 return
1039
1040 def fetchone(
1041 self,
1042 result: CursorResult[Unpack[TupleAny]],
1043 dbapi_cursor: DBAPICursor,
1044 hard_close: bool = False,
1045 ) -> Any:
1046 raise NotImplementedError()
1047
1048 def fetchmany(
1049 self,
1050 result: CursorResult[Unpack[TupleAny]],
1051 dbapi_cursor: DBAPICursor,
1052 size: Optional[int] = None,
1053 ) -> Any:
1054 raise NotImplementedError()
1055
1056 def fetchall(
1057 self,
1058 result: CursorResult[Unpack[TupleAny]],
1059 dbapi_cursor: DBAPICursor,
1060 ) -> Any:
1061 raise NotImplementedError()
1062
1063 def handle_exception(
1064 self,
1065 result: CursorResult[Unpack[TupleAny]],
1066 dbapi_cursor: Optional[DBAPICursor],
1067 err: BaseException,
1068 ) -> NoReturn:
1069 raise err
1070
1071
1072class NoCursorFetchStrategy(ResultFetchStrategy):
1073 """Cursor strategy for a result that has no open cursor.
1074
1075 There are two varieties of this strategy, one for DQL and one for
1076 DML (and also DDL), each of which represent a result that had a cursor
1077 but no longer has one.
1078
1079 """
1080
1081 __slots__ = ()
1082
1083 def soft_close(self, result, dbapi_cursor):
1084 pass
1085
1086 def hard_close(self, result, dbapi_cursor):
1087 pass
1088
1089 def fetchone(self, result, dbapi_cursor, hard_close=False):
1090 return self._non_result(result, None)
1091
1092 def fetchmany(self, result, dbapi_cursor, size=None):
1093 return self._non_result(result, [])
1094
1095 def fetchall(self, result, dbapi_cursor):
1096 return self._non_result(result, [])
1097
1098 def _non_result(self, result, default, err=None):
1099 raise NotImplementedError()
1100
1101
1102class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1103 """Cursor strategy for a DQL result that has no open cursor.
1104
1105 This is a result set that can return rows, i.e. for a SELECT, or for an
1106 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1107 where the cursor is closed and no rows remain available. The owning result
1108 object may or may not be "hard closed", which determines if the fetch
1109 methods send empty results or raise for closed result.
1110
1111 """
1112
1113 __slots__ = ()
1114
1115 def _non_result(self, result, default, err=None):
1116 if result.closed:
1117 raise exc.ResourceClosedError(
1118 "This result object is closed."
1119 ) from err
1120 else:
1121 return default
1122
1123
1124_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1125
1126
1127class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1128 """Cursor strategy for a DML result that has no open cursor.
1129
1130 This is a result set that does not return rows, i.e. for an INSERT,
1131 UPDATE, DELETE that does not include RETURNING.
1132
1133 """
1134
1135 __slots__ = ()
1136
1137 def _non_result(self, result, default, err=None):
1138 # we only expect to have a _NoResultMetaData() here right now.
1139 assert not result._metadata.returns_rows
1140 result._metadata._we_dont_return_rows(err)
1141
1142
1143_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1144
1145
1146class CursorFetchStrategy(ResultFetchStrategy):
1147 """Call fetch methods from a DBAPI cursor.
1148
1149 Alternate versions of this class may instead buffer the rows from
1150 cursors or not use cursors at all.
1151
1152 """
1153
1154 __slots__ = ()
1155
1156 def soft_close(
1157 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1158 ) -> None:
1159 result.cursor_strategy = _NO_CURSOR_DQL
1160
1161 def hard_close(
1162 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1163 ) -> None:
1164 result.cursor_strategy = _NO_CURSOR_DQL
1165
1166 def handle_exception(
1167 self,
1168 result: CursorResult[Any],
1169 dbapi_cursor: Optional[DBAPICursor],
1170 err: BaseException,
1171 ) -> NoReturn:
1172 result.connection._handle_dbapi_exception(
1173 err, None, None, dbapi_cursor, result.context
1174 )
1175
1176 def yield_per(
1177 self,
1178 result: CursorResult[Any],
1179 dbapi_cursor: Optional[DBAPICursor],
1180 num: int,
1181 ) -> None:
1182 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1183 dbapi_cursor,
1184 {"max_row_buffer": num},
1185 initial_buffer=collections.deque(),
1186 growth_factor=0,
1187 )
1188
1189 def fetchone(
1190 self,
1191 result: CursorResult[Any],
1192 dbapi_cursor: DBAPICursor,
1193 hard_close: bool = False,
1194 ) -> Any:
1195 try:
1196 row = dbapi_cursor.fetchone()
1197 if row is None:
1198 result._soft_close(hard=hard_close)
1199 return row
1200 except BaseException as e:
1201 self.handle_exception(result, dbapi_cursor, e)
1202
1203 def fetchmany(
1204 self,
1205 result: CursorResult[Any],
1206 dbapi_cursor: DBAPICursor,
1207 size: Optional[int] = None,
1208 ) -> Any:
1209 try:
1210 if size is None:
1211 l = dbapi_cursor.fetchmany()
1212 else:
1213 l = dbapi_cursor.fetchmany(size)
1214
1215 if not l:
1216 result._soft_close()
1217 return l
1218 except BaseException as e:
1219 self.handle_exception(result, dbapi_cursor, e)
1220
1221 def fetchall(
1222 self,
1223 result: CursorResult[Any],
1224 dbapi_cursor: DBAPICursor,
1225 ) -> Any:
1226 try:
1227 rows = dbapi_cursor.fetchall()
1228 result._soft_close()
1229 return rows
1230 except BaseException as e:
1231 self.handle_exception(result, dbapi_cursor, e)
1232
1233
1234_DEFAULT_FETCH = CursorFetchStrategy()
1235
1236
1237class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1238 """A cursor fetch strategy with row buffering behavior.
1239
1240 This strategy buffers the contents of a selection of rows
1241 before ``fetchone()`` is called. This is to allow the results of
1242 ``cursor.description`` to be available immediately, when
1243 interfacing with a DB-API that requires rows to be consumed before
1244 this information is available (currently psycopg2, when used with
1245 server-side cursors).
1246
1247 The pre-fetching behavior fetches only one row initially, and then
1248 grows its buffer size by a fixed amount with each successive need
1249 for additional rows up the ``max_row_buffer`` size, which defaults
1250 to 1000::
1251
1252 with psycopg2_engine.connect() as conn:
1253
1254 result = conn.execution_options(
1255 stream_results=True, max_row_buffer=50
1256 ).execute(text("select * from table"))
1257
1258 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1259
1260 .. seealso::
1261
1262 :ref:`psycopg2_execution_options`
1263 """
1264
1265 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1266
1267 def __init__(
1268 self,
1269 dbapi_cursor,
1270 execution_options,
1271 growth_factor=5,
1272 initial_buffer=None,
1273 ):
1274 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1275
1276 if initial_buffer is not None:
1277 self._rowbuffer = initial_buffer
1278 else:
1279 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1280 self._growth_factor = growth_factor
1281
1282 if growth_factor:
1283 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1284 else:
1285 self._bufsize = self._max_row_buffer
1286
1287 @classmethod
1288 def create(cls, result):
1289 return BufferedRowCursorFetchStrategy(
1290 result.cursor,
1291 result.context.execution_options,
1292 )
1293
1294 def _buffer_rows(self, result, dbapi_cursor):
1295 """this is currently used only by fetchone()."""
1296
1297 size = self._bufsize
1298 try:
1299 if size < 1:
1300 new_rows = dbapi_cursor.fetchall()
1301 else:
1302 new_rows = dbapi_cursor.fetchmany(size)
1303 except BaseException as e:
1304 self.handle_exception(result, dbapi_cursor, e)
1305
1306 if not new_rows:
1307 return
1308 self._rowbuffer = collections.deque(new_rows)
1309 if self._growth_factor and size < self._max_row_buffer:
1310 self._bufsize = min(
1311 self._max_row_buffer, size * self._growth_factor
1312 )
1313
1314 def yield_per(self, result, dbapi_cursor, num):
1315 self._growth_factor = 0
1316 self._max_row_buffer = self._bufsize = num
1317
1318 def soft_close(self, result, dbapi_cursor):
1319 self._rowbuffer.clear()
1320 super().soft_close(result, dbapi_cursor)
1321
1322 def hard_close(self, result, dbapi_cursor):
1323 self._rowbuffer.clear()
1324 super().hard_close(result, dbapi_cursor)
1325
1326 def fetchone(self, result, dbapi_cursor, hard_close=False):
1327 if not self._rowbuffer:
1328 self._buffer_rows(result, dbapi_cursor)
1329 if not self._rowbuffer:
1330 try:
1331 result._soft_close(hard=hard_close)
1332 except BaseException as e:
1333 self.handle_exception(result, dbapi_cursor, e)
1334 return None
1335 return self._rowbuffer.popleft()
1336
1337 def fetchmany(self, result, dbapi_cursor, size=None):
1338 if size is None:
1339 return self.fetchall(result, dbapi_cursor)
1340
1341 rb = self._rowbuffer
1342 lb = len(rb)
1343 close = False
1344 if size > lb:
1345 try:
1346 new = dbapi_cursor.fetchmany(size - lb)
1347 except BaseException as e:
1348 self.handle_exception(result, dbapi_cursor, e)
1349 else:
1350 if not new:
1351 # defer closing since it may clear the row buffer
1352 close = True
1353 else:
1354 rb.extend(new)
1355
1356 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1357 if close:
1358 result._soft_close()
1359 return res
1360
1361 def fetchall(self, result, dbapi_cursor):
1362 try:
1363 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1364 self._rowbuffer.clear()
1365 result._soft_close()
1366 return ret
1367 except BaseException as e:
1368 self.handle_exception(result, dbapi_cursor, e)
1369
1370
1371class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1372 """A cursor strategy that buffers rows fully upon creation.
1373
1374 Used for operations where a result is to be delivered
1375 after the database conversation can not be continued,
1376 such as MSSQL INSERT...OUTPUT after an autocommit.
1377
1378 """
1379
1380 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1381
1382 def __init__(
1383 self,
1384 dbapi_cursor: Optional[DBAPICursor],
1385 alternate_description: Optional[_DBAPICursorDescription] = None,
1386 initial_buffer: Optional[Iterable[Any]] = None,
1387 ):
1388 self.alternate_cursor_description = alternate_description
1389 if initial_buffer is not None:
1390 self._rowbuffer = collections.deque(initial_buffer)
1391 else:
1392 assert dbapi_cursor is not None
1393 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1394
1395 def yield_per(self, result, dbapi_cursor, num):
1396 pass
1397
1398 def soft_close(self, result, dbapi_cursor):
1399 self._rowbuffer.clear()
1400 super().soft_close(result, dbapi_cursor)
1401
1402 def hard_close(self, result, dbapi_cursor):
1403 self._rowbuffer.clear()
1404 super().hard_close(result, dbapi_cursor)
1405
1406 def fetchone(self, result, dbapi_cursor, hard_close=False):
1407 if self._rowbuffer:
1408 return self._rowbuffer.popleft()
1409 else:
1410 result._soft_close(hard=hard_close)
1411 return None
1412
1413 def fetchmany(self, result, dbapi_cursor, size=None):
1414 if size is None:
1415 return self.fetchall(result, dbapi_cursor)
1416
1417 rb = self._rowbuffer
1418 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1419 if not rows:
1420 result._soft_close()
1421 return rows
1422
1423 def fetchall(self, result, dbapi_cursor):
1424 ret = self._rowbuffer
1425 self._rowbuffer = collections.deque()
1426 result._soft_close()
1427 return ret
1428
1429
1430class _NoResultMetaData(ResultMetaData):
1431 __slots__ = ()
1432
1433 returns_rows = False
1434
1435 def _we_dont_return_rows(self, err=None):
1436 raise exc.ResourceClosedError(
1437 "This result object does not return rows. "
1438 "It has been closed automatically."
1439 ) from err
1440
1441 def _index_for_key(self, keys, raiseerr):
1442 self._we_dont_return_rows()
1443
1444 def _metadata_for_keys(self, key):
1445 self._we_dont_return_rows()
1446
1447 def _reduce(self, keys):
1448 self._we_dont_return_rows()
1449
1450 @property
1451 def _keymap(self): # type: ignore[override]
1452 self._we_dont_return_rows()
1453
1454 @property
1455 def _key_to_index(self): # type: ignore[override]
1456 self._we_dont_return_rows()
1457
1458 @property
1459 def _processors(self): # type: ignore[override]
1460 self._we_dont_return_rows()
1461
1462 @property
1463 def keys(self):
1464 self._we_dont_return_rows()
1465
1466
1467_NO_RESULT_METADATA = _NoResultMetaData()
1468
1469
1470def null_dml_result() -> IteratorResult[Any]:
1471 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1472 it._soft_close()
1473 return it
1474
1475
1476class CursorResult(Result[Unpack[_Ts]]):
1477 """A Result that is representing state from a DBAPI cursor.
1478
1479 .. versionchanged:: 1.4 The :class:`.CursorResult``
1480 class replaces the previous :class:`.ResultProxy` interface.
1481 This classes are based on the :class:`.Result` calling API
1482 which provides an updated usage model and calling facade for
1483 SQLAlchemy Core and SQLAlchemy ORM.
1484
1485 Returns database rows via the :class:`.Row` class, which provides
1486 additional API features and behaviors on top of the raw data returned by
1487 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1488 method, other kinds of objects may also be returned.
1489
1490 .. seealso::
1491
1492 :ref:`tutorial_selecting_data` - introductory material for accessing
1493 :class:`_engine.CursorResult` and :class:`.Row` objects.
1494
1495 """
1496
1497 __slots__ = (
1498 "context",
1499 "dialect",
1500 "cursor",
1501 "cursor_strategy",
1502 "_echo",
1503 "connection",
1504 )
1505
1506 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1507 _no_result_metadata = _NO_RESULT_METADATA
1508 _soft_closed: bool = False
1509 closed: bool = False
1510 _is_cursor = True
1511
1512 context: DefaultExecutionContext
1513 dialect: Dialect
1514 cursor_strategy: ResultFetchStrategy
1515 connection: Connection
1516
1517 def __init__(
1518 self,
1519 context: DefaultExecutionContext,
1520 cursor_strategy: ResultFetchStrategy,
1521 cursor_description: Optional[_DBAPICursorDescription],
1522 ):
1523 self.context = context
1524 self.dialect = context.dialect
1525 self.cursor = context.cursor
1526 self.cursor_strategy = cursor_strategy
1527 self.connection = context.root_connection
1528 self._echo = echo = (
1529 self.connection._echo and context.engine._should_log_debug()
1530 )
1531
1532 if cursor_description is not None:
1533 # inline of Result._row_getter(), set up an initial row
1534 # getter assuming no transformations will be called as this
1535 # is the most common case
1536
1537 metadata = self._init_metadata(context, cursor_description)
1538
1539 _make_row: Any
1540 _make_row = functools.partial(
1541 Row,
1542 metadata,
1543 metadata._effective_processors,
1544 metadata._key_to_index,
1545 )
1546
1547 if context._num_sentinel_cols:
1548 sentinel_filter = operator.itemgetter(
1549 slice(-context._num_sentinel_cols)
1550 )
1551
1552 def _sliced_row(raw_data):
1553 return _make_row(sentinel_filter(raw_data))
1554
1555 sliced_row = _sliced_row
1556 else:
1557 sliced_row = _make_row
1558
1559 if echo:
1560 log = self.context.connection._log_debug
1561
1562 def _log_row(row):
1563 log("Row %r", sql_util._repr_row(row))
1564 return row
1565
1566 self._row_logging_fn = _log_row
1567
1568 def _make_row_2(row):
1569 return _log_row(sliced_row(row))
1570
1571 make_row = _make_row_2
1572 else:
1573 make_row = sliced_row
1574 self._set_memoized_attribute("_row_getter", make_row)
1575
1576 else:
1577 assert context._num_sentinel_cols == 0
1578 self._metadata = self._no_result_metadata
1579
1580 def _init_metadata(self, context, cursor_description):
1581 driver_column_names = context.execution_options.get(
1582 "driver_column_names", False
1583 )
1584 if context.compiled:
1585 compiled = context.compiled
1586
1587 metadata: CursorResultMetaData
1588
1589 if driver_column_names:
1590 metadata = CursorResultMetaData(
1591 self, cursor_description, driver_column_names=True
1592 )
1593 assert not metadata._safe_for_cache
1594 elif compiled._cached_metadata:
1595 metadata = compiled._cached_metadata
1596 else:
1597 metadata = CursorResultMetaData(self, cursor_description)
1598 if metadata._safe_for_cache:
1599 compiled._cached_metadata = metadata
1600
1601 # result rewrite/ adapt step. this is to suit the case
1602 # when we are invoked against a cached Compiled object, we want
1603 # to rewrite the ResultMetaData to reflect the Column objects
1604 # that are in our current SQL statement object, not the one
1605 # that is associated with the cached Compiled object.
1606 # the Compiled object may also tell us to not
1607 # actually do this step; this is to support the ORM where
1608 # it is to produce a new Result object in any case, and will
1609 # be using the cached Column objects against this database result
1610 # so we don't want to rewrite them.
1611 #
1612 # Basically this step suits the use case where the end user
1613 # is using Core SQL expressions and is accessing columns in the
1614 # result row using row._mapping[table.c.column].
1615 if (
1616 not context.execution_options.get(
1617 "_result_disable_adapt_to_context", False
1618 )
1619 and compiled._result_columns
1620 and context.cache_hit is context.dialect.CACHE_HIT
1621 and compiled.statement is not context.invoked_statement
1622 ):
1623 metadata = metadata._adapt_to_context(context)
1624
1625 self._metadata = metadata
1626
1627 else:
1628 self._metadata = metadata = CursorResultMetaData(
1629 self,
1630 cursor_description,
1631 driver_column_names=driver_column_names,
1632 )
1633 if self._echo:
1634 context.connection._log_debug(
1635 "Col %r", tuple(x[0] for x in cursor_description)
1636 )
1637 return metadata
1638
1639 def _soft_close(self, hard=False):
1640 """Soft close this :class:`_engine.CursorResult`.
1641
1642 This releases all DBAPI cursor resources, but leaves the
1643 CursorResult "open" from a semantic perspective, meaning the
1644 fetchXXX() methods will continue to return empty results.
1645
1646 This method is called automatically when:
1647
1648 * all result rows are exhausted using the fetchXXX() methods.
1649 * cursor.description is None.
1650
1651 This method is **not public**, but is documented in order to clarify
1652 the "autoclose" process used.
1653
1654 .. seealso::
1655
1656 :meth:`_engine.CursorResult.close`
1657
1658
1659 """
1660
1661 if (not hard and self._soft_closed) or (hard and self.closed):
1662 return
1663
1664 if hard:
1665 self.closed = True
1666 self.cursor_strategy.hard_close(self, self.cursor)
1667 else:
1668 self.cursor_strategy.soft_close(self, self.cursor)
1669
1670 if not self._soft_closed:
1671 cursor = self.cursor
1672 self.cursor = None # type: ignore
1673 self.connection._safe_close_cursor(cursor)
1674 self._soft_closed = True
1675
1676 @property
1677 def inserted_primary_key_rows(self):
1678 """Return the value of
1679 :attr:`_engine.CursorResult.inserted_primary_key`
1680 as a row contained within a list; some dialects may support a
1681 multiple row form as well.
1682
1683 .. note:: As indicated below, in current SQLAlchemy versions this
1684 accessor is only useful beyond what's already supplied by
1685 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1686 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1687 generalize this feature to more dialects.
1688
1689 This accessor is added to support dialects that offer the feature
1690 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1691 feature, currently **only the psycopg2 dialect**, which provides
1692 for many rows to be INSERTed at once while still retaining the
1693 behavior of being able to return server-generated primary key values.
1694
1695 * **When using the psycopg2 dialect, or other dialects that may support
1696 "fast executemany" style inserts in upcoming releases** : When
1697 invoking an INSERT statement while passing a list of rows as the
1698 second argument to :meth:`_engine.Connection.execute`, this accessor
1699 will then provide a list of rows, where each row contains the primary
1700 key value for each row that was INSERTed.
1701
1702 * **When using all other dialects / backends that don't yet support
1703 this feature**: This accessor is only useful for **single row INSERT
1704 statements**, and returns the same information as that of the
1705 :attr:`_engine.CursorResult.inserted_primary_key` within a
1706 single-element list. When an INSERT statement is executed in
1707 conjunction with a list of rows to be INSERTed, the list will contain
1708 one row per row inserted in the statement, however it will contain
1709 ``None`` for any server-generated values.
1710
1711 Future releases of SQLAlchemy will further generalize the
1712 "fast execution helper" feature of psycopg2 to suit other dialects,
1713 thus allowing this accessor to be of more general use.
1714
1715 .. versionadded:: 1.4
1716
1717 .. seealso::
1718
1719 :attr:`_engine.CursorResult.inserted_primary_key`
1720
1721 """
1722 if not self.context.compiled:
1723 raise exc.InvalidRequestError(
1724 "Statement is not a compiled expression construct."
1725 )
1726 elif not self.context.isinsert:
1727 raise exc.InvalidRequestError(
1728 "Statement is not an insert() expression construct."
1729 )
1730 elif self.context._is_explicit_returning:
1731 raise exc.InvalidRequestError(
1732 "Can't call inserted_primary_key "
1733 "when returning() "
1734 "is used."
1735 )
1736 return self.context.inserted_primary_key_rows
1737
1738 @property
1739 def inserted_primary_key(self):
1740 """Return the primary key for the row just inserted.
1741
1742 The return value is a :class:`_result.Row` object representing
1743 a named tuple of primary key values in the order in which the
1744 primary key columns are configured in the source
1745 :class:`_schema.Table`.
1746
1747 .. versionchanged:: 1.4.8 - the
1748 :attr:`_engine.CursorResult.inserted_primary_key`
1749 value is now a named tuple via the :class:`_result.Row` class,
1750 rather than a plain tuple.
1751
1752 This accessor only applies to single row :func:`_expression.insert`
1753 constructs which did not explicitly specify
1754 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1755 while not yet available for most backends, would be accessed using
1756 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1757
1758 Note that primary key columns which specify a server_default clause, or
1759 otherwise do not qualify as "autoincrement" columns (see the notes at
1760 :class:`_schema.Column`), and were generated using the database-side
1761 default, will appear in this list as ``None`` unless the backend
1762 supports "returning" and the insert statement executed with the
1763 "implicit returning" enabled.
1764
1765 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1766 statement is not a compiled expression construct
1767 or is not an insert() construct.
1768
1769 """
1770
1771 if self.context.executemany:
1772 raise exc.InvalidRequestError(
1773 "This statement was an executemany call; if primary key "
1774 "returning is supported, please "
1775 "use .inserted_primary_key_rows."
1776 )
1777
1778 ikp = self.inserted_primary_key_rows
1779 if ikp:
1780 return ikp[0]
1781 else:
1782 return None
1783
1784 def last_updated_params(self):
1785 """Return the collection of updated parameters from this
1786 execution.
1787
1788 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1789 statement is not a compiled expression construct
1790 or is not an update() construct.
1791
1792 """
1793 if not self.context.compiled:
1794 raise exc.InvalidRequestError(
1795 "Statement is not a compiled expression construct."
1796 )
1797 elif not self.context.isupdate:
1798 raise exc.InvalidRequestError(
1799 "Statement is not an update() expression construct."
1800 )
1801 elif self.context.executemany:
1802 return self.context.compiled_parameters
1803 else:
1804 return self.context.compiled_parameters[0]
1805
1806 def last_inserted_params(self):
1807 """Return the collection of inserted parameters from this
1808 execution.
1809
1810 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1811 statement is not a compiled expression construct
1812 or is not an insert() construct.
1813
1814 """
1815 if not self.context.compiled:
1816 raise exc.InvalidRequestError(
1817 "Statement is not a compiled expression construct."
1818 )
1819 elif not self.context.isinsert:
1820 raise exc.InvalidRequestError(
1821 "Statement is not an insert() expression construct."
1822 )
1823 elif self.context.executemany:
1824 return self.context.compiled_parameters
1825 else:
1826 return self.context.compiled_parameters[0]
1827
1828 @property
1829 def returned_defaults_rows(self):
1830 """Return a list of rows each containing the values of default
1831 columns that were fetched using
1832 the :meth:`.ValuesBase.return_defaults` feature.
1833
1834 The return value is a list of :class:`.Row` objects.
1835
1836 .. versionadded:: 1.4
1837
1838 """
1839 return self.context.returned_default_rows
1840
1841 def splice_horizontally(self, other):
1842 """Return a new :class:`.CursorResult` that "horizontally splices"
1843 together the rows of this :class:`.CursorResult` with that of another
1844 :class:`.CursorResult`.
1845
1846 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1847 not intended for general use.
1848
1849 "horizontally splices" means that for each row in the first and second
1850 result sets, a new row that concatenates the two rows together is
1851 produced, which then becomes the new row. The incoming
1852 :class:`.CursorResult` must have the identical number of rows. It is
1853 typically expected that the two result sets come from the same sort
1854 order as well, as the result rows are spliced together based on their
1855 position in the result.
1856
1857 The expected use case here is so that multiple INSERT..RETURNING
1858 statements (which definitely need to be sorted) against different
1859 tables can produce a single result that looks like a JOIN of those two
1860 tables.
1861
1862 E.g.::
1863
1864 r1 = connection.execute(
1865 users.insert().returning(
1866 users.c.user_name, users.c.user_id, sort_by_parameter_order=True
1867 ),
1868 user_values,
1869 )
1870
1871 r2 = connection.execute(
1872 addresses.insert().returning(
1873 addresses.c.address_id,
1874 addresses.c.address,
1875 addresses.c.user_id,
1876 sort_by_parameter_order=True,
1877 ),
1878 address_values,
1879 )
1880
1881 rows = r1.splice_horizontally(r2).all()
1882 assert rows == [
1883 ("john", 1, 1, "foo@bar.com", 1),
1884 ("jack", 2, 2, "bar@bat.com", 2),
1885 ]
1886
1887 .. versionadded:: 2.0
1888
1889 .. seealso::
1890
1891 :meth:`.CursorResult.splice_vertically`
1892
1893
1894 """ # noqa: E501
1895
1896 clone = self._generate()
1897 total_rows = [
1898 tuple(r1) + tuple(r2)
1899 for r1, r2 in zip(
1900 list(self._raw_row_iterator()),
1901 list(other._raw_row_iterator()),
1902 )
1903 ]
1904
1905 clone._metadata = clone._metadata._splice_horizontally(other._metadata)
1906
1907 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1908 None,
1909 initial_buffer=total_rows,
1910 )
1911 clone._reset_memoizations()
1912 return clone
1913
1914 def splice_vertically(self, other):
1915 """Return a new :class:`.CursorResult` that "vertically splices",
1916 i.e. "extends", the rows of this :class:`.CursorResult` with that of
1917 another :class:`.CursorResult`.
1918
1919 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1920 not intended for general use.
1921
1922 "vertically splices" means the rows of the given result are appended to
1923 the rows of this cursor result. The incoming :class:`.CursorResult`
1924 must have rows that represent the identical list of columns in the
1925 identical order as they are in this :class:`.CursorResult`.
1926
1927 .. versionadded:: 2.0
1928
1929 .. seealso::
1930
1931 :meth:`.CursorResult.splice_horizontally`
1932
1933 """
1934 clone = self._generate()
1935 total_rows = list(self._raw_row_iterator()) + list(
1936 other._raw_row_iterator()
1937 )
1938
1939 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1940 None,
1941 initial_buffer=total_rows,
1942 )
1943 clone._reset_memoizations()
1944 return clone
1945
1946 def _rewind(self, rows):
1947 """rewind this result back to the given rowset.
1948
1949 this is used internally for the case where an :class:`.Insert`
1950 construct combines the use of
1951 :meth:`.Insert.return_defaults` along with the
1952 "supplemental columns" feature.
1953
1954 """
1955
1956 if self._echo:
1957 self.context.connection._log_debug(
1958 "CursorResult rewound %d row(s)", len(rows)
1959 )
1960
1961 # the rows given are expected to be Row objects, so we
1962 # have to clear out processors which have already run on these
1963 # rows
1964 self._metadata = cast(
1965 CursorResultMetaData, self._metadata
1966 )._remove_processors()
1967
1968 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
1969 None,
1970 # TODO: if these are Row objects, can we save on not having to
1971 # re-make new Row objects out of them a second time? is that
1972 # what's actually happening right now? maybe look into this
1973 initial_buffer=rows,
1974 )
1975 self._reset_memoizations()
1976 return self
1977
1978 @property
1979 def returned_defaults(self):
1980 """Return the values of default columns that were fetched using
1981 the :meth:`.ValuesBase.return_defaults` feature.
1982
1983 The value is an instance of :class:`.Row`, or ``None``
1984 if :meth:`.ValuesBase.return_defaults` was not used or if the
1985 backend does not support RETURNING.
1986
1987 .. seealso::
1988
1989 :meth:`.ValuesBase.return_defaults`
1990
1991 """
1992
1993 if self.context.executemany:
1994 raise exc.InvalidRequestError(
1995 "This statement was an executemany call; if return defaults "
1996 "is supported, please use .returned_defaults_rows."
1997 )
1998
1999 rows = self.context.returned_default_rows
2000 if rows:
2001 return rows[0]
2002 else:
2003 return None
2004
2005 def lastrow_has_defaults(self):
2006 """Return ``lastrow_has_defaults()`` from the underlying
2007 :class:`.ExecutionContext`.
2008
2009 See :class:`.ExecutionContext` for details.
2010
2011 """
2012
2013 return self.context.lastrow_has_defaults()
2014
2015 def postfetch_cols(self):
2016 """Return ``postfetch_cols()`` from the underlying
2017 :class:`.ExecutionContext`.
2018
2019 See :class:`.ExecutionContext` for details.
2020
2021 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2022 statement is not a compiled expression construct
2023 or is not an insert() or update() construct.
2024
2025 """
2026
2027 if not self.context.compiled:
2028 raise exc.InvalidRequestError(
2029 "Statement is not a compiled expression construct."
2030 )
2031 elif not self.context.isinsert and not self.context.isupdate:
2032 raise exc.InvalidRequestError(
2033 "Statement is not an insert() or update() "
2034 "expression construct."
2035 )
2036 return self.context.postfetch_cols
2037
2038 def prefetch_cols(self):
2039 """Return ``prefetch_cols()`` from the underlying
2040 :class:`.ExecutionContext`.
2041
2042 See :class:`.ExecutionContext` for details.
2043
2044 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
2045 statement is not a compiled expression construct
2046 or is not an insert() or update() construct.
2047
2048 """
2049
2050 if not self.context.compiled:
2051 raise exc.InvalidRequestError(
2052 "Statement is not a compiled expression construct."
2053 )
2054 elif not self.context.isinsert and not self.context.isupdate:
2055 raise exc.InvalidRequestError(
2056 "Statement is not an insert() or update() "
2057 "expression construct."
2058 )
2059 return self.context.prefetch_cols
2060
2061 def supports_sane_rowcount(self):
2062 """Return ``supports_sane_rowcount`` from the dialect.
2063
2064 See :attr:`_engine.CursorResult.rowcount` for background.
2065
2066 """
2067
2068 return self.dialect.supports_sane_rowcount
2069
2070 def supports_sane_multi_rowcount(self):
2071 """Return ``supports_sane_multi_rowcount`` from the dialect.
2072
2073 See :attr:`_engine.CursorResult.rowcount` for background.
2074
2075 """
2076
2077 return self.dialect.supports_sane_multi_rowcount
2078
2079 @util.memoized_property
2080 def rowcount(self) -> int:
2081 """Return the 'rowcount' for this result.
2082
2083 The primary purpose of 'rowcount' is to report the number of rows
2084 matched by the WHERE criterion of an UPDATE or DELETE statement
2085 executed once (i.e. for a single parameter set), which may then be
2086 compared to the number of rows expected to be updated or deleted as a
2087 means of asserting data integrity.
2088
2089 This attribute is transferred from the ``cursor.rowcount`` attribute
2090 of the DBAPI before the cursor is closed, to support DBAPIs that
2091 don't make this value available after cursor close. Some DBAPIs may
2092 offer meaningful values for other kinds of statements, such as INSERT
2093 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
2094 for these statements, set the
2095 :paramref:`.Connection.execution_options.preserve_rowcount`
2096 execution option to True, which will cause the ``cursor.rowcount``
2097 value to be unconditionally memoized before any results are returned
2098 or the cursor is closed, regardless of statement type.
2099
2100 For cases where the DBAPI does not support rowcount for a particular
2101 kind of statement and/or execution, the returned value will be ``-1``,
2102 which is delivered directly from the DBAPI and is part of :pep:`249`.
2103 All DBAPIs should support rowcount for single-parameter-set
2104 UPDATE and DELETE statements, however.
2105
2106 .. note::
2107
2108 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2109
2110
2111 * This attribute returns the number of rows *matched*,
2112 which is not necessarily the same as the number of rows
2113 that were actually *modified*. For example, an UPDATE statement
2114 may have no net change on a given row if the SET values
2115 given are the same as those present in the row already.
2116 Such a row would be matched but not modified.
2117 On backends that feature both styles, such as MySQL,
2118 rowcount is configured to return the match
2119 count in all cases.
2120
2121 * :attr:`_engine.CursorResult.rowcount` in the default case is
2122 *only* useful in conjunction with an UPDATE or DELETE statement,
2123 and only with a single set of parameters. For other kinds of
2124 statements, SQLAlchemy will not attempt to pre-memoize the value
2125 unless the
2126 :paramref:`.Connection.execution_options.preserve_rowcount`
2127 execution option is used. Note that contrary to :pep:`249`, many
2128 DBAPIs do not support rowcount values for statements that are not
2129 UPDATE or DELETE, particularly when rows are being returned which
2130 are not fully pre-buffered. DBAPIs that dont support rowcount
2131 for a particular kind of statement should return the value ``-1``
2132 for such statements.
2133
2134 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2135 when executing a single statement with multiple parameter sets
2136 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2137 values across multiple parameter sets and will return ``-1``
2138 when accessed.
2139
2140 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2141 a correct population of :attr:`_engine.CursorResult.rowcount`
2142 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2143 execution option is set to True.
2144
2145 * Statements that use RETURNING may not support rowcount, returning
2146 a ``-1`` value instead.
2147
2148 .. seealso::
2149
2150 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2151
2152 :paramref:`.Connection.execution_options.preserve_rowcount`
2153
2154 """ # noqa: E501
2155 try:
2156 return self.context.rowcount
2157 except BaseException as e:
2158 self.cursor_strategy.handle_exception(self, self.cursor, e)
2159 raise # not called
2160
2161 @property
2162 def lastrowid(self):
2163 """Return the 'lastrowid' accessor on the DBAPI cursor.
2164
2165 This is a DBAPI specific method and is only functional
2166 for those backends which support it, for statements
2167 where it is appropriate. It's behavior is not
2168 consistent across backends.
2169
2170 Usage of this method is normally unnecessary when
2171 using insert() expression constructs; the
2172 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2173 tuple of primary key values for a newly inserted row,
2174 regardless of database backend.
2175
2176 """
2177 try:
2178 return self.context.get_lastrowid()
2179 except BaseException as e:
2180 self.cursor_strategy.handle_exception(self, self.cursor, e)
2181
2182 @property
2183 def returns_rows(self):
2184 """True if this :class:`_engine.CursorResult` returns zero or more
2185 rows.
2186
2187 I.e. if it is legal to call the methods
2188 :meth:`_engine.CursorResult.fetchone`,
2189 :meth:`_engine.CursorResult.fetchmany`
2190 :meth:`_engine.CursorResult.fetchall`.
2191
2192 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2193 always be synonymous with whether or not the DBAPI cursor had a
2194 ``.description`` attribute, indicating the presence of result columns,
2195 noting that a cursor that returns zero rows still has a
2196 ``.description`` if a row-returning statement was emitted.
2197
2198 This attribute should be True for all results that are against
2199 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2200 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2201 not using RETURNING, the value will usually be False, however
2202 there are some dialect-specific exceptions to this, such as when
2203 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2204 order to retrieve an inserted primary key value.
2205
2206
2207 """
2208 return self._metadata.returns_rows
2209
2210 @property
2211 def is_insert(self):
2212 """True if this :class:`_engine.CursorResult` is the result
2213 of a executing an expression language compiled
2214 :func:`_expression.insert` construct.
2215
2216 When True, this implies that the
2217 :attr:`inserted_primary_key` attribute is accessible,
2218 assuming the statement did not include
2219 a user defined "returning" construct.
2220
2221 """
2222 return self.context.isinsert
2223
2224 def _fetchiter_impl(self):
2225 fetchone = self.cursor_strategy.fetchone
2226
2227 while True:
2228 row = fetchone(self, self.cursor)
2229 if row is None:
2230 break
2231 yield row
2232
2233 def _fetchone_impl(self, hard_close=False):
2234 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2235
2236 def _fetchall_impl(self):
2237 return self.cursor_strategy.fetchall(self, self.cursor)
2238
2239 def _fetchmany_impl(self, size=None):
2240 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2241
2242 def _raw_row_iterator(self):
2243 return self._fetchiter_impl()
2244
2245 def merge(
2246 self, *others: Result[Unpack[TupleAny]]
2247 ) -> MergedResult[Unpack[TupleAny]]:
2248 merged_result = super().merge(*others)
2249 if self.context._has_rowcount:
2250 merged_result.rowcount = sum(
2251 cast("CursorResult[Any]", result).rowcount
2252 for result in (self,) + others
2253 )
2254 return merged_result
2255
2256 def close(self) -> Any:
2257 """Close this :class:`_engine.CursorResult`.
2258
2259 This closes out the underlying DBAPI cursor corresponding to the
2260 statement execution, if one is still present. Note that the DBAPI
2261 cursor is automatically released when the :class:`_engine.CursorResult`
2262 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2263 generally an optional method except in the case when discarding a
2264 :class:`_engine.CursorResult` that still has additional rows pending
2265 for fetch.
2266
2267 After this method is called, it is no longer valid to call upon
2268 the fetch methods, which will raise a :class:`.ResourceClosedError`
2269 on subsequent use.
2270
2271 .. seealso::
2272
2273 :ref:`connections_toplevel`
2274
2275 """
2276 self._soft_close(hard=True)
2277
2278 @_generative
2279 def yield_per(self, num: int) -> Self:
2280 self._yield_per = num
2281 self.cursor_strategy.yield_per(self, self.cursor, num)
2282 return self
2283
2284
2285ResultProxy = CursorResult