1# engine/result.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define generic result set constructs."""
9
10from __future__ import annotations
11
12from enum import Enum
13import functools
14import itertools
15import operator
16import typing
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import Generic
22from typing import Iterable
23from typing import Iterator
24from typing import List
25from typing import Mapping
26from typing import NoReturn
27from typing import Optional
28from typing import overload
29from typing import Sequence
30from typing import Set
31from typing import Tuple
32from typing import TYPE_CHECKING
33from typing import TypeVar
34from typing import Union
35
36from .row import Row
37from .row import RowMapping
38from .. import exc
39from .. import util
40from ..sql.base import _generative
41from ..sql.base import HasMemoized
42from ..sql.base import InPlaceGenerative
43from ..util import HasMemoized_ro_memoized_attribute
44from ..util import NONE_SET
45from ..util._has_cy import HAS_CYEXTENSION
46from ..util.typing import Literal
47from ..util.typing import Self
48
49if typing.TYPE_CHECKING or not HAS_CYEXTENSION:
50 from ._py_row import tuplegetter as tuplegetter
51else:
52 from sqlalchemy.cyextension.resultproxy import tuplegetter as tuplegetter
53
54if typing.TYPE_CHECKING:
55 from ..sql.elements import SQLCoreOperations
56 from ..sql.type_api import _ResultProcessorType
57
58_KeyType = Union[str, "SQLCoreOperations[Any]"]
59_KeyIndexType = Union[_KeyType, int]
60
61# is overridden in cursor using _CursorKeyMapRecType
62_KeyMapRecType = Any
63
64_KeyMapType = Mapping[_KeyType, _KeyMapRecType]
65
66
67_RowData = Union[Row[Any], RowMapping, Any]
68"""A generic form of "row" that accommodates for the different kinds of
69"rows" that different result objects return, including row, row mapping, and
70scalar values"""
71
72_RawRowType = Tuple[Any, ...]
73"""represents the kind of row we get from a DBAPI cursor"""
74
75_R = TypeVar("_R", bound=_RowData)
76_T = TypeVar("_T", bound=Any)
77_TP = TypeVar("_TP", bound=Tuple[Any, ...])
78
79_InterimRowType = Union[_R, _RawRowType]
80"""a catchall "anything" kind of return type that can be applied
81across all the result types
82
83"""
84
85_InterimSupportsScalarsRowType = Union[Row[Any], Any]
86
87_ProcessorsType = Sequence[Optional["_ResultProcessorType[Any]"]]
88_TupleGetterType = Callable[[Sequence[Any]], Sequence[Any]]
89_UniqueFilterType = Callable[[Any], Any]
90_UniqueFilterStateType = Tuple[Set[Any], Optional[_UniqueFilterType]]
91
92
93class ResultMetaData:
94 """Base for metadata about result rows."""
95
96 __slots__ = ()
97
98 _tuplefilter: Optional[_TupleGetterType] = None
99 _translated_indexes: Optional[Sequence[int]] = None
100 _unique_filters: Optional[Sequence[Callable[[Any], Any]]] = None
101 _keymap: _KeyMapType
102 _keys: Sequence[str]
103 _processors: Optional[_ProcessorsType]
104 _key_to_index: Mapping[_KeyType, int]
105
106 @property
107 def keys(self) -> RMKeyView:
108 return RMKeyView(self)
109
110 def _has_key(self, key: object) -> bool:
111 raise NotImplementedError()
112
113 def _for_freeze(self) -> ResultMetaData:
114 raise NotImplementedError()
115
116 @overload
117 def _key_fallback(
118 self, key: Any, err: Optional[Exception], raiseerr: Literal[True] = ...
119 ) -> NoReturn: ...
120
121 @overload
122 def _key_fallback(
123 self,
124 key: Any,
125 err: Optional[Exception],
126 raiseerr: Literal[False] = ...,
127 ) -> None: ...
128
129 @overload
130 def _key_fallback(
131 self, key: Any, err: Optional[Exception], raiseerr: bool = ...
132 ) -> Optional[NoReturn]: ...
133
134 def _key_fallback(
135 self, key: Any, err: Optional[Exception], raiseerr: bool = True
136 ) -> Optional[NoReturn]:
137 assert raiseerr
138 raise KeyError(key) from err
139
140 def _raise_for_ambiguous_column_name(
141 self, rec: _KeyMapRecType
142 ) -> NoReturn:
143 raise NotImplementedError(
144 "ambiguous column name logic is implemented for "
145 "CursorResultMetaData"
146 )
147
148 def _index_for_key(
149 self, key: _KeyIndexType, raiseerr: bool
150 ) -> Optional[int]:
151 raise NotImplementedError()
152
153 def _indexes_for_keys(
154 self, keys: Sequence[_KeyIndexType]
155 ) -> Sequence[int]:
156 raise NotImplementedError()
157
158 def _metadata_for_keys(
159 self, keys: Sequence[_KeyIndexType]
160 ) -> Iterator[_KeyMapRecType]:
161 raise NotImplementedError()
162
163 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
164 raise NotImplementedError()
165
166 def _getter(
167 self, key: Any, raiseerr: bool = True
168 ) -> Optional[Callable[[Row[Any]], Any]]:
169 index = self._index_for_key(key, raiseerr)
170
171 if index is not None:
172 return operator.itemgetter(index)
173 else:
174 return None
175
176 def _row_as_tuple_getter(
177 self, keys: Sequence[_KeyIndexType]
178 ) -> _TupleGetterType:
179 indexes = self._indexes_for_keys(keys)
180 return tuplegetter(*indexes)
181
182 def _make_key_to_index(
183 self, keymap: Mapping[_KeyType, Sequence[Any]], index: int
184 ) -> Mapping[_KeyType, int]:
185 return {
186 key: rec[index]
187 for key, rec in keymap.items()
188 if rec[index] is not None
189 }
190
191 def _key_not_found(self, key: Any, attr_error: bool) -> NoReturn:
192 if key in self._keymap:
193 # the index must be none in this case
194 self._raise_for_ambiguous_column_name(self._keymap[key])
195 else:
196 # unknown key
197 if attr_error:
198 try:
199 self._key_fallback(key, None)
200 except KeyError as ke:
201 raise AttributeError(ke.args[0]) from ke
202 else:
203 self._key_fallback(key, None)
204
205 @property
206 def _effective_processors(self) -> Optional[_ProcessorsType]:
207 if not self._processors or NONE_SET.issuperset(self._processors):
208 return None
209 else:
210 return self._processors
211
212
213class RMKeyView(typing.KeysView[Any]):
214 __slots__ = ("_parent", "_keys")
215
216 _parent: ResultMetaData
217 _keys: Sequence[str]
218
219 def __init__(self, parent: ResultMetaData):
220 self._parent = parent
221 self._keys = [k for k in parent._keys if k is not None]
222
223 def __len__(self) -> int:
224 return len(self._keys)
225
226 def __repr__(self) -> str:
227 return "{0.__class__.__name__}({0._keys!r})".format(self)
228
229 def __iter__(self) -> Iterator[str]:
230 return iter(self._keys)
231
232 def __contains__(self, item: Any) -> bool:
233 if isinstance(item, int):
234 return False
235
236 # note this also includes special key fallback behaviors
237 # which also don't seem to be tested in test_resultset right now
238 return self._parent._has_key(item)
239
240 def __eq__(self, other: Any) -> bool:
241 return list(other) == list(self)
242
243 def __ne__(self, other: Any) -> bool:
244 return list(other) != list(self)
245
246
247class SimpleResultMetaData(ResultMetaData):
248 """result metadata for in-memory collections."""
249
250 __slots__ = (
251 "_keys",
252 "_keymap",
253 "_processors",
254 "_tuplefilter",
255 "_translated_indexes",
256 "_unique_filters",
257 "_key_to_index",
258 )
259
260 _keys: Sequence[str]
261
262 def __init__(
263 self,
264 keys: Sequence[str],
265 extra: Optional[Sequence[Any]] = None,
266 _processors: Optional[_ProcessorsType] = None,
267 _tuplefilter: Optional[_TupleGetterType] = None,
268 _translated_indexes: Optional[Sequence[int]] = None,
269 _unique_filters: Optional[Sequence[Callable[[Any], Any]]] = None,
270 ):
271 self._keys = list(keys)
272 self._tuplefilter = _tuplefilter
273 self._translated_indexes = _translated_indexes
274 self._unique_filters = _unique_filters
275 if extra:
276 recs_names = [
277 (
278 (name,) + (extras if extras else ()),
279 (index, name, extras),
280 )
281 for index, (name, extras) in enumerate(zip(self._keys, extra))
282 ]
283 else:
284 recs_names = [
285 ((name,), (index, name, ()))
286 for index, name in enumerate(self._keys)
287 ]
288
289 self._keymap = {key: rec for keys, rec in recs_names for key in keys}
290
291 self._processors = _processors
292
293 self._key_to_index = self._make_key_to_index(self._keymap, 0)
294
295 def _has_key(self, key: object) -> bool:
296 return key in self._keymap
297
298 def _for_freeze(self) -> ResultMetaData:
299 unique_filters = self._unique_filters
300 if unique_filters and self._tuplefilter:
301 unique_filters = self._tuplefilter(unique_filters)
302
303 # TODO: are we freezing the result with or without uniqueness
304 # applied?
305 return SimpleResultMetaData(
306 self._keys,
307 extra=[self._keymap[key][2] for key in self._keys],
308 _unique_filters=unique_filters,
309 )
310
311 def __getstate__(self) -> Dict[str, Any]:
312 return {
313 "_keys": self._keys,
314 "_translated_indexes": self._translated_indexes,
315 }
316
317 def __setstate__(self, state: Dict[str, Any]) -> None:
318 if state["_translated_indexes"]:
319 _translated_indexes = state["_translated_indexes"]
320 _tuplefilter = tuplegetter(*_translated_indexes)
321 else:
322 _translated_indexes = _tuplefilter = None
323 self.__init__( # type: ignore
324 state["_keys"],
325 _translated_indexes=_translated_indexes,
326 _tuplefilter=_tuplefilter,
327 )
328
329 def _index_for_key(self, key: Any, raiseerr: bool = True) -> int:
330 if int in key.__class__.__mro__:
331 key = self._keys[key]
332 try:
333 rec = self._keymap[key]
334 except KeyError as ke:
335 rec = self._key_fallback(key, ke, raiseerr)
336
337 return rec[0] # type: ignore[no-any-return]
338
339 def _indexes_for_keys(self, keys: Sequence[Any]) -> Sequence[int]:
340 return [self._keymap[key][0] for key in keys]
341
342 def _metadata_for_keys(
343 self, keys: Sequence[Any]
344 ) -> Iterator[_KeyMapRecType]:
345 for key in keys:
346 if int in key.__class__.__mro__:
347 key = self._keys[key]
348
349 try:
350 rec = self._keymap[key]
351 except KeyError as ke:
352 rec = self._key_fallback(key, ke, True)
353
354 yield rec
355
356 def _reduce(self, keys: Sequence[Any]) -> ResultMetaData:
357 try:
358 metadata_for_keys = [
359 self._keymap[
360 self._keys[key] if int in key.__class__.__mro__ else key
361 ]
362 for key in keys
363 ]
364 except KeyError as ke:
365 self._key_fallback(ke.args[0], ke, True)
366
367 indexes: Sequence[int]
368 new_keys: Sequence[str]
369 extra: Sequence[Any]
370 indexes, new_keys, extra = zip(*metadata_for_keys)
371
372 if self._translated_indexes:
373 indexes = [self._translated_indexes[idx] for idx in indexes]
374
375 tup = tuplegetter(*indexes)
376
377 new_metadata = SimpleResultMetaData(
378 new_keys,
379 extra=extra,
380 _tuplefilter=tup,
381 _translated_indexes=indexes,
382 _processors=self._processors,
383 _unique_filters=self._unique_filters,
384 )
385
386 return new_metadata
387
388
389def result_tuple(
390 fields: Sequence[str], extra: Optional[Any] = None
391) -> Callable[[Iterable[Any]], Row[Any]]:
392 parent = SimpleResultMetaData(fields, extra)
393 return functools.partial(
394 Row, parent, parent._effective_processors, parent._key_to_index
395 )
396
397
398# a symbol that indicates to internal Result methods that
399# "no row is returned". We can't use None for those cases where a scalar
400# filter is applied to rows.
401class _NoRow(Enum):
402 _NO_ROW = 0
403
404
405_NO_ROW = _NoRow._NO_ROW
406
407
408class ResultInternal(InPlaceGenerative, Generic[_R]):
409 __slots__ = ()
410
411 _real_result: Optional[Result[Any]] = None
412 _generate_rows: bool = True
413 _row_logging_fn: Optional[Callable[[Any], Any]]
414
415 _unique_filter_state: Optional[_UniqueFilterStateType] = None
416 _post_creational_filter: Optional[Callable[[Any], Any]] = None
417 _is_cursor = False
418
419 _metadata: ResultMetaData
420
421 _source_supports_scalars: bool
422
423 def _fetchiter_impl(self) -> Iterator[_InterimRowType[Row[Any]]]:
424 raise NotImplementedError()
425
426 def _fetchone_impl(
427 self, hard_close: bool = False
428 ) -> Optional[_InterimRowType[Row[Any]]]:
429 raise NotImplementedError()
430
431 def _fetchmany_impl(
432 self, size: Optional[int] = None
433 ) -> List[_InterimRowType[Row[Any]]]:
434 raise NotImplementedError()
435
436 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
437 raise NotImplementedError()
438
439 def _soft_close(self, hard: bool = False) -> None:
440 raise NotImplementedError()
441
442 @HasMemoized_ro_memoized_attribute
443 def _row_getter(self) -> Optional[Callable[..., _R]]:
444 real_result: Result[Any] = (
445 self._real_result
446 if self._real_result
447 else cast("Result[Any]", self)
448 )
449
450 if real_result._source_supports_scalars:
451 if not self._generate_rows:
452 return None
453 else:
454 _proc = Row
455
456 def process_row(
457 metadata: ResultMetaData,
458 processors: Optional[_ProcessorsType],
459 key_to_index: Mapping[_KeyType, int],
460 scalar_obj: Any,
461 ) -> Row[Any]:
462 return _proc(
463 metadata, processors, key_to_index, (scalar_obj,)
464 )
465
466 else:
467 process_row = Row # type: ignore
468
469 metadata = self._metadata
470
471 key_to_index = metadata._key_to_index
472 processors = metadata._effective_processors
473 tf = metadata._tuplefilter
474
475 if tf and not real_result._source_supports_scalars:
476 if processors:
477 processors = tf(processors)
478
479 _make_row_orig: Callable[..., _R] = functools.partial( # type: ignore # noqa E501
480 process_row, metadata, processors, key_to_index
481 )
482
483 fixed_tf = tf
484
485 def make_row(row: _InterimRowType[Row[Any]]) -> _R:
486 return _make_row_orig(fixed_tf(row))
487
488 else:
489 make_row = functools.partial( # type: ignore
490 process_row, metadata, processors, key_to_index
491 )
492
493 if real_result._row_logging_fn:
494 _log_row = real_result._row_logging_fn
495 _make_row = make_row
496
497 def make_row(row: _InterimRowType[Row[Any]]) -> _R:
498 return _log_row(_make_row(row)) # type: ignore
499
500 return make_row
501
502 @HasMemoized_ro_memoized_attribute
503 def _iterator_getter(self) -> Callable[..., Iterator[_R]]:
504 make_row = self._row_getter
505
506 post_creational_filter = self._post_creational_filter
507
508 if self._unique_filter_state:
509 uniques, strategy = self._unique_strategy
510
511 def iterrows(self: Result[Any]) -> Iterator[_R]:
512 for raw_row in self._fetchiter_impl():
513 obj: _InterimRowType[Any] = (
514 make_row(raw_row) if make_row else raw_row
515 )
516 hashed = strategy(obj) if strategy else obj
517 if hashed in uniques:
518 continue
519 uniques.add(hashed)
520 if post_creational_filter:
521 obj = post_creational_filter(obj)
522 yield obj # type: ignore
523
524 else:
525
526 def iterrows(self: Result[Any]) -> Iterator[_R]:
527 for raw_row in self._fetchiter_impl():
528 row: _InterimRowType[Any] = (
529 make_row(raw_row) if make_row else raw_row
530 )
531 if post_creational_filter:
532 row = post_creational_filter(row)
533 yield row # type: ignore
534
535 return iterrows
536
537 def _raw_all_rows(self) -> List[_R]:
538 make_row = self._row_getter
539 assert make_row is not None
540 rows = self._fetchall_impl()
541 return [make_row(row) for row in rows]
542
543 def _allrows(self) -> List[_R]:
544 post_creational_filter = self._post_creational_filter
545
546 make_row = self._row_getter
547
548 rows = self._fetchall_impl()
549 made_rows: List[_InterimRowType[_R]]
550 if make_row:
551 made_rows = [make_row(row) for row in rows]
552 else:
553 made_rows = rows # type: ignore
554
555 interim_rows: List[_R]
556
557 if self._unique_filter_state:
558 uniques, strategy = self._unique_strategy
559
560 interim_rows = [
561 made_row # type: ignore
562 for made_row, sig_row in [
563 (
564 made_row,
565 strategy(made_row) if strategy else made_row,
566 )
567 for made_row in made_rows
568 ]
569 if sig_row not in uniques and not uniques.add(sig_row) # type: ignore # noqa: E501
570 ]
571 else:
572 interim_rows = made_rows # type: ignore
573
574 if post_creational_filter:
575 interim_rows = [
576 post_creational_filter(row) for row in interim_rows
577 ]
578 return interim_rows
579
580 @HasMemoized_ro_memoized_attribute
581 def _onerow_getter(
582 self,
583 ) -> Callable[..., Union[Literal[_NoRow._NO_ROW], _R]]:
584 make_row = self._row_getter
585
586 post_creational_filter = self._post_creational_filter
587
588 if self._unique_filter_state:
589 uniques, strategy = self._unique_strategy
590
591 def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
592 _onerow = self._fetchone_impl
593 while True:
594 row = _onerow()
595 if row is None:
596 return _NO_ROW
597 else:
598 obj: _InterimRowType[Any] = (
599 make_row(row) if make_row else row
600 )
601 hashed = strategy(obj) if strategy else obj
602 if hashed in uniques:
603 continue
604 else:
605 uniques.add(hashed)
606 if post_creational_filter:
607 obj = post_creational_filter(obj)
608 return obj # type: ignore
609
610 else:
611
612 def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
613 row = self._fetchone_impl()
614 if row is None:
615 return _NO_ROW
616 else:
617 interim_row: _InterimRowType[Any] = (
618 make_row(row) if make_row else row
619 )
620 if post_creational_filter:
621 interim_row = post_creational_filter(interim_row)
622 return interim_row # type: ignore
623
624 return onerow
625
626 @HasMemoized_ro_memoized_attribute
627 def _manyrow_getter(self) -> Callable[..., List[_R]]:
628 make_row = self._row_getter
629
630 post_creational_filter = self._post_creational_filter
631
632 if self._unique_filter_state:
633 uniques, strategy = self._unique_strategy
634
635 def filterrows(
636 make_row: Optional[Callable[..., _R]],
637 rows: List[Any],
638 strategy: Optional[Callable[[List[Any]], Any]],
639 uniques: Set[Any],
640 ) -> List[_R]:
641 if make_row:
642 rows = [make_row(row) for row in rows]
643
644 if strategy:
645 made_rows = (
646 (made_row, strategy(made_row)) for made_row in rows
647 )
648 else:
649 made_rows = ((made_row, made_row) for made_row in rows)
650 return [
651 made_row
652 for made_row, sig_row in made_rows
653 if sig_row not in uniques and not uniques.add(sig_row) # type: ignore # noqa: E501
654 ]
655
656 def manyrows(
657 self: ResultInternal[_R], num: Optional[int]
658 ) -> List[_R]:
659 collect: List[_R] = []
660
661 _manyrows = self._fetchmany_impl
662
663 if num is None:
664 # if None is passed, we don't know the default
665 # manyrows number, DBAPI has this as cursor.arraysize
666 # different DBAPIs / fetch strategies may be different.
667 # do a fetch to find what the number is. if there are
668 # only fewer rows left, then it doesn't matter.
669 real_result = (
670 self._real_result
671 if self._real_result
672 else cast("Result[Any]", self)
673 )
674 if real_result._yield_per:
675 num_required = num = real_result._yield_per
676 else:
677 rows = _manyrows(num)
678 num = len(rows)
679 assert make_row is not None
680 collect.extend(
681 filterrows(make_row, rows, strategy, uniques)
682 )
683 num_required = num - len(collect)
684 else:
685 num_required = num
686
687 assert num is not None
688
689 while num_required:
690 rows = _manyrows(num_required)
691 if not rows:
692 break
693
694 collect.extend(
695 filterrows(make_row, rows, strategy, uniques)
696 )
697 num_required = num - len(collect)
698
699 if post_creational_filter:
700 collect = [post_creational_filter(row) for row in collect]
701 return collect
702
703 else:
704
705 def manyrows(
706 self: ResultInternal[_R], num: Optional[int]
707 ) -> List[_R]:
708 if num is None:
709 real_result = (
710 self._real_result
711 if self._real_result
712 else cast("Result[Any]", self)
713 )
714 num = real_result._yield_per
715
716 rows: List[_InterimRowType[Any]] = self._fetchmany_impl(num)
717 if make_row:
718 rows = [make_row(row) for row in rows]
719 if post_creational_filter:
720 rows = [post_creational_filter(row) for row in rows]
721 return rows # type: ignore
722
723 return manyrows
724
725 @overload
726 def _only_one_row(
727 self: ResultInternal[Row[Any]],
728 raise_for_second_row: bool,
729 raise_for_none: bool,
730 scalar: Literal[True],
731 ) -> Any: ...
732
733 @overload
734 def _only_one_row(
735 self,
736 raise_for_second_row: bool,
737 raise_for_none: Literal[True],
738 scalar: bool,
739 ) -> _R: ...
740
741 @overload
742 def _only_one_row(
743 self,
744 raise_for_second_row: bool,
745 raise_for_none: bool,
746 scalar: bool,
747 ) -> Optional[_R]: ...
748
749 def _only_one_row(
750 self,
751 raise_for_second_row: bool,
752 raise_for_none: bool,
753 scalar: bool,
754 ) -> Optional[_R]:
755 onerow = self._fetchone_impl
756
757 row: Optional[_InterimRowType[Any]] = onerow(hard_close=True)
758 if row is None:
759 if raise_for_none:
760 raise exc.NoResultFound(
761 "No row was found when one was required"
762 )
763 else:
764 return None
765
766 if scalar and self._source_supports_scalars:
767 self._generate_rows = False
768 make_row = None
769 else:
770 make_row = self._row_getter
771
772 try:
773 row = make_row(row) if make_row else row
774 except:
775 self._soft_close(hard=True)
776 raise
777
778 if raise_for_second_row:
779 if self._unique_filter_state:
780 # for no second row but uniqueness, need to essentially
781 # consume the entire result :(
782 uniques, strategy = self._unique_strategy
783
784 existing_row_hash = strategy(row) if strategy else row
785
786 while True:
787 next_row: Any = onerow(hard_close=True)
788 if next_row is None:
789 next_row = _NO_ROW
790 break
791
792 try:
793 next_row = make_row(next_row) if make_row else next_row
794
795 if strategy:
796 assert next_row is not _NO_ROW
797 if existing_row_hash == strategy(next_row):
798 continue
799 elif row == next_row:
800 continue
801 # here, we have a row and it's different
802 break
803 except:
804 self._soft_close(hard=True)
805 raise
806 else:
807 next_row = onerow(hard_close=True)
808 if next_row is None:
809 next_row = _NO_ROW
810
811 if next_row is not _NO_ROW:
812 self._soft_close(hard=True)
813 raise exc.MultipleResultsFound(
814 "Multiple rows were found when exactly one was required"
815 if raise_for_none
816 else "Multiple rows were found when one or none "
817 "was required"
818 )
819 else:
820 # if we checked for second row then that would have
821 # closed us :)
822 self._soft_close(hard=True)
823
824 if not scalar:
825 post_creational_filter = self._post_creational_filter
826 if post_creational_filter:
827 row = post_creational_filter(row)
828
829 if scalar and make_row:
830 return row[0] # type: ignore
831 else:
832 return row # type: ignore
833
834 def _iter_impl(self) -> Iterator[_R]:
835 return self._iterator_getter(self)
836
837 def _next_impl(self) -> _R:
838 row = self._onerow_getter(self)
839 if row is _NO_ROW:
840 raise StopIteration()
841 else:
842 return row
843
844 @_generative
845 def _column_slices(self, indexes: Sequence[_KeyIndexType]) -> Self:
846 real_result = (
847 self._real_result
848 if self._real_result
849 else cast("Result[Any]", self)
850 )
851
852 if not real_result._source_supports_scalars or len(indexes) != 1:
853 self._metadata = self._metadata._reduce(indexes)
854
855 assert self._generate_rows
856
857 return self
858
859 @HasMemoized.memoized_attribute
860 def _unique_strategy(self) -> _UniqueFilterStateType:
861 assert self._unique_filter_state is not None
862 uniques, strategy = self._unique_filter_state
863
864 real_result = (
865 self._real_result
866 if self._real_result is not None
867 else cast("Result[Any]", self)
868 )
869
870 if not strategy and self._metadata._unique_filters:
871 if (
872 real_result._source_supports_scalars
873 and not self._generate_rows
874 ):
875 strategy = self._metadata._unique_filters[0]
876 else:
877 filters = self._metadata._unique_filters
878 if self._metadata._tuplefilter:
879 filters = self._metadata._tuplefilter(filters)
880
881 strategy = operator.methodcaller("_filter_on_values", filters)
882 return uniques, strategy
883
884
885class _WithKeys:
886 __slots__ = ()
887
888 _metadata: ResultMetaData
889
890 # used mainly to share documentation on the keys method.
891 def keys(self) -> RMKeyView:
892 """Return an iterable view which yields the string keys that would
893 be represented by each :class:`_engine.Row`.
894
895 The keys can represent the labels of the columns returned by a core
896 statement or the names of the orm classes returned by an orm
897 execution.
898
899 The view also can be tested for key containment using the Python
900 ``in`` operator, which will test both for the string keys represented
901 in the view, as well as for alternate keys such as column objects.
902
903 .. versionchanged:: 1.4 a key view object is returned rather than a
904 plain list.
905
906
907 """
908 return self._metadata.keys
909
910
911class Result(_WithKeys, ResultInternal[Row[_TP]]):
912 """Represent a set of database results.
913
914 .. versionadded:: 1.4 The :class:`_engine.Result` object provides a
915 completely updated usage model and calling facade for SQLAlchemy
916 Core and SQLAlchemy ORM. In Core, it forms the basis of the
917 :class:`_engine.CursorResult` object which replaces the previous
918 :class:`_engine.ResultProxy` interface. When using the ORM, a
919 higher level object called :class:`_engine.ChunkedIteratorResult`
920 is normally used.
921
922 .. note:: In SQLAlchemy 1.4 and above, this object is
923 used for ORM results returned by :meth:`_orm.Session.execute`, which can
924 yield instances of ORM mapped objects either individually or within
925 tuple-like rows. Note that the :class:`_engine.Result` object does not
926 deduplicate instances or rows automatically as is the case with the
927 legacy :class:`_orm.Query` object. For in-Python de-duplication of
928 instances or rows, use the :meth:`_engine.Result.unique` modifier
929 method.
930
931 .. seealso::
932
933 :ref:`tutorial_fetching_rows` - in the :doc:`/tutorial/index`
934
935 """
936
937 __slots__ = ("_metadata", "__dict__")
938
939 _row_logging_fn: Optional[Callable[[Row[Any]], Row[Any]]] = None
940
941 _source_supports_scalars: bool = False
942
943 _yield_per: Optional[int] = None
944
945 _attributes: util.immutabledict[Any, Any] = util.immutabledict()
946
947 def __init__(self, cursor_metadata: ResultMetaData):
948 self._metadata = cursor_metadata
949
950 def __enter__(self) -> Self:
951 return self
952
953 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
954 self.close()
955
956 def close(self) -> None:
957 """close this :class:`_engine.Result`.
958
959 The behavior of this method is implementation specific, and is
960 not implemented by default. The method should generally end
961 the resources in use by the result object and also cause any
962 subsequent iteration or row fetching to raise
963 :class:`.ResourceClosedError`.
964
965 .. versionadded:: 1.4.27 - ``.close()`` was previously not generally
966 available for all :class:`_engine.Result` classes, instead only
967 being available on the :class:`_engine.CursorResult` returned for
968 Core statement executions. As most other result objects, namely the
969 ones used by the ORM, are proxying a :class:`_engine.CursorResult`
970 in any case, this allows the underlying cursor result to be closed
971 from the outside facade for the case when the ORM query is using
972 the ``yield_per`` execution option where it does not immediately
973 exhaust and autoclose the database cursor.
974
975 """
976 self._soft_close(hard=True)
977
978 @property
979 def _soft_closed(self) -> bool:
980 raise NotImplementedError()
981
982 @property
983 def closed(self) -> bool:
984 """return ``True`` if this :class:`_engine.Result` reports .closed
985
986 .. versionadded:: 1.4.43
987
988 """
989 raise NotImplementedError()
990
991 @_generative
992 def yield_per(self, num: int) -> Self:
993 """Configure the row-fetching strategy to fetch ``num`` rows at a time.
994
995 This impacts the underlying behavior of the result when iterating over
996 the result object, or otherwise making use of methods such as
997 :meth:`_engine.Result.fetchone` that return one row at a time. Data
998 from the underlying cursor or other data source will be buffered up to
999 this many rows in memory, and the buffered collection will then be
1000 yielded out one row at a time or as many rows are requested. Each time
1001 the buffer clears, it will be refreshed to this many rows or as many
1002 rows remain if fewer remain.
1003
1004 The :meth:`_engine.Result.yield_per` method is generally used in
1005 conjunction with the
1006 :paramref:`_engine.Connection.execution_options.stream_results`
1007 execution option, which will allow the database dialect in use to make
1008 use of a server side cursor, if the DBAPI supports a specific "server
1009 side cursor" mode separate from its default mode of operation.
1010
1011 .. tip::
1012
1013 Consider using the
1014 :paramref:`_engine.Connection.execution_options.yield_per`
1015 execution option, which will simultaneously set
1016 :paramref:`_engine.Connection.execution_options.stream_results`
1017 to ensure the use of server side cursors, as well as automatically
1018 invoke the :meth:`_engine.Result.yield_per` method to establish
1019 a fixed row buffer size at once.
1020
1021 The :paramref:`_engine.Connection.execution_options.yield_per`
1022 execution option is available for ORM operations, with
1023 :class:`_orm.Session`-oriented use described at
1024 :ref:`orm_queryguide_yield_per`. The Core-only version which works
1025 with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40.
1026
1027 .. versionadded:: 1.4
1028
1029 :param num: number of rows to fetch each time the buffer is refilled.
1030 If set to a value below 1, fetches all rows for the next buffer.
1031
1032 .. seealso::
1033
1034 :ref:`engine_stream_results` - describes Core behavior for
1035 :meth:`_engine.Result.yield_per`
1036
1037 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1038
1039 """
1040 self._yield_per = num
1041 return self
1042
1043 @_generative
1044 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
1045 """Apply unique filtering to the objects returned by this
1046 :class:`_engine.Result`.
1047
1048 When this filter is applied with no arguments, the rows or objects
1049 returned will filtered such that each row is returned uniquely. The
1050 algorithm used to determine this uniqueness is by default the Python
1051 hashing identity of the whole tuple. In some cases a specialized
1052 per-entity hashing scheme may be used, such as when using the ORM, a
1053 scheme is applied which works against the primary key identity of
1054 returned objects.
1055
1056 The unique filter is applied **after all other filters**, which means
1057 if the columns returned have been refined using a method such as the
1058 :meth:`_engine.Result.columns` or :meth:`_engine.Result.scalars`
1059 method, the uniquing is applied to **only the column or columns
1060 returned**. This occurs regardless of the order in which these
1061 methods have been called upon the :class:`_engine.Result` object.
1062
1063 The unique filter also changes the calculus used for methods like
1064 :meth:`_engine.Result.fetchmany` and :meth:`_engine.Result.partitions`.
1065 When using :meth:`_engine.Result.unique`, these methods will continue
1066 to yield the number of rows or objects requested, after uniquing
1067 has been applied. However, this necessarily impacts the buffering
1068 behavior of the underlying cursor or datasource, such that multiple
1069 underlying calls to ``cursor.fetchmany()`` may be necessary in order
1070 to accumulate enough objects in order to provide a unique collection
1071 of the requested size.
1072
1073 :param strategy: a callable that will be applied to rows or objects
1074 being iterated, which should return an object that represents the
1075 unique value of the row. A Python ``set()`` is used to store
1076 these identities. If not passed, a default uniqueness strategy
1077 is used which may have been assembled by the source of this
1078 :class:`_engine.Result` object.
1079
1080 """
1081 self._unique_filter_state = (set(), strategy)
1082 return self
1083
1084 def columns(self, *col_expressions: _KeyIndexType) -> Self:
1085 r"""Establish the columns that should be returned in each row.
1086
1087 This method may be used to limit the columns returned as well
1088 as to reorder them. The given list of expressions are normally
1089 a series of integers or string key names. They may also be
1090 appropriate :class:`.ColumnElement` objects which correspond to
1091 a given statement construct.
1092
1093 .. versionchanged:: 2.0 Due to a bug in 1.4, the
1094 :meth:`_engine.Result.columns` method had an incorrect behavior
1095 where calling upon the method with just one index would cause the
1096 :class:`_engine.Result` object to yield scalar values rather than
1097 :class:`_engine.Row` objects. In version 2.0, this behavior
1098 has been corrected such that calling upon
1099 :meth:`_engine.Result.columns` with a single index will
1100 produce a :class:`_engine.Result` object that continues
1101 to yield :class:`_engine.Row` objects, which include
1102 only a single column.
1103
1104 E.g.::
1105
1106 statement = select(table.c.x, table.c.y, table.c.z)
1107 result = connection.execute(statement)
1108
1109 for z, y in result.columns("z", "y"):
1110 ...
1111
1112 Example of using the column objects from the statement itself::
1113
1114 for z, y in result.columns(
1115 statement.selected_columns.c.z, statement.selected_columns.c.y
1116 ):
1117 ...
1118
1119 .. versionadded:: 1.4
1120
1121 :param \*col_expressions: indicates columns to be returned. Elements
1122 may be integer row indexes, string column names, or appropriate
1123 :class:`.ColumnElement` objects corresponding to a select construct.
1124
1125 :return: this :class:`_engine.Result` object with the modifications
1126 given.
1127
1128 """
1129 return self._column_slices(col_expressions)
1130
1131 @overload
1132 def scalars(self: Result[Tuple[_T]]) -> ScalarResult[_T]: ...
1133
1134 @overload
1135 def scalars(
1136 self: Result[Tuple[_T]], index: Literal[0]
1137 ) -> ScalarResult[_T]: ...
1138
1139 @overload
1140 def scalars(self, index: _KeyIndexType = 0) -> ScalarResult[Any]: ...
1141
1142 def scalars(self, index: _KeyIndexType = 0) -> ScalarResult[Any]:
1143 """Return a :class:`_engine.ScalarResult` filtering object which
1144 will return single elements rather than :class:`_row.Row` objects.
1145
1146 E.g.::
1147
1148 >>> result = conn.execute(text("select int_id from table"))
1149 >>> result.scalars().all()
1150 [1, 2, 3]
1151
1152 When results are fetched from the :class:`_engine.ScalarResult`
1153 filtering object, the single column-row that would be returned by the
1154 :class:`_engine.Result` is instead returned as the column's value.
1155
1156 .. versionadded:: 1.4
1157
1158 :param index: integer or row key indicating the column to be fetched
1159 from each row, defaults to ``0`` indicating the first column.
1160
1161 :return: a new :class:`_engine.ScalarResult` filtering object referring
1162 to this :class:`_engine.Result` object.
1163
1164 """
1165 return ScalarResult(self, index)
1166
1167 def _getter(
1168 self, key: _KeyIndexType, raiseerr: bool = True
1169 ) -> Optional[Callable[[Row[Any]], Any]]:
1170 """return a callable that will retrieve the given key from a
1171 :class:`_engine.Row`.
1172
1173 """
1174 if self._source_supports_scalars:
1175 raise NotImplementedError(
1176 "can't use this function in 'only scalars' mode"
1177 )
1178 return self._metadata._getter(key, raiseerr)
1179
1180 def _tuple_getter(self, keys: Sequence[_KeyIndexType]) -> _TupleGetterType:
1181 """return a callable that will retrieve the given keys from a
1182 :class:`_engine.Row`.
1183
1184 """
1185 if self._source_supports_scalars:
1186 raise NotImplementedError(
1187 "can't use this function in 'only scalars' mode"
1188 )
1189 return self._metadata._row_as_tuple_getter(keys)
1190
1191 def mappings(self) -> MappingResult:
1192 """Apply a mappings filter to returned rows, returning an instance of
1193 :class:`_engine.MappingResult`.
1194
1195 When this filter is applied, fetching rows will return
1196 :class:`_engine.RowMapping` objects instead of :class:`_engine.Row`
1197 objects.
1198
1199 .. versionadded:: 1.4
1200
1201 :return: a new :class:`_engine.MappingResult` filtering object
1202 referring to this :class:`_engine.Result` object.
1203
1204 """
1205
1206 return MappingResult(self)
1207
1208 @property
1209 def t(self) -> TupleResult[_TP]:
1210 """Apply a "typed tuple" typing filter to returned rows.
1211
1212 The :attr:`_engine.Result.t` attribute is a synonym for
1213 calling the :meth:`_engine.Result.tuples` method.
1214
1215 .. versionadded:: 2.0
1216
1217 """
1218 return self # type: ignore
1219
1220 def tuples(self) -> TupleResult[_TP]:
1221 """Apply a "typed tuple" typing filter to returned rows.
1222
1223 This method returns the same :class:`_engine.Result` object
1224 at runtime,
1225 however annotates as returning a :class:`_engine.TupleResult` object
1226 that will indicate to :pep:`484` typing tools that plain typed
1227 ``Tuple`` instances are returned rather than rows. This allows
1228 tuple unpacking and ``__getitem__`` access of :class:`_engine.Row`
1229 objects to by typed, for those cases where the statement invoked
1230 itself included typing information.
1231
1232 .. versionadded:: 2.0
1233
1234 :return: the :class:`_engine.TupleResult` type at typing time.
1235
1236 .. seealso::
1237
1238 :attr:`_engine.Result.t` - shorter synonym
1239
1240 :attr:`_engine.Row._t` - :class:`_engine.Row` version
1241
1242 """
1243
1244 return self # type: ignore
1245
1246 def _raw_row_iterator(self) -> Iterator[_RowData]:
1247 """Return a safe iterator that yields raw row data.
1248
1249 This is used by the :meth:`_engine.Result.merge` method
1250 to merge multiple compatible results together.
1251
1252 """
1253 raise NotImplementedError()
1254
1255 def __iter__(self) -> Iterator[Row[_TP]]:
1256 return self._iter_impl()
1257
1258 def __next__(self) -> Row[_TP]:
1259 return self._next_impl()
1260
1261 def partitions(
1262 self, size: Optional[int] = None
1263 ) -> Iterator[Sequence[Row[_TP]]]:
1264 """Iterate through sub-lists of rows of the size given.
1265
1266 Each list will be of the size given, excluding the last list to
1267 be yielded, which may have a small number of rows. No empty
1268 lists will be yielded.
1269
1270 The result object is automatically closed when the iterator
1271 is fully consumed.
1272
1273 Note that the backend driver will usually buffer the entire result
1274 ahead of time unless the
1275 :paramref:`.Connection.execution_options.stream_results` execution
1276 option is used indicating that the driver should not pre-buffer
1277 results, if possible. Not all drivers support this option and
1278 the option is silently ignored for those who do not.
1279
1280 When using the ORM, the :meth:`_engine.Result.partitions` method
1281 is typically more effective from a memory perspective when it is
1282 combined with use of the
1283 :ref:`yield_per execution option <orm_queryguide_yield_per>`,
1284 which instructs both the DBAPI driver to use server side cursors,
1285 if available, as well as instructs the ORM loading internals to only
1286 build a certain amount of ORM objects from a result at a time before
1287 yielding them out.
1288
1289 .. versionadded:: 1.4
1290
1291 :param size: indicate the maximum number of rows to be present
1292 in each list yielded. If None, makes use of the value set by
1293 the :meth:`_engine.Result.yield_per`, method, if it were called,
1294 or the :paramref:`_engine.Connection.execution_options.yield_per`
1295 execution option, which is equivalent in this regard. If
1296 yield_per weren't set, it makes use of the
1297 :meth:`_engine.Result.fetchmany` default, which may be backend
1298 specific and not well defined.
1299
1300 :return: iterator of lists
1301
1302 .. seealso::
1303
1304 :ref:`engine_stream_results`
1305
1306 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1307
1308 """
1309
1310 getter = self._manyrow_getter
1311
1312 while True:
1313 partition = getter(self, size)
1314 if partition:
1315 yield partition
1316 else:
1317 break
1318
1319 def fetchall(self) -> Sequence[Row[_TP]]:
1320 """A synonym for the :meth:`_engine.Result.all` method."""
1321
1322 return self._allrows()
1323
1324 def fetchone(self) -> Optional[Row[_TP]]:
1325 """Fetch one row.
1326
1327 When all rows are exhausted, returns None.
1328
1329 This method is provided for backwards compatibility with
1330 SQLAlchemy 1.x.x.
1331
1332 To fetch the first row of a result only, use the
1333 :meth:`_engine.Result.first` method. To iterate through all
1334 rows, iterate the :class:`_engine.Result` object directly.
1335
1336 :return: a :class:`_engine.Row` object if no filters are applied,
1337 or ``None`` if no rows remain.
1338
1339 """
1340 row = self._onerow_getter(self)
1341 if row is _NO_ROW:
1342 return None
1343 else:
1344 return row
1345
1346 def fetchmany(self, size: Optional[int] = None) -> Sequence[Row[_TP]]:
1347 """Fetch many rows.
1348
1349 When all rows are exhausted, returns an empty sequence.
1350
1351 This method is provided for backwards compatibility with
1352 SQLAlchemy 1.x.x.
1353
1354 To fetch rows in groups, use the :meth:`_engine.Result.partitions`
1355 method.
1356
1357 :return: a sequence of :class:`_engine.Row` objects.
1358
1359 .. seealso::
1360
1361 :meth:`_engine.Result.partitions`
1362
1363 """
1364
1365 return self._manyrow_getter(self, size)
1366
1367 def all(self) -> Sequence[Row[_TP]]:
1368 """Return all rows in a sequence.
1369
1370 Closes the result set after invocation. Subsequent invocations
1371 will return an empty sequence.
1372
1373 .. versionadded:: 1.4
1374
1375 :return: a sequence of :class:`_engine.Row` objects.
1376
1377 .. seealso::
1378
1379 :ref:`engine_stream_results` - How to stream a large result set
1380 without loading it completely in python.
1381
1382 """
1383
1384 return self._allrows()
1385
1386 def first(self) -> Optional[Row[_TP]]:
1387 """Fetch the first row or ``None`` if no row is present.
1388
1389 Closes the result set and discards remaining rows.
1390
1391 .. note:: This method returns one **row**, e.g. tuple, by default.
1392 To return exactly one single scalar value, that is, the first
1393 column of the first row, use the
1394 :meth:`_engine.Result.scalar` method,
1395 or combine :meth:`_engine.Result.scalars` and
1396 :meth:`_engine.Result.first`.
1397
1398 Additionally, in contrast to the behavior of the legacy ORM
1399 :meth:`_orm.Query.first` method, **no limit is applied** to the
1400 SQL query which was invoked to produce this
1401 :class:`_engine.Result`;
1402 for a DBAPI driver that buffers results in memory before yielding
1403 rows, all rows will be sent to the Python process and all but
1404 the first row will be discarded.
1405
1406 .. seealso::
1407
1408 :ref:`migration_20_unify_select`
1409
1410 :return: a :class:`_engine.Row` object, or None
1411 if no rows remain.
1412
1413 .. seealso::
1414
1415 :meth:`_engine.Result.scalar`
1416
1417 :meth:`_engine.Result.one`
1418
1419 """
1420
1421 return self._only_one_row(
1422 raise_for_second_row=False, raise_for_none=False, scalar=False
1423 )
1424
1425 def one_or_none(self) -> Optional[Row[_TP]]:
1426 """Return at most one result or raise an exception.
1427
1428 Returns ``None`` if the result has no rows.
1429 Raises :class:`.MultipleResultsFound`
1430 if multiple rows are returned.
1431
1432 .. versionadded:: 1.4
1433
1434 :return: The first :class:`_engine.Row` or ``None`` if no row
1435 is available.
1436
1437 :raises: :class:`.MultipleResultsFound`
1438
1439 .. seealso::
1440
1441 :meth:`_engine.Result.first`
1442
1443 :meth:`_engine.Result.one`
1444
1445 """
1446 return self._only_one_row(
1447 raise_for_second_row=True, raise_for_none=False, scalar=False
1448 )
1449
1450 @overload
1451 def scalar_one(self: Result[Tuple[_T]]) -> _T: ...
1452
1453 @overload
1454 def scalar_one(self) -> Any: ...
1455
1456 def scalar_one(self) -> Any:
1457 """Return exactly one scalar result or raise an exception.
1458
1459 This is equivalent to calling :meth:`_engine.Result.scalars` and
1460 then :meth:`_engine.ScalarResult.one`.
1461
1462 .. seealso::
1463
1464 :meth:`_engine.ScalarResult.one`
1465
1466 :meth:`_engine.Result.scalars`
1467
1468 """
1469 return self._only_one_row(
1470 raise_for_second_row=True, raise_for_none=True, scalar=True
1471 )
1472
1473 @overload
1474 def scalar_one_or_none(self: Result[Tuple[_T]]) -> Optional[_T]: ...
1475
1476 @overload
1477 def scalar_one_or_none(self) -> Optional[Any]: ...
1478
1479 def scalar_one_or_none(self) -> Optional[Any]:
1480 """Return exactly one scalar result or ``None``.
1481
1482 This is equivalent to calling :meth:`_engine.Result.scalars` and
1483 then :meth:`_engine.ScalarResult.one_or_none`.
1484
1485 .. seealso::
1486
1487 :meth:`_engine.ScalarResult.one_or_none`
1488
1489 :meth:`_engine.Result.scalars`
1490
1491 """
1492 return self._only_one_row(
1493 raise_for_second_row=True, raise_for_none=False, scalar=True
1494 )
1495
1496 def one(self) -> Row[_TP]:
1497 """Return exactly one row or raise an exception.
1498
1499 Raises :class:`_exc.NoResultFound` if the result returns no
1500 rows, or :class:`_exc.MultipleResultsFound` if multiple rows
1501 would be returned.
1502
1503 .. note:: This method returns one **row**, e.g. tuple, by default.
1504 To return exactly one single scalar value, that is, the first
1505 column of the first row, use the
1506 :meth:`_engine.Result.scalar_one` method, or combine
1507 :meth:`_engine.Result.scalars` and
1508 :meth:`_engine.Result.one`.
1509
1510 .. versionadded:: 1.4
1511
1512 :return: The first :class:`_engine.Row`.
1513
1514 :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound`
1515
1516 .. seealso::
1517
1518 :meth:`_engine.Result.first`
1519
1520 :meth:`_engine.Result.one_or_none`
1521
1522 :meth:`_engine.Result.scalar_one`
1523
1524 """
1525 return self._only_one_row(
1526 raise_for_second_row=True, raise_for_none=True, scalar=False
1527 )
1528
1529 @overload
1530 def scalar(self: Result[Tuple[_T]]) -> Optional[_T]: ...
1531
1532 @overload
1533 def scalar(self) -> Any: ...
1534
1535 def scalar(self) -> Any:
1536 """Fetch the first column of the first row, and close the result set.
1537
1538 Returns ``None`` if there are no rows to fetch.
1539
1540 No validation is performed to test if additional rows remain.
1541
1542 After calling this method, the object is fully closed,
1543 e.g. the :meth:`_engine.CursorResult.close`
1544 method will have been called.
1545
1546 :return: a Python scalar value, or ``None`` if no rows remain.
1547
1548 """
1549 return self._only_one_row(
1550 raise_for_second_row=False, raise_for_none=False, scalar=True
1551 )
1552
1553 def freeze(self) -> FrozenResult[_TP]:
1554 """Return a callable object that will produce copies of this
1555 :class:`_engine.Result` when invoked.
1556
1557 The callable object returned is an instance of
1558 :class:`_engine.FrozenResult`.
1559
1560 This is used for result set caching. The method must be called
1561 on the result when it has been unconsumed, and calling the method
1562 will consume the result fully. When the :class:`_engine.FrozenResult`
1563 is retrieved from a cache, it can be called any number of times where
1564 it will produce a new :class:`_engine.Result` object each time
1565 against its stored set of rows.
1566
1567 .. seealso::
1568
1569 :ref:`do_orm_execute_re_executing` - example usage within the
1570 ORM to implement a result-set cache.
1571
1572 """
1573
1574 return FrozenResult(self)
1575
1576 def merge(self, *others: Result[Any]) -> MergedResult[_TP]:
1577 """Merge this :class:`_engine.Result` with other compatible result
1578 objects.
1579
1580 The object returned is an instance of :class:`_engine.MergedResult`,
1581 which will be composed of iterators from the given result
1582 objects.
1583
1584 The new result will use the metadata from this result object.
1585 The subsequent result objects must be against an identical
1586 set of result / cursor metadata, otherwise the behavior is
1587 undefined.
1588
1589 """
1590 return MergedResult(self._metadata, (self,) + others)
1591
1592
1593class FilterResult(ResultInternal[_R]):
1594 """A wrapper for a :class:`_engine.Result` that returns objects other than
1595 :class:`_engine.Row` objects, such as dictionaries or scalar objects.
1596
1597 :class:`_engine.FilterResult` is the common base for additional result
1598 APIs including :class:`_engine.MappingResult`,
1599 :class:`_engine.ScalarResult` and :class:`_engine.AsyncResult`.
1600
1601 """
1602
1603 __slots__ = (
1604 "_real_result",
1605 "_post_creational_filter",
1606 "_metadata",
1607 "_unique_filter_state",
1608 "__dict__",
1609 )
1610
1611 _post_creational_filter: Optional[Callable[[Any], Any]]
1612
1613 _real_result: Result[Any]
1614
1615 def __enter__(self) -> Self:
1616 return self
1617
1618 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
1619 self._real_result.__exit__(type_, value, traceback)
1620
1621 @_generative
1622 def yield_per(self, num: int) -> Self:
1623 """Configure the row-fetching strategy to fetch ``num`` rows at a time.
1624
1625 The :meth:`_engine.FilterResult.yield_per` method is a pass through
1626 to the :meth:`_engine.Result.yield_per` method. See that method's
1627 documentation for usage notes.
1628
1629 .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per`
1630 so that the method is available on all result set implementations
1631
1632 .. seealso::
1633
1634 :ref:`engine_stream_results` - describes Core behavior for
1635 :meth:`_engine.Result.yield_per`
1636
1637 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1638
1639 """
1640 self._real_result = self._real_result.yield_per(num)
1641 return self
1642
1643 def _soft_close(self, hard: bool = False) -> None:
1644 self._real_result._soft_close(hard=hard)
1645
1646 @property
1647 def _soft_closed(self) -> bool:
1648 return self._real_result._soft_closed
1649
1650 @property
1651 def closed(self) -> bool:
1652 """Return ``True`` if the underlying :class:`_engine.Result` reports
1653 closed
1654
1655 .. versionadded:: 1.4.43
1656
1657 """
1658 return self._real_result.closed
1659
1660 def close(self) -> None:
1661 """Close this :class:`_engine.FilterResult`.
1662
1663 .. versionadded:: 1.4.43
1664
1665 """
1666 self._real_result.close()
1667
1668 @property
1669 def _attributes(self) -> Dict[Any, Any]:
1670 return self._real_result._attributes
1671
1672 def _fetchiter_impl(self) -> Iterator[_InterimRowType[Row[Any]]]:
1673 return self._real_result._fetchiter_impl()
1674
1675 def _fetchone_impl(
1676 self, hard_close: bool = False
1677 ) -> Optional[_InterimRowType[Row[Any]]]:
1678 return self._real_result._fetchone_impl(hard_close=hard_close)
1679
1680 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
1681 return self._real_result._fetchall_impl()
1682
1683 def _fetchmany_impl(
1684 self, size: Optional[int] = None
1685 ) -> List[_InterimRowType[Row[Any]]]:
1686 return self._real_result._fetchmany_impl(size=size)
1687
1688
1689class ScalarResult(FilterResult[_R]):
1690 """A wrapper for a :class:`_engine.Result` that returns scalar values
1691 rather than :class:`_row.Row` values.
1692
1693 The :class:`_engine.ScalarResult` object is acquired by calling the
1694 :meth:`_engine.Result.scalars` method.
1695
1696 A special limitation of :class:`_engine.ScalarResult` is that it has
1697 no ``fetchone()`` method; since the semantics of ``fetchone()`` are that
1698 the ``None`` value indicates no more results, this is not compatible
1699 with :class:`_engine.ScalarResult` since there is no way to distinguish
1700 between ``None`` as a row value versus ``None`` as an indicator. Use
1701 ``next(result)`` to receive values individually.
1702
1703 """
1704
1705 __slots__ = ()
1706
1707 _generate_rows = False
1708
1709 _post_creational_filter: Optional[Callable[[Any], Any]]
1710
1711 def __init__(self, real_result: Result[Any], index: _KeyIndexType):
1712 self._real_result = real_result
1713
1714 if real_result._source_supports_scalars:
1715 self._metadata = real_result._metadata
1716 self._post_creational_filter = None
1717 else:
1718 self._metadata = real_result._metadata._reduce([index])
1719 self._post_creational_filter = operator.itemgetter(0)
1720
1721 self._unique_filter_state = real_result._unique_filter_state
1722
1723 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
1724 """Apply unique filtering to the objects returned by this
1725 :class:`_engine.ScalarResult`.
1726
1727 See :meth:`_engine.Result.unique` for usage details.
1728
1729 """
1730 self._unique_filter_state = (set(), strategy)
1731 return self
1732
1733 def partitions(self, size: Optional[int] = None) -> Iterator[Sequence[_R]]:
1734 """Iterate through sub-lists of elements of the size given.
1735
1736 Equivalent to :meth:`_engine.Result.partitions` except that
1737 scalar values, rather than :class:`_engine.Row` objects,
1738 are returned.
1739
1740 """
1741
1742 getter = self._manyrow_getter
1743
1744 while True:
1745 partition = getter(self, size)
1746 if partition:
1747 yield partition
1748 else:
1749 break
1750
1751 def fetchall(self) -> Sequence[_R]:
1752 """A synonym for the :meth:`_engine.ScalarResult.all` method."""
1753
1754 return self._allrows()
1755
1756 def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]:
1757 """Fetch many objects.
1758
1759 Equivalent to :meth:`_engine.Result.fetchmany` except that
1760 scalar values, rather than :class:`_engine.Row` objects,
1761 are returned.
1762
1763 """
1764 return self._manyrow_getter(self, size)
1765
1766 def all(self) -> Sequence[_R]:
1767 """Return all scalar values in a sequence.
1768
1769 Equivalent to :meth:`_engine.Result.all` except that
1770 scalar values, rather than :class:`_engine.Row` objects,
1771 are returned.
1772
1773 """
1774 return self._allrows()
1775
1776 def __iter__(self) -> Iterator[_R]:
1777 return self._iter_impl()
1778
1779 def __next__(self) -> _R:
1780 return self._next_impl()
1781
1782 def first(self) -> Optional[_R]:
1783 """Fetch the first object or ``None`` if no object is present.
1784
1785 Equivalent to :meth:`_engine.Result.first` except that
1786 scalar values, rather than :class:`_engine.Row` objects,
1787 are returned.
1788
1789
1790 """
1791 return self._only_one_row(
1792 raise_for_second_row=False, raise_for_none=False, scalar=False
1793 )
1794
1795 def one_or_none(self) -> Optional[_R]:
1796 """Return at most one object or raise an exception.
1797
1798 Equivalent to :meth:`_engine.Result.one_or_none` except that
1799 scalar values, rather than :class:`_engine.Row` objects,
1800 are returned.
1801
1802 """
1803 return self._only_one_row(
1804 raise_for_second_row=True, raise_for_none=False, scalar=False
1805 )
1806
1807 def one(self) -> _R:
1808 """Return exactly one object or raise an exception.
1809
1810 Equivalent to :meth:`_engine.Result.one` except that
1811 scalar values, rather than :class:`_engine.Row` objects,
1812 are returned.
1813
1814 """
1815 return self._only_one_row(
1816 raise_for_second_row=True, raise_for_none=True, scalar=False
1817 )
1818
1819
1820class TupleResult(FilterResult[_R], util.TypingOnly):
1821 """A :class:`_engine.Result` that's typed as returning plain
1822 Python tuples instead of rows.
1823
1824 Since :class:`_engine.Row` acts like a tuple in every way already,
1825 this class is a typing only class, regular :class:`_engine.Result` is
1826 still used at runtime.
1827
1828 """
1829
1830 __slots__ = ()
1831
1832 if TYPE_CHECKING:
1833
1834 def partitions(
1835 self, size: Optional[int] = None
1836 ) -> Iterator[Sequence[_R]]:
1837 """Iterate through sub-lists of elements of the size given.
1838
1839 Equivalent to :meth:`_engine.Result.partitions` except that
1840 tuple values, rather than :class:`_engine.Row` objects,
1841 are returned.
1842
1843 """
1844 ...
1845
1846 def fetchone(self) -> Optional[_R]:
1847 """Fetch one tuple.
1848
1849 Equivalent to :meth:`_engine.Result.fetchone` except that
1850 tuple values, rather than :class:`_engine.Row`
1851 objects, are returned.
1852
1853 """
1854 ...
1855
1856 def fetchall(self) -> Sequence[_R]:
1857 """A synonym for the :meth:`_engine.ScalarResult.all` method."""
1858 ...
1859
1860 def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]:
1861 """Fetch many objects.
1862
1863 Equivalent to :meth:`_engine.Result.fetchmany` except that
1864 tuple values, rather than :class:`_engine.Row` objects,
1865 are returned.
1866
1867 """
1868 ...
1869
1870 def all(self) -> Sequence[_R]: # noqa: A001
1871 """Return all scalar values in a sequence.
1872
1873 Equivalent to :meth:`_engine.Result.all` except that
1874 tuple values, rather than :class:`_engine.Row` objects,
1875 are returned.
1876
1877 """
1878 ...
1879
1880 def __iter__(self) -> Iterator[_R]: ...
1881
1882 def __next__(self) -> _R: ...
1883
1884 def first(self) -> Optional[_R]:
1885 """Fetch the first object or ``None`` if no object is present.
1886
1887 Equivalent to :meth:`_engine.Result.first` except that
1888 tuple values, rather than :class:`_engine.Row` objects,
1889 are returned.
1890
1891
1892 """
1893 ...
1894
1895 def one_or_none(self) -> Optional[_R]:
1896 """Return at most one object or raise an exception.
1897
1898 Equivalent to :meth:`_engine.Result.one_or_none` except that
1899 tuple values, rather than :class:`_engine.Row` objects,
1900 are returned.
1901
1902 """
1903 ...
1904
1905 def one(self) -> _R:
1906 """Return exactly one object or raise an exception.
1907
1908 Equivalent to :meth:`_engine.Result.one` except that
1909 tuple values, rather than :class:`_engine.Row` objects,
1910 are returned.
1911
1912 """
1913 ...
1914
1915 @overload
1916 def scalar_one(self: TupleResult[Tuple[_T]]) -> _T: ...
1917
1918 @overload
1919 def scalar_one(self) -> Any: ...
1920
1921 def scalar_one(self) -> Any:
1922 """Return exactly one scalar result or raise an exception.
1923
1924 This is equivalent to calling :meth:`_engine.Result.scalars`
1925 and then :meth:`_engine.ScalarResult.one`.
1926
1927 .. seealso::
1928
1929 :meth:`_engine.ScalarResult.one`
1930
1931 :meth:`_engine.Result.scalars`
1932
1933 """
1934 ...
1935
1936 @overload
1937 def scalar_one_or_none(
1938 self: TupleResult[Tuple[_T]],
1939 ) -> Optional[_T]: ...
1940
1941 @overload
1942 def scalar_one_or_none(self) -> Optional[Any]: ...
1943
1944 def scalar_one_or_none(self) -> Optional[Any]:
1945 """Return exactly one or no scalar result.
1946
1947 This is equivalent to calling :meth:`_engine.Result.scalars`
1948 and then :meth:`_engine.ScalarResult.one_or_none`.
1949
1950 .. seealso::
1951
1952 :meth:`_engine.ScalarResult.one_or_none`
1953
1954 :meth:`_engine.Result.scalars`
1955
1956 """
1957 ...
1958
1959 @overload
1960 def scalar(self: TupleResult[Tuple[_T]]) -> Optional[_T]: ...
1961
1962 @overload
1963 def scalar(self) -> Any: ...
1964
1965 def scalar(self) -> Any:
1966 """Fetch the first column of the first row, and close the result
1967 set.
1968
1969 Returns ``None`` if there are no rows to fetch.
1970
1971 No validation is performed to test if additional rows remain.
1972
1973 After calling this method, the object is fully closed,
1974 e.g. the :meth:`_engine.CursorResult.close`
1975 method will have been called.
1976
1977 :return: a Python scalar value , or ``None`` if no rows remain.
1978
1979 """
1980 ...
1981
1982
1983class MappingResult(_WithKeys, FilterResult[RowMapping]):
1984 """A wrapper for a :class:`_engine.Result` that returns dictionary values
1985 rather than :class:`_engine.Row` values.
1986
1987 The :class:`_engine.MappingResult` object is acquired by calling the
1988 :meth:`_engine.Result.mappings` method.
1989
1990 """
1991
1992 __slots__ = ()
1993
1994 _generate_rows = True
1995
1996 _post_creational_filter = operator.attrgetter("_mapping")
1997
1998 def __init__(self, result: Result[Any]):
1999 self._real_result = result
2000 self._unique_filter_state = result._unique_filter_state
2001 self._metadata = result._metadata
2002 if result._source_supports_scalars:
2003 self._metadata = self._metadata._reduce([0])
2004
2005 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
2006 """Apply unique filtering to the objects returned by this
2007 :class:`_engine.MappingResult`.
2008
2009 See :meth:`_engine.Result.unique` for usage details.
2010
2011 """
2012 self._unique_filter_state = (set(), strategy)
2013 return self
2014
2015 def columns(self, *col_expressions: _KeyIndexType) -> Self:
2016 r"""Establish the columns that should be returned in each row."""
2017 return self._column_slices(col_expressions)
2018
2019 def partitions(
2020 self, size: Optional[int] = None
2021 ) -> Iterator[Sequence[RowMapping]]:
2022 """Iterate through sub-lists of elements of the size given.
2023
2024 Equivalent to :meth:`_engine.Result.partitions` except that
2025 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2026 objects, are returned.
2027
2028 """
2029
2030 getter = self._manyrow_getter
2031
2032 while True:
2033 partition = getter(self, size)
2034 if partition:
2035 yield partition
2036 else:
2037 break
2038
2039 def fetchall(self) -> Sequence[RowMapping]:
2040 """A synonym for the :meth:`_engine.MappingResult.all` method."""
2041
2042 return self._allrows()
2043
2044 def fetchone(self) -> Optional[RowMapping]:
2045 """Fetch one object.
2046
2047 Equivalent to :meth:`_engine.Result.fetchone` except that
2048 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2049 objects, are returned.
2050
2051 """
2052
2053 row = self._onerow_getter(self)
2054 if row is _NO_ROW:
2055 return None
2056 else:
2057 return row
2058
2059 def fetchmany(self, size: Optional[int] = None) -> Sequence[RowMapping]:
2060 """Fetch many objects.
2061
2062 Equivalent to :meth:`_engine.Result.fetchmany` except that
2063 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2064 objects, are returned.
2065
2066 """
2067
2068 return self._manyrow_getter(self, size)
2069
2070 def all(self) -> Sequence[RowMapping]:
2071 """Return all scalar values in a sequence.
2072
2073 Equivalent to :meth:`_engine.Result.all` except that
2074 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2075 objects, are returned.
2076
2077 """
2078
2079 return self._allrows()
2080
2081 def __iter__(self) -> Iterator[RowMapping]:
2082 return self._iter_impl()
2083
2084 def __next__(self) -> RowMapping:
2085 return self._next_impl()
2086
2087 def first(self) -> Optional[RowMapping]:
2088 """Fetch the first object or ``None`` if no object is present.
2089
2090 Equivalent to :meth:`_engine.Result.first` except that
2091 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2092 objects, are returned.
2093
2094
2095 """
2096 return self._only_one_row(
2097 raise_for_second_row=False, raise_for_none=False, scalar=False
2098 )
2099
2100 def one_or_none(self) -> Optional[RowMapping]:
2101 """Return at most one object or raise an exception.
2102
2103 Equivalent to :meth:`_engine.Result.one_or_none` except that
2104 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2105 objects, are returned.
2106
2107 """
2108 return self._only_one_row(
2109 raise_for_second_row=True, raise_for_none=False, scalar=False
2110 )
2111
2112 def one(self) -> RowMapping:
2113 """Return exactly one object or raise an exception.
2114
2115 Equivalent to :meth:`_engine.Result.one` except that
2116 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2117 objects, are returned.
2118
2119 """
2120 return self._only_one_row(
2121 raise_for_second_row=True, raise_for_none=True, scalar=False
2122 )
2123
2124
2125class FrozenResult(Generic[_TP]):
2126 """Represents a :class:`_engine.Result` object in a "frozen" state suitable
2127 for caching.
2128
2129 The :class:`_engine.FrozenResult` object is returned from the
2130 :meth:`_engine.Result.freeze` method of any :class:`_engine.Result`
2131 object.
2132
2133 A new iterable :class:`_engine.Result` object is generated from a fixed
2134 set of data each time the :class:`_engine.FrozenResult` is invoked as
2135 a callable::
2136
2137
2138 result = connection.execute(query)
2139
2140 frozen = result.freeze()
2141
2142 unfrozen_result_one = frozen()
2143
2144 for row in unfrozen_result_one:
2145 print(row)
2146
2147 unfrozen_result_two = frozen()
2148 rows = unfrozen_result_two.all()
2149
2150 # ... etc
2151
2152 .. versionadded:: 1.4
2153
2154 .. seealso::
2155
2156 :ref:`do_orm_execute_re_executing` - example usage within the
2157 ORM to implement a result-set cache.
2158
2159 :func:`_orm.loading.merge_frozen_result` - ORM function to merge
2160 a frozen result back into a :class:`_orm.Session`.
2161
2162 """
2163
2164 data: Sequence[Any]
2165
2166 def __init__(self, result: Result[_TP]):
2167 self.metadata = result._metadata._for_freeze()
2168 self._source_supports_scalars = result._source_supports_scalars
2169 self._attributes = result._attributes
2170
2171 if self._source_supports_scalars:
2172 self.data = list(result._raw_row_iterator())
2173 else:
2174 self.data = result.fetchall()
2175
2176 def rewrite_rows(self) -> Sequence[Sequence[Any]]:
2177 if self._source_supports_scalars:
2178 return [[elem] for elem in self.data]
2179 else:
2180 return [list(row) for row in self.data]
2181
2182 def with_new_rows(
2183 self, tuple_data: Sequence[Row[_TP]]
2184 ) -> FrozenResult[_TP]:
2185 fr = FrozenResult.__new__(FrozenResult)
2186 fr.metadata = self.metadata
2187 fr._attributes = self._attributes
2188 fr._source_supports_scalars = self._source_supports_scalars
2189
2190 if self._source_supports_scalars:
2191 fr.data = [d[0] for d in tuple_data]
2192 else:
2193 fr.data = tuple_data
2194 return fr
2195
2196 def __call__(self) -> Result[_TP]:
2197 result: IteratorResult[_TP] = IteratorResult(
2198 self.metadata, iter(self.data)
2199 )
2200 result._attributes = self._attributes
2201 result._source_supports_scalars = self._source_supports_scalars
2202 return result
2203
2204
2205class IteratorResult(Result[_TP]):
2206 """A :class:`_engine.Result` that gets data from a Python iterator of
2207 :class:`_engine.Row` objects or similar row-like data.
2208
2209 .. versionadded:: 1.4
2210
2211 """
2212
2213 _hard_closed = False
2214 _soft_closed = False
2215
2216 def __init__(
2217 self,
2218 cursor_metadata: ResultMetaData,
2219 iterator: Iterator[_InterimSupportsScalarsRowType],
2220 raw: Optional[Result[Any]] = None,
2221 _source_supports_scalars: bool = False,
2222 ):
2223 self._metadata = cursor_metadata
2224 self.iterator = iterator
2225 self.raw = raw
2226 self._source_supports_scalars = _source_supports_scalars
2227
2228 @property
2229 def closed(self) -> bool:
2230 """Return ``True`` if this :class:`_engine.IteratorResult` has
2231 been closed
2232
2233 .. versionadded:: 1.4.43
2234
2235 """
2236 return self._hard_closed
2237
2238 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2239 if hard:
2240 self._hard_closed = True
2241 if self.raw is not None:
2242 self.raw._soft_close(hard=hard, **kw)
2243 self.iterator = iter([])
2244 self._reset_memoizations()
2245 self._soft_closed = True
2246
2247 def _raise_hard_closed(self) -> NoReturn:
2248 raise exc.ResourceClosedError("This result object is closed.")
2249
2250 def _raw_row_iterator(self) -> Iterator[_RowData]:
2251 return self.iterator
2252
2253 def _fetchiter_impl(self) -> Iterator[_InterimSupportsScalarsRowType]:
2254 if self._hard_closed:
2255 self._raise_hard_closed()
2256 return self.iterator
2257
2258 def _fetchone_impl(
2259 self, hard_close: bool = False
2260 ) -> Optional[_InterimRowType[Row[Any]]]:
2261 if self._hard_closed:
2262 self._raise_hard_closed()
2263
2264 row = next(self.iterator, _NO_ROW)
2265 if row is _NO_ROW:
2266 self._soft_close(hard=hard_close)
2267 return None
2268 else:
2269 return row
2270
2271 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
2272 if self._hard_closed:
2273 self._raise_hard_closed()
2274 try:
2275 return list(self.iterator)
2276 finally:
2277 self._soft_close()
2278
2279 def _fetchmany_impl(
2280 self, size: Optional[int] = None
2281 ) -> List[_InterimRowType[Row[Any]]]:
2282 if self._hard_closed:
2283 self._raise_hard_closed()
2284
2285 return list(itertools.islice(self.iterator, 0, size))
2286
2287
2288def null_result() -> IteratorResult[Any]:
2289 return IteratorResult(SimpleResultMetaData([]), iter([]))
2290
2291
2292class ChunkedIteratorResult(IteratorResult[_TP]):
2293 """An :class:`_engine.IteratorResult` that works from an
2294 iterator-producing callable.
2295
2296 The given ``chunks`` argument is a function that is given a number of rows
2297 to return in each chunk, or ``None`` for all rows. The function should
2298 then return an un-consumed iterator of lists, each list of the requested
2299 size.
2300
2301 The function can be called at any time again, in which case it should
2302 continue from the same result set but adjust the chunk size as given.
2303
2304 .. versionadded:: 1.4
2305
2306 """
2307
2308 def __init__(
2309 self,
2310 cursor_metadata: ResultMetaData,
2311 chunks: Callable[
2312 [Optional[int]], Iterator[Sequence[_InterimRowType[_R]]]
2313 ],
2314 source_supports_scalars: bool = False,
2315 raw: Optional[Result[Any]] = None,
2316 dynamic_yield_per: bool = False,
2317 ):
2318 self._metadata = cursor_metadata
2319 self.chunks = chunks
2320 self._source_supports_scalars = source_supports_scalars
2321 self.raw = raw
2322 self.iterator = itertools.chain.from_iterable(self.chunks(None))
2323 self.dynamic_yield_per = dynamic_yield_per
2324
2325 @_generative
2326 def yield_per(self, num: int) -> Self:
2327 # TODO: this throws away the iterator which may be holding
2328 # onto a chunk. the yield_per cannot be changed once any
2329 # rows have been fetched. either find a way to enforce this,
2330 # or we can't use itertools.chain and will instead have to
2331 # keep track.
2332
2333 self._yield_per = num
2334 self.iterator = itertools.chain.from_iterable(self.chunks(num))
2335 return self
2336
2337 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2338 super()._soft_close(hard=hard, **kw)
2339 self.chunks = lambda size: [] # type: ignore
2340
2341 def _fetchmany_impl(
2342 self, size: Optional[int] = None
2343 ) -> List[_InterimRowType[Row[Any]]]:
2344 if self.dynamic_yield_per:
2345 self.iterator = itertools.chain.from_iterable(self.chunks(size))
2346 return super()._fetchmany_impl(size=size)
2347
2348
2349class MergedResult(IteratorResult[_TP]):
2350 """A :class:`_engine.Result` that is merged from any number of
2351 :class:`_engine.Result` objects.
2352
2353 Returned by the :meth:`_engine.Result.merge` method.
2354
2355 .. versionadded:: 1.4
2356
2357 """
2358
2359 closed = False
2360 rowcount: Optional[int]
2361
2362 def __init__(
2363 self, cursor_metadata: ResultMetaData, results: Sequence[Result[_TP]]
2364 ):
2365 self._results = results
2366 super().__init__(
2367 cursor_metadata,
2368 itertools.chain.from_iterable(
2369 r._raw_row_iterator() for r in results
2370 ),
2371 )
2372
2373 self._unique_filter_state = results[0]._unique_filter_state
2374 self._yield_per = results[0]._yield_per
2375
2376 # going to try something w/ this in next rev
2377 self._source_supports_scalars = results[0]._source_supports_scalars
2378
2379 self._attributes = self._attributes.merge_with(
2380 *[r._attributes for r in results]
2381 )
2382
2383 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2384 for r in self._results:
2385 r._soft_close(hard=hard, **kw)
2386 if hard:
2387 self.closed = True