1# engine/result.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define generic result set constructs."""
9
10from __future__ import annotations
11
12from enum import Enum
13import functools
14import itertools
15import operator
16import typing
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import Generic
22from typing import Iterable
23from typing import Iterator
24from typing import List
25from typing import Mapping
26from typing import NoReturn
27from typing import Optional
28from typing import overload
29from typing import Sequence
30from typing import Set
31from typing import Tuple
32from typing import TYPE_CHECKING
33from typing import TypeVar
34from typing import Union
35
36from .row import Row
37from .row import RowMapping
38from .. import exc
39from .. import util
40from ..sql.base import _generative
41from ..sql.base import HasMemoized
42from ..sql.base import InPlaceGenerative
43from ..util import HasMemoized_ro_memoized_attribute
44from ..util import NONE_SET
45from ..util._has_cy import HAS_CYEXTENSION
46from ..util.typing import Literal
47from ..util.typing import Self
48
49if typing.TYPE_CHECKING or not HAS_CYEXTENSION:
50 from ._py_row import tuplegetter as tuplegetter
51else:
52 from sqlalchemy.cyextension.resultproxy import tuplegetter as tuplegetter
53
54if typing.TYPE_CHECKING:
55 from typing import Type
56
57 from .. import inspection
58 from ..sql import roles
59 from ..sql._typing import _HasClauseElement
60 from ..sql.elements import SQLCoreOperations
61 from ..sql.type_api import _ResultProcessorType
62
63_KeyType = Union[
64 str,
65 "SQLCoreOperations[Any]",
66 "roles.TypedColumnsClauseRole[Any]",
67 "roles.ColumnsClauseRole",
68 "Type[Any]",
69 "inspection.Inspectable[_HasClauseElement[Any]]",
70]
71_KeyIndexType = Union[_KeyType, int]
72
73# is overridden in cursor using _CursorKeyMapRecType
74_KeyMapRecType = Any
75
76_KeyMapType = Mapping[_KeyType, _KeyMapRecType]
77
78
79_RowData = Union[Row[Any], RowMapping, Any]
80"""A generic form of "row" that accommodates for the different kinds of
81"rows" that different result objects return, including row, row mapping, and
82scalar values"""
83
84_RawRowType = Tuple[Any, ...]
85"""represents the kind of row we get from a DBAPI cursor"""
86
87_R = TypeVar("_R", bound=_RowData)
88_T = TypeVar("_T", bound=Any)
89_TP = TypeVar("_TP", bound=Tuple[Any, ...])
90
91_InterimRowType = Union[_R, _RawRowType]
92"""a catchall "anything" kind of return type that can be applied
93across all the result types
94
95"""
96
97_InterimSupportsScalarsRowType = Union[Row[Any], Any]
98
99_ProcessorsType = Sequence[Optional["_ResultProcessorType[Any]"]]
100_TupleGetterType = Callable[[Sequence[Any]], Sequence[Any]]
101_UniqueFilterType = Callable[[Any], Any]
102_UniqueFilterStateType = Tuple[Set[Any], Optional[_UniqueFilterType]]
103
104
105class ResultMetaData:
106 """Base for metadata about result rows."""
107
108 __slots__ = ()
109
110 _tuplefilter: Optional[_TupleGetterType] = None
111 _translated_indexes: Optional[Sequence[int]] = None
112 _unique_filters: Optional[Sequence[Callable[[Any], Any]]] = None
113 _keymap: _KeyMapType
114 _keys: Sequence[str]
115 _processors: Optional[_ProcessorsType]
116 _key_to_index: Mapping[_KeyType, int]
117
118 @property
119 def keys(self) -> RMKeyView:
120 return RMKeyView(self)
121
122 def _has_key(self, key: object) -> bool:
123 raise NotImplementedError()
124
125 def _for_freeze(self) -> ResultMetaData:
126 raise NotImplementedError()
127
128 @overload
129 def _key_fallback(
130 self, key: Any, err: Optional[Exception], raiseerr: Literal[True] = ...
131 ) -> NoReturn: ...
132
133 @overload
134 def _key_fallback(
135 self,
136 key: Any,
137 err: Optional[Exception],
138 raiseerr: Literal[False] = ...,
139 ) -> None: ...
140
141 @overload
142 def _key_fallback(
143 self, key: Any, err: Optional[Exception], raiseerr: bool = ...
144 ) -> Optional[NoReturn]: ...
145
146 def _key_fallback(
147 self, key: Any, err: Optional[Exception], raiseerr: bool = True
148 ) -> Optional[NoReturn]:
149 assert raiseerr
150 raise KeyError(key) from err
151
152 def _raise_for_ambiguous_column_name(
153 self, rec: _KeyMapRecType
154 ) -> NoReturn:
155 raise NotImplementedError(
156 "ambiguous column name logic is implemented for "
157 "CursorResultMetaData"
158 )
159
160 def _index_for_key(
161 self, key: _KeyIndexType, raiseerr: bool
162 ) -> Optional[int]:
163 raise NotImplementedError()
164
165 def _indexes_for_keys(
166 self, keys: Sequence[_KeyIndexType]
167 ) -> Sequence[int]:
168 raise NotImplementedError()
169
170 def _metadata_for_keys(
171 self, keys: Sequence[_KeyIndexType]
172 ) -> Iterator[_KeyMapRecType]:
173 raise NotImplementedError()
174
175 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
176 raise NotImplementedError()
177
178 def _getter(
179 self, key: Any, raiseerr: bool = True
180 ) -> Optional[Callable[[Row[Any]], Any]]:
181 index = self._index_for_key(key, raiseerr)
182
183 if index is not None:
184 return operator.itemgetter(index)
185 else:
186 return None
187
188 def _row_as_tuple_getter(
189 self, keys: Sequence[_KeyIndexType]
190 ) -> _TupleGetterType:
191 indexes = self._indexes_for_keys(keys)
192 return tuplegetter(*indexes)
193
194 def _make_key_to_index(
195 self, keymap: Mapping[_KeyType, Sequence[Any]], index: int
196 ) -> Mapping[_KeyType, int]:
197 return {
198 key: rec[index]
199 for key, rec in keymap.items()
200 if rec[index] is not None
201 }
202
203 def _key_not_found(self, key: Any, attr_error: bool) -> NoReturn:
204 if key in self._keymap:
205 # the index must be none in this case
206 self._raise_for_ambiguous_column_name(self._keymap[key])
207 else:
208 # unknown key
209 if attr_error:
210 try:
211 self._key_fallback(key, None)
212 except KeyError as ke:
213 raise AttributeError(ke.args[0]) from ke
214 else:
215 self._key_fallback(key, None)
216
217 @property
218 def _effective_processors(self) -> Optional[_ProcessorsType]:
219 if not self._processors or NONE_SET.issuperset(self._processors):
220 return None
221 else:
222 return self._processors
223
224
225class RMKeyView(typing.KeysView[Any]):
226 __slots__ = ("_parent", "_keys")
227
228 _parent: ResultMetaData
229 _keys: Sequence[str]
230
231 def __init__(self, parent: ResultMetaData):
232 self._parent = parent
233 self._keys = [k for k in parent._keys if k is not None]
234
235 def __len__(self) -> int:
236 return len(self._keys)
237
238 def __repr__(self) -> str:
239 return "{0.__class__.__name__}({0._keys!r})".format(self)
240
241 def __iter__(self) -> Iterator[str]:
242 return iter(self._keys)
243
244 def __contains__(self, item: Any) -> bool:
245 if isinstance(item, int):
246 return False
247
248 # note this also includes special key fallback behaviors
249 # which also don't seem to be tested in test_resultset right now
250 return self._parent._has_key(item)
251
252 def __eq__(self, other: Any) -> bool:
253 return list(other) == list(self)
254
255 def __ne__(self, other: Any) -> bool:
256 return list(other) != list(self)
257
258
259class SimpleResultMetaData(ResultMetaData):
260 """result metadata for in-memory collections."""
261
262 __slots__ = (
263 "_keys",
264 "_keymap",
265 "_processors",
266 "_tuplefilter",
267 "_translated_indexes",
268 "_unique_filters",
269 "_key_to_index",
270 )
271
272 _keys: Sequence[str]
273
274 def __init__(
275 self,
276 keys: Sequence[str],
277 extra: Optional[Sequence[Any]] = None,
278 _processors: Optional[_ProcessorsType] = None,
279 _tuplefilter: Optional[_TupleGetterType] = None,
280 _translated_indexes: Optional[Sequence[int]] = None,
281 _unique_filters: Optional[Sequence[Callable[[Any], Any]]] = None,
282 ):
283 self._keys = list(keys)
284 self._tuplefilter = _tuplefilter
285 self._translated_indexes = _translated_indexes
286 self._unique_filters = _unique_filters
287 if extra:
288 recs_names = [
289 (
290 (name,) + (extras if extras else ()),
291 (index, name, extras),
292 )
293 for index, (name, extras) in enumerate(zip(self._keys, extra))
294 ]
295 else:
296 recs_names = [
297 ((name,), (index, name, ()))
298 for index, name in enumerate(self._keys)
299 ]
300
301 self._keymap = {key: rec for keys, rec in recs_names for key in keys}
302
303 self._processors = _processors
304
305 self._key_to_index = self._make_key_to_index(self._keymap, 0)
306
307 def _has_key(self, key: object) -> bool:
308 return key in self._keymap
309
310 def _for_freeze(self) -> ResultMetaData:
311 unique_filters = self._unique_filters
312 if unique_filters and self._tuplefilter:
313 unique_filters = self._tuplefilter(unique_filters)
314
315 # TODO: are we freezing the result with or without uniqueness
316 # applied?
317 return SimpleResultMetaData(
318 self._keys,
319 extra=[self._keymap[key][2] for key in self._keys],
320 _unique_filters=unique_filters,
321 )
322
323 def __getstate__(self) -> Dict[str, Any]:
324 return {
325 "_keys": self._keys,
326 "_translated_indexes": self._translated_indexes,
327 }
328
329 def __setstate__(self, state: Dict[str, Any]) -> None:
330 if state["_translated_indexes"]:
331 _translated_indexes = state["_translated_indexes"]
332 _tuplefilter = tuplegetter(*_translated_indexes)
333 else:
334 _translated_indexes = _tuplefilter = None
335 self.__init__( # type: ignore
336 state["_keys"],
337 _translated_indexes=_translated_indexes,
338 _tuplefilter=_tuplefilter,
339 )
340
341 def _index_for_key(self, key: Any, raiseerr: bool = True) -> int:
342 if int in key.__class__.__mro__:
343 key = self._keys[key]
344 try:
345 rec = self._keymap[key]
346 except KeyError as ke:
347 rec = self._key_fallback(key, ke, raiseerr)
348
349 return rec[0] # type: ignore[no-any-return]
350
351 def _indexes_for_keys(self, keys: Sequence[Any]) -> Sequence[int]:
352 return [self._keymap[key][0] for key in keys]
353
354 def _metadata_for_keys(
355 self, keys: Sequence[Any]
356 ) -> Iterator[_KeyMapRecType]:
357 for key in keys:
358 if int in key.__class__.__mro__:
359 key = self._keys[key]
360
361 try:
362 rec = self._keymap[key]
363 except KeyError as ke:
364 rec = self._key_fallback(key, ke, True)
365
366 yield rec
367
368 def _reduce(self, keys: Sequence[Any]) -> ResultMetaData:
369 try:
370 metadata_for_keys = [
371 self._keymap[
372 self._keys[key] if int in key.__class__.__mro__ else key
373 ]
374 for key in keys
375 ]
376 except KeyError as ke:
377 self._key_fallback(ke.args[0], ke, True)
378
379 indexes: Sequence[int]
380 new_keys: Sequence[str]
381 extra: Sequence[Any]
382 indexes, new_keys, extra = zip(*metadata_for_keys)
383
384 if self._translated_indexes:
385 indexes = [self._translated_indexes[idx] for idx in indexes]
386
387 tup = tuplegetter(*indexes)
388
389 new_metadata = SimpleResultMetaData(
390 new_keys,
391 extra=extra,
392 _tuplefilter=tup,
393 _translated_indexes=indexes,
394 _processors=self._processors,
395 _unique_filters=self._unique_filters,
396 )
397
398 return new_metadata
399
400
401def result_tuple(
402 fields: Sequence[str], extra: Optional[Any] = None
403) -> Callable[[Iterable[Any]], Row[Any]]:
404 parent = SimpleResultMetaData(fields, extra)
405 return functools.partial(
406 Row, parent, parent._effective_processors, parent._key_to_index
407 )
408
409
410# a symbol that indicates to internal Result methods that
411# "no row is returned". We can't use None for those cases where a scalar
412# filter is applied to rows.
413class _NoRow(Enum):
414 _NO_ROW = 0
415
416
417_NO_ROW = _NoRow._NO_ROW
418
419
420class ResultInternal(InPlaceGenerative, Generic[_R]):
421 __slots__ = ()
422
423 _real_result: Optional[Result[Any]] = None
424 _generate_rows: bool = True
425 _row_logging_fn: Optional[Callable[[Any], Any]]
426
427 _unique_filter_state: Optional[_UniqueFilterStateType] = None
428 _post_creational_filter: Optional[Callable[[Any], Any]] = None
429 _is_cursor = False
430
431 _metadata: ResultMetaData
432
433 _source_supports_scalars: bool
434
435 def _fetchiter_impl(self) -> Iterator[_InterimRowType[Row[Any]]]:
436 raise NotImplementedError()
437
438 def _fetchone_impl(
439 self, hard_close: bool = False
440 ) -> Optional[_InterimRowType[Row[Any]]]:
441 raise NotImplementedError()
442
443 def _fetchmany_impl(
444 self, size: Optional[int] = None
445 ) -> List[_InterimRowType[Row[Any]]]:
446 raise NotImplementedError()
447
448 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
449 raise NotImplementedError()
450
451 def _soft_close(self, hard: bool = False) -> None:
452 raise NotImplementedError()
453
454 @HasMemoized_ro_memoized_attribute
455 def _row_getter(self) -> Optional[Callable[..., _R]]:
456 real_result: Result[Any] = (
457 self._real_result
458 if self._real_result
459 else cast("Result[Any]", self)
460 )
461
462 if real_result._source_supports_scalars:
463 if not self._generate_rows:
464 return None
465 else:
466 _proc = Row
467
468 def process_row(
469 metadata: ResultMetaData,
470 processors: Optional[_ProcessorsType],
471 key_to_index: Mapping[_KeyType, int],
472 scalar_obj: Any,
473 ) -> Row[Any]:
474 return _proc(
475 metadata, processors, key_to_index, (scalar_obj,)
476 )
477
478 else:
479 process_row = Row # type: ignore
480
481 metadata = self._metadata
482
483 key_to_index = metadata._key_to_index
484 processors = metadata._effective_processors
485 tf = metadata._tuplefilter
486
487 if tf and not real_result._source_supports_scalars:
488 if processors:
489 processors = tf(processors)
490
491 _make_row_orig: Callable[..., _R] = functools.partial( # type: ignore # noqa E501
492 process_row, metadata, processors, key_to_index
493 )
494
495 fixed_tf = tf
496
497 def make_row(row: _InterimRowType[Row[Any]]) -> _R:
498 return _make_row_orig(fixed_tf(row))
499
500 else:
501 make_row = functools.partial( # type: ignore
502 process_row, metadata, processors, key_to_index
503 )
504
505 if real_result._row_logging_fn:
506 _log_row = real_result._row_logging_fn
507 _make_row = make_row
508
509 def make_row(row: _InterimRowType[Row[Any]]) -> _R:
510 return _log_row(_make_row(row)) # type: ignore
511
512 return make_row
513
514 @HasMemoized_ro_memoized_attribute
515 def _iterator_getter(self) -> Callable[..., Iterator[_R]]:
516 make_row = self._row_getter
517
518 post_creational_filter = self._post_creational_filter
519
520 if self._unique_filter_state:
521 uniques, strategy = self._unique_strategy
522
523 def iterrows(self: Result[Any]) -> Iterator[_R]:
524 for raw_row in self._fetchiter_impl():
525 obj: _InterimRowType[Any] = (
526 make_row(raw_row) if make_row else raw_row
527 )
528 hashed = strategy(obj) if strategy else obj
529 if hashed in uniques:
530 continue
531 uniques.add(hashed)
532 if post_creational_filter:
533 obj = post_creational_filter(obj)
534 yield obj # type: ignore
535
536 else:
537
538 def iterrows(self: Result[Any]) -> Iterator[_R]:
539 for raw_row in self._fetchiter_impl():
540 row: _InterimRowType[Any] = (
541 make_row(raw_row) if make_row else raw_row
542 )
543 if post_creational_filter:
544 row = post_creational_filter(row)
545 yield row # type: ignore
546
547 return iterrows
548
549 def _raw_all_rows(self) -> List[_R]:
550 make_row = self._row_getter
551 assert make_row is not None
552 rows = self._fetchall_impl()
553 return [make_row(row) for row in rows]
554
555 def _allrows(self) -> List[_R]:
556 post_creational_filter = self._post_creational_filter
557
558 make_row = self._row_getter
559
560 rows = self._fetchall_impl()
561 made_rows: List[_InterimRowType[_R]]
562 if make_row:
563 made_rows = [make_row(row) for row in rows]
564 else:
565 made_rows = rows # type: ignore
566
567 interim_rows: List[_R]
568
569 if self._unique_filter_state:
570 uniques, strategy = self._unique_strategy
571
572 interim_rows = [
573 made_row # type: ignore
574 for made_row, sig_row in [
575 (
576 made_row,
577 strategy(made_row) if strategy else made_row,
578 )
579 for made_row in made_rows
580 ]
581 if sig_row not in uniques and not uniques.add(sig_row) # type: ignore # noqa: E501
582 ]
583 else:
584 interim_rows = made_rows # type: ignore
585
586 if post_creational_filter:
587 interim_rows = [
588 post_creational_filter(row) for row in interim_rows
589 ]
590 return interim_rows
591
592 @HasMemoized_ro_memoized_attribute
593 def _onerow_getter(
594 self,
595 ) -> Callable[..., Union[Literal[_NoRow._NO_ROW], _R]]:
596 make_row = self._row_getter
597
598 post_creational_filter = self._post_creational_filter
599
600 if self._unique_filter_state:
601 uniques, strategy = self._unique_strategy
602
603 def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
604 _onerow = self._fetchone_impl
605 while True:
606 row = _onerow()
607 if row is None:
608 return _NO_ROW
609 else:
610 obj: _InterimRowType[Any] = (
611 make_row(row) if make_row else row
612 )
613 hashed = strategy(obj) if strategy else obj
614 if hashed in uniques:
615 continue
616 else:
617 uniques.add(hashed)
618 if post_creational_filter:
619 obj = post_creational_filter(obj)
620 return obj # type: ignore
621
622 else:
623
624 def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
625 row = self._fetchone_impl()
626 if row is None:
627 return _NO_ROW
628 else:
629 interim_row: _InterimRowType[Any] = (
630 make_row(row) if make_row else row
631 )
632 if post_creational_filter:
633 interim_row = post_creational_filter(interim_row)
634 return interim_row # type: ignore
635
636 return onerow
637
638 @HasMemoized_ro_memoized_attribute
639 def _manyrow_getter(self) -> Callable[..., List[_R]]:
640 make_row = self._row_getter
641
642 post_creational_filter = self._post_creational_filter
643
644 if self._unique_filter_state:
645 uniques, strategy = self._unique_strategy
646
647 def filterrows(
648 make_row: Optional[Callable[..., _R]],
649 rows: List[Any],
650 strategy: Optional[Callable[[List[Any]], Any]],
651 uniques: Set[Any],
652 ) -> List[_R]:
653 if make_row:
654 rows = [make_row(row) for row in rows]
655
656 if strategy:
657 made_rows = (
658 (made_row, strategy(made_row)) for made_row in rows
659 )
660 else:
661 made_rows = ((made_row, made_row) for made_row in rows)
662 return [
663 made_row
664 for made_row, sig_row in made_rows
665 if sig_row not in uniques and not uniques.add(sig_row) # type: ignore # noqa: E501
666 ]
667
668 def manyrows(
669 self: ResultInternal[_R], num: Optional[int]
670 ) -> List[_R]:
671 collect: List[_R] = []
672
673 _manyrows = self._fetchmany_impl
674
675 if num is None:
676 # if None is passed, we don't know the default
677 # manyrows number, DBAPI has this as cursor.arraysize
678 # different DBAPIs / fetch strategies may be different.
679 # do a fetch to find what the number is. if there are
680 # only fewer rows left, then it doesn't matter.
681 real_result = (
682 self._real_result
683 if self._real_result
684 else cast("Result[Any]", self)
685 )
686 if real_result._yield_per:
687 num_required = num = real_result._yield_per
688 else:
689 rows = _manyrows(num)
690 num = len(rows)
691 assert make_row is not None
692 collect.extend(
693 filterrows(make_row, rows, strategy, uniques)
694 )
695 num_required = num - len(collect)
696 else:
697 num_required = num
698
699 assert num is not None
700
701 while num_required:
702 rows = _manyrows(num_required)
703 if not rows:
704 break
705
706 collect.extend(
707 filterrows(make_row, rows, strategy, uniques)
708 )
709 num_required = num - len(collect)
710
711 if post_creational_filter:
712 collect = [post_creational_filter(row) for row in collect]
713 return collect
714
715 else:
716
717 def manyrows(
718 self: ResultInternal[_R], num: Optional[int]
719 ) -> List[_R]:
720 if num is None:
721 real_result = (
722 self._real_result
723 if self._real_result
724 else cast("Result[Any]", self)
725 )
726 num = real_result._yield_per
727
728 rows: List[_InterimRowType[Any]] = self._fetchmany_impl(num)
729 if make_row:
730 rows = [make_row(row) for row in rows]
731 if post_creational_filter:
732 rows = [post_creational_filter(row) for row in rows]
733 return rows # type: ignore
734
735 return manyrows
736
737 @overload
738 def _only_one_row(
739 self: ResultInternal[Row[Any]],
740 raise_for_second_row: bool,
741 raise_for_none: bool,
742 scalar: Literal[True],
743 ) -> Any: ...
744
745 @overload
746 def _only_one_row(
747 self,
748 raise_for_second_row: bool,
749 raise_for_none: Literal[True],
750 scalar: bool,
751 ) -> _R: ...
752
753 @overload
754 def _only_one_row(
755 self,
756 raise_for_second_row: bool,
757 raise_for_none: bool,
758 scalar: bool,
759 ) -> Optional[_R]: ...
760
761 def _only_one_row(
762 self,
763 raise_for_second_row: bool,
764 raise_for_none: bool,
765 scalar: bool,
766 ) -> Optional[_R]:
767 onerow = self._fetchone_impl
768
769 row: Optional[_InterimRowType[Any]] = onerow(hard_close=True)
770 if row is None:
771 if raise_for_none:
772 raise exc.NoResultFound(
773 "No row was found when one was required"
774 )
775 else:
776 return None
777
778 if scalar and self._source_supports_scalars:
779 self._generate_rows = False
780 make_row = None
781 else:
782 make_row = self._row_getter
783
784 try:
785 row = make_row(row) if make_row else row
786 except:
787 self._soft_close(hard=True)
788 raise
789
790 if raise_for_second_row:
791 if self._unique_filter_state:
792 # for no second row but uniqueness, need to essentially
793 # consume the entire result :(
794 uniques, strategy = self._unique_strategy
795
796 existing_row_hash = strategy(row) if strategy else row
797
798 while True:
799 next_row: Any = onerow(hard_close=True)
800 if next_row is None:
801 next_row = _NO_ROW
802 break
803
804 try:
805 next_row = make_row(next_row) if make_row else next_row
806
807 if strategy:
808 assert next_row is not _NO_ROW
809 if existing_row_hash == strategy(next_row):
810 continue
811 elif row == next_row:
812 continue
813 # here, we have a row and it's different
814 break
815 except:
816 self._soft_close(hard=True)
817 raise
818 else:
819 next_row = onerow(hard_close=True)
820 if next_row is None:
821 next_row = _NO_ROW
822
823 if next_row is not _NO_ROW:
824 self._soft_close(hard=True)
825 raise exc.MultipleResultsFound(
826 "Multiple rows were found when exactly one was required"
827 if raise_for_none
828 else "Multiple rows were found when one or none "
829 "was required"
830 )
831 else:
832 # if we checked for second row then that would have
833 # closed us :)
834 self._soft_close(hard=True)
835
836 if not scalar:
837 post_creational_filter = self._post_creational_filter
838 if post_creational_filter:
839 row = post_creational_filter(row)
840
841 if scalar and make_row:
842 return row[0] # type: ignore
843 else:
844 return row # type: ignore
845
846 def _iter_impl(self) -> Iterator[_R]:
847 return self._iterator_getter(self)
848
849 def _next_impl(self) -> _R:
850 row = self._onerow_getter(self)
851 if row is _NO_ROW:
852 raise StopIteration()
853 else:
854 return row
855
856 @_generative
857 def _column_slices(self, indexes: Sequence[_KeyIndexType]) -> Self:
858 real_result = (
859 self._real_result
860 if self._real_result
861 else cast("Result[Any]", self)
862 )
863
864 if not real_result._source_supports_scalars or len(indexes) != 1:
865 self._metadata = self._metadata._reduce(indexes)
866
867 assert self._generate_rows
868
869 return self
870
871 @HasMemoized.memoized_attribute
872 def _unique_strategy(self) -> _UniqueFilterStateType:
873 assert self._unique_filter_state is not None
874 uniques, strategy = self._unique_filter_state
875
876 real_result = (
877 self._real_result
878 if self._real_result is not None
879 else cast("Result[Any]", self)
880 )
881
882 if not strategy and self._metadata._unique_filters:
883 if (
884 real_result._source_supports_scalars
885 and not self._generate_rows
886 ):
887 strategy = self._metadata._unique_filters[0]
888 else:
889 filters = self._metadata._unique_filters
890 if self._metadata._tuplefilter:
891 filters = self._metadata._tuplefilter(filters)
892
893 strategy = operator.methodcaller("_filter_on_values", filters)
894 return uniques, strategy
895
896
897class _WithKeys:
898 __slots__ = ()
899
900 _metadata: ResultMetaData
901
902 # used mainly to share documentation on the keys method.
903 def keys(self) -> RMKeyView:
904 """Return an iterable view which yields the string keys that would
905 be represented by each :class:`_engine.Row`.
906
907 The keys can represent the labels of the columns returned by a core
908 statement or the names of the orm classes returned by an orm
909 execution.
910
911 The view also can be tested for key containment using the Python
912 ``in`` operator, which will test both for the string keys represented
913 in the view, as well as for alternate keys such as column objects.
914
915 .. versionchanged:: 1.4 a key view object is returned rather than a
916 plain list.
917
918
919 """
920 return self._metadata.keys
921
922
923class Result(_WithKeys, ResultInternal[Row[_TP]]):
924 """Represent a set of database results.
925
926 .. versionadded:: 1.4 The :class:`_engine.Result` object provides a
927 completely updated usage model and calling facade for SQLAlchemy
928 Core and SQLAlchemy ORM. In Core, it forms the basis of the
929 :class:`_engine.CursorResult` object which replaces the previous
930 :class:`_engine.ResultProxy` interface. When using the ORM, a
931 higher level object called :class:`_engine.ChunkedIteratorResult`
932 is normally used.
933
934 .. note:: In SQLAlchemy 1.4 and above, this object is
935 used for ORM results returned by :meth:`_orm.Session.execute`, which can
936 yield instances of ORM mapped objects either individually or within
937 tuple-like rows. Note that the :class:`_engine.Result` object does not
938 deduplicate instances or rows automatically as is the case with the
939 legacy :class:`_orm.Query` object. For in-Python de-duplication of
940 instances or rows, use the :meth:`_engine.Result.unique` modifier
941 method.
942
943 .. seealso::
944
945 :ref:`tutorial_fetching_rows` - in the :doc:`/tutorial/index`
946
947 """
948
949 __slots__ = ("_metadata", "__dict__")
950
951 _row_logging_fn: Optional[Callable[[Row[Any]], Row[Any]]] = None
952
953 _source_supports_scalars: bool = False
954
955 _yield_per: Optional[int] = None
956
957 _attributes: util.immutabledict[Any, Any] = util.immutabledict()
958
959 def __init__(self, cursor_metadata: ResultMetaData):
960 self._metadata = cursor_metadata
961
962 def __enter__(self) -> Self:
963 return self
964
965 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
966 self.close()
967
968 def close(self) -> None:
969 """close this :class:`_engine.Result`.
970
971 The behavior of this method is implementation specific, and is
972 not implemented by default. The method should generally end
973 the resources in use by the result object and also cause any
974 subsequent iteration or row fetching to raise
975 :class:`.ResourceClosedError`.
976
977 .. versionadded:: 1.4.27 - ``.close()`` was previously not generally
978 available for all :class:`_engine.Result` classes, instead only
979 being available on the :class:`_engine.CursorResult` returned for
980 Core statement executions. As most other result objects, namely the
981 ones used by the ORM, are proxying a :class:`_engine.CursorResult`
982 in any case, this allows the underlying cursor result to be closed
983 from the outside facade for the case when the ORM query is using
984 the ``yield_per`` execution option where it does not immediately
985 exhaust and autoclose the database cursor.
986
987 """
988 self._soft_close(hard=True)
989
990 @property
991 def _soft_closed(self) -> bool:
992 raise NotImplementedError()
993
994 @property
995 def closed(self) -> bool:
996 """return ``True`` if this :class:`_engine.Result` reports .closed
997
998 .. versionadded:: 1.4.43
999
1000 """
1001 raise NotImplementedError()
1002
1003 @_generative
1004 def yield_per(self, num: int) -> Self:
1005 """Configure the row-fetching strategy to fetch ``num`` rows at a time.
1006
1007 This impacts the underlying behavior of the result when iterating over
1008 the result object, or otherwise making use of methods such as
1009 :meth:`_engine.Result.fetchone` that return one row at a time. Data
1010 from the underlying cursor or other data source will be buffered up to
1011 this many rows in memory, and the buffered collection will then be
1012 yielded out one row at a time or as many rows are requested. Each time
1013 the buffer clears, it will be refreshed to this many rows or as many
1014 rows remain if fewer remain.
1015
1016 The :meth:`_engine.Result.yield_per` method is generally used in
1017 conjunction with the
1018 :paramref:`_engine.Connection.execution_options.stream_results`
1019 execution option, which will allow the database dialect in use to make
1020 use of a server side cursor, if the DBAPI supports a specific "server
1021 side cursor" mode separate from its default mode of operation.
1022
1023 .. tip::
1024
1025 Consider using the
1026 :paramref:`_engine.Connection.execution_options.yield_per`
1027 execution option, which will simultaneously set
1028 :paramref:`_engine.Connection.execution_options.stream_results`
1029 to ensure the use of server side cursors, as well as automatically
1030 invoke the :meth:`_engine.Result.yield_per` method to establish
1031 a fixed row buffer size at once.
1032
1033 The :paramref:`_engine.Connection.execution_options.yield_per`
1034 execution option is available for ORM operations, with
1035 :class:`_orm.Session`-oriented use described at
1036 :ref:`orm_queryguide_yield_per`. The Core-only version which works
1037 with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40.
1038
1039 .. versionadded:: 1.4
1040
1041 :param num: number of rows to fetch each time the buffer is refilled.
1042 If set to a value below 1, fetches all rows for the next buffer.
1043
1044 .. seealso::
1045
1046 :ref:`engine_stream_results` - describes Core behavior for
1047 :meth:`_engine.Result.yield_per`
1048
1049 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1050
1051 """
1052 self._yield_per = num
1053 return self
1054
1055 @_generative
1056 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
1057 """Apply unique filtering to the objects returned by this
1058 :class:`_engine.Result`.
1059
1060 When this filter is applied with no arguments, the rows or objects
1061 returned will filtered such that each row is returned uniquely. The
1062 algorithm used to determine this uniqueness is by default the Python
1063 hashing identity of the whole tuple. In some cases a specialized
1064 per-entity hashing scheme may be used, such as when using the ORM, a
1065 scheme is applied which works against the primary key identity of
1066 returned objects.
1067
1068 The unique filter is applied **after all other filters**, which means
1069 if the columns returned have been refined using a method such as the
1070 :meth:`_engine.Result.columns` or :meth:`_engine.Result.scalars`
1071 method, the uniquing is applied to **only the column or columns
1072 returned**. This occurs regardless of the order in which these
1073 methods have been called upon the :class:`_engine.Result` object.
1074
1075 The unique filter also changes the calculus used for methods like
1076 :meth:`_engine.Result.fetchmany` and :meth:`_engine.Result.partitions`.
1077 When using :meth:`_engine.Result.unique`, these methods will continue
1078 to yield the number of rows or objects requested, after uniquing
1079 has been applied. However, this necessarily impacts the buffering
1080 behavior of the underlying cursor or datasource, such that multiple
1081 underlying calls to ``cursor.fetchmany()`` may be necessary in order
1082 to accumulate enough objects in order to provide a unique collection
1083 of the requested size.
1084
1085 :param strategy: a callable that will be applied to rows or objects
1086 being iterated, which should return an object that represents the
1087 unique value of the row. A Python ``set()`` is used to store
1088 these identities. If not passed, a default uniqueness strategy
1089 is used which may have been assembled by the source of this
1090 :class:`_engine.Result` object.
1091
1092 """
1093 self._unique_filter_state = (set(), strategy)
1094 return self
1095
1096 def columns(self, *col_expressions: _KeyIndexType) -> Self:
1097 r"""Establish the columns that should be returned in each row.
1098
1099 This method may be used to limit the columns returned as well
1100 as to reorder them. The given list of expressions are normally
1101 a series of integers or string key names. They may also be
1102 appropriate :class:`.ColumnElement` objects which correspond to
1103 a given statement construct.
1104
1105 .. versionchanged:: 2.0 Due to a bug in 1.4, the
1106 :meth:`_engine.Result.columns` method had an incorrect behavior
1107 where calling upon the method with just one index would cause the
1108 :class:`_engine.Result` object to yield scalar values rather than
1109 :class:`_engine.Row` objects. In version 2.0, this behavior
1110 has been corrected such that calling upon
1111 :meth:`_engine.Result.columns` with a single index will
1112 produce a :class:`_engine.Result` object that continues
1113 to yield :class:`_engine.Row` objects, which include
1114 only a single column.
1115
1116 E.g.::
1117
1118 statement = select(table.c.x, table.c.y, table.c.z)
1119 result = connection.execute(statement)
1120
1121 for z, y in result.columns("z", "y"):
1122 ...
1123
1124 Example of using the column objects from the statement itself::
1125
1126 for z, y in result.columns(
1127 statement.selected_columns.c.z, statement.selected_columns.c.y
1128 ):
1129 ...
1130
1131 .. versionadded:: 1.4
1132
1133 :param \*col_expressions: indicates columns to be returned. Elements
1134 may be integer row indexes, string column names, or appropriate
1135 :class:`.ColumnElement` objects corresponding to a select construct.
1136
1137 :return: this :class:`_engine.Result` object with the modifications
1138 given.
1139
1140 """
1141 return self._column_slices(col_expressions)
1142
1143 @overload
1144 def scalars(self: Result[Tuple[_T]]) -> ScalarResult[_T]: ...
1145
1146 @overload
1147 def scalars(
1148 self: Result[Tuple[_T]], index: Literal[0]
1149 ) -> ScalarResult[_T]: ...
1150
1151 @overload
1152 def scalars(self, index: _KeyIndexType = 0) -> ScalarResult[Any]: ...
1153
1154 def scalars(self, index: _KeyIndexType = 0) -> ScalarResult[Any]:
1155 """Return a :class:`_engine.ScalarResult` filtering object which
1156 will return single elements rather than :class:`_row.Row` objects.
1157
1158 E.g.::
1159
1160 >>> result = conn.execute(text("select int_id from table"))
1161 >>> result.scalars().all()
1162 [1, 2, 3]
1163
1164 When results are fetched from the :class:`_engine.ScalarResult`
1165 filtering object, the single column-row that would be returned by the
1166 :class:`_engine.Result` is instead returned as the column's value.
1167
1168 .. versionadded:: 1.4
1169
1170 :param index: integer or row key indicating the column to be fetched
1171 from each row, defaults to ``0`` indicating the first column.
1172
1173 :return: a new :class:`_engine.ScalarResult` filtering object referring
1174 to this :class:`_engine.Result` object.
1175
1176 """
1177 return ScalarResult(self, index)
1178
1179 def _getter(
1180 self, key: _KeyIndexType, raiseerr: bool = True
1181 ) -> Optional[Callable[[Row[Any]], Any]]:
1182 """return a callable that will retrieve the given key from a
1183 :class:`_engine.Row`.
1184
1185 """
1186 if self._source_supports_scalars:
1187 raise NotImplementedError(
1188 "can't use this function in 'only scalars' mode"
1189 )
1190 return self._metadata._getter(key, raiseerr)
1191
1192 def _tuple_getter(self, keys: Sequence[_KeyIndexType]) -> _TupleGetterType:
1193 """return a callable that will retrieve the given keys from a
1194 :class:`_engine.Row`.
1195
1196 """
1197 if self._source_supports_scalars:
1198 raise NotImplementedError(
1199 "can't use this function in 'only scalars' mode"
1200 )
1201 return self._metadata._row_as_tuple_getter(keys)
1202
1203 def mappings(self) -> MappingResult:
1204 """Apply a mappings filter to returned rows, returning an instance of
1205 :class:`_engine.MappingResult`.
1206
1207 When this filter is applied, fetching rows will return
1208 :class:`_engine.RowMapping` objects instead of :class:`_engine.Row`
1209 objects.
1210
1211 .. versionadded:: 1.4
1212
1213 :return: a new :class:`_engine.MappingResult` filtering object
1214 referring to this :class:`_engine.Result` object.
1215
1216 """
1217
1218 return MappingResult(self)
1219
1220 @property
1221 def t(self) -> TupleResult[_TP]:
1222 """Apply a "typed tuple" typing filter to returned rows.
1223
1224 The :attr:`_engine.Result.t` attribute is a synonym for
1225 calling the :meth:`_engine.Result.tuples` method.
1226
1227 .. versionadded:: 2.0
1228
1229 """
1230 return self # type: ignore
1231
1232 def tuples(self) -> TupleResult[_TP]:
1233 """Apply a "typed tuple" typing filter to returned rows.
1234
1235 This method returns the same :class:`_engine.Result` object
1236 at runtime,
1237 however annotates as returning a :class:`_engine.TupleResult` object
1238 that will indicate to :pep:`484` typing tools that plain typed
1239 ``Tuple`` instances are returned rather than rows. This allows
1240 tuple unpacking and ``__getitem__`` access of :class:`_engine.Row`
1241 objects to by typed, for those cases where the statement invoked
1242 itself included typing information.
1243
1244 .. versionadded:: 2.0
1245
1246 :return: the :class:`_engine.TupleResult` type at typing time.
1247
1248 .. seealso::
1249
1250 :attr:`_engine.Result.t` - shorter synonym
1251
1252 :attr:`_engine.Row._t` - :class:`_engine.Row` version
1253
1254 """
1255
1256 return self # type: ignore
1257
1258 def _raw_row_iterator(self) -> Iterator[_RowData]:
1259 """Return a safe iterator that yields raw row data.
1260
1261 This is used by the :meth:`_engine.Result.merge` method
1262 to merge multiple compatible results together.
1263
1264 """
1265 raise NotImplementedError()
1266
1267 def __iter__(self) -> Iterator[Row[_TP]]:
1268 return self._iter_impl()
1269
1270 def __next__(self) -> Row[_TP]:
1271 return self._next_impl()
1272
1273 def partitions(
1274 self, size: Optional[int] = None
1275 ) -> Iterator[Sequence[Row[_TP]]]:
1276 """Iterate through sub-lists of rows of the size given.
1277
1278 Each list will be of the size given, excluding the last list to
1279 be yielded, which may have a small number of rows. No empty
1280 lists will be yielded.
1281
1282 The result object is automatically closed when the iterator
1283 is fully consumed.
1284
1285 Note that the backend driver will usually buffer the entire result
1286 ahead of time unless the
1287 :paramref:`.Connection.execution_options.stream_results` execution
1288 option is used indicating that the driver should not pre-buffer
1289 results, if possible. Not all drivers support this option and
1290 the option is silently ignored for those who do not.
1291
1292 When using the ORM, the :meth:`_engine.Result.partitions` method
1293 is typically more effective from a memory perspective when it is
1294 combined with use of the
1295 :ref:`yield_per execution option <orm_queryguide_yield_per>`,
1296 which instructs both the DBAPI driver to use server side cursors,
1297 if available, as well as instructs the ORM loading internals to only
1298 build a certain amount of ORM objects from a result at a time before
1299 yielding them out.
1300
1301 .. versionadded:: 1.4
1302
1303 :param size: indicate the maximum number of rows to be present
1304 in each list yielded. If None, makes use of the value set by
1305 the :meth:`_engine.Result.yield_per`, method, if it were called,
1306 or the :paramref:`_engine.Connection.execution_options.yield_per`
1307 execution option, which is equivalent in this regard. If
1308 yield_per weren't set, it makes use of the
1309 :meth:`_engine.Result.fetchmany` default, which may be backend
1310 specific and not well defined.
1311
1312 :return: iterator of lists
1313
1314 .. seealso::
1315
1316 :ref:`engine_stream_results`
1317
1318 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1319
1320 """
1321
1322 getter = self._manyrow_getter
1323
1324 while True:
1325 partition = getter(self, size)
1326 if partition:
1327 yield partition
1328 else:
1329 break
1330
1331 def fetchall(self) -> Sequence[Row[_TP]]:
1332 """A synonym for the :meth:`_engine.Result.all` method."""
1333
1334 return self._allrows()
1335
1336 def fetchone(self) -> Optional[Row[_TP]]:
1337 """Fetch one row.
1338
1339 When all rows are exhausted, returns None.
1340
1341 This method is provided for backwards compatibility with
1342 SQLAlchemy 1.x.x.
1343
1344 To fetch the first row of a result only, use the
1345 :meth:`_engine.Result.first` method. To iterate through all
1346 rows, iterate the :class:`_engine.Result` object directly.
1347
1348 :return: a :class:`_engine.Row` object if no filters are applied,
1349 or ``None`` if no rows remain.
1350
1351 """
1352 row = self._onerow_getter(self)
1353 if row is _NO_ROW:
1354 return None
1355 else:
1356 return row
1357
1358 def fetchmany(self, size: Optional[int] = None) -> Sequence[Row[_TP]]:
1359 """Fetch many rows.
1360
1361 When all rows are exhausted, returns an empty sequence.
1362
1363 This method is provided for backwards compatibility with
1364 SQLAlchemy 1.x.x.
1365
1366 To fetch rows in groups, use the :meth:`_engine.Result.partitions`
1367 method.
1368
1369 :return: a sequence of :class:`_engine.Row` objects.
1370
1371 .. seealso::
1372
1373 :meth:`_engine.Result.partitions`
1374
1375 """
1376
1377 return self._manyrow_getter(self, size)
1378
1379 def all(self) -> Sequence[Row[_TP]]:
1380 """Return all rows in a sequence.
1381
1382 Closes the result set after invocation. Subsequent invocations
1383 will return an empty sequence.
1384
1385 .. versionadded:: 1.4
1386
1387 :return: a sequence of :class:`_engine.Row` objects.
1388
1389 .. seealso::
1390
1391 :ref:`engine_stream_results` - How to stream a large result set
1392 without loading it completely in python.
1393
1394 """
1395
1396 return self._allrows()
1397
1398 def first(self) -> Optional[Row[_TP]]:
1399 """Fetch the first row or ``None`` if no row is present.
1400
1401 Closes the result set and discards remaining rows.
1402
1403 .. note:: This method returns one **row**, e.g. tuple, by default.
1404 To return exactly one single scalar value, that is, the first
1405 column of the first row, use the
1406 :meth:`_engine.Result.scalar` method,
1407 or combine :meth:`_engine.Result.scalars` and
1408 :meth:`_engine.Result.first`.
1409
1410 Additionally, in contrast to the behavior of the legacy ORM
1411 :meth:`_orm.Query.first` method, **no limit is applied** to the
1412 SQL query which was invoked to produce this
1413 :class:`_engine.Result`;
1414 for a DBAPI driver that buffers results in memory before yielding
1415 rows, all rows will be sent to the Python process and all but
1416 the first row will be discarded.
1417
1418 .. seealso::
1419
1420 :ref:`migration_20_unify_select`
1421
1422 :return: a :class:`_engine.Row` object, or None
1423 if no rows remain.
1424
1425 .. seealso::
1426
1427 :meth:`_engine.Result.scalar`
1428
1429 :meth:`_engine.Result.one`
1430
1431 """
1432
1433 return self._only_one_row(
1434 raise_for_second_row=False, raise_for_none=False, scalar=False
1435 )
1436
1437 def one_or_none(self) -> Optional[Row[_TP]]:
1438 """Return at most one result or raise an exception.
1439
1440 Returns ``None`` if the result has no rows.
1441 Raises :class:`.MultipleResultsFound`
1442 if multiple rows are returned.
1443
1444 .. versionadded:: 1.4
1445
1446 :return: The first :class:`_engine.Row` or ``None`` if no row
1447 is available.
1448
1449 :raises: :class:`.MultipleResultsFound`
1450
1451 .. seealso::
1452
1453 :meth:`_engine.Result.first`
1454
1455 :meth:`_engine.Result.one`
1456
1457 """
1458 return self._only_one_row(
1459 raise_for_second_row=True, raise_for_none=False, scalar=False
1460 )
1461
1462 @overload
1463 def scalar_one(self: Result[Tuple[_T]]) -> _T: ...
1464
1465 @overload
1466 def scalar_one(self) -> Any: ...
1467
1468 def scalar_one(self) -> Any:
1469 """Return exactly one scalar result or raise an exception.
1470
1471 This is equivalent to calling :meth:`_engine.Result.scalars` and
1472 then :meth:`_engine.ScalarResult.one`.
1473
1474 .. seealso::
1475
1476 :meth:`_engine.ScalarResult.one`
1477
1478 :meth:`_engine.Result.scalars`
1479
1480 """
1481 return self._only_one_row(
1482 raise_for_second_row=True, raise_for_none=True, scalar=True
1483 )
1484
1485 @overload
1486 def scalar_one_or_none(self: Result[Tuple[_T]]) -> Optional[_T]: ...
1487
1488 @overload
1489 def scalar_one_or_none(self) -> Optional[Any]: ...
1490
1491 def scalar_one_or_none(self) -> Optional[Any]:
1492 """Return exactly one scalar result or ``None``.
1493
1494 This is equivalent to calling :meth:`_engine.Result.scalars` and
1495 then :meth:`_engine.ScalarResult.one_or_none`.
1496
1497 .. seealso::
1498
1499 :meth:`_engine.ScalarResult.one_or_none`
1500
1501 :meth:`_engine.Result.scalars`
1502
1503 """
1504 return self._only_one_row(
1505 raise_for_second_row=True, raise_for_none=False, scalar=True
1506 )
1507
1508 def one(self) -> Row[_TP]:
1509 """Return exactly one row or raise an exception.
1510
1511 Raises :class:`_exc.NoResultFound` if the result returns no
1512 rows, or :class:`_exc.MultipleResultsFound` if multiple rows
1513 would be returned.
1514
1515 .. note:: This method returns one **row**, e.g. tuple, by default.
1516 To return exactly one single scalar value, that is, the first
1517 column of the first row, use the
1518 :meth:`_engine.Result.scalar_one` method, or combine
1519 :meth:`_engine.Result.scalars` and
1520 :meth:`_engine.Result.one`.
1521
1522 .. versionadded:: 1.4
1523
1524 :return: The first :class:`_engine.Row`.
1525
1526 :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound`
1527
1528 .. seealso::
1529
1530 :meth:`_engine.Result.first`
1531
1532 :meth:`_engine.Result.one_or_none`
1533
1534 :meth:`_engine.Result.scalar_one`
1535
1536 """
1537 return self._only_one_row(
1538 raise_for_second_row=True, raise_for_none=True, scalar=False
1539 )
1540
1541 @overload
1542 def scalar(self: Result[Tuple[_T]]) -> Optional[_T]: ...
1543
1544 @overload
1545 def scalar(self) -> Any: ...
1546
1547 def scalar(self) -> Any:
1548 """Fetch the first column of the first row, and close the result set.
1549
1550 Returns ``None`` if there are no rows to fetch.
1551
1552 No validation is performed to test if additional rows remain.
1553
1554 After calling this method, the object is fully closed,
1555 e.g. the :meth:`_engine.CursorResult.close`
1556 method will have been called.
1557
1558 :return: a Python scalar value, or ``None`` if no rows remain.
1559
1560 """
1561 return self._only_one_row(
1562 raise_for_second_row=False, raise_for_none=False, scalar=True
1563 )
1564
1565 def freeze(self) -> FrozenResult[_TP]:
1566 """Return a callable object that will produce copies of this
1567 :class:`_engine.Result` when invoked.
1568
1569 The callable object returned is an instance of
1570 :class:`_engine.FrozenResult`.
1571
1572 This is used for result set caching. The method must be called
1573 on the result when it has been unconsumed, and calling the method
1574 will consume the result fully. When the :class:`_engine.FrozenResult`
1575 is retrieved from a cache, it can be called any number of times where
1576 it will produce a new :class:`_engine.Result` object each time
1577 against its stored set of rows.
1578
1579 .. seealso::
1580
1581 :ref:`do_orm_execute_re_executing` - example usage within the
1582 ORM to implement a result-set cache.
1583
1584 """
1585
1586 return FrozenResult(self)
1587
1588 def merge(self, *others: Result[Any]) -> MergedResult[_TP]:
1589 """Merge this :class:`_engine.Result` with other compatible result
1590 objects.
1591
1592 The object returned is an instance of :class:`_engine.MergedResult`,
1593 which will be composed of iterators from the given result
1594 objects.
1595
1596 The new result will use the metadata from this result object.
1597 The subsequent result objects must be against an identical
1598 set of result / cursor metadata, otherwise the behavior is
1599 undefined.
1600
1601 """
1602 return MergedResult(self._metadata, (self,) + others)
1603
1604
1605class FilterResult(ResultInternal[_R]):
1606 """A wrapper for a :class:`_engine.Result` that returns objects other than
1607 :class:`_engine.Row` objects, such as dictionaries or scalar objects.
1608
1609 :class:`_engine.FilterResult` is the common base for additional result
1610 APIs including :class:`_engine.MappingResult`,
1611 :class:`_engine.ScalarResult` and :class:`_engine.AsyncResult`.
1612
1613 """
1614
1615 __slots__ = (
1616 "_real_result",
1617 "_post_creational_filter",
1618 "_metadata",
1619 "_unique_filter_state",
1620 "__dict__",
1621 )
1622
1623 _post_creational_filter: Optional[Callable[[Any], Any]]
1624
1625 _real_result: Result[Any]
1626
1627 def __enter__(self) -> Self:
1628 return self
1629
1630 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
1631 self._real_result.__exit__(type_, value, traceback)
1632
1633 @_generative
1634 def yield_per(self, num: int) -> Self:
1635 """Configure the row-fetching strategy to fetch ``num`` rows at a time.
1636
1637 The :meth:`_engine.FilterResult.yield_per` method is a pass through
1638 to the :meth:`_engine.Result.yield_per` method. See that method's
1639 documentation for usage notes.
1640
1641 .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per`
1642 so that the method is available on all result set implementations
1643
1644 .. seealso::
1645
1646 :ref:`engine_stream_results` - describes Core behavior for
1647 :meth:`_engine.Result.yield_per`
1648
1649 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1650
1651 """
1652 self._real_result = self._real_result.yield_per(num)
1653 return self
1654
1655 def _soft_close(self, hard: bool = False) -> None:
1656 self._real_result._soft_close(hard=hard)
1657
1658 @property
1659 def _soft_closed(self) -> bool:
1660 return self._real_result._soft_closed
1661
1662 @property
1663 def closed(self) -> bool:
1664 """Return ``True`` if the underlying :class:`_engine.Result` reports
1665 closed
1666
1667 .. versionadded:: 1.4.43
1668
1669 """
1670 return self._real_result.closed
1671
1672 def close(self) -> None:
1673 """Close this :class:`_engine.FilterResult`.
1674
1675 .. versionadded:: 1.4.43
1676
1677 """
1678 self._real_result.close()
1679
1680 @property
1681 def _attributes(self) -> Dict[Any, Any]:
1682 return self._real_result._attributes
1683
1684 def _fetchiter_impl(self) -> Iterator[_InterimRowType[Row[Any]]]:
1685 return self._real_result._fetchiter_impl()
1686
1687 def _fetchone_impl(
1688 self, hard_close: bool = False
1689 ) -> Optional[_InterimRowType[Row[Any]]]:
1690 return self._real_result._fetchone_impl(hard_close=hard_close)
1691
1692 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
1693 return self._real_result._fetchall_impl()
1694
1695 def _fetchmany_impl(
1696 self, size: Optional[int] = None
1697 ) -> List[_InterimRowType[Row[Any]]]:
1698 return self._real_result._fetchmany_impl(size=size)
1699
1700
1701class ScalarResult(FilterResult[_R]):
1702 """A wrapper for a :class:`_engine.Result` that returns scalar values
1703 rather than :class:`_row.Row` values.
1704
1705 The :class:`_engine.ScalarResult` object is acquired by calling the
1706 :meth:`_engine.Result.scalars` method.
1707
1708 A special limitation of :class:`_engine.ScalarResult` is that it has
1709 no ``fetchone()`` method; since the semantics of ``fetchone()`` are that
1710 the ``None`` value indicates no more results, this is not compatible
1711 with :class:`_engine.ScalarResult` since there is no way to distinguish
1712 between ``None`` as a row value versus ``None`` as an indicator. Use
1713 ``next(result)`` to receive values individually.
1714
1715 """
1716
1717 __slots__ = ()
1718
1719 _generate_rows = False
1720
1721 _post_creational_filter: Optional[Callable[[Any], Any]]
1722
1723 def __init__(self, real_result: Result[Any], index: _KeyIndexType):
1724 self._real_result = real_result
1725
1726 if real_result._source_supports_scalars:
1727 self._metadata = real_result._metadata
1728 self._post_creational_filter = None
1729 else:
1730 self._metadata = real_result._metadata._reduce([index])
1731 self._post_creational_filter = operator.itemgetter(0)
1732
1733 self._unique_filter_state = real_result._unique_filter_state
1734
1735 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
1736 """Apply unique filtering to the objects returned by this
1737 :class:`_engine.ScalarResult`.
1738
1739 See :meth:`_engine.Result.unique` for usage details.
1740
1741 """
1742 self._unique_filter_state = (set(), strategy)
1743 return self
1744
1745 def partitions(self, size: Optional[int] = None) -> Iterator[Sequence[_R]]:
1746 """Iterate through sub-lists of elements of the size given.
1747
1748 Equivalent to :meth:`_engine.Result.partitions` except that
1749 scalar values, rather than :class:`_engine.Row` objects,
1750 are returned.
1751
1752 """
1753
1754 getter = self._manyrow_getter
1755
1756 while True:
1757 partition = getter(self, size)
1758 if partition:
1759 yield partition
1760 else:
1761 break
1762
1763 def fetchall(self) -> Sequence[_R]:
1764 """A synonym for the :meth:`_engine.ScalarResult.all` method."""
1765
1766 return self._allrows()
1767
1768 def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]:
1769 """Fetch many objects.
1770
1771 Equivalent to :meth:`_engine.Result.fetchmany` except that
1772 scalar values, rather than :class:`_engine.Row` objects,
1773 are returned.
1774
1775 """
1776 return self._manyrow_getter(self, size)
1777
1778 def all(self) -> Sequence[_R]:
1779 """Return all scalar values in a sequence.
1780
1781 Equivalent to :meth:`_engine.Result.all` except that
1782 scalar values, rather than :class:`_engine.Row` objects,
1783 are returned.
1784
1785 """
1786 return self._allrows()
1787
1788 def __iter__(self) -> Iterator[_R]:
1789 return self._iter_impl()
1790
1791 def __next__(self) -> _R:
1792 return self._next_impl()
1793
1794 def first(self) -> Optional[_R]:
1795 """Fetch the first object or ``None`` if no object is present.
1796
1797 Equivalent to :meth:`_engine.Result.first` except that
1798 scalar values, rather than :class:`_engine.Row` objects,
1799 are returned.
1800
1801
1802 """
1803 return self._only_one_row(
1804 raise_for_second_row=False, raise_for_none=False, scalar=False
1805 )
1806
1807 def one_or_none(self) -> Optional[_R]:
1808 """Return at most one object or raise an exception.
1809
1810 Equivalent to :meth:`_engine.Result.one_or_none` except that
1811 scalar values, rather than :class:`_engine.Row` objects,
1812 are returned.
1813
1814 """
1815 return self._only_one_row(
1816 raise_for_second_row=True, raise_for_none=False, scalar=False
1817 )
1818
1819 def one(self) -> _R:
1820 """Return exactly one object or raise an exception.
1821
1822 Equivalent to :meth:`_engine.Result.one` except that
1823 scalar values, rather than :class:`_engine.Row` objects,
1824 are returned.
1825
1826 """
1827 return self._only_one_row(
1828 raise_for_second_row=True, raise_for_none=True, scalar=False
1829 )
1830
1831
1832class TupleResult(FilterResult[_R], util.TypingOnly):
1833 """A :class:`_engine.Result` that's typed as returning plain
1834 Python tuples instead of rows.
1835
1836 Since :class:`_engine.Row` acts like a tuple in every way already,
1837 this class is a typing only class, regular :class:`_engine.Result` is
1838 still used at runtime.
1839
1840 """
1841
1842 __slots__ = ()
1843
1844 if TYPE_CHECKING:
1845
1846 def partitions(
1847 self, size: Optional[int] = None
1848 ) -> Iterator[Sequence[_R]]:
1849 """Iterate through sub-lists of elements of the size given.
1850
1851 Equivalent to :meth:`_engine.Result.partitions` except that
1852 tuple values, rather than :class:`_engine.Row` objects,
1853 are returned.
1854
1855 """
1856 ...
1857
1858 def fetchone(self) -> Optional[_R]:
1859 """Fetch one tuple.
1860
1861 Equivalent to :meth:`_engine.Result.fetchone` except that
1862 tuple values, rather than :class:`_engine.Row`
1863 objects, are returned.
1864
1865 """
1866 ...
1867
1868 def fetchall(self) -> Sequence[_R]:
1869 """A synonym for the :meth:`_engine.ScalarResult.all` method."""
1870 ...
1871
1872 def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]:
1873 """Fetch many objects.
1874
1875 Equivalent to :meth:`_engine.Result.fetchmany` except that
1876 tuple values, rather than :class:`_engine.Row` objects,
1877 are returned.
1878
1879 """
1880 ...
1881
1882 def all(self) -> Sequence[_R]: # noqa: A001
1883 """Return all scalar values in a sequence.
1884
1885 Equivalent to :meth:`_engine.Result.all` except that
1886 tuple values, rather than :class:`_engine.Row` objects,
1887 are returned.
1888
1889 """
1890 ...
1891
1892 def __iter__(self) -> Iterator[_R]: ...
1893
1894 def __next__(self) -> _R: ...
1895
1896 def first(self) -> Optional[_R]:
1897 """Fetch the first object or ``None`` if no object is present.
1898
1899 Equivalent to :meth:`_engine.Result.first` except that
1900 tuple values, rather than :class:`_engine.Row` objects,
1901 are returned.
1902
1903
1904 """
1905 ...
1906
1907 def one_or_none(self) -> Optional[_R]:
1908 """Return at most one object or raise an exception.
1909
1910 Equivalent to :meth:`_engine.Result.one_or_none` except that
1911 tuple values, rather than :class:`_engine.Row` objects,
1912 are returned.
1913
1914 """
1915 ...
1916
1917 def one(self) -> _R:
1918 """Return exactly one object or raise an exception.
1919
1920 Equivalent to :meth:`_engine.Result.one` except that
1921 tuple values, rather than :class:`_engine.Row` objects,
1922 are returned.
1923
1924 """
1925 ...
1926
1927 @overload
1928 def scalar_one(self: TupleResult[Tuple[_T]]) -> _T: ...
1929
1930 @overload
1931 def scalar_one(self) -> Any: ...
1932
1933 def scalar_one(self) -> Any:
1934 """Return exactly one scalar result or raise an exception.
1935
1936 This is equivalent to calling :meth:`_engine.Result.scalars`
1937 and then :meth:`_engine.ScalarResult.one`.
1938
1939 .. seealso::
1940
1941 :meth:`_engine.ScalarResult.one`
1942
1943 :meth:`_engine.Result.scalars`
1944
1945 """
1946 ...
1947
1948 @overload
1949 def scalar_one_or_none(
1950 self: TupleResult[Tuple[_T]],
1951 ) -> Optional[_T]: ...
1952
1953 @overload
1954 def scalar_one_or_none(self) -> Optional[Any]: ...
1955
1956 def scalar_one_or_none(self) -> Optional[Any]:
1957 """Return exactly one or no scalar result.
1958
1959 This is equivalent to calling :meth:`_engine.Result.scalars`
1960 and then :meth:`_engine.ScalarResult.one_or_none`.
1961
1962 .. seealso::
1963
1964 :meth:`_engine.ScalarResult.one_or_none`
1965
1966 :meth:`_engine.Result.scalars`
1967
1968 """
1969 ...
1970
1971 @overload
1972 def scalar(self: TupleResult[Tuple[_T]]) -> Optional[_T]: ...
1973
1974 @overload
1975 def scalar(self) -> Any: ...
1976
1977 def scalar(self) -> Any:
1978 """Fetch the first column of the first row, and close the result
1979 set.
1980
1981 Returns ``None`` if there are no rows to fetch.
1982
1983 No validation is performed to test if additional rows remain.
1984
1985 After calling this method, the object is fully closed,
1986 e.g. the :meth:`_engine.CursorResult.close`
1987 method will have been called.
1988
1989 :return: a Python scalar value , or ``None`` if no rows remain.
1990
1991 """
1992 ...
1993
1994
1995class MappingResult(_WithKeys, FilterResult[RowMapping]):
1996 """A wrapper for a :class:`_engine.Result` that returns dictionary values
1997 rather than :class:`_engine.Row` values.
1998
1999 The :class:`_engine.MappingResult` object is acquired by calling the
2000 :meth:`_engine.Result.mappings` method.
2001
2002 """
2003
2004 __slots__ = ()
2005
2006 _generate_rows = True
2007
2008 _post_creational_filter = operator.attrgetter("_mapping")
2009
2010 def __init__(self, result: Result[Any]):
2011 self._real_result = result
2012 self._unique_filter_state = result._unique_filter_state
2013 self._metadata = result._metadata
2014 if result._source_supports_scalars:
2015 self._metadata = self._metadata._reduce([0])
2016
2017 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
2018 """Apply unique filtering to the objects returned by this
2019 :class:`_engine.MappingResult`.
2020
2021 See :meth:`_engine.Result.unique` for usage details.
2022
2023 """
2024 self._unique_filter_state = (set(), strategy)
2025 return self
2026
2027 def columns(self, *col_expressions: _KeyIndexType) -> Self:
2028 """Establish the columns that should be returned in each row."""
2029 return self._column_slices(col_expressions)
2030
2031 def partitions(
2032 self, size: Optional[int] = None
2033 ) -> Iterator[Sequence[RowMapping]]:
2034 """Iterate through sub-lists of elements of the size given.
2035
2036 Equivalent to :meth:`_engine.Result.partitions` except that
2037 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2038 objects, are returned.
2039
2040 """
2041
2042 getter = self._manyrow_getter
2043
2044 while True:
2045 partition = getter(self, size)
2046 if partition:
2047 yield partition
2048 else:
2049 break
2050
2051 def fetchall(self) -> Sequence[RowMapping]:
2052 """A synonym for the :meth:`_engine.MappingResult.all` method."""
2053
2054 return self._allrows()
2055
2056 def fetchone(self) -> Optional[RowMapping]:
2057 """Fetch one object.
2058
2059 Equivalent to :meth:`_engine.Result.fetchone` except that
2060 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2061 objects, are returned.
2062
2063 """
2064
2065 row = self._onerow_getter(self)
2066 if row is _NO_ROW:
2067 return None
2068 else:
2069 return row
2070
2071 def fetchmany(self, size: Optional[int] = None) -> Sequence[RowMapping]:
2072 """Fetch many objects.
2073
2074 Equivalent to :meth:`_engine.Result.fetchmany` except that
2075 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2076 objects, are returned.
2077
2078 """
2079
2080 return self._manyrow_getter(self, size)
2081
2082 def all(self) -> Sequence[RowMapping]:
2083 """Return all scalar values in a sequence.
2084
2085 Equivalent to :meth:`_engine.Result.all` except that
2086 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2087 objects, are returned.
2088
2089 """
2090
2091 return self._allrows()
2092
2093 def __iter__(self) -> Iterator[RowMapping]:
2094 return self._iter_impl()
2095
2096 def __next__(self) -> RowMapping:
2097 return self._next_impl()
2098
2099 def first(self) -> Optional[RowMapping]:
2100 """Fetch the first object or ``None`` if no object is present.
2101
2102 Equivalent to :meth:`_engine.Result.first` except that
2103 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2104 objects, are returned.
2105
2106
2107 """
2108 return self._only_one_row(
2109 raise_for_second_row=False, raise_for_none=False, scalar=False
2110 )
2111
2112 def one_or_none(self) -> Optional[RowMapping]:
2113 """Return at most one object or raise an exception.
2114
2115 Equivalent to :meth:`_engine.Result.one_or_none` except that
2116 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2117 objects, are returned.
2118
2119 """
2120 return self._only_one_row(
2121 raise_for_second_row=True, raise_for_none=False, scalar=False
2122 )
2123
2124 def one(self) -> RowMapping:
2125 """Return exactly one object or raise an exception.
2126
2127 Equivalent to :meth:`_engine.Result.one` except that
2128 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2129 objects, are returned.
2130
2131 """
2132 return self._only_one_row(
2133 raise_for_second_row=True, raise_for_none=True, scalar=False
2134 )
2135
2136
2137class FrozenResult(Generic[_TP]):
2138 """Represents a :class:`_engine.Result` object in a "frozen" state suitable
2139 for caching.
2140
2141 The :class:`_engine.FrozenResult` object is returned from the
2142 :meth:`_engine.Result.freeze` method of any :class:`_engine.Result`
2143 object.
2144
2145 A new iterable :class:`_engine.Result` object is generated from a fixed
2146 set of data each time the :class:`_engine.FrozenResult` is invoked as
2147 a callable::
2148
2149
2150 result = connection.execute(query)
2151
2152 frozen = result.freeze()
2153
2154 unfrozen_result_one = frozen()
2155
2156 for row in unfrozen_result_one:
2157 print(row)
2158
2159 unfrozen_result_two = frozen()
2160 rows = unfrozen_result_two.all()
2161
2162 # ... etc
2163
2164 .. versionadded:: 1.4
2165
2166 .. seealso::
2167
2168 :ref:`do_orm_execute_re_executing` - example usage within the
2169 ORM to implement a result-set cache.
2170
2171 :func:`_orm.loading.merge_frozen_result` - ORM function to merge
2172 a frozen result back into a :class:`_orm.Session`.
2173
2174 """
2175
2176 data: Sequence[Any]
2177
2178 def __init__(self, result: Result[_TP]):
2179 self.metadata = result._metadata._for_freeze()
2180 self._source_supports_scalars = result._source_supports_scalars
2181 self._attributes = result._attributes
2182
2183 if self._source_supports_scalars:
2184 self.data = list(result._raw_row_iterator())
2185 else:
2186 self.data = result.fetchall()
2187
2188 def rewrite_rows(self) -> Sequence[Sequence[Any]]:
2189 if self._source_supports_scalars:
2190 return [[elem] for elem in self.data]
2191 else:
2192 return [list(row) for row in self.data]
2193
2194 def with_new_rows(
2195 self, tuple_data: Sequence[Row[_TP]]
2196 ) -> FrozenResult[_TP]:
2197 fr = FrozenResult.__new__(FrozenResult)
2198 fr.metadata = self.metadata
2199 fr._attributes = self._attributes
2200 fr._source_supports_scalars = self._source_supports_scalars
2201
2202 if self._source_supports_scalars:
2203 fr.data = [d[0] for d in tuple_data]
2204 else:
2205 fr.data = tuple_data
2206 return fr
2207
2208 def __call__(self) -> Result[_TP]:
2209 result: IteratorResult[_TP] = IteratorResult(
2210 self.metadata, iter(self.data)
2211 )
2212 result._attributes = self._attributes
2213 result._source_supports_scalars = self._source_supports_scalars
2214 return result
2215
2216
2217class IteratorResult(Result[_TP]):
2218 """A :class:`_engine.Result` that gets data from a Python iterator of
2219 :class:`_engine.Row` objects or similar row-like data.
2220
2221 .. versionadded:: 1.4
2222
2223 """
2224
2225 _hard_closed = False
2226 _soft_closed = False
2227
2228 def __init__(
2229 self,
2230 cursor_metadata: ResultMetaData,
2231 iterator: Iterator[_InterimSupportsScalarsRowType],
2232 raw: Optional[Result[Any]] = None,
2233 _source_supports_scalars: bool = False,
2234 ):
2235 self._metadata = cursor_metadata
2236 self.iterator = iterator
2237 self.raw = raw
2238 self._source_supports_scalars = _source_supports_scalars
2239
2240 @property
2241 def closed(self) -> bool:
2242 """Return ``True`` if this :class:`_engine.IteratorResult` has
2243 been closed
2244
2245 .. versionadded:: 1.4.43
2246
2247 """
2248 return self._hard_closed
2249
2250 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2251 if hard:
2252 self._hard_closed = True
2253 if self.raw is not None:
2254 self.raw._soft_close(hard=hard, **kw)
2255 self.iterator = iter([])
2256 self._reset_memoizations()
2257 self._soft_closed = True
2258
2259 def _raise_hard_closed(self) -> NoReturn:
2260 raise exc.ResourceClosedError("This result object is closed.")
2261
2262 def _raw_row_iterator(self) -> Iterator[_RowData]:
2263 return self.iterator
2264
2265 def _fetchiter_impl(self) -> Iterator[_InterimSupportsScalarsRowType]:
2266 if self._hard_closed:
2267 self._raise_hard_closed()
2268 return self.iterator
2269
2270 def _fetchone_impl(
2271 self, hard_close: bool = False
2272 ) -> Optional[_InterimRowType[Row[Any]]]:
2273 if self._hard_closed:
2274 self._raise_hard_closed()
2275
2276 row = next(self.iterator, _NO_ROW)
2277 if row is _NO_ROW:
2278 self._soft_close(hard=hard_close)
2279 return None
2280 else:
2281 return row
2282
2283 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
2284 if self._hard_closed:
2285 self._raise_hard_closed()
2286 try:
2287 return list(self.iterator)
2288 finally:
2289 self._soft_close()
2290
2291 def _fetchmany_impl(
2292 self, size: Optional[int] = None
2293 ) -> List[_InterimRowType[Row[Any]]]:
2294 if self._hard_closed:
2295 self._raise_hard_closed()
2296
2297 return list(itertools.islice(self.iterator, 0, size))
2298
2299
2300def null_result() -> IteratorResult[Any]:
2301 return IteratorResult(SimpleResultMetaData([]), iter([]))
2302
2303
2304class ChunkedIteratorResult(IteratorResult[_TP]):
2305 """An :class:`_engine.IteratorResult` that works from an
2306 iterator-producing callable.
2307
2308 The given ``chunks`` argument is a function that is given a number of rows
2309 to return in each chunk, or ``None`` for all rows. The function should
2310 then return an un-consumed iterator of lists, each list of the requested
2311 size.
2312
2313 The function can be called at any time again, in which case it should
2314 continue from the same result set but adjust the chunk size as given.
2315
2316 .. versionadded:: 1.4
2317
2318 """
2319
2320 def __init__(
2321 self,
2322 cursor_metadata: ResultMetaData,
2323 chunks: Callable[
2324 [Optional[int]], Iterator[Sequence[_InterimRowType[_R]]]
2325 ],
2326 source_supports_scalars: bool = False,
2327 raw: Optional[Result[Any]] = None,
2328 dynamic_yield_per: bool = False,
2329 ):
2330 self._metadata = cursor_metadata
2331 self.chunks = chunks
2332 self._source_supports_scalars = source_supports_scalars
2333 self.raw = raw
2334 self.iterator = itertools.chain.from_iterable(self.chunks(None))
2335 self.dynamic_yield_per = dynamic_yield_per
2336
2337 @_generative
2338 def yield_per(self, num: int) -> Self:
2339 # TODO: this throws away the iterator which may be holding
2340 # onto a chunk. the yield_per cannot be changed once any
2341 # rows have been fetched. either find a way to enforce this,
2342 # or we can't use itertools.chain and will instead have to
2343 # keep track.
2344
2345 self._yield_per = num
2346 self.iterator = itertools.chain.from_iterable(self.chunks(num))
2347 return self
2348
2349 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2350 super()._soft_close(hard=hard, **kw)
2351 self.chunks = lambda size: [] # type: ignore
2352
2353 def _fetchmany_impl(
2354 self, size: Optional[int] = None
2355 ) -> List[_InterimRowType[Row[Any]]]:
2356 if self.dynamic_yield_per:
2357 self.iterator = itertools.chain.from_iterable(self.chunks(size))
2358 return super()._fetchmany_impl(size=size)
2359
2360
2361class MergedResult(IteratorResult[_TP]):
2362 """A :class:`_engine.Result` that is merged from any number of
2363 :class:`_engine.Result` objects.
2364
2365 Returned by the :meth:`_engine.Result.merge` method.
2366
2367 .. versionadded:: 1.4
2368
2369 """
2370
2371 closed = False
2372 rowcount: Optional[int]
2373
2374 def __init__(
2375 self, cursor_metadata: ResultMetaData, results: Sequence[Result[_TP]]
2376 ):
2377 self._results = results
2378 super().__init__(
2379 cursor_metadata,
2380 itertools.chain.from_iterable(
2381 r._raw_row_iterator() for r in results
2382 ),
2383 )
2384
2385 self._unique_filter_state = results[0]._unique_filter_state
2386 self._yield_per = results[0]._yield_per
2387
2388 # going to try something w/ this in next rev
2389 self._source_supports_scalars = results[0]._source_supports_scalars
2390
2391 self._attributes = self._attributes.merge_with(
2392 *[r._attributes for r in results]
2393 )
2394
2395 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2396 for r in self._results:
2397 r._soft_close(hard=hard, **kw)
2398 if hard:
2399 self.closed = True