1# engine/cursor.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Define cursor-specific result set constructs including
10:class:`.CursorResult`."""
11
12
13from __future__ import annotations
14
15import collections
16import functools
17import operator
18import typing
19from typing import Any
20from typing import cast
21from typing import ClassVar
22from typing import Dict
23from typing import Iterator
24from typing import List
25from typing import Mapping
26from typing import NoReturn
27from typing import Optional
28from typing import Sequence
29from typing import Tuple
30from typing import TYPE_CHECKING
31from typing import Union
32
33from .result import IteratorResult
34from .result import MergedResult
35from .result import Result
36from .result import ResultMetaData
37from .result import SimpleResultMetaData
38from .result import tuplegetter
39from .row import Row
40from .. import exc
41from .. import util
42from ..sql import elements
43from ..sql import sqltypes
44from ..sql import util as sql_util
45from ..sql.base import _generative
46from ..sql.compiler import ResultColumnsEntry
47from ..sql.compiler import RM_NAME
48from ..sql.compiler import RM_OBJECTS
49from ..sql.compiler import RM_RENDERED_NAME
50from ..sql.compiler import RM_TYPE
51from ..sql.type_api import TypeEngine
52from ..util import compat
53from ..util.typing import Literal
54from ..util.typing import Self
55from ..util.typing import TupleAny
56from ..util.typing import TypeVarTuple
57from ..util.typing import Unpack
58
59
60if typing.TYPE_CHECKING:
61 from .base import Connection
62 from .default import DefaultExecutionContext
63 from .interfaces import _DBAPICursorDescription
64 from .interfaces import DBAPICursor
65 from .interfaces import Dialect
66 from .interfaces import ExecutionContext
67 from .result import _KeyIndexType
68 from .result import _KeyMapRecType
69 from .result import _KeyMapType
70 from .result import _KeyType
71 from .result import _ProcessorsType
72 from .result import _TupleGetterType
73 from ..sql.type_api import _ResultProcessorType
74
75
76_Ts = TypeVarTuple("_Ts")
77
78
79# metadata entry tuple indexes.
80# using raw tuple is faster than namedtuple.
81# these match up to the positions in
82# _CursorKeyMapRecType
83MD_INDEX: Literal[0] = 0
84"""integer index in cursor.description
85
86"""
87
88MD_RESULT_MAP_INDEX: Literal[1] = 1
89"""integer index in compiled._result_columns"""
90
91MD_OBJECTS: Literal[2] = 2
92"""other string keys and ColumnElement obj that can match.
93
94This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
95
96"""
97
98MD_LOOKUP_KEY: Literal[3] = 3
99"""string key we usually expect for key-based lookup
100
101this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
102"""
103
104
105MD_RENDERED_NAME: Literal[4] = 4
106"""name that is usually in cursor.description
107
108this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
109"""
110
111
112MD_PROCESSOR: Literal[5] = 5
113"""callable to process a result value into a row"""
114
115MD_UNTRANSLATED: Literal[6] = 6
116"""raw name from cursor.description"""
117
118
119_CursorKeyMapRecType = Tuple[
120 Optional[int], # MD_INDEX, None means the record is ambiguously named
121 int, # MD_RESULT_MAP_INDEX
122 List[Any], # MD_OBJECTS
123 str, # MD_LOOKUP_KEY
124 str, # MD_RENDERED_NAME
125 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
126 Optional[str], # MD_UNTRANSLATED
127]
128
129_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
130
131# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
132# not None
133_NonAmbigCursorKeyMapRecType = Tuple[
134 int,
135 int,
136 List[Any],
137 str,
138 str,
139 Optional["_ResultProcessorType[Any]"],
140 str,
141]
142
143
144class CursorResultMetaData(ResultMetaData):
145 """Result metadata for DBAPI cursors."""
146
147 __slots__ = (
148 "_keymap",
149 "_processors",
150 "_keys",
151 "_keymap_by_result_column_idx",
152 "_tuplefilter",
153 "_translated_indexes",
154 "_safe_for_cache",
155 "_unpickled",
156 "_key_to_index",
157 # don't need _unique_filters support here for now. Can be added
158 # if a need arises.
159 )
160
161 _keymap: _CursorKeyMapType
162 _processors: _ProcessorsType
163 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
164 _unpickled: bool
165 _safe_for_cache: bool
166 _translated_indexes: Optional[List[int]]
167
168 returns_rows: ClassVar[bool] = True
169
170 def _has_key(self, key: Any) -> bool:
171 return key in self._keymap
172
173 def _for_freeze(self) -> ResultMetaData:
174 return SimpleResultMetaData(
175 self._keys,
176 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
177 )
178
179 def _make_new_metadata(
180 self,
181 *,
182 unpickled: bool,
183 processors: _ProcessorsType,
184 keys: Sequence[str],
185 keymap: _KeyMapType,
186 tuplefilter: Optional[_TupleGetterType],
187 translated_indexes: Optional[List[int]],
188 safe_for_cache: bool,
189 keymap_by_result_column_idx: Any,
190 ) -> CursorResultMetaData:
191 new_obj = self.__class__.__new__(self.__class__)
192 new_obj._unpickled = unpickled
193 new_obj._processors = processors
194 new_obj._keys = keys
195 new_obj._keymap = keymap
196 new_obj._tuplefilter = tuplefilter
197 new_obj._translated_indexes = translated_indexes
198 new_obj._safe_for_cache = safe_for_cache
199 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
200 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
201 return new_obj
202
203 def _remove_processors(self) -> CursorResultMetaData:
204 assert not self._tuplefilter
205 return self._make_new_metadata(
206 unpickled=self._unpickled,
207 processors=[None] * len(self._processors),
208 tuplefilter=None,
209 translated_indexes=None,
210 keymap={
211 key: value[0:5] + (None,) + value[6:]
212 for key, value in self._keymap.items()
213 },
214 keys=self._keys,
215 safe_for_cache=self._safe_for_cache,
216 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
217 )
218
219 def _splice_horizontally(
220 self, other: CursorResultMetaData
221 ) -> CursorResultMetaData:
222 assert not self._tuplefilter
223
224 keymap = dict(self._keymap)
225 offset = len(self._keys)
226 keymap.update(
227 {
228 key: (
229 # int index should be None for ambiguous key
230 (
231 value[0] + offset
232 if value[0] is not None and key not in keymap
233 else None
234 ),
235 value[1] + offset,
236 *value[2:],
237 )
238 for key, value in other._keymap.items()
239 }
240 )
241 return self._make_new_metadata(
242 unpickled=self._unpickled,
243 processors=self._processors + other._processors, # type: ignore
244 tuplefilter=None,
245 translated_indexes=None,
246 keys=self._keys + other._keys, # type: ignore
247 keymap=keymap,
248 safe_for_cache=self._safe_for_cache,
249 keymap_by_result_column_idx={
250 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
251 for metadata_entry in keymap.values()
252 },
253 )
254
255 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
256 recs = list(self._metadata_for_keys(keys))
257
258 indexes = [rec[MD_INDEX] for rec in recs]
259 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
260
261 if self._translated_indexes:
262 indexes = [self._translated_indexes[idx] for idx in indexes]
263 tup = tuplegetter(*indexes)
264 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
265
266 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
267 # TODO: need unit test for:
268 # result = connection.execute("raw sql, no columns").scalars()
269 # without the "or ()" it's failing because MD_OBJECTS is None
270 keymap.update(
271 (e, new_rec)
272 for new_rec in new_recs
273 for e in new_rec[MD_OBJECTS] or ()
274 )
275
276 return self._make_new_metadata(
277 unpickled=self._unpickled,
278 processors=self._processors,
279 keys=new_keys,
280 tuplefilter=tup,
281 translated_indexes=indexes,
282 keymap=keymap, # type: ignore[arg-type]
283 safe_for_cache=self._safe_for_cache,
284 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
285 )
286
287 def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData:
288 """When using a cached Compiled construct that has a _result_map,
289 for a new statement that used the cached Compiled, we need to ensure
290 the keymap has the Column objects from our new statement as keys.
291 So here we rewrite keymap with new entries for the new columns
292 as matched to those of the cached statement.
293
294 """
295
296 if not context.compiled or not context.compiled._result_columns:
297 return self
298
299 compiled_statement = context.compiled.statement
300 invoked_statement = context.invoked_statement
301
302 if TYPE_CHECKING:
303 assert isinstance(invoked_statement, elements.ClauseElement)
304
305 if compiled_statement is invoked_statement:
306 return self
307
308 assert invoked_statement is not None
309
310 # this is the most common path for Core statements when
311 # caching is used. In ORM use, this codepath is not really used
312 # as the _result_disable_adapt_to_context execution option is
313 # set by the ORM.
314
315 # make a copy and add the columns from the invoked statement
316 # to the result map.
317
318 keymap_by_position = self._keymap_by_result_column_idx
319
320 if keymap_by_position is None:
321 # first retrival from cache, this map will not be set up yet,
322 # initialize lazily
323 keymap_by_position = self._keymap_by_result_column_idx = {
324 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
325 for metadata_entry in self._keymap.values()
326 }
327
328 assert not self._tuplefilter
329 return self._make_new_metadata(
330 keymap=compat.dict_union(
331 self._keymap,
332 {
333 new: keymap_by_position[idx]
334 for idx, new in enumerate(
335 invoked_statement._all_selected_columns
336 )
337 if idx in keymap_by_position
338 },
339 ),
340 unpickled=self._unpickled,
341 processors=self._processors,
342 tuplefilter=None,
343 translated_indexes=None,
344 keys=self._keys,
345 safe_for_cache=self._safe_for_cache,
346 keymap_by_result_column_idx=self._keymap_by_result_column_idx,
347 )
348
349 def __init__(
350 self,
351 parent: CursorResult[Unpack[TupleAny]],
352 cursor_description: _DBAPICursorDescription,
353 ):
354 context = parent.context
355 self._tuplefilter = None
356 self._translated_indexes = None
357 self._safe_for_cache = self._unpickled = False
358
359 if context.result_column_struct:
360 (
361 result_columns,
362 cols_are_ordered,
363 textual_ordered,
364 ad_hoc_textual,
365 loose_column_name_matching,
366 ) = context.result_column_struct
367 num_ctx_cols = len(result_columns)
368 else:
369 result_columns = cols_are_ordered = ( # type: ignore
370 num_ctx_cols
371 ) = ad_hoc_textual = loose_column_name_matching = (
372 textual_ordered
373 ) = False
374
375 # merge cursor.description with the column info
376 # present in the compiled structure, if any
377 raw = self._merge_cursor_description(
378 context,
379 cursor_description,
380 result_columns,
381 num_ctx_cols,
382 cols_are_ordered,
383 textual_ordered,
384 ad_hoc_textual,
385 loose_column_name_matching,
386 )
387
388 # processors in key order which are used when building up
389 # a row
390 self._processors = [
391 metadata_entry[MD_PROCESSOR] for metadata_entry in raw
392 ]
393
394 # this is used when using this ResultMetaData in a Core-only cache
395 # retrieval context. it's initialized on first cache retrieval
396 # when the _result_disable_adapt_to_context execution option
397 # (which the ORM generally sets) is not set.
398 self._keymap_by_result_column_idx = None
399
400 # for compiled SQL constructs, copy additional lookup keys into
401 # the key lookup map, such as Column objects, labels,
402 # column keys and other names
403 if num_ctx_cols:
404 # keymap by primary string...
405 by_key = {
406 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
407 for metadata_entry in raw
408 }
409
410 if len(by_key) != num_ctx_cols:
411 # if by-primary-string dictionary smaller than
412 # number of columns, assume we have dupes; (this check
413 # is also in place if string dictionary is bigger, as
414 # can occur when '*' was used as one of the compiled columns,
415 # which may or may not be suggestive of dupes), rewrite
416 # dupe records with "None" for index which results in
417 # ambiguous column exception when accessed.
418 #
419 # this is considered to be the less common case as it is not
420 # common to have dupe column keys in a SELECT statement.
421 #
422 # new in 1.4: get the complete set of all possible keys,
423 # strings, objects, whatever, that are dupes across two
424 # different records, first.
425 index_by_key: Dict[Any, Any] = {}
426 dupes = set()
427 for metadata_entry in raw:
428 for key in (metadata_entry[MD_RENDERED_NAME],) + (
429 metadata_entry[MD_OBJECTS] or ()
430 ):
431 idx = metadata_entry[MD_INDEX]
432 # if this key has been associated with more than one
433 # positional index, it's a dupe
434 if index_by_key.setdefault(key, idx) != idx:
435 dupes.add(key)
436
437 # then put everything we have into the keymap excluding only
438 # those keys that are dupes.
439 self._keymap = {
440 obj_elem: metadata_entry
441 for metadata_entry in raw
442 if metadata_entry[MD_OBJECTS]
443 for obj_elem in metadata_entry[MD_OBJECTS]
444 if obj_elem not in dupes
445 }
446
447 # then for the dupe keys, put the "ambiguous column"
448 # record into by_key.
449 by_key.update(
450 {
451 key: (None, None, [], key, key, None, None)
452 for key in dupes
453 }
454 )
455
456 else:
457 # no dupes - copy secondary elements from compiled
458 # columns into self._keymap. this is the most common
459 # codepath for Core / ORM statement executions before the
460 # result metadata is cached
461 self._keymap = {
462 obj_elem: metadata_entry
463 for metadata_entry in raw
464 if metadata_entry[MD_OBJECTS]
465 for obj_elem in metadata_entry[MD_OBJECTS]
466 }
467 # update keymap with primary string names taking
468 # precedence
469 self._keymap.update(by_key)
470 else:
471 # no compiled objects to map, just create keymap by primary string
472 self._keymap = {
473 metadata_entry[MD_LOOKUP_KEY]: metadata_entry
474 for metadata_entry in raw
475 }
476
477 # update keymap with "translated" names. In SQLAlchemy this is a
478 # sqlite only thing, and in fact impacting only extremely old SQLite
479 # versions unlikely to be present in modern Python versions.
480 # however, the pyhive third party dialect is
481 # also using this hook, which means others still might use it as well.
482 # I dislike having this awkward hook here but as long as we need
483 # to use names in cursor.description in some cases we need to have
484 # some hook to accomplish this.
485 if not num_ctx_cols and context._translate_colname:
486 self._keymap.update(
487 {
488 metadata_entry[MD_UNTRANSLATED]: self._keymap[
489 metadata_entry[MD_LOOKUP_KEY]
490 ]
491 for metadata_entry in raw
492 if metadata_entry[MD_UNTRANSLATED]
493 }
494 )
495
496 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
497
498 def _merge_cursor_description(
499 self,
500 context,
501 cursor_description,
502 result_columns,
503 num_ctx_cols,
504 cols_are_ordered,
505 textual_ordered,
506 ad_hoc_textual,
507 loose_column_name_matching,
508 ):
509 """Merge a cursor.description with compiled result column information.
510
511 There are at least four separate strategies used here, selected
512 depending on the type of SQL construct used to start with.
513
514 The most common case is that of the compiled SQL expression construct,
515 which generated the column names present in the raw SQL string and
516 which has the identical number of columns as were reported by
517 cursor.description. In this case, we assume a 1-1 positional mapping
518 between the entries in cursor.description and the compiled object.
519 This is also the most performant case as we disregard extracting /
520 decoding the column names present in cursor.description since we
521 already have the desired name we generated in the compiled SQL
522 construct.
523
524 The next common case is that of the completely raw string SQL,
525 such as passed to connection.execute(). In this case we have no
526 compiled construct to work with, so we extract and decode the
527 names from cursor.description and index those as the primary
528 result row target keys.
529
530 The remaining fairly common case is that of the textual SQL
531 that includes at least partial column information; this is when
532 we use a :class:`_expression.TextualSelect` construct.
533 This construct may have
534 unordered or ordered column information. In the ordered case, we
535 merge the cursor.description and the compiled construct's information
536 positionally, and warn if there are additional description names
537 present, however we still decode the names in cursor.description
538 as we don't have a guarantee that the names in the columns match
539 on these. In the unordered case, we match names in cursor.description
540 to that of the compiled construct based on name matching.
541 In both of these cases, the cursor.description names and the column
542 expression objects and names are indexed as result row target keys.
543
544 The final case is much less common, where we have a compiled
545 non-textual SQL expression construct, but the number of columns
546 in cursor.description doesn't match what's in the compiled
547 construct. We make the guess here that there might be textual
548 column expressions in the compiled construct that themselves include
549 a comma in them causing them to split. We do the same name-matching
550 as with textual non-ordered columns.
551
552 The name-matched system of merging is the same as that used by
553 SQLAlchemy for all cases up through the 0.9 series. Positional
554 matching for compiled SQL expressions was introduced in 1.0 as a
555 major performance feature, and positional matching for textual
556 :class:`_expression.TextualSelect` objects in 1.1.
557 As name matching is no longer
558 a common case, it was acceptable to factor it into smaller generator-
559 oriented methods that are easier to understand, but incur slightly
560 more performance overhead.
561
562 """
563
564 if (
565 num_ctx_cols
566 and cols_are_ordered
567 and not textual_ordered
568 and num_ctx_cols == len(cursor_description)
569 ):
570 self._keys = [elem[0] for elem in result_columns]
571 # pure positional 1-1 case; doesn't need to read
572 # the names from cursor.description
573
574 # most common case for Core and ORM
575
576 # this metadata is safe to cache because we are guaranteed
577 # to have the columns in the same order for new executions
578 self._safe_for_cache = True
579 return [
580 (
581 idx,
582 idx,
583 rmap_entry[RM_OBJECTS],
584 rmap_entry[RM_NAME],
585 rmap_entry[RM_RENDERED_NAME],
586 context.get_result_processor(
587 rmap_entry[RM_TYPE],
588 rmap_entry[RM_RENDERED_NAME],
589 cursor_description[idx][1],
590 ),
591 None,
592 )
593 for idx, rmap_entry in enumerate(result_columns)
594 ]
595 else:
596 # name-based or text-positional cases, where we need
597 # to read cursor.description names
598
599 if textual_ordered or (
600 ad_hoc_textual and len(cursor_description) == num_ctx_cols
601 ):
602 self._safe_for_cache = True
603 # textual positional case
604 raw_iterator = self._merge_textual_cols_by_position(
605 context, cursor_description, result_columns
606 )
607 elif num_ctx_cols:
608 # compiled SQL with a mismatch of description cols
609 # vs. compiled cols, or textual w/ unordered columns
610 # the order of columns can change if the query is
611 # against a "select *", so not safe to cache
612 self._safe_for_cache = False
613 raw_iterator = self._merge_cols_by_name(
614 context,
615 cursor_description,
616 result_columns,
617 loose_column_name_matching,
618 )
619 else:
620 # no compiled SQL, just a raw string, order of columns
621 # can change for "select *"
622 self._safe_for_cache = False
623 raw_iterator = self._merge_cols_by_none(
624 context, cursor_description
625 )
626
627 return [
628 (
629 idx,
630 ridx,
631 obj,
632 cursor_colname,
633 cursor_colname,
634 context.get_result_processor(
635 mapped_type, cursor_colname, coltype
636 ),
637 untranslated,
638 )
639 for (
640 idx,
641 ridx,
642 cursor_colname,
643 mapped_type,
644 coltype,
645 obj,
646 untranslated,
647 ) in raw_iterator
648 ]
649
650 def _colnames_from_description(self, context, cursor_description):
651 """Extract column names and data types from a cursor.description.
652
653 Applies unicode decoding, column translation, "normalization",
654 and case sensitivity rules to the names based on the dialect.
655
656 """
657
658 dialect = context.dialect
659 translate_colname = context._translate_colname
660 normalize_name = (
661 dialect.normalize_name if dialect.requires_name_normalize else None
662 )
663 untranslated = None
664
665 self._keys = []
666
667 for idx, rec in enumerate(cursor_description):
668 colname = rec[0]
669 coltype = rec[1]
670
671 if translate_colname:
672 colname, untranslated = translate_colname(colname)
673
674 if normalize_name:
675 colname = normalize_name(colname)
676
677 self._keys.append(colname)
678
679 yield idx, colname, untranslated, coltype
680
681 def _merge_textual_cols_by_position(
682 self, context, cursor_description, result_columns
683 ):
684 num_ctx_cols = len(result_columns)
685
686 if num_ctx_cols > len(cursor_description):
687 util.warn(
688 "Number of columns in textual SQL (%d) is "
689 "smaller than number of columns requested (%d)"
690 % (num_ctx_cols, len(cursor_description))
691 )
692 seen = set()
693
694 for (
695 idx,
696 colname,
697 untranslated,
698 coltype,
699 ) in self._colnames_from_description(context, cursor_description):
700 if idx < num_ctx_cols:
701 ctx_rec = result_columns[idx]
702 obj = ctx_rec[RM_OBJECTS]
703 ridx = idx
704 mapped_type = ctx_rec[RM_TYPE]
705 if obj[0] in seen:
706 raise exc.InvalidRequestError(
707 "Duplicate column expression requested "
708 "in textual SQL: %r" % obj[0]
709 )
710 seen.add(obj[0])
711 else:
712 mapped_type = sqltypes.NULLTYPE
713 obj = None
714 ridx = None
715 yield idx, ridx, colname, mapped_type, coltype, obj, untranslated
716
717 def _merge_cols_by_name(
718 self,
719 context,
720 cursor_description,
721 result_columns,
722 loose_column_name_matching,
723 ):
724 match_map = self._create_description_match_map(
725 result_columns, loose_column_name_matching
726 )
727 mapped_type: TypeEngine[Any]
728
729 for (
730 idx,
731 colname,
732 untranslated,
733 coltype,
734 ) in self._colnames_from_description(context, cursor_description):
735 try:
736 ctx_rec = match_map[colname]
737 except KeyError:
738 mapped_type = sqltypes.NULLTYPE
739 obj = None
740 result_columns_idx = None
741 else:
742 obj = ctx_rec[1]
743 mapped_type = ctx_rec[2]
744 result_columns_idx = ctx_rec[3]
745 yield (
746 idx,
747 result_columns_idx,
748 colname,
749 mapped_type,
750 coltype,
751 obj,
752 untranslated,
753 )
754
755 @classmethod
756 def _create_description_match_map(
757 cls,
758 result_columns: List[ResultColumnsEntry],
759 loose_column_name_matching: bool = False,
760 ) -> Dict[
761 Union[str, object], Tuple[str, Tuple[Any, ...], TypeEngine[Any], int]
762 ]:
763 """when matching cursor.description to a set of names that are present
764 in a Compiled object, as is the case with TextualSelect, get all the
765 names we expect might match those in cursor.description.
766 """
767
768 d: Dict[
769 Union[str, object],
770 Tuple[str, Tuple[Any, ...], TypeEngine[Any], int],
771 ] = {}
772 for ridx, elem in enumerate(result_columns):
773 key = elem[RM_RENDERED_NAME]
774 if key in d:
775 # conflicting keyname - just add the column-linked objects
776 # to the existing record. if there is a duplicate column
777 # name in the cursor description, this will allow all of those
778 # objects to raise an ambiguous column error
779 e_name, e_obj, e_type, e_ridx = d[key]
780 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
781 else:
782 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
783
784 if loose_column_name_matching:
785 # when using a textual statement with an unordered set
786 # of columns that line up, we are expecting the user
787 # to be using label names in the SQL that match to the column
788 # expressions. Enable more liberal matching for this case;
789 # duplicate keys that are ambiguous will be fixed later.
790 for r_key in elem[RM_OBJECTS]:
791 d.setdefault(
792 r_key,
793 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
794 )
795 return d
796
797 def _merge_cols_by_none(self, context, cursor_description):
798 for (
799 idx,
800 colname,
801 untranslated,
802 coltype,
803 ) in self._colnames_from_description(context, cursor_description):
804 yield (
805 idx,
806 None,
807 colname,
808 sqltypes.NULLTYPE,
809 coltype,
810 None,
811 untranslated,
812 )
813
814 if not TYPE_CHECKING:
815
816 def _key_fallback(
817 self, key: Any, err: Optional[Exception], raiseerr: bool = True
818 ) -> Optional[NoReturn]:
819 if raiseerr:
820 if self._unpickled and isinstance(key, elements.ColumnElement):
821 raise exc.NoSuchColumnError(
822 "Row was unpickled; lookup by ColumnElement "
823 "is unsupported"
824 ) from err
825 else:
826 raise exc.NoSuchColumnError(
827 "Could not locate column in row for column '%s'"
828 % util.string_or_unprintable(key)
829 ) from err
830 else:
831 return None
832
833 def _raise_for_ambiguous_column_name(self, rec):
834 raise exc.InvalidRequestError(
835 "Ambiguous column name '%s' in "
836 "result set column descriptions" % rec[MD_LOOKUP_KEY]
837 )
838
839 def _index_for_key(self, key: Any, raiseerr: bool = True) -> Optional[int]:
840 # TODO: can consider pre-loading ints and negative ints
841 # into _keymap - also no coverage here
842 if isinstance(key, int):
843 key = self._keys[key]
844
845 try:
846 rec = self._keymap[key]
847 except KeyError as ke:
848 x = self._key_fallback(key, ke, raiseerr)
849 assert x is None
850 return None
851
852 index = rec[0]
853
854 if index is None:
855 self._raise_for_ambiguous_column_name(rec)
856 return index
857
858 def _indexes_for_keys(self, keys):
859 try:
860 return [self._keymap[key][0] for key in keys]
861 except KeyError as ke:
862 # ensure it raises
863 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
864
865 def _metadata_for_keys(
866 self, keys: Sequence[Any]
867 ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
868 for key in keys:
869 if int in key.__class__.__mro__:
870 key = self._keys[key]
871
872 try:
873 rec = self._keymap[key]
874 except KeyError as ke:
875 # ensure it raises
876 CursorResultMetaData._key_fallback(self, ke.args[0], ke)
877
878 index = rec[MD_INDEX]
879
880 if index is None:
881 self._raise_for_ambiguous_column_name(rec)
882
883 yield cast(_NonAmbigCursorKeyMapRecType, rec)
884
885 def __getstate__(self):
886 # TODO: consider serializing this as SimpleResultMetaData
887 return {
888 "_keymap": {
889 key: (
890 rec[MD_INDEX],
891 rec[MD_RESULT_MAP_INDEX],
892 [],
893 key,
894 rec[MD_RENDERED_NAME],
895 None,
896 None,
897 )
898 for key, rec in self._keymap.items()
899 if isinstance(key, (str, int))
900 },
901 "_keys": self._keys,
902 "_translated_indexes": self._translated_indexes,
903 }
904
905 def __setstate__(self, state):
906 self._processors = [None for _ in range(len(state["_keys"]))]
907 self._keymap = state["_keymap"]
908 self._keymap_by_result_column_idx = None
909 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
910 self._keys = state["_keys"]
911 self._unpickled = True
912 if state["_translated_indexes"]:
913 self._translated_indexes = cast(
914 "List[int]", state["_translated_indexes"]
915 )
916 self._tuplefilter = tuplegetter(*self._translated_indexes)
917 else:
918 self._translated_indexes = self._tuplefilter = None
919
920
921class ResultFetchStrategy:
922 """Define a fetching strategy for a result object.
923
924
925 .. versionadded:: 1.4
926
927 """
928
929 __slots__ = ()
930
931 alternate_cursor_description: Optional[_DBAPICursorDescription] = None
932
933 def soft_close(
934 self,
935 result: CursorResult[Unpack[TupleAny]],
936 dbapi_cursor: Optional[DBAPICursor],
937 ) -> None:
938 raise NotImplementedError()
939
940 def hard_close(
941 self,
942 result: CursorResult[Unpack[TupleAny]],
943 dbapi_cursor: Optional[DBAPICursor],
944 ) -> None:
945 raise NotImplementedError()
946
947 def yield_per(
948 self,
949 result: CursorResult[Unpack[TupleAny]],
950 dbapi_cursor: Optional[DBAPICursor],
951 num: int,
952 ) -> None:
953 return
954
955 def fetchone(
956 self,
957 result: CursorResult[Unpack[TupleAny]],
958 dbapi_cursor: DBAPICursor,
959 hard_close: bool = False,
960 ) -> Any:
961 raise NotImplementedError()
962
963 def fetchmany(
964 self,
965 result: CursorResult[Unpack[TupleAny]],
966 dbapi_cursor: DBAPICursor,
967 size: Optional[int] = None,
968 ) -> Any:
969 raise NotImplementedError()
970
971 def fetchall(
972 self,
973 result: CursorResult[Unpack[TupleAny]],
974 dbapi_cursor: DBAPICursor,
975 ) -> Any:
976 raise NotImplementedError()
977
978 def handle_exception(
979 self,
980 result: CursorResult[Unpack[TupleAny]],
981 dbapi_cursor: Optional[DBAPICursor],
982 err: BaseException,
983 ) -> NoReturn:
984 raise err
985
986
987class NoCursorFetchStrategy(ResultFetchStrategy):
988 """Cursor strategy for a result that has no open cursor.
989
990 There are two varieties of this strategy, one for DQL and one for
991 DML (and also DDL), each of which represent a result that had a cursor
992 but no longer has one.
993
994 """
995
996 __slots__ = ()
997
998 def soft_close(self, result, dbapi_cursor):
999 pass
1000
1001 def hard_close(self, result, dbapi_cursor):
1002 pass
1003
1004 def fetchone(self, result, dbapi_cursor, hard_close=False):
1005 return self._non_result(result, None)
1006
1007 def fetchmany(self, result, dbapi_cursor, size=None):
1008 return self._non_result(result, [])
1009
1010 def fetchall(self, result, dbapi_cursor):
1011 return self._non_result(result, [])
1012
1013 def _non_result(self, result, default, err=None):
1014 raise NotImplementedError()
1015
1016
1017class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
1018 """Cursor strategy for a DQL result that has no open cursor.
1019
1020 This is a result set that can return rows, i.e. for a SELECT, or for an
1021 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
1022 where the cursor is closed and no rows remain available. The owning result
1023 object may or may not be "hard closed", which determines if the fetch
1024 methods send empty results or raise for closed result.
1025
1026 """
1027
1028 __slots__ = ()
1029
1030 def _non_result(self, result, default, err=None):
1031 if result.closed:
1032 raise exc.ResourceClosedError(
1033 "This result object is closed."
1034 ) from err
1035 else:
1036 return default
1037
1038
1039_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
1040
1041
1042class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
1043 """Cursor strategy for a DML result that has no open cursor.
1044
1045 This is a result set that does not return rows, i.e. for an INSERT,
1046 UPDATE, DELETE that does not include RETURNING.
1047
1048 """
1049
1050 __slots__ = ()
1051
1052 def _non_result(self, result, default, err=None):
1053 # we only expect to have a _NoResultMetaData() here right now.
1054 assert not result._metadata.returns_rows
1055 result._metadata._we_dont_return_rows(err)
1056
1057
1058_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
1059
1060
1061class CursorFetchStrategy(ResultFetchStrategy):
1062 """Call fetch methods from a DBAPI cursor.
1063
1064 Alternate versions of this class may instead buffer the rows from
1065 cursors or not use cursors at all.
1066
1067 """
1068
1069 __slots__ = ()
1070
1071 def soft_close(
1072 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1073 ) -> None:
1074 result.cursor_strategy = _NO_CURSOR_DQL
1075
1076 def hard_close(
1077 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
1078 ) -> None:
1079 result.cursor_strategy = _NO_CURSOR_DQL
1080
1081 def handle_exception(
1082 self,
1083 result: CursorResult[Any],
1084 dbapi_cursor: Optional[DBAPICursor],
1085 err: BaseException,
1086 ) -> NoReturn:
1087 result.connection._handle_dbapi_exception(
1088 err, None, None, dbapi_cursor, result.context
1089 )
1090
1091 def yield_per(
1092 self,
1093 result: CursorResult[Any],
1094 dbapi_cursor: Optional[DBAPICursor],
1095 num: int,
1096 ) -> None:
1097 result.cursor_strategy = BufferedRowCursorFetchStrategy(
1098 dbapi_cursor,
1099 {"max_row_buffer": num},
1100 initial_buffer=collections.deque(),
1101 growth_factor=0,
1102 )
1103
1104 def fetchone(
1105 self,
1106 result: CursorResult[Any],
1107 dbapi_cursor: DBAPICursor,
1108 hard_close: bool = False,
1109 ) -> Any:
1110 try:
1111 row = dbapi_cursor.fetchone()
1112 if row is None:
1113 result._soft_close(hard=hard_close)
1114 return row
1115 except BaseException as e:
1116 self.handle_exception(result, dbapi_cursor, e)
1117
1118 def fetchmany(
1119 self,
1120 result: CursorResult[Any],
1121 dbapi_cursor: DBAPICursor,
1122 size: Optional[int] = None,
1123 ) -> Any:
1124 try:
1125 if size is None:
1126 l = dbapi_cursor.fetchmany()
1127 else:
1128 l = dbapi_cursor.fetchmany(size)
1129
1130 if not l:
1131 result._soft_close()
1132 return l
1133 except BaseException as e:
1134 self.handle_exception(result, dbapi_cursor, e)
1135
1136 def fetchall(
1137 self,
1138 result: CursorResult[Any],
1139 dbapi_cursor: DBAPICursor,
1140 ) -> Any:
1141 try:
1142 rows = dbapi_cursor.fetchall()
1143 result._soft_close()
1144 return rows
1145 except BaseException as e:
1146 self.handle_exception(result, dbapi_cursor, e)
1147
1148
1149_DEFAULT_FETCH = CursorFetchStrategy()
1150
1151
1152class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
1153 """A cursor fetch strategy with row buffering behavior.
1154
1155 This strategy buffers the contents of a selection of rows
1156 before ``fetchone()`` is called. This is to allow the results of
1157 ``cursor.description`` to be available immediately, when
1158 interfacing with a DB-API that requires rows to be consumed before
1159 this information is available (currently psycopg2, when used with
1160 server-side cursors).
1161
1162 The pre-fetching behavior fetches only one row initially, and then
1163 grows its buffer size by a fixed amount with each successive need
1164 for additional rows up the ``max_row_buffer`` size, which defaults
1165 to 1000::
1166
1167 with psycopg2_engine.connect() as conn:
1168
1169 result = conn.execution_options(
1170 stream_results=True, max_row_buffer=50
1171 ).execute(text("select * from table"))
1172
1173 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
1174
1175 .. seealso::
1176
1177 :ref:`psycopg2_execution_options`
1178 """
1179
1180 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
1181
1182 def __init__(
1183 self,
1184 dbapi_cursor,
1185 execution_options,
1186 growth_factor=5,
1187 initial_buffer=None,
1188 ):
1189 self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
1190
1191 if initial_buffer is not None:
1192 self._rowbuffer = initial_buffer
1193 else:
1194 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
1195 self._growth_factor = growth_factor
1196
1197 if growth_factor:
1198 self._bufsize = min(self._max_row_buffer, self._growth_factor)
1199 else:
1200 self._bufsize = self._max_row_buffer
1201
1202 @classmethod
1203 def create(cls, result):
1204 return BufferedRowCursorFetchStrategy(
1205 result.cursor,
1206 result.context.execution_options,
1207 )
1208
1209 def _buffer_rows(self, result, dbapi_cursor):
1210 """this is currently used only by fetchone()."""
1211
1212 size = self._bufsize
1213 try:
1214 if size < 1:
1215 new_rows = dbapi_cursor.fetchall()
1216 else:
1217 new_rows = dbapi_cursor.fetchmany(size)
1218 except BaseException as e:
1219 self.handle_exception(result, dbapi_cursor, e)
1220
1221 if not new_rows:
1222 return
1223 self._rowbuffer = collections.deque(new_rows)
1224 if self._growth_factor and size < self._max_row_buffer:
1225 self._bufsize = min(
1226 self._max_row_buffer, size * self._growth_factor
1227 )
1228
1229 def yield_per(self, result, dbapi_cursor, num):
1230 self._growth_factor = 0
1231 self._max_row_buffer = self._bufsize = num
1232
1233 def soft_close(self, result, dbapi_cursor):
1234 self._rowbuffer.clear()
1235 super().soft_close(result, dbapi_cursor)
1236
1237 def hard_close(self, result, dbapi_cursor):
1238 self._rowbuffer.clear()
1239 super().hard_close(result, dbapi_cursor)
1240
1241 def fetchone(self, result, dbapi_cursor, hard_close=False):
1242 if not self._rowbuffer:
1243 self._buffer_rows(result, dbapi_cursor)
1244 if not self._rowbuffer:
1245 try:
1246 result._soft_close(hard=hard_close)
1247 except BaseException as e:
1248 self.handle_exception(result, dbapi_cursor, e)
1249 return None
1250 return self._rowbuffer.popleft()
1251
1252 def fetchmany(self, result, dbapi_cursor, size=None):
1253 if size is None:
1254 return self.fetchall(result, dbapi_cursor)
1255
1256 rb = self._rowbuffer
1257 lb = len(rb)
1258 close = False
1259 if size > lb:
1260 try:
1261 new = dbapi_cursor.fetchmany(size - lb)
1262 except BaseException as e:
1263 self.handle_exception(result, dbapi_cursor, e)
1264 else:
1265 if not new:
1266 # defer closing since it may clear the row buffer
1267 close = True
1268 else:
1269 rb.extend(new)
1270
1271 res = [rb.popleft() for _ in range(min(size, len(rb)))]
1272 if close:
1273 result._soft_close()
1274 return res
1275
1276 def fetchall(self, result, dbapi_cursor):
1277 try:
1278 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
1279 self._rowbuffer.clear()
1280 result._soft_close()
1281 return ret
1282 except BaseException as e:
1283 self.handle_exception(result, dbapi_cursor, e)
1284
1285
1286class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
1287 """A cursor strategy that buffers rows fully upon creation.
1288
1289 Used for operations where a result is to be delivered
1290 after the database conversation can not be continued,
1291 such as MSSQL INSERT...OUTPUT after an autocommit.
1292
1293 """
1294
1295 __slots__ = ("_rowbuffer", "alternate_cursor_description")
1296
1297 def __init__(
1298 self, dbapi_cursor, alternate_description=None, initial_buffer=None
1299 ):
1300 self.alternate_cursor_description = alternate_description
1301 if initial_buffer is not None:
1302 self._rowbuffer = collections.deque(initial_buffer)
1303 else:
1304 self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
1305
1306 def yield_per(self, result, dbapi_cursor, num):
1307 pass
1308
1309 def soft_close(self, result, dbapi_cursor):
1310 self._rowbuffer.clear()
1311 super().soft_close(result, dbapi_cursor)
1312
1313 def hard_close(self, result, dbapi_cursor):
1314 self._rowbuffer.clear()
1315 super().hard_close(result, dbapi_cursor)
1316
1317 def fetchone(self, result, dbapi_cursor, hard_close=False):
1318 if self._rowbuffer:
1319 return self._rowbuffer.popleft()
1320 else:
1321 result._soft_close(hard=hard_close)
1322 return None
1323
1324 def fetchmany(self, result, dbapi_cursor, size=None):
1325 if size is None:
1326 return self.fetchall(result, dbapi_cursor)
1327
1328 rb = self._rowbuffer
1329 rows = [rb.popleft() for _ in range(min(size, len(rb)))]
1330 if not rows:
1331 result._soft_close()
1332 return rows
1333
1334 def fetchall(self, result, dbapi_cursor):
1335 ret = self._rowbuffer
1336 self._rowbuffer = collections.deque()
1337 result._soft_close()
1338 return ret
1339
1340
1341class _NoResultMetaData(ResultMetaData):
1342 __slots__ = ()
1343
1344 returns_rows = False
1345
1346 def _we_dont_return_rows(self, err=None):
1347 raise exc.ResourceClosedError(
1348 "This result object does not return rows. "
1349 "It has been closed automatically."
1350 ) from err
1351
1352 def _index_for_key(self, keys, raiseerr):
1353 self._we_dont_return_rows()
1354
1355 def _metadata_for_keys(self, key):
1356 self._we_dont_return_rows()
1357
1358 def _reduce(self, keys):
1359 self._we_dont_return_rows()
1360
1361 @property
1362 def _keymap(self):
1363 self._we_dont_return_rows()
1364
1365 @property
1366 def _key_to_index(self):
1367 self._we_dont_return_rows()
1368
1369 @property
1370 def _processors(self):
1371 self._we_dont_return_rows()
1372
1373 @property
1374 def keys(self):
1375 self._we_dont_return_rows()
1376
1377
1378_NO_RESULT_METADATA = _NoResultMetaData()
1379
1380
1381def null_dml_result() -> IteratorResult[Any]:
1382 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
1383 it._soft_close()
1384 return it
1385
1386
1387class CursorResult(Result[Unpack[_Ts]]):
1388 """A Result that is representing state from a DBAPI cursor.
1389
1390 .. versionchanged:: 1.4 The :class:`.CursorResult``
1391 class replaces the previous :class:`.ResultProxy` interface.
1392 This classes are based on the :class:`.Result` calling API
1393 which provides an updated usage model and calling facade for
1394 SQLAlchemy Core and SQLAlchemy ORM.
1395
1396 Returns database rows via the :class:`.Row` class, which provides
1397 additional API features and behaviors on top of the raw data returned by
1398 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
1399 method, other kinds of objects may also be returned.
1400
1401 .. seealso::
1402
1403 :ref:`tutorial_selecting_data` - introductory material for accessing
1404 :class:`_engine.CursorResult` and :class:`.Row` objects.
1405
1406 """
1407
1408 __slots__ = (
1409 "context",
1410 "dialect",
1411 "cursor",
1412 "cursor_strategy",
1413 "_echo",
1414 "connection",
1415 )
1416
1417 _metadata: Union[CursorResultMetaData, _NoResultMetaData]
1418 _no_result_metadata = _NO_RESULT_METADATA
1419 _soft_closed: bool = False
1420 closed: bool = False
1421 _is_cursor = True
1422
1423 context: DefaultExecutionContext
1424 dialect: Dialect
1425 cursor_strategy: ResultFetchStrategy
1426 connection: Connection
1427
1428 def __init__(
1429 self,
1430 context: DefaultExecutionContext,
1431 cursor_strategy: ResultFetchStrategy,
1432 cursor_description: Optional[_DBAPICursorDescription],
1433 ):
1434 self.context = context
1435 self.dialect = context.dialect
1436 self.cursor = context.cursor
1437 self.cursor_strategy = cursor_strategy
1438 self.connection = context.root_connection
1439 self._echo = echo = (
1440 self.connection._echo and context.engine._should_log_debug()
1441 )
1442
1443 if cursor_description is not None:
1444 # inline of Result._row_getter(), set up an initial row
1445 # getter assuming no transformations will be called as this
1446 # is the most common case
1447
1448 metadata = self._init_metadata(context, cursor_description)
1449
1450 _make_row: Any
1451 _make_row = functools.partial(
1452 Row,
1453 metadata,
1454 metadata._effective_processors,
1455 metadata._key_to_index,
1456 )
1457
1458 if context._num_sentinel_cols:
1459 sentinel_filter = operator.itemgetter(
1460 slice(-context._num_sentinel_cols)
1461 )
1462
1463 def _sliced_row(raw_data):
1464 return _make_row(sentinel_filter(raw_data))
1465
1466 sliced_row = _sliced_row
1467 else:
1468 sliced_row = _make_row
1469
1470 if echo:
1471 log = self.context.connection._log_debug
1472
1473 def _log_row(row):
1474 log("Row %r", sql_util._repr_row(row))
1475 return row
1476
1477 self._row_logging_fn = _log_row
1478
1479 def _make_row_2(row):
1480 return _log_row(sliced_row(row))
1481
1482 make_row = _make_row_2
1483 else:
1484 make_row = sliced_row
1485 self._set_memoized_attribute("_row_getter", make_row)
1486
1487 else:
1488 assert context._num_sentinel_cols == 0
1489 self._metadata = self._no_result_metadata
1490
1491 def _init_metadata(self, context, cursor_description):
1492 if context.compiled:
1493 compiled = context.compiled
1494
1495 if compiled._cached_metadata:
1496 metadata = compiled._cached_metadata
1497 else:
1498 metadata = CursorResultMetaData(self, cursor_description)
1499 if metadata._safe_for_cache:
1500 compiled._cached_metadata = metadata
1501
1502 # result rewrite/ adapt step. this is to suit the case
1503 # when we are invoked against a cached Compiled object, we want
1504 # to rewrite the ResultMetaData to reflect the Column objects
1505 # that are in our current SQL statement object, not the one
1506 # that is associated with the cached Compiled object.
1507 # the Compiled object may also tell us to not
1508 # actually do this step; this is to support the ORM where
1509 # it is to produce a new Result object in any case, and will
1510 # be using the cached Column objects against this database result
1511 # so we don't want to rewrite them.
1512 #
1513 # Basically this step suits the use case where the end user
1514 # is using Core SQL expressions and is accessing columns in the
1515 # result row using row._mapping[table.c.column].
1516 if (
1517 not context.execution_options.get(
1518 "_result_disable_adapt_to_context", False
1519 )
1520 and compiled._result_columns
1521 and context.cache_hit is context.dialect.CACHE_HIT
1522 and compiled.statement is not context.invoked_statement
1523 ):
1524 metadata = metadata._adapt_to_context(context)
1525
1526 self._metadata = metadata
1527
1528 else:
1529 self._metadata = metadata = CursorResultMetaData(
1530 self, cursor_description
1531 )
1532 if self._echo:
1533 context.connection._log_debug(
1534 "Col %r", tuple(x[0] for x in cursor_description)
1535 )
1536 return metadata
1537
1538 def _soft_close(self, hard=False):
1539 """Soft close this :class:`_engine.CursorResult`.
1540
1541 This releases all DBAPI cursor resources, but leaves the
1542 CursorResult "open" from a semantic perspective, meaning the
1543 fetchXXX() methods will continue to return empty results.
1544
1545 This method is called automatically when:
1546
1547 * all result rows are exhausted using the fetchXXX() methods.
1548 * cursor.description is None.
1549
1550 This method is **not public**, but is documented in order to clarify
1551 the "autoclose" process used.
1552
1553 .. seealso::
1554
1555 :meth:`_engine.CursorResult.close`
1556
1557
1558 """
1559
1560 if (not hard and self._soft_closed) or (hard and self.closed):
1561 return
1562
1563 if hard:
1564 self.closed = True
1565 self.cursor_strategy.hard_close(self, self.cursor)
1566 else:
1567 self.cursor_strategy.soft_close(self, self.cursor)
1568
1569 if not self._soft_closed:
1570 cursor = self.cursor
1571 self.cursor = None # type: ignore
1572 self.connection._safe_close_cursor(cursor)
1573 self._soft_closed = True
1574
1575 @property
1576 def inserted_primary_key_rows(self):
1577 """Return the value of
1578 :attr:`_engine.CursorResult.inserted_primary_key`
1579 as a row contained within a list; some dialects may support a
1580 multiple row form as well.
1581
1582 .. note:: As indicated below, in current SQLAlchemy versions this
1583 accessor is only useful beyond what's already supplied by
1584 :attr:`_engine.CursorResult.inserted_primary_key` when using the
1585 :ref:`postgresql_psycopg2` dialect. Future versions hope to
1586 generalize this feature to more dialects.
1587
1588 This accessor is added to support dialects that offer the feature
1589 that is currently implemented by the :ref:`psycopg2_executemany_mode`
1590 feature, currently **only the psycopg2 dialect**, which provides
1591 for many rows to be INSERTed at once while still retaining the
1592 behavior of being able to return server-generated primary key values.
1593
1594 * **When using the psycopg2 dialect, or other dialects that may support
1595 "fast executemany" style inserts in upcoming releases** : When
1596 invoking an INSERT statement while passing a list of rows as the
1597 second argument to :meth:`_engine.Connection.execute`, this accessor
1598 will then provide a list of rows, where each row contains the primary
1599 key value for each row that was INSERTed.
1600
1601 * **When using all other dialects / backends that don't yet support
1602 this feature**: This accessor is only useful for **single row INSERT
1603 statements**, and returns the same information as that of the
1604 :attr:`_engine.CursorResult.inserted_primary_key` within a
1605 single-element list. When an INSERT statement is executed in
1606 conjunction with a list of rows to be INSERTed, the list will contain
1607 one row per row inserted in the statement, however it will contain
1608 ``None`` for any server-generated values.
1609
1610 Future releases of SQLAlchemy will further generalize the
1611 "fast execution helper" feature of psycopg2 to suit other dialects,
1612 thus allowing this accessor to be of more general use.
1613
1614 .. versionadded:: 1.4
1615
1616 .. seealso::
1617
1618 :attr:`_engine.CursorResult.inserted_primary_key`
1619
1620 """
1621 if not self.context.compiled:
1622 raise exc.InvalidRequestError(
1623 "Statement is not a compiled expression construct."
1624 )
1625 elif not self.context.isinsert:
1626 raise exc.InvalidRequestError(
1627 "Statement is not an insert() expression construct."
1628 )
1629 elif self.context._is_explicit_returning:
1630 raise exc.InvalidRequestError(
1631 "Can't call inserted_primary_key "
1632 "when returning() "
1633 "is used."
1634 )
1635 return self.context.inserted_primary_key_rows
1636
1637 @property
1638 def inserted_primary_key(self):
1639 """Return the primary key for the row just inserted.
1640
1641 The return value is a :class:`_result.Row` object representing
1642 a named tuple of primary key values in the order in which the
1643 primary key columns are configured in the source
1644 :class:`_schema.Table`.
1645
1646 .. versionchanged:: 1.4.8 - the
1647 :attr:`_engine.CursorResult.inserted_primary_key`
1648 value is now a named tuple via the :class:`_result.Row` class,
1649 rather than a plain tuple.
1650
1651 This accessor only applies to single row :func:`_expression.insert`
1652 constructs which did not explicitly specify
1653 :meth:`_expression.Insert.returning`. Support for multirow inserts,
1654 while not yet available for most backends, would be accessed using
1655 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
1656
1657 Note that primary key columns which specify a server_default clause, or
1658 otherwise do not qualify as "autoincrement" columns (see the notes at
1659 :class:`_schema.Column`), and were generated using the database-side
1660 default, will appear in this list as ``None`` unless the backend
1661 supports "returning" and the insert statement executed with the
1662 "implicit returning" enabled.
1663
1664 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1665 statement is not a compiled expression construct
1666 or is not an insert() construct.
1667
1668 """
1669
1670 if self.context.executemany:
1671 raise exc.InvalidRequestError(
1672 "This statement was an executemany call; if primary key "
1673 "returning is supported, please "
1674 "use .inserted_primary_key_rows."
1675 )
1676
1677 ikp = self.inserted_primary_key_rows
1678 if ikp:
1679 return ikp[0]
1680 else:
1681 return None
1682
1683 def last_updated_params(self):
1684 """Return the collection of updated parameters from this
1685 execution.
1686
1687 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1688 statement is not a compiled expression construct
1689 or is not an update() construct.
1690
1691 """
1692 if not self.context.compiled:
1693 raise exc.InvalidRequestError(
1694 "Statement is not a compiled expression construct."
1695 )
1696 elif not self.context.isupdate:
1697 raise exc.InvalidRequestError(
1698 "Statement is not an update() expression construct."
1699 )
1700 elif self.context.executemany:
1701 return self.context.compiled_parameters
1702 else:
1703 return self.context.compiled_parameters[0]
1704
1705 def last_inserted_params(self):
1706 """Return the collection of inserted parameters from this
1707 execution.
1708
1709 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1710 statement is not a compiled expression construct
1711 or is not an insert() construct.
1712
1713 """
1714 if not self.context.compiled:
1715 raise exc.InvalidRequestError(
1716 "Statement is not a compiled expression construct."
1717 )
1718 elif not self.context.isinsert:
1719 raise exc.InvalidRequestError(
1720 "Statement is not an insert() expression construct."
1721 )
1722 elif self.context.executemany:
1723 return self.context.compiled_parameters
1724 else:
1725 return self.context.compiled_parameters[0]
1726
1727 @property
1728 def returned_defaults_rows(self):
1729 """Return a list of rows each containing the values of default
1730 columns that were fetched using
1731 the :meth:`.ValuesBase.return_defaults` feature.
1732
1733 The return value is a list of :class:`.Row` objects.
1734
1735 .. versionadded:: 1.4
1736
1737 """
1738 return self.context.returned_default_rows
1739
1740 def splice_horizontally(self, other):
1741 """Return a new :class:`.CursorResult` that "horizontally splices"
1742 together the rows of this :class:`.CursorResult` with that of another
1743 :class:`.CursorResult`.
1744
1745 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1746 not intended for general use.
1747
1748 "horizontally splices" means that for each row in the first and second
1749 result sets, a new row that concatenates the two rows together is
1750 produced, which then becomes the new row. The incoming
1751 :class:`.CursorResult` must have the identical number of rows. It is
1752 typically expected that the two result sets come from the same sort
1753 order as well, as the result rows are spliced together based on their
1754 position in the result.
1755
1756 The expected use case here is so that multiple INSERT..RETURNING
1757 statements (which definitely need to be sorted) against different
1758 tables can produce a single result that looks like a JOIN of those two
1759 tables.
1760
1761 E.g.::
1762
1763 r1 = connection.execute(
1764 users.insert().returning(
1765 users.c.user_name,
1766 users.c.user_id,
1767 sort_by_parameter_order=True
1768 ),
1769 user_values
1770 )
1771
1772 r2 = connection.execute(
1773 addresses.insert().returning(
1774 addresses.c.address_id,
1775 addresses.c.address,
1776 addresses.c.user_id,
1777 sort_by_parameter_order=True
1778 ),
1779 address_values
1780 )
1781
1782 rows = r1.splice_horizontally(r2).all()
1783 assert (
1784 rows ==
1785 [
1786 ("john", 1, 1, "foo@bar.com", 1),
1787 ("jack", 2, 2, "bar@bat.com", 2),
1788 ]
1789 )
1790
1791 .. versionadded:: 2.0
1792
1793 .. seealso::
1794
1795 :meth:`.CursorResult.splice_vertically`
1796
1797
1798 """
1799
1800 clone = self._generate()
1801 total_rows = [
1802 tuple(r1) + tuple(r2)
1803 for r1, r2 in zip(
1804 list(self._raw_row_iterator()),
1805 list(other._raw_row_iterator()),
1806 )
1807 ]
1808
1809 clone._metadata = clone._metadata._splice_horizontally(other._metadata)
1810
1811 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1812 None,
1813 initial_buffer=total_rows,
1814 )
1815 clone._reset_memoizations()
1816 return clone
1817
1818 def splice_vertically(self, other):
1819 """Return a new :class:`.CursorResult` that "vertically splices",
1820 i.e. "extends", the rows of this :class:`.CursorResult` with that of
1821 another :class:`.CursorResult`.
1822
1823 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
1824 not intended for general use.
1825
1826 "vertically splices" means the rows of the given result are appended to
1827 the rows of this cursor result. The incoming :class:`.CursorResult`
1828 must have rows that represent the identical list of columns in the
1829 identical order as they are in this :class:`.CursorResult`.
1830
1831 .. versionadded:: 2.0
1832
1833 .. seealso::
1834
1835 :meth:`.CursorResult.splice_horizontally`
1836
1837 """
1838 clone = self._generate()
1839 total_rows = list(self._raw_row_iterator()) + list(
1840 other._raw_row_iterator()
1841 )
1842
1843 clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
1844 None,
1845 initial_buffer=total_rows,
1846 )
1847 clone._reset_memoizations()
1848 return clone
1849
1850 def _rewind(self, rows):
1851 """rewind this result back to the given rowset.
1852
1853 this is used internally for the case where an :class:`.Insert`
1854 construct combines the use of
1855 :meth:`.Insert.return_defaults` along with the
1856 "supplemental columns" feature.
1857
1858 """
1859
1860 if self._echo:
1861 self.context.connection._log_debug(
1862 "CursorResult rewound %d row(s)", len(rows)
1863 )
1864
1865 # the rows given are expected to be Row objects, so we
1866 # have to clear out processors which have already run on these
1867 # rows
1868 self._metadata = cast(
1869 CursorResultMetaData, self._metadata
1870 )._remove_processors()
1871
1872 self.cursor_strategy = FullyBufferedCursorFetchStrategy(
1873 None,
1874 # TODO: if these are Row objects, can we save on not having to
1875 # re-make new Row objects out of them a second time? is that
1876 # what's actually happening right now? maybe look into this
1877 initial_buffer=rows,
1878 )
1879 self._reset_memoizations()
1880 return self
1881
1882 @property
1883 def returned_defaults(self):
1884 """Return the values of default columns that were fetched using
1885 the :meth:`.ValuesBase.return_defaults` feature.
1886
1887 The value is an instance of :class:`.Row`, or ``None``
1888 if :meth:`.ValuesBase.return_defaults` was not used or if the
1889 backend does not support RETURNING.
1890
1891 .. seealso::
1892
1893 :meth:`.ValuesBase.return_defaults`
1894
1895 """
1896
1897 if self.context.executemany:
1898 raise exc.InvalidRequestError(
1899 "This statement was an executemany call; if return defaults "
1900 "is supported, please use .returned_defaults_rows."
1901 )
1902
1903 rows = self.context.returned_default_rows
1904 if rows:
1905 return rows[0]
1906 else:
1907 return None
1908
1909 def lastrow_has_defaults(self):
1910 """Return ``lastrow_has_defaults()`` from the underlying
1911 :class:`.ExecutionContext`.
1912
1913 See :class:`.ExecutionContext` for details.
1914
1915 """
1916
1917 return self.context.lastrow_has_defaults()
1918
1919 def postfetch_cols(self):
1920 """Return ``postfetch_cols()`` from the underlying
1921 :class:`.ExecutionContext`.
1922
1923 See :class:`.ExecutionContext` for details.
1924
1925 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1926 statement is not a compiled expression construct
1927 or is not an insert() or update() construct.
1928
1929 """
1930
1931 if not self.context.compiled:
1932 raise exc.InvalidRequestError(
1933 "Statement is not a compiled expression construct."
1934 )
1935 elif not self.context.isinsert and not self.context.isupdate:
1936 raise exc.InvalidRequestError(
1937 "Statement is not an insert() or update() "
1938 "expression construct."
1939 )
1940 return self.context.postfetch_cols
1941
1942 def prefetch_cols(self):
1943 """Return ``prefetch_cols()`` from the underlying
1944 :class:`.ExecutionContext`.
1945
1946 See :class:`.ExecutionContext` for details.
1947
1948 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
1949 statement is not a compiled expression construct
1950 or is not an insert() or update() construct.
1951
1952 """
1953
1954 if not self.context.compiled:
1955 raise exc.InvalidRequestError(
1956 "Statement is not a compiled expression construct."
1957 )
1958 elif not self.context.isinsert and not self.context.isupdate:
1959 raise exc.InvalidRequestError(
1960 "Statement is not an insert() or update() "
1961 "expression construct."
1962 )
1963 return self.context.prefetch_cols
1964
1965 def supports_sane_rowcount(self):
1966 """Return ``supports_sane_rowcount`` from the dialect.
1967
1968 See :attr:`_engine.CursorResult.rowcount` for background.
1969
1970 """
1971
1972 return self.dialect.supports_sane_rowcount
1973
1974 def supports_sane_multi_rowcount(self):
1975 """Return ``supports_sane_multi_rowcount`` from the dialect.
1976
1977 See :attr:`_engine.CursorResult.rowcount` for background.
1978
1979 """
1980
1981 return self.dialect.supports_sane_multi_rowcount
1982
1983 @util.memoized_property
1984 def rowcount(self) -> int:
1985 """Return the 'rowcount' for this result.
1986
1987 The primary purpose of 'rowcount' is to report the number of rows
1988 matched by the WHERE criterion of an UPDATE or DELETE statement
1989 executed once (i.e. for a single parameter set), which may then be
1990 compared to the number of rows expected to be updated or deleted as a
1991 means of asserting data integrity.
1992
1993 This attribute is transferred from the ``cursor.rowcount`` attribute
1994 of the DBAPI before the cursor is closed, to support DBAPIs that
1995 don't make this value available after cursor close. Some DBAPIs may
1996 offer meaningful values for other kinds of statements, such as INSERT
1997 and SELECT statements as well. In order to retrieve ``cursor.rowcount``
1998 for these statements, set the
1999 :paramref:`.Connection.execution_options.preserve_rowcount`
2000 execution option to True, which will cause the ``cursor.rowcount``
2001 value to be unconditionally memoized before any results are returned
2002 or the cursor is closed, regardless of statement type.
2003
2004 For cases where the DBAPI does not support rowcount for a particular
2005 kind of statement and/or execution, the returned value will be ``-1``,
2006 which is delivered directly from the DBAPI and is part of :pep:`249`.
2007 All DBAPIs should support rowcount for single-parameter-set
2008 UPDATE and DELETE statements, however.
2009
2010 .. note::
2011
2012 Notes regarding :attr:`_engine.CursorResult.rowcount`:
2013
2014
2015 * This attribute returns the number of rows *matched*,
2016 which is not necessarily the same as the number of rows
2017 that were actually *modified*. For example, an UPDATE statement
2018 may have no net change on a given row if the SET values
2019 given are the same as those present in the row already.
2020 Such a row would be matched but not modified.
2021 On backends that feature both styles, such as MySQL,
2022 rowcount is configured to return the match
2023 count in all cases.
2024
2025 * :attr:`_engine.CursorResult.rowcount` in the default case is
2026 *only* useful in conjunction with an UPDATE or DELETE statement,
2027 and only with a single set of parameters. For other kinds of
2028 statements, SQLAlchemy will not attempt to pre-memoize the value
2029 unless the
2030 :paramref:`.Connection.execution_options.preserve_rowcount`
2031 execution option is used. Note that contrary to :pep:`249`, many
2032 DBAPIs do not support rowcount values for statements that are not
2033 UPDATE or DELETE, particularly when rows are being returned which
2034 are not fully pre-buffered. DBAPIs that dont support rowcount
2035 for a particular kind of statement should return the value ``-1``
2036 for such statements.
2037
2038 * :attr:`_engine.CursorResult.rowcount` may not be meaningful
2039 when executing a single statement with multiple parameter sets
2040 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
2041 values across multiple parameter sets and will return ``-1``
2042 when accessed.
2043
2044 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
2045 a correct population of :attr:`_engine.CursorResult.rowcount`
2046 when the :paramref:`.Connection.execution_options.preserve_rowcount`
2047 execution option is set to True.
2048
2049 * Statements that use RETURNING may not support rowcount, returning
2050 a ``-1`` value instead.
2051
2052 .. seealso::
2053
2054 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
2055
2056 :paramref:`.Connection.execution_options.preserve_rowcount`
2057
2058 """ # noqa: E501
2059 try:
2060 return self.context.rowcount
2061 except BaseException as e:
2062 self.cursor_strategy.handle_exception(self, self.cursor, e)
2063 raise # not called
2064
2065 @property
2066 def lastrowid(self):
2067 """Return the 'lastrowid' accessor on the DBAPI cursor.
2068
2069 This is a DBAPI specific method and is only functional
2070 for those backends which support it, for statements
2071 where it is appropriate. It's behavior is not
2072 consistent across backends.
2073
2074 Usage of this method is normally unnecessary when
2075 using insert() expression constructs; the
2076 :attr:`~CursorResult.inserted_primary_key` attribute provides a
2077 tuple of primary key values for a newly inserted row,
2078 regardless of database backend.
2079
2080 """
2081 try:
2082 return self.context.get_lastrowid()
2083 except BaseException as e:
2084 self.cursor_strategy.handle_exception(self, self.cursor, e)
2085
2086 @property
2087 def returns_rows(self):
2088 """True if this :class:`_engine.CursorResult` returns zero or more
2089 rows.
2090
2091 I.e. if it is legal to call the methods
2092 :meth:`_engine.CursorResult.fetchone`,
2093 :meth:`_engine.CursorResult.fetchmany`
2094 :meth:`_engine.CursorResult.fetchall`.
2095
2096 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
2097 always be synonymous with whether or not the DBAPI cursor had a
2098 ``.description`` attribute, indicating the presence of result columns,
2099 noting that a cursor that returns zero rows still has a
2100 ``.description`` if a row-returning statement was emitted.
2101
2102 This attribute should be True for all results that are against
2103 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
2104 that use RETURNING. For INSERT/UPDATE/DELETE statements that were
2105 not using RETURNING, the value will usually be False, however
2106 there are some dialect-specific exceptions to this, such as when
2107 using the MSSQL / pyodbc dialect a SELECT is emitted inline in
2108 order to retrieve an inserted primary key value.
2109
2110
2111 """
2112 return self._metadata.returns_rows
2113
2114 @property
2115 def is_insert(self):
2116 """True if this :class:`_engine.CursorResult` is the result
2117 of a executing an expression language compiled
2118 :func:`_expression.insert` construct.
2119
2120 When True, this implies that the
2121 :attr:`inserted_primary_key` attribute is accessible,
2122 assuming the statement did not include
2123 a user defined "returning" construct.
2124
2125 """
2126 return self.context.isinsert
2127
2128 def _fetchiter_impl(self):
2129 fetchone = self.cursor_strategy.fetchone
2130
2131 while True:
2132 row = fetchone(self, self.cursor)
2133 if row is None:
2134 break
2135 yield row
2136
2137 def _fetchone_impl(self, hard_close=False):
2138 return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
2139
2140 def _fetchall_impl(self):
2141 return self.cursor_strategy.fetchall(self, self.cursor)
2142
2143 def _fetchmany_impl(self, size=None):
2144 return self.cursor_strategy.fetchmany(self, self.cursor, size)
2145
2146 def _raw_row_iterator(self):
2147 return self._fetchiter_impl()
2148
2149 def merge(
2150 self, *others: Result[Unpack[TupleAny]]
2151 ) -> MergedResult[Unpack[TupleAny]]:
2152 merged_result = super().merge(*others)
2153 if self.context._has_rowcount:
2154 merged_result.rowcount = sum(
2155 cast("CursorResult[Any]", result).rowcount
2156 for result in (self,) + others
2157 )
2158 return merged_result
2159
2160 def close(self) -> Any:
2161 """Close this :class:`_engine.CursorResult`.
2162
2163 This closes out the underlying DBAPI cursor corresponding to the
2164 statement execution, if one is still present. Note that the DBAPI
2165 cursor is automatically released when the :class:`_engine.CursorResult`
2166 exhausts all available rows. :meth:`_engine.CursorResult.close` is
2167 generally an optional method except in the case when discarding a
2168 :class:`_engine.CursorResult` that still has additional rows pending
2169 for fetch.
2170
2171 After this method is called, it is no longer valid to call upon
2172 the fetch methods, which will raise a :class:`.ResourceClosedError`
2173 on subsequent use.
2174
2175 .. seealso::
2176
2177 :ref:`connections_toplevel`
2178
2179 """
2180 self._soft_close(hard=True)
2181
2182 @_generative
2183 def yield_per(self, num: int) -> Self:
2184 self._yield_per = num
2185 self.cursor_strategy.yield_per(self, self.cursor, num)
2186 return self
2187
2188
2189ResultProxy = CursorResult