1# engine/result.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Define generic result set constructs."""
9
10from __future__ import annotations
11
12from enum import Enum
13import functools
14import itertools
15import operator
16import typing
17from typing import Any
18from typing import Callable
19from typing import cast
20from typing import Dict
21from typing import Generic
22from typing import Iterable
23from typing import Iterator
24from typing import List
25from typing import Mapping
26from typing import NoReturn
27from typing import Optional
28from typing import overload
29from typing import Sequence
30from typing import Set
31from typing import Tuple
32from typing import TYPE_CHECKING
33from typing import TypeVar
34from typing import Union
35
36from .row import Row
37from .row import RowMapping
38from .. import exc
39from .. import util
40from ..sql.base import _generative
41from ..sql.base import HasMemoized
42from ..sql.base import InPlaceGenerative
43from ..util import HasMemoized_ro_memoized_attribute
44from ..util import NONE_SET
45from ..util._has_cy import HAS_CYEXTENSION
46from ..util.typing import Literal
47from ..util.typing import Self
48
49if typing.TYPE_CHECKING or not HAS_CYEXTENSION:
50 from ._py_row import tuplegetter as tuplegetter
51else:
52 from sqlalchemy.cyextension.resultproxy import tuplegetter as tuplegetter
53
54if typing.TYPE_CHECKING:
55 from typing import Type
56
57 from .. import inspection
58 from ..sql import roles
59 from ..sql._typing import _HasClauseElement
60 from ..sql.elements import SQLCoreOperations
61 from ..sql.type_api import _ResultProcessorType
62
63_KeyType = Union[
64 str,
65 "SQLCoreOperations[Any]",
66 "roles.TypedColumnsClauseRole[Any]",
67 "roles.ColumnsClauseRole",
68 "Type[Any]",
69 "inspection.Inspectable[_HasClauseElement[Any]]",
70]
71_KeyIndexType = Union[_KeyType, int]
72
73# is overridden in cursor using _CursorKeyMapRecType
74_KeyMapRecType = Any
75
76_KeyMapType = Mapping[_KeyType, _KeyMapRecType]
77
78
79_RowData = Union[Row[Any], RowMapping, Any]
80"""A generic form of "row" that accommodates for the different kinds of
81"rows" that different result objects return, including row, row mapping, and
82scalar values"""
83
84_RawRowType = Tuple[Any, ...]
85"""represents the kind of row we get from a DBAPI cursor"""
86
87_R = TypeVar("_R", bound=_RowData)
88_T = TypeVar("_T", bound=Any)
89_TP = TypeVar("_TP", bound=Tuple[Any, ...])
90
91_InterimRowType = Union[_R, _RawRowType]
92"""a catchall "anything" kind of return type that can be applied
93across all the result types
94
95"""
96
97_InterimSupportsScalarsRowType = Union[Row[Any], Any]
98
99_ProcessorsType = Sequence[Optional["_ResultProcessorType[Any]"]]
100_TupleGetterType = Callable[[Sequence[Any]], Sequence[Any]]
101_UniqueFilterType = Callable[[Any], Any]
102_UniqueFilterStateType = Tuple[Set[Any], Optional[_UniqueFilterType]]
103
104
105class ResultMetaData:
106 """Base for metadata about result rows."""
107
108 __slots__ = ()
109
110 _tuplefilter: Optional[_TupleGetterType] = None
111 _translated_indexes: Optional[Sequence[int]] = None
112 _unique_filters: Optional[Sequence[Callable[[Any], Any]]] = None
113 _keymap: _KeyMapType
114 _keys: Sequence[str]
115 _processors: Optional[_ProcessorsType]
116 _key_to_index: Mapping[_KeyType, int]
117
118 @property
119 def keys(self) -> RMKeyView:
120 return RMKeyView(self)
121
122 def _has_key(self, key: object) -> bool:
123 raise NotImplementedError()
124
125 def _for_freeze(self) -> ResultMetaData:
126 raise NotImplementedError()
127
128 @overload
129 def _key_fallback(
130 self, key: Any, err: Optional[Exception], raiseerr: Literal[True] = ...
131 ) -> NoReturn: ...
132
133 @overload
134 def _key_fallback(
135 self,
136 key: Any,
137 err: Optional[Exception],
138 raiseerr: Literal[False] = ...,
139 ) -> None: ...
140
141 @overload
142 def _key_fallback(
143 self, key: Any, err: Optional[Exception], raiseerr: bool = ...
144 ) -> Optional[NoReturn]: ...
145
146 def _key_fallback(
147 self, key: Any, err: Optional[Exception], raiseerr: bool = True
148 ) -> Optional[NoReturn]:
149 assert raiseerr
150 raise KeyError(key) from err
151
152 def _raise_for_ambiguous_column_name(
153 self, rec: _KeyMapRecType
154 ) -> NoReturn:
155 raise NotImplementedError(
156 "ambiguous column name logic is implemented for "
157 "CursorResultMetaData"
158 )
159
160 def _index_for_key(
161 self, key: _KeyIndexType, raiseerr: bool
162 ) -> Optional[int]:
163 raise NotImplementedError()
164
165 def _indexes_for_keys(
166 self, keys: Sequence[_KeyIndexType]
167 ) -> Sequence[int]:
168 raise NotImplementedError()
169
170 def _metadata_for_keys(
171 self, keys: Sequence[_KeyIndexType]
172 ) -> Iterator[_KeyMapRecType]:
173 raise NotImplementedError()
174
175 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
176 raise NotImplementedError()
177
178 def _getter(
179 self, key: Any, raiseerr: bool = True
180 ) -> Optional[Callable[[Row[Any]], Any]]:
181 index = self._index_for_key(key, raiseerr)
182
183 if index is not None:
184 return operator.itemgetter(index)
185 else:
186 return None
187
188 def _row_as_tuple_getter(
189 self, keys: Sequence[_KeyIndexType]
190 ) -> _TupleGetterType:
191 indexes = self._indexes_for_keys(keys)
192 return tuplegetter(*indexes)
193
194 def _make_key_to_index(
195 self, keymap: Mapping[_KeyType, Sequence[Any]], index: int
196 ) -> Mapping[_KeyType, int]:
197 return {
198 key: rec[index]
199 for key, rec in keymap.items()
200 if rec[index] is not None
201 }
202
203 def _key_not_found(self, key: Any, attr_error: bool) -> NoReturn:
204 if key in self._keymap:
205 # the index must be none in this case
206 self._raise_for_ambiguous_column_name(self._keymap[key])
207 else:
208 # unknown key
209 if attr_error:
210 try:
211 self._key_fallback(key, None)
212 except KeyError as ke:
213 raise AttributeError(ke.args[0]) from ke
214 else:
215 self._key_fallback(key, None)
216
217 @property
218 def _effective_processors(self) -> Optional[_ProcessorsType]:
219 if not self._processors or NONE_SET.issuperset(self._processors):
220 return None
221 else:
222 return self._processors
223
224
225class RMKeyView(typing.KeysView[Any]):
226 __slots__ = ("_parent", "_keys")
227
228 _parent: ResultMetaData
229 _keys: Sequence[str]
230
231 def __init__(self, parent: ResultMetaData):
232 self._parent = parent
233 self._keys = [k for k in parent._keys if k is not None]
234
235 def __len__(self) -> int:
236 return len(self._keys)
237
238 def __repr__(self) -> str:
239 return "{0.__class__.__name__}({0._keys!r})".format(self)
240
241 def __iter__(self) -> Iterator[str]:
242 return iter(self._keys)
243
244 def __contains__(self, item: Any) -> bool:
245 if isinstance(item, int):
246 return False
247
248 # note this also includes special key fallback behaviors
249 # which also don't seem to be tested in test_resultset right now
250 return self._parent._has_key(item)
251
252 def __eq__(self, other: Any) -> bool:
253 return list(other) == list(self)
254
255 def __ne__(self, other: Any) -> bool:
256 return list(other) != list(self)
257
258
259class SimpleResultMetaData(ResultMetaData):
260 """result metadata for in-memory collections."""
261
262 __slots__ = (
263 "_keys",
264 "_keymap",
265 "_processors",
266 "_tuplefilter",
267 "_translated_indexes",
268 "_unique_filters",
269 "_key_to_index",
270 "_ambiguous_keys",
271 )
272
273 _keys: Sequence[str]
274
275 def __init__(
276 self,
277 keys: Sequence[str],
278 extra: Optional[Sequence[Any]] = None,
279 _processors: Optional[_ProcessorsType] = None,
280 _tuplefilter: Optional[_TupleGetterType] = None,
281 _translated_indexes: Optional[Sequence[int]] = None,
282 _unique_filters: Optional[Sequence[Callable[[Any], Any]]] = None,
283 _ambiguous_keys: Optional[frozenset[str]] = None,
284 ):
285 self._keys = list(keys)
286 self._tuplefilter = _tuplefilter
287 self._translated_indexes = _translated_indexes
288 self._unique_filters = _unique_filters
289 if extra:
290 recs_names = [
291 (
292 (name,) + (extras if extras else ()),
293 (index, name, extras),
294 )
295 for index, (name, extras) in enumerate(zip(self._keys, extra))
296 ]
297 else:
298 recs_names = [
299 ((name,), (index, name, ()))
300 for index, name in enumerate(self._keys)
301 ]
302
303 self._keymap = {key: rec for keys, rec in recs_names for key in keys}
304
305 if _ambiguous_keys:
306 for name in _ambiguous_keys.intersection(self._keymap):
307 rec = self._keymap[name]
308 self._keymap[name] = (None,) + rec[1:]
309
310 self._processors = _processors
311
312 self._ambiguous_keys = _ambiguous_keys
313
314 self._key_to_index = self._make_key_to_index(self._keymap, 0)
315
316 def _has_key(self, key: object) -> bool:
317 return key in self._keymap
318
319 def _for_freeze(self) -> ResultMetaData:
320 unique_filters = self._unique_filters
321 if unique_filters and self._tuplefilter:
322 unique_filters = self._tuplefilter(unique_filters)
323
324 # TODO: are we freezing the result with or without uniqueness
325 # applied?
326 return SimpleResultMetaData(
327 self._keys,
328 extra=[self._keymap[key][2] for key in self._keys],
329 _unique_filters=unique_filters,
330 _ambiguous_keys=self._ambiguous_keys,
331 )
332
333 def __getstate__(self) -> Dict[str, Any]:
334 return {
335 "_keys": self._keys,
336 "_translated_indexes": self._translated_indexes,
337 "_ambiguous_keys": self._ambiguous_keys,
338 }
339
340 def __setstate__(self, state: Dict[str, Any]) -> None:
341 if state["_translated_indexes"]:
342 _translated_indexes = state["_translated_indexes"]
343 _tuplefilter = tuplegetter(*_translated_indexes)
344 else:
345 _translated_indexes = _tuplefilter = None
346 self.__init__( # type: ignore
347 state["_keys"],
348 _translated_indexes=_translated_indexes,
349 _tuplefilter=_tuplefilter,
350 _ambiguous_keys=state.get("_ambiguous_keys"),
351 )
352
353 def _index_for_key(self, key: Any, raiseerr: bool = True) -> int:
354 if int in key.__class__.__mro__:
355 key = self._keys[key]
356 try:
357 rec = self._keymap[key]
358 except KeyError as ke:
359 rec = self._key_fallback(key, ke, raiseerr)
360
361 if rec[0] is None:
362 self._raise_for_ambiguous_column_name(rec)
363 return rec[0] # type: ignore[no-any-return]
364
365 def _raise_for_ambiguous_column_name(
366 self, rec: _KeyMapRecType
367 ) -> NoReturn:
368 raise exc.InvalidRequestError(
369 "Ambiguous column name '%s' in "
370 "result set column descriptions" % rec[1]
371 )
372
373 def _indexes_for_keys(self, keys: Sequence[Any]) -> Sequence[int]:
374 # only used by the ORM with Column objects; does not need
375 # ambiguous column name support
376 return [self._keymap[key][0] for key in keys]
377
378 def _metadata_for_keys(
379 self, keys: Sequence[Any]
380 ) -> Iterator[_KeyMapRecType]:
381 for key in keys:
382 if int in key.__class__.__mro__:
383 key = self._keys[key]
384
385 try:
386 rec = self._keymap[key]
387 except KeyError as ke:
388 rec = self._key_fallback(key, ke, True)
389
390 if rec[0] is None:
391 self._raise_for_ambiguous_column_name(rec)
392
393 yield rec
394
395 def _reduce(self, keys: Sequence[Any]) -> ResultMetaData:
396 try:
397 metadata_for_keys = [
398 self._keymap[
399 self._keys[key] if int in key.__class__.__mro__ else key
400 ]
401 for key in keys
402 ]
403 except KeyError as ke:
404 self._key_fallback(ke.args[0], ke, True)
405
406 indexes: Sequence[int]
407 new_keys: Sequence[str]
408 extra: Sequence[Any]
409 indexes, new_keys, extra = zip(*metadata_for_keys)
410
411 if self._translated_indexes:
412 indexes = [self._translated_indexes[idx] for idx in indexes]
413
414 tup = tuplegetter(*indexes)
415
416 new_metadata = SimpleResultMetaData(
417 new_keys,
418 extra=extra,
419 _tuplefilter=tup,
420 _translated_indexes=indexes,
421 _processors=self._processors,
422 _unique_filters=self._unique_filters,
423 )
424
425 return new_metadata
426
427
428def result_tuple(
429 fields: Sequence[str], extra: Optional[Any] = None
430) -> Callable[[Iterable[Any]], Row[Any]]:
431 parent = SimpleResultMetaData(fields, extra)
432 return functools.partial(
433 Row, parent, parent._effective_processors, parent._key_to_index
434 )
435
436
437# a symbol that indicates to internal Result methods that
438# "no row is returned". We can't use None for those cases where a scalar
439# filter is applied to rows.
440class _NoRow(Enum):
441 _NO_ROW = 0
442
443
444_NO_ROW = _NoRow._NO_ROW
445
446
447class ResultInternal(InPlaceGenerative, Generic[_R]):
448 __slots__ = ()
449
450 _real_result: Optional[Result[Any]] = None
451 _generate_rows: bool = True
452 _row_logging_fn: Optional[Callable[[Any], Any]]
453
454 _unique_filter_state: Optional[_UniqueFilterStateType] = None
455 _post_creational_filter: Optional[Callable[[Any], Any]] = None
456 _is_cursor = False
457
458 _metadata: ResultMetaData
459
460 _source_supports_scalars: bool
461
462 def _fetchiter_impl(self) -> Iterator[_InterimRowType[Row[Any]]]:
463 raise NotImplementedError()
464
465 def _fetchone_impl(
466 self, hard_close: bool = False
467 ) -> Optional[_InterimRowType[Row[Any]]]:
468 raise NotImplementedError()
469
470 def _fetchmany_impl(
471 self, size: Optional[int] = None
472 ) -> List[_InterimRowType[Row[Any]]]:
473 raise NotImplementedError()
474
475 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
476 raise NotImplementedError()
477
478 def _soft_close(self, hard: bool = False) -> None:
479 raise NotImplementedError()
480
481 @HasMemoized_ro_memoized_attribute
482 def _row_getter(self) -> Optional[Callable[..., _R]]:
483 real_result: Result[Any] = (
484 self._real_result
485 if self._real_result
486 else cast("Result[Any]", self)
487 )
488
489 if real_result._source_supports_scalars:
490 if not self._generate_rows:
491 return None
492 else:
493 _proc = Row
494
495 def process_row(
496 metadata: ResultMetaData,
497 processors: Optional[_ProcessorsType],
498 key_to_index: Mapping[_KeyType, int],
499 scalar_obj: Any,
500 ) -> Row[Any]:
501 return _proc(
502 metadata, processors, key_to_index, (scalar_obj,)
503 )
504
505 else:
506 process_row = Row # type: ignore
507
508 metadata = self._metadata
509
510 key_to_index = metadata._key_to_index
511 processors = metadata._effective_processors
512 tf = metadata._tuplefilter
513
514 if tf and not real_result._source_supports_scalars:
515 if processors:
516 processors = tf(processors)
517
518 _make_row_orig: Callable[..., _R] = functools.partial( # type: ignore # noqa E501
519 process_row, metadata, processors, key_to_index
520 )
521
522 fixed_tf = tf
523
524 def make_row(row: _InterimRowType[Row[Any]]) -> _R:
525 return _make_row_orig(fixed_tf(row))
526
527 else:
528 make_row = functools.partial( # type: ignore
529 process_row, metadata, processors, key_to_index
530 )
531
532 if real_result._row_logging_fn:
533 _log_row = real_result._row_logging_fn
534 _make_row = make_row
535
536 def make_row(row: _InterimRowType[Row[Any]]) -> _R:
537 return _log_row(_make_row(row)) # type: ignore
538
539 return make_row
540
541 @HasMemoized_ro_memoized_attribute
542 def _iterator_getter(self) -> Callable[..., Iterator[_R]]:
543 make_row = self._row_getter
544
545 post_creational_filter = self._post_creational_filter
546
547 if self._unique_filter_state:
548 uniques, strategy = self._unique_strategy
549
550 def iterrows(self: Result[Any]) -> Iterator[_R]:
551 for raw_row in self._fetchiter_impl():
552 obj: _InterimRowType[Any] = (
553 make_row(raw_row) if make_row else raw_row
554 )
555 hashed = strategy(obj) if strategy else obj
556 if hashed in uniques:
557 continue
558 uniques.add(hashed)
559 if post_creational_filter:
560 obj = post_creational_filter(obj)
561 yield obj # type: ignore
562
563 else:
564
565 def iterrows(self: Result[Any]) -> Iterator[_R]:
566 for raw_row in self._fetchiter_impl():
567 row: _InterimRowType[Any] = (
568 make_row(raw_row) if make_row else raw_row
569 )
570 if post_creational_filter:
571 row = post_creational_filter(row)
572 yield row # type: ignore
573
574 return iterrows
575
576 def _raw_all_rows(self) -> List[_R]:
577 make_row = self._row_getter
578 assert make_row is not None
579 rows = self._fetchall_impl()
580 return [make_row(row) for row in rows]
581
582 def _allrows(self) -> List[_R]:
583 post_creational_filter = self._post_creational_filter
584
585 make_row = self._row_getter
586
587 rows = self._fetchall_impl()
588 made_rows: List[_InterimRowType[_R]]
589 if make_row:
590 made_rows = [make_row(row) for row in rows]
591 else:
592 made_rows = rows # type: ignore
593
594 interim_rows: List[_R]
595
596 if self._unique_filter_state:
597 uniques, strategy = self._unique_strategy
598
599 interim_rows = [
600 made_row # type: ignore
601 for made_row, sig_row in [
602 (
603 made_row,
604 strategy(made_row) if strategy else made_row,
605 )
606 for made_row in made_rows
607 ]
608 if sig_row not in uniques and not uniques.add(sig_row) # type: ignore # noqa: E501
609 ]
610 else:
611 interim_rows = made_rows # type: ignore
612
613 if post_creational_filter:
614 interim_rows = [
615 post_creational_filter(row) for row in interim_rows
616 ]
617 return interim_rows
618
619 @HasMemoized_ro_memoized_attribute
620 def _onerow_getter(
621 self,
622 ) -> Callable[..., Union[Literal[_NoRow._NO_ROW], _R]]:
623 make_row = self._row_getter
624
625 post_creational_filter = self._post_creational_filter
626
627 if self._unique_filter_state:
628 uniques, strategy = self._unique_strategy
629
630 def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
631 _onerow = self._fetchone_impl
632 while True:
633 row = _onerow()
634 if row is None:
635 return _NO_ROW
636 else:
637 obj: _InterimRowType[Any] = (
638 make_row(row) if make_row else row
639 )
640 hashed = strategy(obj) if strategy else obj
641 if hashed in uniques:
642 continue
643 else:
644 uniques.add(hashed)
645 if post_creational_filter:
646 obj = post_creational_filter(obj)
647 return obj # type: ignore
648
649 else:
650
651 def onerow(self: Result[Any]) -> Union[_NoRow, _R]:
652 row = self._fetchone_impl()
653 if row is None:
654 return _NO_ROW
655 else:
656 interim_row: _InterimRowType[Any] = (
657 make_row(row) if make_row else row
658 )
659 if post_creational_filter:
660 interim_row = post_creational_filter(interim_row)
661 return interim_row # type: ignore
662
663 return onerow
664
665 @HasMemoized_ro_memoized_attribute
666 def _manyrow_getter(self) -> Callable[..., List[_R]]:
667 make_row = self._row_getter
668
669 post_creational_filter = self._post_creational_filter
670
671 if self._unique_filter_state:
672 uniques, strategy = self._unique_strategy
673
674 def filterrows(
675 make_row: Optional[Callable[..., _R]],
676 rows: List[Any],
677 strategy: Optional[Callable[[List[Any]], Any]],
678 uniques: Set[Any],
679 ) -> List[_R]:
680 if make_row:
681 rows = [make_row(row) for row in rows]
682
683 if strategy:
684 made_rows = (
685 (made_row, strategy(made_row)) for made_row in rows
686 )
687 else:
688 made_rows = ((made_row, made_row) for made_row in rows)
689 return [
690 made_row
691 for made_row, sig_row in made_rows
692 if sig_row not in uniques and not uniques.add(sig_row) # type: ignore # noqa: E501
693 ]
694
695 def manyrows(
696 self: ResultInternal[_R], num: Optional[int]
697 ) -> List[_R]:
698 collect: List[_R] = []
699
700 _manyrows = self._fetchmany_impl
701
702 if num is None:
703 # if None is passed, we don't know the default
704 # manyrows number, DBAPI has this as cursor.arraysize
705 # different DBAPIs / fetch strategies may be different.
706 # do a fetch to find what the number is. if there are
707 # only fewer rows left, then it doesn't matter.
708 real_result = (
709 self._real_result
710 if self._real_result
711 else cast("Result[Any]", self)
712 )
713 if real_result._yield_per:
714 num_required = num = real_result._yield_per
715 else:
716 rows = _manyrows(num)
717 num = len(rows)
718 assert make_row is not None
719 collect.extend(
720 filterrows(make_row, rows, strategy, uniques)
721 )
722 num_required = num - len(collect)
723 else:
724 num_required = num
725
726 assert num is not None
727
728 while num_required:
729 rows = _manyrows(num_required)
730 if not rows:
731 break
732
733 collect.extend(
734 filterrows(make_row, rows, strategy, uniques)
735 )
736 num_required = num - len(collect)
737
738 if post_creational_filter:
739 collect = [post_creational_filter(row) for row in collect]
740 return collect
741
742 else:
743
744 def manyrows(
745 self: ResultInternal[_R], num: Optional[int]
746 ) -> List[_R]:
747 if num is None:
748 real_result = (
749 self._real_result
750 if self._real_result
751 else cast("Result[Any]", self)
752 )
753 num = real_result._yield_per
754
755 rows: List[_InterimRowType[Any]] = self._fetchmany_impl(num)
756 if make_row:
757 rows = [make_row(row) for row in rows]
758 if post_creational_filter:
759 rows = [post_creational_filter(row) for row in rows]
760 return rows # type: ignore
761
762 return manyrows
763
764 @overload
765 def _only_one_row(
766 self: ResultInternal[Row[Any]],
767 raise_for_second_row: bool,
768 raise_for_none: bool,
769 scalar: Literal[True],
770 ) -> Any: ...
771
772 @overload
773 def _only_one_row(
774 self,
775 raise_for_second_row: bool,
776 raise_for_none: Literal[True],
777 scalar: bool,
778 ) -> _R: ...
779
780 @overload
781 def _only_one_row(
782 self,
783 raise_for_second_row: bool,
784 raise_for_none: bool,
785 scalar: bool,
786 ) -> Optional[_R]: ...
787
788 def _only_one_row(
789 self,
790 raise_for_second_row: bool,
791 raise_for_none: bool,
792 scalar: bool,
793 ) -> Optional[_R]:
794 onerow = self._fetchone_impl
795
796 row: Optional[_InterimRowType[Any]] = onerow(hard_close=True)
797 if row is None:
798 if raise_for_none:
799 raise exc.NoResultFound(
800 "No row was found when one was required"
801 )
802 else:
803 return None
804
805 if scalar and self._source_supports_scalars:
806 self._generate_rows = False
807 make_row = None
808 else:
809 make_row = self._row_getter
810
811 try:
812 row = make_row(row) if make_row else row
813 except:
814 self._soft_close(hard=True)
815 raise
816
817 if raise_for_second_row:
818 if self._unique_filter_state:
819 # for no second row but uniqueness, need to essentially
820 # consume the entire result :(
821 uniques, strategy = self._unique_strategy
822
823 existing_row_hash = strategy(row) if strategy else row
824
825 while True:
826 next_row: Any = onerow(hard_close=True)
827 if next_row is None:
828 next_row = _NO_ROW
829 break
830
831 try:
832 next_row = make_row(next_row) if make_row else next_row
833
834 if strategy:
835 assert next_row is not _NO_ROW
836 if existing_row_hash == strategy(next_row):
837 continue
838 elif row == next_row:
839 continue
840 # here, we have a row and it's different
841 break
842 except:
843 self._soft_close(hard=True)
844 raise
845 else:
846 next_row = onerow(hard_close=True)
847 if next_row is None:
848 next_row = _NO_ROW
849
850 if next_row is not _NO_ROW:
851 self._soft_close(hard=True)
852 raise exc.MultipleResultsFound(
853 "Multiple rows were found when exactly one was required"
854 if raise_for_none
855 else "Multiple rows were found when one or none "
856 "was required"
857 )
858 else:
859 # if we checked for second row then that would have
860 # closed us :)
861 self._soft_close(hard=True)
862
863 if not scalar:
864 post_creational_filter = self._post_creational_filter
865 if post_creational_filter:
866 row = post_creational_filter(row)
867
868 if scalar and make_row:
869 return row[0] # type: ignore
870 else:
871 return row # type: ignore
872
873 def _iter_impl(self) -> Iterator[_R]:
874 return self._iterator_getter(self)
875
876 def _next_impl(self) -> _R:
877 row = self._onerow_getter(self)
878 if row is _NO_ROW:
879 raise StopIteration()
880 else:
881 return row
882
883 @_generative
884 def _column_slices(self, indexes: Sequence[_KeyIndexType]) -> Self:
885 real_result = (
886 self._real_result
887 if self._real_result
888 else cast("Result[Any]", self)
889 )
890
891 if not real_result._source_supports_scalars or len(indexes) != 1:
892 self._metadata = self._metadata._reduce(indexes)
893
894 assert self._generate_rows
895
896 return self
897
898 @HasMemoized.memoized_attribute
899 def _unique_strategy(self) -> _UniqueFilterStateType:
900 assert self._unique_filter_state is not None
901 uniques, strategy = self._unique_filter_state
902
903 real_result = (
904 self._real_result
905 if self._real_result is not None
906 else cast("Result[Any]", self)
907 )
908
909 if not strategy and self._metadata._unique_filters:
910 if (
911 real_result._source_supports_scalars
912 and not self._generate_rows
913 ):
914 strategy = self._metadata._unique_filters[0]
915 else:
916 filters = self._metadata._unique_filters
917 if self._metadata._tuplefilter:
918 filters = self._metadata._tuplefilter(filters)
919
920 strategy = operator.methodcaller("_filter_on_values", filters)
921 return uniques, strategy
922
923
924class _WithKeys:
925 __slots__ = ()
926
927 _metadata: ResultMetaData
928
929 # used mainly to share documentation on the keys method.
930 def keys(self) -> RMKeyView:
931 """Return an iterable view which yields the string keys that would
932 be represented by each :class:`_engine.Row`.
933
934 The keys can represent the labels of the columns returned by a core
935 statement or the names of the orm classes returned by an orm
936 execution.
937
938 The view also can be tested for key containment using the Python
939 ``in`` operator, which will test both for the string keys represented
940 in the view, as well as for alternate keys such as column objects.
941
942 .. versionchanged:: 1.4 a key view object is returned rather than a
943 plain list.
944
945
946 """
947 return self._metadata.keys
948
949
950class Result(_WithKeys, ResultInternal[Row[_TP]]):
951 """Represent a set of database results.
952
953 .. versionadded:: 1.4 The :class:`_engine.Result` object provides a
954 completely updated usage model and calling facade for SQLAlchemy
955 Core and SQLAlchemy ORM. In Core, it forms the basis of the
956 :class:`_engine.CursorResult` object which replaces the previous
957 :class:`_engine.ResultProxy` interface. When using the ORM, a
958 higher level object called :class:`_engine.ChunkedIteratorResult`
959 is normally used.
960
961 .. note:: In SQLAlchemy 1.4 and above, this object is
962 used for ORM results returned by :meth:`_orm.Session.execute`, which can
963 yield instances of ORM mapped objects either individually or within
964 tuple-like rows. Note that the :class:`_engine.Result` object does not
965 deduplicate instances or rows automatically as is the case with the
966 legacy :class:`_orm.Query` object. For in-Python de-duplication of
967 instances or rows, use the :meth:`_engine.Result.unique` modifier
968 method.
969
970 .. seealso::
971
972 :ref:`tutorial_fetching_rows` - in the :doc:`/tutorial/index`
973
974 """
975
976 __slots__ = ("_metadata", "__dict__")
977
978 _row_logging_fn: Optional[Callable[[Row[Any]], Row[Any]]] = None
979
980 _source_supports_scalars: bool = False
981
982 _yield_per: Optional[int] = None
983
984 _attributes: util.immutabledict[Any, Any] = util.immutabledict()
985
986 def __init__(self, cursor_metadata: ResultMetaData):
987 self._metadata = cursor_metadata
988
989 def __enter__(self) -> Self:
990 return self
991
992 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
993 self.close()
994
995 def close(self) -> None:
996 """Hard close this :class:`_engine.Result`.
997
998 The behavior of this method is implementation specific, and is
999 not implemented by default. The method should generally end
1000 the resources in use by the result object and also cause any
1001 subsequent iteration or row fetching to raise
1002 :class:`.ResourceClosedError`.
1003
1004 .. versionadded:: 1.4.27 - ``.close()`` was previously not generally
1005 available for all :class:`_engine.Result` classes, instead only
1006 being available on the :class:`_engine.CursorResult` returned for
1007 Core statement executions. As most other result objects, namely the
1008 ones used by the ORM, are proxying a :class:`_engine.CursorResult`
1009 in any case, this allows the underlying cursor result to be closed
1010 from the outside facade for the case when the ORM query is using
1011 the ``yield_per`` execution option where it does not immediately
1012 exhaust and autoclose the database cursor.
1013
1014 """
1015 self._soft_close(hard=True)
1016
1017 @property
1018 def _soft_closed(self) -> bool:
1019 raise NotImplementedError()
1020
1021 @property
1022 def closed(self) -> bool:
1023 """Return ``True`` if this :class:`_engine.Result` was **hard closed**
1024 by explicitly calling the :meth:`close` method.
1025
1026 The attribute is **not** True if the :class:`_engine.Result` was only
1027 **soft closed**; a "soft close" is the style of close that takes place
1028 for example when the :class:`.CursorResult` is returned for a DML
1029 only statement without RETURNING, or when all result rows are fetched.
1030
1031 .. seealso::
1032
1033 :attr:`.CursorResult.returns_rows` - attribute specific to
1034 :class:`.CursorResult` which indicates if the result is one that
1035 may return zero or more rows
1036
1037 """
1038 raise NotImplementedError()
1039
1040 @_generative
1041 def yield_per(self, num: int) -> Self:
1042 """Configure the row-fetching strategy to fetch ``num`` rows at a time.
1043
1044 This impacts the underlying behavior of the result when iterating over
1045 the result object, or otherwise making use of methods such as
1046 :meth:`_engine.Result.fetchone` that return one row at a time. Data
1047 from the underlying cursor or other data source will be buffered up to
1048 this many rows in memory, and the buffered collection will then be
1049 yielded out one row at a time or as many rows are requested. Each time
1050 the buffer clears, it will be refreshed to this many rows or as many
1051 rows remain if fewer remain.
1052
1053 The :meth:`_engine.Result.yield_per` method is generally used in
1054 conjunction with the
1055 :paramref:`_engine.Connection.execution_options.stream_results`
1056 execution option, which will allow the database dialect in use to make
1057 use of a server side cursor, if the DBAPI supports a specific "server
1058 side cursor" mode separate from its default mode of operation.
1059
1060 .. tip::
1061
1062 Consider using the
1063 :paramref:`_engine.Connection.execution_options.yield_per`
1064 execution option, which will simultaneously set
1065 :paramref:`_engine.Connection.execution_options.stream_results`
1066 to ensure the use of server side cursors, as well as automatically
1067 invoke the :meth:`_engine.Result.yield_per` method to establish
1068 a fixed row buffer size at once.
1069
1070 The :paramref:`_engine.Connection.execution_options.yield_per`
1071 execution option is available for ORM operations, with
1072 :class:`_orm.Session`-oriented use described at
1073 :ref:`orm_queryguide_yield_per`. The Core-only version which works
1074 with :class:`_engine.Connection` is new as of SQLAlchemy 1.4.40.
1075
1076 .. versionadded:: 1.4
1077
1078 :param num: number of rows to fetch each time the buffer is refilled.
1079 If set to a value below 1, fetches all rows for the next buffer.
1080
1081 .. seealso::
1082
1083 :ref:`engine_stream_results` - describes Core behavior for
1084 :meth:`_engine.Result.yield_per`
1085
1086 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1087
1088 """
1089 self._yield_per = num
1090 return self
1091
1092 @_generative
1093 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
1094 """Apply unique filtering to the objects returned by this
1095 :class:`_engine.Result`.
1096
1097 When this filter is applied with no arguments, the rows or objects
1098 returned will filtered such that each row is returned uniquely. The
1099 algorithm used to determine this uniqueness is by default the Python
1100 hashing identity of the whole tuple. In some cases a specialized
1101 per-entity hashing scheme may be used, such as when using the ORM, a
1102 scheme is applied which works against the primary key identity of
1103 returned objects.
1104
1105 The unique filter is applied **after all other filters**, which means
1106 if the columns returned have been refined using a method such as the
1107 :meth:`_engine.Result.columns` or :meth:`_engine.Result.scalars`
1108 method, the uniquing is applied to **only the column or columns
1109 returned**. This occurs regardless of the order in which these
1110 methods have been called upon the :class:`_engine.Result` object.
1111
1112 The unique filter also changes the calculus used for methods like
1113 :meth:`_engine.Result.fetchmany` and :meth:`_engine.Result.partitions`.
1114 When using :meth:`_engine.Result.unique`, these methods will continue
1115 to yield the number of rows or objects requested, after uniquing
1116 has been applied. However, this necessarily impacts the buffering
1117 behavior of the underlying cursor or datasource, such that multiple
1118 underlying calls to ``cursor.fetchmany()`` may be necessary in order
1119 to accumulate enough objects in order to provide a unique collection
1120 of the requested size.
1121
1122 :param strategy: a callable that will be applied to rows or objects
1123 being iterated, which should return an object that represents the
1124 unique value of the row. A Python ``set()`` is used to store
1125 these identities. If not passed, a default uniqueness strategy
1126 is used which may have been assembled by the source of this
1127 :class:`_engine.Result` object.
1128
1129 """
1130 self._unique_filter_state = (set(), strategy)
1131 return self
1132
1133 def columns(self, *col_expressions: _KeyIndexType) -> Self:
1134 r"""Establish the columns that should be returned in each row.
1135
1136 This method may be used to limit the columns returned as well
1137 as to reorder them. The given list of expressions are normally
1138 a series of integers or string key names. They may also be
1139 appropriate :class:`.ColumnElement` objects which correspond to
1140 a given statement construct.
1141
1142 .. versionchanged:: 2.0 Due to a bug in 1.4, the
1143 :meth:`_engine.Result.columns` method had an incorrect behavior
1144 where calling upon the method with just one index would cause the
1145 :class:`_engine.Result` object to yield scalar values rather than
1146 :class:`_engine.Row` objects. In version 2.0, this behavior
1147 has been corrected such that calling upon
1148 :meth:`_engine.Result.columns` with a single index will
1149 produce a :class:`_engine.Result` object that continues
1150 to yield :class:`_engine.Row` objects, which include
1151 only a single column.
1152
1153 E.g.::
1154
1155 statement = select(table.c.x, table.c.y, table.c.z)
1156 result = connection.execute(statement)
1157
1158 for z, y in result.columns("z", "y"):
1159 ...
1160
1161 Example of using the column objects from the statement itself::
1162
1163 for z, y in result.columns(
1164 statement.selected_columns.c.z, statement.selected_columns.c.y
1165 ):
1166 ...
1167
1168 .. versionadded:: 1.4
1169
1170 :param \*col_expressions: indicates columns to be returned. Elements
1171 may be integer row indexes, string column names, or appropriate
1172 :class:`.ColumnElement` objects corresponding to a select construct.
1173
1174 :return: this :class:`_engine.Result` object with the modifications
1175 given.
1176
1177 """
1178 return self._column_slices(col_expressions)
1179
1180 @overload
1181 def scalars(self: Result[Tuple[_T]]) -> ScalarResult[_T]: ...
1182
1183 @overload
1184 def scalars(
1185 self: Result[Tuple[_T]], index: Literal[0]
1186 ) -> ScalarResult[_T]: ...
1187
1188 @overload
1189 def scalars(self, index: _KeyIndexType = 0) -> ScalarResult[Any]: ...
1190
1191 def scalars(self, index: _KeyIndexType = 0) -> ScalarResult[Any]:
1192 """Return a :class:`_engine.ScalarResult` filtering object which
1193 will return single elements rather than :class:`_row.Row` objects.
1194
1195 E.g.::
1196
1197 >>> result = conn.execute(text("select int_id from table"))
1198 >>> result.scalars().all()
1199 [1, 2, 3]
1200
1201 When results are fetched from the :class:`_engine.ScalarResult`
1202 filtering object, the single column-row that would be returned by the
1203 :class:`_engine.Result` is instead returned as the column's value.
1204
1205 .. versionadded:: 1.4
1206
1207 :param index: integer or row key indicating the column to be fetched
1208 from each row, defaults to ``0`` indicating the first column.
1209
1210 :return: a new :class:`_engine.ScalarResult` filtering object referring
1211 to this :class:`_engine.Result` object.
1212
1213 """
1214 return ScalarResult(self, index)
1215
1216 def _getter(
1217 self, key: _KeyIndexType, raiseerr: bool = True
1218 ) -> Optional[Callable[[Row[Any]], Any]]:
1219 """return a callable that will retrieve the given key from a
1220 :class:`_engine.Row`.
1221
1222 """
1223 if self._source_supports_scalars:
1224 raise NotImplementedError(
1225 "can't use this function in 'only scalars' mode"
1226 )
1227 return self._metadata._getter(key, raiseerr)
1228
1229 def _tuple_getter(self, keys: Sequence[_KeyIndexType]) -> _TupleGetterType:
1230 """return a callable that will retrieve the given keys from a
1231 :class:`_engine.Row`.
1232
1233 """
1234 if self._source_supports_scalars:
1235 raise NotImplementedError(
1236 "can't use this function in 'only scalars' mode"
1237 )
1238 return self._metadata._row_as_tuple_getter(keys)
1239
1240 def mappings(self) -> MappingResult:
1241 """Apply a mappings filter to returned rows, returning an instance of
1242 :class:`_engine.MappingResult`.
1243
1244 When this filter is applied, fetching rows will return
1245 :class:`_engine.RowMapping` objects instead of :class:`_engine.Row`
1246 objects.
1247
1248 .. versionadded:: 1.4
1249
1250 :return: a new :class:`_engine.MappingResult` filtering object
1251 referring to this :class:`_engine.Result` object.
1252
1253 """
1254
1255 return MappingResult(self)
1256
1257 @property
1258 def t(self) -> TupleResult[_TP]:
1259 """Apply a "typed tuple" typing filter to returned rows.
1260
1261 The :attr:`_engine.Result.t` attribute is a synonym for
1262 calling the :meth:`_engine.Result.tuples` method.
1263
1264 .. versionadded:: 2.0
1265
1266 """
1267 return self # type: ignore
1268
1269 def tuples(self) -> TupleResult[_TP]:
1270 """Apply a "typed tuple" typing filter to returned rows.
1271
1272 This method returns the same :class:`_engine.Result` object
1273 at runtime,
1274 however annotates as returning a :class:`_engine.TupleResult` object
1275 that will indicate to :pep:`484` typing tools that plain typed
1276 ``Tuple`` instances are returned rather than rows. This allows
1277 tuple unpacking and ``__getitem__`` access of :class:`_engine.Row`
1278 objects to by typed, for those cases where the statement invoked
1279 itself included typing information.
1280
1281 .. versionadded:: 2.0
1282
1283 :return: the :class:`_engine.TupleResult` type at typing time.
1284
1285 .. seealso::
1286
1287 :attr:`_engine.Result.t` - shorter synonym
1288
1289 :attr:`_engine.Row._t` - :class:`_engine.Row` version
1290
1291 """
1292
1293 return self # type: ignore
1294
1295 def _raw_row_iterator(self) -> Iterator[_RowData]:
1296 """Return a safe iterator that yields raw row data.
1297
1298 This is used by the :meth:`_engine.Result.merge` method
1299 to merge multiple compatible results together.
1300
1301 """
1302 raise NotImplementedError()
1303
1304 def __iter__(self) -> Iterator[Row[_TP]]:
1305 return self._iter_impl()
1306
1307 def __next__(self) -> Row[_TP]:
1308 return self._next_impl()
1309
1310 def partitions(
1311 self, size: Optional[int] = None
1312 ) -> Iterator[Sequence[Row[_TP]]]:
1313 """Iterate through sub-lists of rows of the size given.
1314
1315 Each list will be of the size given, excluding the last list to
1316 be yielded, which may have a small number of rows. No empty
1317 lists will be yielded.
1318
1319 The result object is automatically closed when the iterator
1320 is fully consumed.
1321
1322 Note that the backend driver will usually buffer the entire result
1323 ahead of time unless the
1324 :paramref:`.Connection.execution_options.stream_results` execution
1325 option is used indicating that the driver should not pre-buffer
1326 results, if possible. Not all drivers support this option and
1327 the option is silently ignored for those who do not.
1328
1329 When using the ORM, the :meth:`_engine.Result.partitions` method
1330 is typically more effective from a memory perspective when it is
1331 combined with use of the
1332 :ref:`yield_per execution option <orm_queryguide_yield_per>`,
1333 which instructs both the DBAPI driver to use server side cursors,
1334 if available, as well as instructs the ORM loading internals to only
1335 build a certain amount of ORM objects from a result at a time before
1336 yielding them out.
1337
1338 .. versionadded:: 1.4
1339
1340 :param size: indicate the maximum number of rows to be present
1341 in each list yielded. If None, makes use of the value set by
1342 the :meth:`_engine.Result.yield_per`, method, if it were called,
1343 or the :paramref:`_engine.Connection.execution_options.yield_per`
1344 execution option, which is equivalent in this regard. If
1345 yield_per weren't set, it makes use of the
1346 :meth:`_engine.Result.fetchmany` default, which may be backend
1347 specific and not well defined.
1348
1349 :return: iterator of lists
1350
1351 .. seealso::
1352
1353 :ref:`engine_stream_results`
1354
1355 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1356
1357 """
1358
1359 getter = self._manyrow_getter
1360
1361 while True:
1362 partition = getter(self, size)
1363 if partition:
1364 yield partition
1365 else:
1366 break
1367
1368 def fetchall(self) -> Sequence[Row[_TP]]:
1369 """A synonym for the :meth:`_engine.Result.all` method."""
1370
1371 return self._allrows()
1372
1373 def fetchone(self) -> Optional[Row[_TP]]:
1374 """Fetch one row.
1375
1376 When all rows are exhausted, returns None.
1377
1378 This method is provided for backwards compatibility with
1379 SQLAlchemy 1.x.x.
1380
1381 To fetch the first row of a result only, use the
1382 :meth:`_engine.Result.first` method. To iterate through all
1383 rows, iterate the :class:`_engine.Result` object directly.
1384
1385 :return: a :class:`_engine.Row` object if no filters are applied,
1386 or ``None`` if no rows remain.
1387
1388 """
1389 row = self._onerow_getter(self)
1390 if row is _NO_ROW:
1391 return None
1392 else:
1393 return row
1394
1395 def fetchmany(self, size: Optional[int] = None) -> Sequence[Row[_TP]]:
1396 """Fetch many rows.
1397
1398 When all rows are exhausted, returns an empty sequence.
1399
1400 This method is provided for backwards compatibility with
1401 SQLAlchemy 1.x.x.
1402
1403 To fetch rows in groups, use the :meth:`_engine.Result.partitions`
1404 method.
1405
1406 :return: a sequence of :class:`_engine.Row` objects.
1407
1408 .. seealso::
1409
1410 :meth:`_engine.Result.partitions`
1411
1412 """
1413
1414 return self._manyrow_getter(self, size)
1415
1416 def all(self) -> Sequence[Row[_TP]]:
1417 """Return all rows in a sequence.
1418
1419 Closes the result set after invocation. Subsequent invocations
1420 will return an empty sequence.
1421
1422 .. versionadded:: 1.4
1423
1424 :return: a sequence of :class:`_engine.Row` objects.
1425
1426 .. seealso::
1427
1428 :ref:`engine_stream_results` - How to stream a large result set
1429 without loading it completely in python.
1430
1431 """
1432
1433 return self._allrows()
1434
1435 def first(self) -> Optional[Row[_TP]]:
1436 """Fetch the first row or ``None`` if no row is present.
1437
1438 Closes the result set and discards remaining rows.
1439
1440 .. note:: This method returns one **row**, e.g. tuple, by default.
1441 To return exactly one single scalar value, that is, the first
1442 column of the first row, use the
1443 :meth:`_engine.Result.scalar` method,
1444 or combine :meth:`_engine.Result.scalars` and
1445 :meth:`_engine.Result.first`.
1446
1447 Additionally, in contrast to the behavior of the legacy ORM
1448 :meth:`_orm.Query.first` method, **no limit is applied** to the
1449 SQL query which was invoked to produce this
1450 :class:`_engine.Result`;
1451 for a DBAPI driver that buffers results in memory before yielding
1452 rows, all rows will be sent to the Python process and all but
1453 the first row will be discarded.
1454
1455 .. seealso::
1456
1457 :ref:`migration_20_unify_select`
1458
1459 :return: a :class:`_engine.Row` object, or None
1460 if no rows remain.
1461
1462 .. seealso::
1463
1464 :meth:`_engine.Result.scalar`
1465
1466 :meth:`_engine.Result.one`
1467
1468 """
1469
1470 return self._only_one_row(
1471 raise_for_second_row=False, raise_for_none=False, scalar=False
1472 )
1473
1474 def one_or_none(self) -> Optional[Row[_TP]]:
1475 """Return at most one result or raise an exception.
1476
1477 Returns ``None`` if the result has no rows.
1478 Raises :class:`.MultipleResultsFound`
1479 if multiple rows are returned.
1480
1481 .. versionadded:: 1.4
1482
1483 :return: The first :class:`_engine.Row` or ``None`` if no row
1484 is available.
1485
1486 :raises: :class:`.MultipleResultsFound`
1487
1488 .. seealso::
1489
1490 :meth:`_engine.Result.first`
1491
1492 :meth:`_engine.Result.one`
1493
1494 """
1495 return self._only_one_row(
1496 raise_for_second_row=True, raise_for_none=False, scalar=False
1497 )
1498
1499 @overload
1500 def scalar_one(self: Result[Tuple[_T]]) -> _T: ...
1501
1502 @overload
1503 def scalar_one(self) -> Any: ...
1504
1505 def scalar_one(self) -> Any:
1506 """Return exactly one scalar result or raise an exception.
1507
1508 This is equivalent to calling :meth:`_engine.Result.scalars` and
1509 then :meth:`_engine.ScalarResult.one`.
1510
1511 .. seealso::
1512
1513 :meth:`_engine.ScalarResult.one`
1514
1515 :meth:`_engine.Result.scalars`
1516
1517 """
1518 return self._only_one_row(
1519 raise_for_second_row=True, raise_for_none=True, scalar=True
1520 )
1521
1522 @overload
1523 def scalar_one_or_none(self: Result[Tuple[_T]]) -> Optional[_T]: ...
1524
1525 @overload
1526 def scalar_one_or_none(self) -> Optional[Any]: ...
1527
1528 def scalar_one_or_none(self) -> Optional[Any]:
1529 """Return exactly one scalar result or ``None``.
1530
1531 This is equivalent to calling :meth:`_engine.Result.scalars` and
1532 then :meth:`_engine.ScalarResult.one_or_none`.
1533
1534 .. seealso::
1535
1536 :meth:`_engine.ScalarResult.one_or_none`
1537
1538 :meth:`_engine.Result.scalars`
1539
1540 """
1541 return self._only_one_row(
1542 raise_for_second_row=True, raise_for_none=False, scalar=True
1543 )
1544
1545 def one(self) -> Row[_TP]:
1546 """Return exactly one row or raise an exception.
1547
1548 Raises :class:`_exc.NoResultFound` if the result returns no
1549 rows, or :class:`_exc.MultipleResultsFound` if multiple rows
1550 would be returned.
1551
1552 .. note:: This method returns one **row**, e.g. tuple, by default.
1553 To return exactly one single scalar value, that is, the first
1554 column of the first row, use the
1555 :meth:`_engine.Result.scalar_one` method, or combine
1556 :meth:`_engine.Result.scalars` and
1557 :meth:`_engine.Result.one`.
1558
1559 .. versionadded:: 1.4
1560
1561 :return: The first :class:`_engine.Row`.
1562
1563 :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound`
1564
1565 .. seealso::
1566
1567 :meth:`_engine.Result.first`
1568
1569 :meth:`_engine.Result.one_or_none`
1570
1571 :meth:`_engine.Result.scalar_one`
1572
1573 """
1574 return self._only_one_row(
1575 raise_for_second_row=True, raise_for_none=True, scalar=False
1576 )
1577
1578 @overload
1579 def scalar(self: Result[Tuple[_T]]) -> Optional[_T]: ...
1580
1581 @overload
1582 def scalar(self) -> Any: ...
1583
1584 def scalar(self) -> Any:
1585 """Fetch the first column of the first row, and close the result set.
1586
1587 Returns ``None`` if there are no rows to fetch.
1588
1589 No validation is performed to test if additional rows remain.
1590
1591 After calling this method, the object is fully closed,
1592 e.g. the :meth:`_engine.CursorResult.close`
1593 method will have been called.
1594
1595 :return: a Python scalar value, or ``None`` if no rows remain.
1596
1597 """
1598 return self._only_one_row(
1599 raise_for_second_row=False, raise_for_none=False, scalar=True
1600 )
1601
1602 def freeze(self) -> FrozenResult[_TP]:
1603 """Return a callable object that will produce copies of this
1604 :class:`_engine.Result` when invoked.
1605
1606 The callable object returned is an instance of
1607 :class:`_engine.FrozenResult`.
1608
1609 This is used for result set caching. The method must be called
1610 on the result when it has been unconsumed, and calling the method
1611 will consume the result fully. When the :class:`_engine.FrozenResult`
1612 is retrieved from a cache, it can be called any number of times where
1613 it will produce a new :class:`_engine.Result` object each time
1614 against its stored set of rows.
1615
1616 .. seealso::
1617
1618 :ref:`do_orm_execute_re_executing` - example usage within the
1619 ORM to implement a result-set cache.
1620
1621 """
1622
1623 return FrozenResult(self)
1624
1625 def merge(self, *others: Result[Any]) -> MergedResult[_TP]:
1626 """Merge this :class:`_engine.Result` with other compatible result
1627 objects.
1628
1629 The object returned is an instance of :class:`_engine.MergedResult`,
1630 which will be composed of iterators from the given result
1631 objects.
1632
1633 The new result will use the metadata from this result object.
1634 The subsequent result objects must be against an identical
1635 set of result / cursor metadata, otherwise the behavior is
1636 undefined.
1637
1638 """
1639 return MergedResult(self._metadata, (self,) + others)
1640
1641
1642class FilterResult(ResultInternal[_R]):
1643 """A wrapper for a :class:`_engine.Result` that returns objects other than
1644 :class:`_engine.Row` objects, such as dictionaries or scalar objects.
1645
1646 :class:`_engine.FilterResult` is the common base for additional result
1647 APIs including :class:`_engine.MappingResult`,
1648 :class:`_engine.ScalarResult` and :class:`_engine.AsyncResult`.
1649
1650 """
1651
1652 __slots__ = (
1653 "_real_result",
1654 "_post_creational_filter",
1655 "_metadata",
1656 "_unique_filter_state",
1657 "__dict__",
1658 )
1659
1660 _post_creational_filter: Optional[Callable[[Any], Any]]
1661
1662 _real_result: Result[Any]
1663
1664 def __enter__(self) -> Self:
1665 return self
1666
1667 def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
1668 self._real_result.__exit__(type_, value, traceback)
1669
1670 @_generative
1671 def yield_per(self, num: int) -> Self:
1672 """Configure the row-fetching strategy to fetch ``num`` rows at a time.
1673
1674 The :meth:`_engine.FilterResult.yield_per` method is a pass through
1675 to the :meth:`_engine.Result.yield_per` method. See that method's
1676 documentation for usage notes.
1677
1678 .. versionadded:: 1.4.40 - added :meth:`_engine.FilterResult.yield_per`
1679 so that the method is available on all result set implementations
1680
1681 .. seealso::
1682
1683 :ref:`engine_stream_results` - describes Core behavior for
1684 :meth:`_engine.Result.yield_per`
1685
1686 :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel`
1687
1688 """
1689 self._real_result = self._real_result.yield_per(num)
1690 return self
1691
1692 def _soft_close(self, hard: bool = False) -> None:
1693 self._real_result._soft_close(hard=hard)
1694
1695 @property
1696 def _soft_closed(self) -> bool:
1697 return self._real_result._soft_closed
1698
1699 @property
1700 def closed(self) -> bool:
1701 """Return ``True`` if this :class:`_engine.Result` being
1702 proxied by this :class:`_engine.FilterResult` was
1703 **hard closed** by explicitly calling the :meth:`_engine.Result.close`
1704 method.
1705
1706 This is a direct proxy for the :attr:`_engine.Result.closed` attribute;
1707 see that attribute for details.
1708
1709 """
1710 return self._real_result.closed
1711
1712 def close(self) -> None:
1713 """Close this :class:`_engine.FilterResult`.
1714
1715 .. versionadded:: 1.4.43
1716
1717 """
1718 self._real_result.close()
1719
1720 @property
1721 def _attributes(self) -> Dict[Any, Any]:
1722 return self._real_result._attributes
1723
1724 def _fetchiter_impl(self) -> Iterator[_InterimRowType[Row[Any]]]:
1725 return self._real_result._fetchiter_impl()
1726
1727 def _fetchone_impl(
1728 self, hard_close: bool = False
1729 ) -> Optional[_InterimRowType[Row[Any]]]:
1730 return self._real_result._fetchone_impl(hard_close=hard_close)
1731
1732 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
1733 return self._real_result._fetchall_impl()
1734
1735 def _fetchmany_impl(
1736 self, size: Optional[int] = None
1737 ) -> List[_InterimRowType[Row[Any]]]:
1738 return self._real_result._fetchmany_impl(size=size)
1739
1740
1741class ScalarResult(FilterResult[_R]):
1742 """A wrapper for a :class:`_engine.Result` that returns scalar values
1743 rather than :class:`_row.Row` values.
1744
1745 The :class:`_engine.ScalarResult` object is acquired by calling the
1746 :meth:`_engine.Result.scalars` method.
1747
1748 A special limitation of :class:`_engine.ScalarResult` is that it has
1749 no ``fetchone()`` method; since the semantics of ``fetchone()`` are that
1750 the ``None`` value indicates no more results, this is not compatible
1751 with :class:`_engine.ScalarResult` since there is no way to distinguish
1752 between ``None`` as a row value versus ``None`` as an indicator. Use
1753 ``next(result)`` to receive values individually.
1754
1755 """
1756
1757 __slots__ = ()
1758
1759 _generate_rows = False
1760
1761 _post_creational_filter: Optional[Callable[[Any], Any]]
1762
1763 def __init__(self, real_result: Result[Any], index: _KeyIndexType):
1764 self._real_result = real_result
1765
1766 if real_result._source_supports_scalars:
1767 self._metadata = real_result._metadata
1768 self._post_creational_filter = None
1769 else:
1770 self._metadata = real_result._metadata._reduce([index])
1771 self._post_creational_filter = operator.itemgetter(0)
1772
1773 self._unique_filter_state = real_result._unique_filter_state
1774
1775 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
1776 """Apply unique filtering to the objects returned by this
1777 :class:`_engine.ScalarResult`.
1778
1779 See :meth:`_engine.Result.unique` for usage details.
1780
1781 """
1782 self._unique_filter_state = (set(), strategy)
1783 return self
1784
1785 def partitions(self, size: Optional[int] = None) -> Iterator[Sequence[_R]]:
1786 """Iterate through sub-lists of elements of the size given.
1787
1788 Equivalent to :meth:`_engine.Result.partitions` except that
1789 scalar values, rather than :class:`_engine.Row` objects,
1790 are returned.
1791
1792 """
1793
1794 getter = self._manyrow_getter
1795
1796 while True:
1797 partition = getter(self, size)
1798 if partition:
1799 yield partition
1800 else:
1801 break
1802
1803 def fetchall(self) -> Sequence[_R]:
1804 """A synonym for the :meth:`_engine.ScalarResult.all` method."""
1805
1806 return self._allrows()
1807
1808 def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]:
1809 """Fetch many objects.
1810
1811 Equivalent to :meth:`_engine.Result.fetchmany` except that
1812 scalar values, rather than :class:`_engine.Row` objects,
1813 are returned.
1814
1815 """
1816 return self._manyrow_getter(self, size)
1817
1818 def all(self) -> Sequence[_R]:
1819 """Return all scalar values in a sequence.
1820
1821 Equivalent to :meth:`_engine.Result.all` except that
1822 scalar values, rather than :class:`_engine.Row` objects,
1823 are returned.
1824
1825 """
1826 return self._allrows()
1827
1828 def __iter__(self) -> Iterator[_R]:
1829 return self._iter_impl()
1830
1831 def __next__(self) -> _R:
1832 return self._next_impl()
1833
1834 def first(self) -> Optional[_R]:
1835 """Fetch the first object or ``None`` if no object is present.
1836
1837 Equivalent to :meth:`_engine.Result.first` except that
1838 scalar values, rather than :class:`_engine.Row` objects,
1839 are returned.
1840
1841
1842 """
1843 return self._only_one_row(
1844 raise_for_second_row=False, raise_for_none=False, scalar=False
1845 )
1846
1847 def one_or_none(self) -> Optional[_R]:
1848 """Return at most one object or raise an exception.
1849
1850 Equivalent to :meth:`_engine.Result.one_or_none` except that
1851 scalar values, rather than :class:`_engine.Row` objects,
1852 are returned.
1853
1854 """
1855 return self._only_one_row(
1856 raise_for_second_row=True, raise_for_none=False, scalar=False
1857 )
1858
1859 def one(self) -> _R:
1860 """Return exactly one object or raise an exception.
1861
1862 Equivalent to :meth:`_engine.Result.one` except that
1863 scalar values, rather than :class:`_engine.Row` objects,
1864 are returned.
1865
1866 """
1867 return self._only_one_row(
1868 raise_for_second_row=True, raise_for_none=True, scalar=False
1869 )
1870
1871
1872class TupleResult(FilterResult[_R], util.TypingOnly):
1873 """A :class:`_engine.Result` that's typed as returning plain
1874 Python tuples instead of rows.
1875
1876 Since :class:`_engine.Row` acts like a tuple in every way already,
1877 this class is a typing only class, regular :class:`_engine.Result` is
1878 still used at runtime.
1879
1880 """
1881
1882 __slots__ = ()
1883
1884 if TYPE_CHECKING:
1885
1886 def partitions(
1887 self, size: Optional[int] = None
1888 ) -> Iterator[Sequence[_R]]:
1889 """Iterate through sub-lists of elements of the size given.
1890
1891 Equivalent to :meth:`_engine.Result.partitions` except that
1892 tuple values, rather than :class:`_engine.Row` objects,
1893 are returned.
1894
1895 """
1896 ...
1897
1898 def fetchone(self) -> Optional[_R]:
1899 """Fetch one tuple.
1900
1901 Equivalent to :meth:`_engine.Result.fetchone` except that
1902 tuple values, rather than :class:`_engine.Row`
1903 objects, are returned.
1904
1905 """
1906 ...
1907
1908 def fetchall(self) -> Sequence[_R]:
1909 """A synonym for the :meth:`_engine.ScalarResult.all` method."""
1910 ...
1911
1912 def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]:
1913 """Fetch many objects.
1914
1915 Equivalent to :meth:`_engine.Result.fetchmany` except that
1916 tuple values, rather than :class:`_engine.Row` objects,
1917 are returned.
1918
1919 """
1920 ...
1921
1922 def all(self) -> Sequence[_R]: # noqa: A001
1923 """Return all scalar values in a sequence.
1924
1925 Equivalent to :meth:`_engine.Result.all` except that
1926 tuple values, rather than :class:`_engine.Row` objects,
1927 are returned.
1928
1929 """
1930 ...
1931
1932 def __iter__(self) -> Iterator[_R]: ...
1933
1934 def __next__(self) -> _R: ...
1935
1936 def first(self) -> Optional[_R]:
1937 """Fetch the first object or ``None`` if no object is present.
1938
1939 Equivalent to :meth:`_engine.Result.first` except that
1940 tuple values, rather than :class:`_engine.Row` objects,
1941 are returned.
1942
1943
1944 """
1945 ...
1946
1947 def one_or_none(self) -> Optional[_R]:
1948 """Return at most one object or raise an exception.
1949
1950 Equivalent to :meth:`_engine.Result.one_or_none` except that
1951 tuple values, rather than :class:`_engine.Row` objects,
1952 are returned.
1953
1954 """
1955 ...
1956
1957 def one(self) -> _R:
1958 """Return exactly one object or raise an exception.
1959
1960 Equivalent to :meth:`_engine.Result.one` except that
1961 tuple values, rather than :class:`_engine.Row` objects,
1962 are returned.
1963
1964 """
1965 ...
1966
1967 @overload
1968 def scalar_one(self: TupleResult[Tuple[_T]]) -> _T: ...
1969
1970 @overload
1971 def scalar_one(self) -> Any: ...
1972
1973 def scalar_one(self) -> Any:
1974 """Return exactly one scalar result or raise an exception.
1975
1976 This is equivalent to calling :meth:`_engine.Result.scalars`
1977 and then :meth:`_engine.ScalarResult.one`.
1978
1979 .. seealso::
1980
1981 :meth:`_engine.ScalarResult.one`
1982
1983 :meth:`_engine.Result.scalars`
1984
1985 """
1986 ...
1987
1988 @overload
1989 def scalar_one_or_none(
1990 self: TupleResult[Tuple[_T]],
1991 ) -> Optional[_T]: ...
1992
1993 @overload
1994 def scalar_one_or_none(self) -> Optional[Any]: ...
1995
1996 def scalar_one_or_none(self) -> Optional[Any]:
1997 """Return exactly one or no scalar result.
1998
1999 This is equivalent to calling :meth:`_engine.Result.scalars`
2000 and then :meth:`_engine.ScalarResult.one_or_none`.
2001
2002 .. seealso::
2003
2004 :meth:`_engine.ScalarResult.one_or_none`
2005
2006 :meth:`_engine.Result.scalars`
2007
2008 """
2009 ...
2010
2011 @overload
2012 def scalar(self: TupleResult[Tuple[_T]]) -> Optional[_T]: ...
2013
2014 @overload
2015 def scalar(self) -> Any: ...
2016
2017 def scalar(self) -> Any:
2018 """Fetch the first column of the first row, and close the result
2019 set.
2020
2021 Returns ``None`` if there are no rows to fetch.
2022
2023 No validation is performed to test if additional rows remain.
2024
2025 After calling this method, the object is fully closed,
2026 e.g. the :meth:`_engine.CursorResult.close`
2027 method will have been called.
2028
2029 :return: a Python scalar value , or ``None`` if no rows remain.
2030
2031 """
2032 ...
2033
2034
2035class MappingResult(_WithKeys, FilterResult[RowMapping]):
2036 """A wrapper for a :class:`_engine.Result` that returns dictionary values
2037 rather than :class:`_engine.Row` values.
2038
2039 The :class:`_engine.MappingResult` object is acquired by calling the
2040 :meth:`_engine.Result.mappings` method.
2041
2042 """
2043
2044 __slots__ = ()
2045
2046 _generate_rows = True
2047
2048 _post_creational_filter = operator.attrgetter("_mapping")
2049
2050 def __init__(self, result: Result[Any]):
2051 self._real_result = result
2052 self._unique_filter_state = result._unique_filter_state
2053 self._metadata = result._metadata
2054 if result._source_supports_scalars:
2055 self._metadata = self._metadata._reduce([0])
2056
2057 def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self:
2058 """Apply unique filtering to the objects returned by this
2059 :class:`_engine.MappingResult`.
2060
2061 See :meth:`_engine.Result.unique` for usage details.
2062
2063 """
2064 self._unique_filter_state = (set(), strategy)
2065 return self
2066
2067 def columns(self, *col_expressions: _KeyIndexType) -> Self:
2068 """Establish the columns that should be returned in each row."""
2069 return self._column_slices(col_expressions)
2070
2071 def partitions(
2072 self, size: Optional[int] = None
2073 ) -> Iterator[Sequence[RowMapping]]:
2074 """Iterate through sub-lists of elements of the size given.
2075
2076 Equivalent to :meth:`_engine.Result.partitions` except that
2077 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2078 objects, are returned.
2079
2080 """
2081
2082 getter = self._manyrow_getter
2083
2084 while True:
2085 partition = getter(self, size)
2086 if partition:
2087 yield partition
2088 else:
2089 break
2090
2091 def fetchall(self) -> Sequence[RowMapping]:
2092 """A synonym for the :meth:`_engine.MappingResult.all` method."""
2093
2094 return self._allrows()
2095
2096 def fetchone(self) -> Optional[RowMapping]:
2097 """Fetch one object.
2098
2099 Equivalent to :meth:`_engine.Result.fetchone` except that
2100 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2101 objects, are returned.
2102
2103 """
2104
2105 row = self._onerow_getter(self)
2106 if row is _NO_ROW:
2107 return None
2108 else:
2109 return row
2110
2111 def fetchmany(self, size: Optional[int] = None) -> Sequence[RowMapping]:
2112 """Fetch many objects.
2113
2114 Equivalent to :meth:`_engine.Result.fetchmany` except that
2115 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2116 objects, are returned.
2117
2118 """
2119
2120 return self._manyrow_getter(self, size)
2121
2122 def all(self) -> Sequence[RowMapping]:
2123 """Return all scalar values in a sequence.
2124
2125 Equivalent to :meth:`_engine.Result.all` except that
2126 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2127 objects, are returned.
2128
2129 """
2130
2131 return self._allrows()
2132
2133 def __iter__(self) -> Iterator[RowMapping]:
2134 return self._iter_impl()
2135
2136 def __next__(self) -> RowMapping:
2137 return self._next_impl()
2138
2139 def first(self) -> Optional[RowMapping]:
2140 """Fetch the first object or ``None`` if no object is present.
2141
2142 Equivalent to :meth:`_engine.Result.first` except that
2143 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2144 objects, are returned.
2145
2146
2147 """
2148 return self._only_one_row(
2149 raise_for_second_row=False, raise_for_none=False, scalar=False
2150 )
2151
2152 def one_or_none(self) -> Optional[RowMapping]:
2153 """Return at most one object or raise an exception.
2154
2155 Equivalent to :meth:`_engine.Result.one_or_none` except that
2156 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2157 objects, are returned.
2158
2159 """
2160 return self._only_one_row(
2161 raise_for_second_row=True, raise_for_none=False, scalar=False
2162 )
2163
2164 def one(self) -> RowMapping:
2165 """Return exactly one object or raise an exception.
2166
2167 Equivalent to :meth:`_engine.Result.one` except that
2168 :class:`_engine.RowMapping` values, rather than :class:`_engine.Row`
2169 objects, are returned.
2170
2171 """
2172 return self._only_one_row(
2173 raise_for_second_row=True, raise_for_none=True, scalar=False
2174 )
2175
2176
2177class FrozenResult(Generic[_TP]):
2178 """Represents a :class:`_engine.Result` object in a "frozen" state suitable
2179 for caching.
2180
2181 The :class:`_engine.FrozenResult` object is returned from the
2182 :meth:`_engine.Result.freeze` method of any :class:`_engine.Result`
2183 object.
2184
2185 A new iterable :class:`_engine.Result` object is generated from a fixed
2186 set of data each time the :class:`_engine.FrozenResult` is invoked as
2187 a callable::
2188
2189
2190 result = connection.execute(query)
2191
2192 frozen = result.freeze()
2193
2194 unfrozen_result_one = frozen()
2195
2196 for row in unfrozen_result_one:
2197 print(row)
2198
2199 unfrozen_result_two = frozen()
2200 rows = unfrozen_result_two.all()
2201
2202 # ... etc
2203
2204 .. versionadded:: 1.4
2205
2206 .. seealso::
2207
2208 :ref:`do_orm_execute_re_executing` - example usage within the
2209 ORM to implement a result-set cache.
2210
2211 :func:`_orm.loading.merge_frozen_result` - ORM function to merge
2212 a frozen result back into a :class:`_orm.Session`.
2213
2214 """
2215
2216 data: Sequence[Any]
2217
2218 def __init__(self, result: Result[_TP]):
2219 self.metadata = result._metadata._for_freeze()
2220 self._source_supports_scalars = result._source_supports_scalars
2221 self._attributes = result._attributes
2222
2223 if self._source_supports_scalars:
2224 self.data = list(result._raw_row_iterator())
2225 else:
2226 self.data = result.fetchall()
2227
2228 def rewrite_rows(self) -> Sequence[Sequence[Any]]:
2229 if self._source_supports_scalars:
2230 return [[elem] for elem in self.data]
2231 else:
2232 return [list(row) for row in self.data]
2233
2234 def with_new_rows(
2235 self, tuple_data: Sequence[Row[_TP]]
2236 ) -> FrozenResult[_TP]:
2237 fr = FrozenResult.__new__(FrozenResult)
2238 fr.metadata = self.metadata
2239 fr._attributes = self._attributes
2240 fr._source_supports_scalars = self._source_supports_scalars
2241
2242 if self._source_supports_scalars:
2243 fr.data = [d[0] for d in tuple_data]
2244 else:
2245 fr.data = tuple_data
2246 return fr
2247
2248 def __call__(self) -> Result[_TP]:
2249 result: IteratorResult[_TP] = IteratorResult(
2250 self.metadata, iter(self.data)
2251 )
2252 result._attributes = self._attributes
2253 result._source_supports_scalars = self._source_supports_scalars
2254 return result
2255
2256
2257class IteratorResult(Result[_TP]):
2258 """A :class:`_engine.Result` that gets data from a Python iterator of
2259 :class:`_engine.Row` objects or similar row-like data.
2260
2261 .. versionadded:: 1.4
2262
2263 """
2264
2265 _hard_closed = False
2266 _soft_closed = False
2267
2268 def __init__(
2269 self,
2270 cursor_metadata: ResultMetaData,
2271 iterator: Iterator[_InterimSupportsScalarsRowType],
2272 raw: Optional[Result[Any]] = None,
2273 _source_supports_scalars: bool = False,
2274 ):
2275 self._metadata = cursor_metadata
2276 self.iterator = iterator
2277 self.raw = raw
2278 self._source_supports_scalars = _source_supports_scalars
2279
2280 @property
2281 def closed(self) -> bool:
2282 """Return ``True`` if this :class:`_engine.IteratorResult` was
2283 **hard closed** by explicitly calling the :meth:`_engine.Result.close`
2284 method.
2285
2286 The attribute is **not** True if the :class:`_engine.Result` was only
2287 **soft closed**; a "soft close" is the style of close that takes place
2288 for example when the :class:`.CursorResult` is returned for a DML
2289 only statement without RETURNING, or when all result rows are fetched.
2290
2291 .. seealso::
2292
2293 :attr:`.CursorResult.returns_rows` - attribute specific to
2294 :class:`.CursorResult` which indicates if the result is one that
2295 may return zero or more rows
2296
2297 """
2298 return self._hard_closed
2299
2300 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2301 if hard:
2302 self._hard_closed = True
2303 if self.raw is not None:
2304 self.raw._soft_close(hard=hard, **kw)
2305 self.iterator = iter([])
2306 self._reset_memoizations()
2307 self._soft_closed = True
2308
2309 def _raise_hard_closed(self) -> NoReturn:
2310 raise exc.ResourceClosedError("This result object is closed.")
2311
2312 def _raw_row_iterator(self) -> Iterator[_RowData]:
2313 return self.iterator
2314
2315 def _fetchiter_impl(self) -> Iterator[_InterimSupportsScalarsRowType]:
2316 if self._hard_closed:
2317 self._raise_hard_closed()
2318 return self.iterator
2319
2320 def _fetchone_impl(
2321 self, hard_close: bool = False
2322 ) -> Optional[_InterimRowType[Row[Any]]]:
2323 if self._hard_closed:
2324 self._raise_hard_closed()
2325
2326 row = next(self.iterator, _NO_ROW)
2327 if row is _NO_ROW:
2328 self._soft_close(hard=hard_close)
2329 return None
2330 else:
2331 return row
2332
2333 def _fetchall_impl(self) -> List[_InterimRowType[Row[Any]]]:
2334 if self._hard_closed:
2335 self._raise_hard_closed()
2336 try:
2337 return list(self.iterator)
2338 finally:
2339 self._soft_close()
2340
2341 def _fetchmany_impl(
2342 self, size: Optional[int] = None
2343 ) -> List[_InterimRowType[Row[Any]]]:
2344 if self._hard_closed:
2345 self._raise_hard_closed()
2346
2347 return list(itertools.islice(self.iterator, 0, size))
2348
2349
2350def null_result() -> IteratorResult[Any]:
2351 return IteratorResult(SimpleResultMetaData([]), iter([]))
2352
2353
2354class ChunkedIteratorResult(IteratorResult[_TP]):
2355 """An :class:`_engine.IteratorResult` that works from an
2356 iterator-producing callable.
2357
2358 The given ``chunks`` argument is a function that is given a number of rows
2359 to return in each chunk, or ``None`` for all rows. The function should
2360 then return an un-consumed iterator of lists, each list of the requested
2361 size.
2362
2363 The function can be called at any time again, in which case it should
2364 continue from the same result set but adjust the chunk size as given.
2365
2366 .. versionadded:: 1.4
2367
2368 """
2369
2370 def __init__(
2371 self,
2372 cursor_metadata: ResultMetaData,
2373 chunks: Callable[
2374 [Optional[int]], Iterator[Sequence[_InterimRowType[_R]]]
2375 ],
2376 source_supports_scalars: bool = False,
2377 raw: Optional[Result[Any]] = None,
2378 dynamic_yield_per: bool = False,
2379 ):
2380 self._metadata = cursor_metadata
2381 self.chunks = chunks
2382 self._source_supports_scalars = source_supports_scalars
2383 self.raw = raw
2384 self.iterator = itertools.chain.from_iterable(self.chunks(None))
2385 self.dynamic_yield_per = dynamic_yield_per
2386
2387 @_generative
2388 def yield_per(self, num: int) -> Self:
2389 # TODO: this throws away the iterator which may be holding
2390 # onto a chunk. the yield_per cannot be changed once any
2391 # rows have been fetched. either find a way to enforce this,
2392 # or we can't use itertools.chain and will instead have to
2393 # keep track.
2394
2395 self._yield_per = num
2396 self.iterator = itertools.chain.from_iterable(self.chunks(num))
2397 return self
2398
2399 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2400 super()._soft_close(hard=hard, **kw)
2401 self.chunks = lambda size: [] # type: ignore
2402
2403 def _fetchmany_impl(
2404 self, size: Optional[int] = None
2405 ) -> List[_InterimRowType[Row[Any]]]:
2406 if self.dynamic_yield_per:
2407 self.iterator = itertools.chain.from_iterable(self.chunks(size))
2408 return super()._fetchmany_impl(size=size)
2409
2410
2411class MergedResult(IteratorResult[_TP]):
2412 """A :class:`_engine.Result` that is merged from any number of
2413 :class:`_engine.Result` objects.
2414
2415 Returned by the :meth:`_engine.Result.merge` method.
2416
2417 .. versionadded:: 1.4
2418
2419 """
2420
2421 closed = False
2422 rowcount: Optional[int]
2423
2424 def __init__(
2425 self, cursor_metadata: ResultMetaData, results: Sequence[Result[_TP]]
2426 ):
2427 self._results = results
2428 super().__init__(
2429 cursor_metadata,
2430 itertools.chain.from_iterable(
2431 r._raw_row_iterator() for r in results
2432 ),
2433 )
2434
2435 self._unique_filter_state = results[0]._unique_filter_state
2436 self._yield_per = results[0]._yield_per
2437
2438 # going to try something w/ this in next rev
2439 self._source_supports_scalars = results[0]._source_supports_scalars
2440
2441 self._attributes = self._attributes.merge_with(
2442 *[r._attributes for r in results]
2443 )
2444
2445 def _soft_close(self, hard: bool = False, **kw: Any) -> None:
2446 for r in self._results:
2447 r._soft_close(hard=hard, **kw)
2448 if hard:
2449 self.closed = True