Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

720 statements  

1# engine/cursor.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Define cursor-specific result set constructs including 

9:class:`.CursorResult`.""" 

10 

11from __future__ import annotations 

12 

13import collections 

14import functools 

15import operator 

16import typing 

17from typing import Any 

18from typing import cast 

19from typing import ClassVar 

20from typing import Deque 

21from typing import Dict 

22from typing import Iterable 

23from typing import Iterator 

24from typing import List 

25from typing import Mapping 

26from typing import NoReturn 

27from typing import Optional 

28from typing import Sequence 

29from typing import Tuple 

30from typing import TYPE_CHECKING 

31from typing import TypeVar 

32from typing import Union 

33 

34from .result import IteratorResult 

35from .result import MergedResult 

36from .result import Result 

37from .result import ResultMetaData 

38from .result import SimpleResultMetaData 

39from .result import tuplegetter 

40from .row import Row 

41from .. import exc 

42from .. import util 

43from ..sql import elements 

44from ..sql import sqltypes 

45from ..sql import util as sql_util 

46from ..sql.base import _generative 

47from ..sql.compiler import ResultColumnsEntry 

48from ..sql.compiler import RM_NAME 

49from ..sql.compiler import RM_OBJECTS 

50from ..sql.compiler import RM_RENDERED_NAME 

51from ..sql.compiler import RM_TYPE 

52from ..sql.type_api import TypeEngine 

53from ..util import compat 

54from ..util.typing import Final 

55from ..util.typing import Literal 

56from ..util.typing import Self 

57 

58if typing.TYPE_CHECKING: 

59 from .base import Connection 

60 from .default import DefaultExecutionContext 

61 from .interfaces import _DBAPICursorDescription 

62 from .interfaces import _MutableCoreSingleExecuteParams 

63 from .interfaces import CoreExecuteOptionsParameter 

64 from .interfaces import DBAPICursor 

65 from .interfaces import DBAPIType 

66 from .interfaces import Dialect 

67 from .interfaces import ExecutionContext 

68 from .result import _KeyIndexType 

69 from .result import _KeyMapRecType 

70 from .result import _KeyMapType 

71 from .result import _KeyType 

72 from .result import _ProcessorsType 

73 from .result import _TupleGetterType 

74 from ..sql.schema import Column 

75 from ..sql.type_api import _ResultProcessorType 

76 

77 

78_T = TypeVar("_T", bound=Any) 

79TupleAny = Tuple[Any, ...] 

80 

81# metadata entry tuple indexes. 

82# using raw tuple is faster than namedtuple. 

83# these match up to the positions in 

84# _CursorKeyMapRecType 

85MD_INDEX: Final[Literal[0]] = 0 

86"""integer index in cursor.description 

87 

88""" 

89 

90MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1 

91"""integer index in compiled._result_columns""" 

92 

93MD_OBJECTS: Final[Literal[2]] = 2 

94"""other string keys and ColumnElement obj that can match. 

95 

96This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects 

97 

98""" 

99 

100MD_LOOKUP_KEY: Final[Literal[3]] = 3 

101"""string key we usually expect for key-based lookup 

102 

103this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name 

104""" 

105 

106 

107MD_RENDERED_NAME: Final[Literal[4]] = 4 

108"""name that is usually in cursor.description 

109 

110this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname 

111""" 

112 

113 

114MD_PROCESSOR: Final[Literal[5]] = 5 

115"""callable to process a result value into a row""" 

116 

117MD_UNTRANSLATED: Final[Literal[6]] = 6 

118"""raw name from cursor.description""" 

119 

120 

121_CursorKeyMapRecType = Tuple[ 

122 Optional[int], # MD_INDEX, None means the record is ambiguously named 

123 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None 

124 TupleAny, # MD_OBJECTS 

125 str, # MD_LOOKUP_KEY 

126 str, # MD_RENDERED_NAME 

127 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR 

128 Optional[str], # MD_UNTRANSLATED 

129] 

130 

131_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType] 

132 

133# same as _CursorKeyMapRecType except the MD_INDEX value is definitely 

134# not None 

135_NonAmbigCursorKeyMapRecType = Tuple[ 

136 int, 

137 int, 

138 List[Any], 

139 str, 

140 str, 

141 Optional["_ResultProcessorType[Any]"], 

142 str, 

143] 

144 

145_MergeColTuple = Tuple[ 

146 int, 

147 Optional[int], 

148 str, 

149 TypeEngine[Any], 

150 "DBAPIType", 

151 Optional[TupleAny], 

152 Optional[str], 

153] 

154 

155 

156class CursorResultMetaData(ResultMetaData): 

157 """Result metadata for DBAPI cursors.""" 

158 

159 __slots__ = ( 

160 "_keymap", 

161 "_processors", 

162 "_keys", 

163 "_keymap_by_result_column_idx", 

164 "_tuplefilter", 

165 "_translated_indexes", 

166 "_safe_for_cache", 

167 "_unpickled", 

168 "_key_to_index", 

169 # don't need _unique_filters support here for now. Can be added 

170 # if a need arises. 

171 ) 

172 

173 _keymap: _CursorKeyMapType 

174 _processors: _ProcessorsType 

175 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]] 

176 _unpickled: bool 

177 _safe_for_cache: bool 

178 _translated_indexes: Optional[List[int]] 

179 

180 returns_rows: ClassVar[bool] = True 

181 

182 def _has_key(self, key: Any) -> bool: 

183 return key in self._keymap 

184 

185 def _for_freeze(self) -> ResultMetaData: 

186 ambiguous = { 

187 rec[MD_LOOKUP_KEY] 

188 for rec in self._keymap.values() 

189 if rec[MD_INDEX] is None 

190 } 

191 return SimpleResultMetaData( 

192 self._keys, 

193 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys], 

194 _ambiguous_keys=frozenset(ambiguous) if ambiguous else None, 

195 ) 

196 

197 def _make_new_metadata( 

198 self, 

199 *, 

200 unpickled: bool, 

201 processors: _ProcessorsType, 

202 keys: Sequence[str], 

203 keymap: _KeyMapType, 

204 tuplefilter: Optional[_TupleGetterType], 

205 translated_indexes: Optional[List[int]], 

206 safe_for_cache: bool, 

207 keymap_by_result_column_idx: Any, 

208 ) -> CursorResultMetaData: 

209 new_obj = self.__class__.__new__(self.__class__) 

210 new_obj._unpickled = unpickled 

211 new_obj._processors = processors 

212 new_obj._keys = keys 

213 new_obj._keymap = keymap 

214 new_obj._tuplefilter = tuplefilter 

215 new_obj._translated_indexes = translated_indexes 

216 new_obj._safe_for_cache = safe_for_cache 

217 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx 

218 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX) 

219 return new_obj 

220 

221 def _remove_processors(self) -> CursorResultMetaData: 

222 assert not self._tuplefilter 

223 return self._make_new_metadata( 

224 unpickled=self._unpickled, 

225 processors=[None] * len(self._processors), 

226 tuplefilter=None, 

227 translated_indexes=None, 

228 keymap={ 

229 key: value[0:5] + (None,) + value[6:] 

230 for key, value in self._keymap.items() 

231 }, 

232 keys=self._keys, 

233 safe_for_cache=self._safe_for_cache, 

234 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

235 ) 

236 

237 def _splice_horizontally( 

238 self, other: CursorResultMetaData 

239 ) -> CursorResultMetaData: 

240 assert not self._tuplefilter 

241 

242 keymap = dict(self._keymap) 

243 offset = len(self._keys) 

244 

245 for key, value in other._keymap.items(): 

246 # int index should be None for ambiguous key 

247 if value[MD_INDEX] is not None and key not in keymap: 

248 md_index = value[MD_INDEX] + offset 

249 md_object = value[MD_RESULT_MAP_INDEX] + offset 

250 else: 

251 md_index = None 

252 md_object = -1 

253 keymap[key] = (md_index, md_object, *value[2:]) 

254 

255 return self._make_new_metadata( 

256 unpickled=self._unpickled, 

257 processors=self._processors + other._processors, # type: ignore 

258 tuplefilter=None, 

259 translated_indexes=None, 

260 keys=self._keys + other._keys, # type: ignore 

261 keymap=keymap, 

262 safe_for_cache=self._safe_for_cache, 

263 keymap_by_result_column_idx={ 

264 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

265 for metadata_entry in keymap.values() 

266 }, 

267 ) 

268 

269 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData: 

270 recs = list(self._metadata_for_keys(keys)) 

271 

272 indexes = [rec[MD_INDEX] for rec in recs] 

273 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] 

274 

275 if self._translated_indexes: 

276 indexes = [self._translated_indexes[idx] for idx in indexes] 

277 tup = tuplegetter(*indexes) 

278 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] 

279 

280 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} 

281 # TODO: need unit test for: 

282 # result = connection.execute("raw sql, no columns").scalars() 

283 # without the "or ()" it's failing because MD_OBJECTS is None 

284 keymap.update( 

285 (e, new_rec) 

286 for new_rec in new_recs 

287 for e in new_rec[MD_OBJECTS] or () 

288 ) 

289 

290 return self._make_new_metadata( 

291 unpickled=self._unpickled, 

292 processors=self._processors, 

293 keys=new_keys, 

294 tuplefilter=tup, 

295 translated_indexes=indexes, 

296 keymap=keymap, # type: ignore[arg-type] 

297 safe_for_cache=self._safe_for_cache, 

298 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

299 ) 

300 

301 def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData: 

302 """When using a cached Compiled construct that has a _result_map, 

303 for a new statement that used the cached Compiled, we need to ensure 

304 the keymap has the Column objects from our new statement as keys. 

305 So here we rewrite keymap with new entries for the new columns 

306 as matched to those of the cached statement. 

307 

308 """ 

309 

310 if not context.compiled or not context.compiled._result_columns: 

311 return self 

312 

313 compiled_statement = context.compiled.statement 

314 invoked_statement = context.invoked_statement 

315 

316 if TYPE_CHECKING: 

317 assert isinstance(invoked_statement, elements.ClauseElement) 

318 

319 if compiled_statement is invoked_statement: 

320 return self 

321 

322 assert invoked_statement is not None 

323 

324 # this is the most common path for Core statements when 

325 # caching is used. In ORM use, this codepath is not really used 

326 # as the _result_disable_adapt_to_context execution option is 

327 # set by the ORM. 

328 

329 # make a copy and add the columns from the invoked statement 

330 # to the result map. 

331 

332 keymap_by_position = self._keymap_by_result_column_idx 

333 

334 if keymap_by_position is None: 

335 # first retrieval from cache, this map will not be set up yet, 

336 # initialize lazily 

337 keymap_by_position = self._keymap_by_result_column_idx = { 

338 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

339 for metadata_entry in self._keymap.values() 

340 } 

341 

342 assert not self._tuplefilter 

343 return self._make_new_metadata( 

344 keymap=compat.dict_union( 

345 self._keymap, 

346 { 

347 new: keymap_by_position[idx] 

348 for idx, new in enumerate( 

349 invoked_statement._all_selected_columns 

350 ) 

351 if idx in keymap_by_position 

352 }, 

353 ), 

354 unpickled=self._unpickled, 

355 processors=self._processors, 

356 tuplefilter=None, 

357 translated_indexes=None, 

358 keys=self._keys, 

359 safe_for_cache=self._safe_for_cache, 

360 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

361 ) 

362 

363 def __init__( 

364 self, 

365 parent: CursorResult[Any], 

366 cursor_description: _DBAPICursorDescription, 

367 ): 

368 context = parent.context 

369 self._tuplefilter = None 

370 self._translated_indexes = None 

371 self._safe_for_cache = self._unpickled = False 

372 

373 if context.result_column_struct: 

374 ( 

375 result_columns, 

376 cols_are_ordered, 

377 textual_ordered, 

378 ad_hoc_textual, 

379 loose_column_name_matching, 

380 ) = context.result_column_struct 

381 num_ctx_cols = len(result_columns) 

382 else: 

383 result_columns = cols_are_ordered = ( # type: ignore 

384 num_ctx_cols 

385 ) = ad_hoc_textual = loose_column_name_matching = ( 

386 textual_ordered 

387 ) = False 

388 

389 # merge cursor.description with the column info 

390 # present in the compiled structure, if any 

391 raw = self._merge_cursor_description( 

392 context, 

393 cursor_description, 

394 result_columns, 

395 num_ctx_cols, 

396 cols_are_ordered, 

397 textual_ordered, 

398 ad_hoc_textual, 

399 loose_column_name_matching, 

400 ) 

401 

402 # processors in key order which are used when building up 

403 # a row 

404 self._processors = [ 

405 metadata_entry[MD_PROCESSOR] for metadata_entry in raw 

406 ] 

407 

408 # this is used when using this ResultMetaData in a Core-only cache 

409 # retrieval context. it's initialized on first cache retrieval 

410 # when the _result_disable_adapt_to_context execution option 

411 # (which the ORM generally sets) is not set. 

412 self._keymap_by_result_column_idx = None 

413 

414 # for compiled SQL constructs, copy additional lookup keys into 

415 # the key lookup map, such as Column objects, labels, 

416 # column keys and other names 

417 if num_ctx_cols: 

418 # keymap by primary string... 

419 by_key: Dict[_KeyType, _CursorKeyMapRecType] = { 

420 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

421 for metadata_entry in raw 

422 } 

423 

424 if len(by_key) != num_ctx_cols: 

425 # if by-primary-string dictionary smaller than 

426 # number of columns, assume we have dupes; (this check 

427 # is also in place if string dictionary is bigger, as 

428 # can occur when '*' was used as one of the compiled columns, 

429 # which may or may not be suggestive of dupes), rewrite 

430 # dupe records with "None" for index which results in 

431 # ambiguous column exception when accessed. 

432 # 

433 # this is considered to be the less common case as it is not 

434 # common to have dupe column keys in a SELECT statement. 

435 # 

436 # new in 1.4: get the complete set of all possible keys, 

437 # strings, objects, whatever, that are dupes across two 

438 # different records, first. 

439 index_by_key: Dict[Any, Any] = {} 

440 dupes = set() 

441 for metadata_entry in raw: 

442 for key in (metadata_entry[MD_RENDERED_NAME],) + ( 

443 metadata_entry[MD_OBJECTS] or () 

444 ): 

445 idx = metadata_entry[MD_INDEX] 

446 # if this key has been associated with more than one 

447 # positional index, it's a dupe 

448 if index_by_key.setdefault(key, idx) != idx: 

449 dupes.add(key) 

450 

451 # then put everything we have into the keymap excluding only 

452 # those keys that are dupes. 

453 self._keymap = { 

454 obj_elem: metadata_entry 

455 for metadata_entry in raw 

456 if metadata_entry[MD_OBJECTS] 

457 for obj_elem in metadata_entry[MD_OBJECTS] 

458 if obj_elem not in dupes 

459 } 

460 

461 # then for the dupe keys, put the "ambiguous column" 

462 # record into by_key. 

463 by_key.update( 

464 { 

465 key: (None, -1, (), key, key, None, None) 

466 for key in dupes 

467 } 

468 ) 

469 

470 else: 

471 # no dupes - copy secondary elements from compiled 

472 # columns into self._keymap. this is the most common 

473 # codepath for Core / ORM statement executions before the 

474 # result metadata is cached 

475 self._keymap = { 

476 obj_elem: metadata_entry 

477 for metadata_entry in raw 

478 if metadata_entry[MD_OBJECTS] 

479 for obj_elem in metadata_entry[MD_OBJECTS] 

480 } 

481 # update keymap with primary string names taking 

482 # precedence 

483 self._keymap.update(by_key) 

484 else: 

485 # no compiled objects to map, just create keymap by primary string 

486 self._keymap = { 

487 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

488 for metadata_entry in raw 

489 } 

490 

491 # update keymap with "translated" names. In SQLAlchemy this is a 

492 # sqlite only thing, and in fact impacting only extremely old SQLite 

493 # versions unlikely to be present in modern Python versions. 

494 # however, the pyhive third party dialect is 

495 # also using this hook, which means others still might use it as well. 

496 # I dislike having this awkward hook here but as long as we need 

497 # to use names in cursor.description in some cases we need to have 

498 # some hook to accomplish this. 

499 if not num_ctx_cols and context._translate_colname: 

500 self._keymap.update( 

501 { 

502 metadata_entry[MD_UNTRANSLATED]: self._keymap[ 

503 metadata_entry[MD_LOOKUP_KEY] 

504 ] 

505 for metadata_entry in raw 

506 if metadata_entry[MD_UNTRANSLATED] 

507 } 

508 ) 

509 

510 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

511 

512 def _merge_cursor_description( 

513 self, 

514 context: DefaultExecutionContext, 

515 cursor_description: _DBAPICursorDescription, 

516 result_columns: Sequence[ResultColumnsEntry], 

517 num_ctx_cols: int, 

518 cols_are_ordered: bool, 

519 textual_ordered: bool, 

520 ad_hoc_textual: bool, 

521 loose_column_name_matching: bool, 

522 ) -> List[_CursorKeyMapRecType]: 

523 """Merge a cursor.description with compiled result column information. 

524 

525 There are at least four separate strategies used here, selected 

526 depending on the type of SQL construct used to start with. 

527 

528 The most common case is that of the compiled SQL expression construct, 

529 which generated the column names present in the raw SQL string and 

530 which has the identical number of columns as were reported by 

531 cursor.description. In this case, we assume a 1-1 positional mapping 

532 between the entries in cursor.description and the compiled object. 

533 This is also the most performant case as we disregard extracting / 

534 decoding the column names present in cursor.description since we 

535 already have the desired name we generated in the compiled SQL 

536 construct. 

537 

538 The next common case is that of the completely raw string SQL, 

539 such as passed to connection.execute(). In this case we have no 

540 compiled construct to work with, so we extract and decode the 

541 names from cursor.description and index those as the primary 

542 result row target keys. 

543 

544 The remaining fairly common case is that of the textual SQL 

545 that includes at least partial column information; this is when 

546 we use a :class:`_expression.TextualSelect` construct. 

547 This construct may have 

548 unordered or ordered column information. In the ordered case, we 

549 merge the cursor.description and the compiled construct's information 

550 positionally, and warn if there are additional description names 

551 present, however we still decode the names in cursor.description 

552 as we don't have a guarantee that the names in the columns match 

553 on these. In the unordered case, we match names in cursor.description 

554 to that of the compiled construct based on name matching. 

555 In both of these cases, the cursor.description names and the column 

556 expression objects and names are indexed as result row target keys. 

557 

558 The final case is much less common, where we have a compiled 

559 non-textual SQL expression construct, but the number of columns 

560 in cursor.description doesn't match what's in the compiled 

561 construct. We make the guess here that there might be textual 

562 column expressions in the compiled construct that themselves include 

563 a comma in them causing them to split. We do the same name-matching 

564 as with textual non-ordered columns. 

565 

566 The name-matched system of merging is the same as that used by 

567 SQLAlchemy for all cases up through the 0.9 series. Positional 

568 matching for compiled SQL expressions was introduced in 1.0 as a 

569 major performance feature, and positional matching for textual 

570 :class:`_expression.TextualSelect` objects in 1.1. 

571 As name matching is no longer 

572 a common case, it was acceptable to factor it into smaller generator- 

573 oriented methods that are easier to understand, but incur slightly 

574 more performance overhead. 

575 

576 """ 

577 

578 if ( 

579 num_ctx_cols 

580 and cols_are_ordered 

581 and not textual_ordered 

582 and num_ctx_cols == len(cursor_description) 

583 ): 

584 self._keys = [elem[0] for elem in result_columns] 

585 # pure positional 1-1 case; doesn't need to read 

586 # the names from cursor.description 

587 

588 # most common case for Core and ORM 

589 

590 # this metadata is safe to cache because we are guaranteed 

591 # to have the columns in the same order for new executions 

592 self._safe_for_cache = True 

593 return [ 

594 ( 

595 idx, 

596 idx, 

597 rmap_entry[RM_OBJECTS], 

598 rmap_entry[RM_NAME], 

599 rmap_entry[RM_RENDERED_NAME], 

600 context.get_result_processor( 

601 rmap_entry[RM_TYPE], 

602 rmap_entry[RM_RENDERED_NAME], 

603 cursor_description[idx][1], 

604 ), 

605 None, 

606 ) 

607 for idx, rmap_entry in enumerate(result_columns) 

608 ] 

609 else: 

610 # name-based or text-positional cases, where we need 

611 # to read cursor.description names 

612 

613 if textual_ordered or ( 

614 ad_hoc_textual and len(cursor_description) == num_ctx_cols 

615 ): 

616 self._safe_for_cache = True 

617 # textual positional case 

618 raw_iterator = self._merge_textual_cols_by_position( 

619 context, cursor_description, result_columns 

620 ) 

621 elif num_ctx_cols: 

622 # compiled SQL with a mismatch of description cols 

623 # vs. compiled cols, or textual w/ unordered columns 

624 # the order of columns can change if the query is 

625 # against a "select *", so not safe to cache 

626 self._safe_for_cache = False 

627 raw_iterator = self._merge_cols_by_name( 

628 context, 

629 cursor_description, 

630 result_columns, 

631 loose_column_name_matching, 

632 ) 

633 else: 

634 # no compiled SQL, just a raw string, order of columns 

635 # can change for "select *" 

636 self._safe_for_cache = False 

637 raw_iterator = self._merge_cols_by_none( 

638 context, cursor_description 

639 ) 

640 

641 return [ 

642 ( 

643 idx, 

644 ridx, 

645 obj, 

646 cursor_colname, 

647 cursor_colname, 

648 context.get_result_processor( 

649 mapped_type, cursor_colname, coltype 

650 ), 

651 untranslated, 

652 ) # type: ignore[misc] 

653 for ( 

654 idx, 

655 ridx, 

656 cursor_colname, 

657 mapped_type, 

658 coltype, 

659 obj, 

660 untranslated, 

661 ) in raw_iterator 

662 ] 

663 

664 def _colnames_from_description( 

665 self, 

666 context: DefaultExecutionContext, 

667 cursor_description: _DBAPICursorDescription, 

668 ) -> Iterator[Tuple[int, str, Optional[str], DBAPIType]]: 

669 """Extract column names and data types from a cursor.description. 

670 

671 Applies unicode decoding, column translation, "normalization", 

672 and case sensitivity rules to the names based on the dialect. 

673 

674 """ 

675 

676 dialect = context.dialect 

677 translate_colname = context._translate_colname 

678 normalize_name = ( 

679 dialect.normalize_name if dialect.requires_name_normalize else None 

680 ) 

681 untranslated = None 

682 

683 self._keys = [] 

684 

685 for idx, rec in enumerate(cursor_description): 

686 colname = rec[0] 

687 coltype = rec[1] 

688 

689 if translate_colname: 

690 colname, untranslated = translate_colname(colname) 

691 

692 if normalize_name: 

693 colname = normalize_name(colname) 

694 

695 self._keys.append(colname) 

696 

697 yield idx, colname, untranslated, coltype 

698 

699 def _merge_textual_cols_by_position( 

700 self, 

701 context: DefaultExecutionContext, 

702 cursor_description: _DBAPICursorDescription, 

703 result_columns: Sequence[ResultColumnsEntry], 

704 ) -> Iterator[_MergeColTuple]: 

705 num_ctx_cols = len(result_columns) 

706 

707 if num_ctx_cols > len(cursor_description): 

708 util.warn( 

709 "Number of columns in textual SQL (%d) is " 

710 "smaller than number of columns requested (%d)" 

711 % (num_ctx_cols, len(cursor_description)) 

712 ) 

713 seen = set() 

714 

715 for ( 

716 idx, 

717 colname, 

718 untranslated, 

719 coltype, 

720 ) in self._colnames_from_description(context, cursor_description): 

721 if idx < num_ctx_cols: 

722 ctx_rec = result_columns[idx] 

723 obj = ctx_rec[RM_OBJECTS] 

724 ridx = idx 

725 mapped_type = ctx_rec[RM_TYPE] 

726 if obj[0] in seen: 

727 raise exc.InvalidRequestError( 

728 "Duplicate column expression requested " 

729 "in textual SQL: %r" % obj[0] 

730 ) 

731 seen.add(obj[0]) 

732 else: 

733 mapped_type = sqltypes.NULLTYPE 

734 obj = None 

735 ridx = None 

736 yield idx, ridx, colname, mapped_type, coltype, obj, untranslated 

737 

738 def _merge_cols_by_name( 

739 self, 

740 context: DefaultExecutionContext, 

741 cursor_description: _DBAPICursorDescription, 

742 result_columns: Sequence[ResultColumnsEntry], 

743 loose_column_name_matching: bool, 

744 ) -> Iterator[_MergeColTuple]: 

745 match_map = self._create_description_match_map( 

746 result_columns, loose_column_name_matching 

747 ) 

748 mapped_type: TypeEngine[Any] 

749 

750 for ( 

751 idx, 

752 colname, 

753 untranslated, 

754 coltype, 

755 ) in self._colnames_from_description(context, cursor_description): 

756 try: 

757 ctx_rec = match_map[colname] 

758 except KeyError: 

759 mapped_type = sqltypes.NULLTYPE 

760 obj = None 

761 result_columns_idx = None 

762 else: 

763 obj = ctx_rec[1] 

764 mapped_type = ctx_rec[2] 

765 result_columns_idx = ctx_rec[3] 

766 yield ( 

767 idx, 

768 result_columns_idx, 

769 colname, 

770 mapped_type, 

771 coltype, 

772 obj, 

773 untranslated, 

774 ) 

775 

776 @classmethod 

777 def _create_description_match_map( 

778 cls, 

779 result_columns: Sequence[ResultColumnsEntry], 

780 loose_column_name_matching: bool = False, 

781 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]: 

782 """when matching cursor.description to a set of names that are present 

783 in a Compiled object, as is the case with TextualSelect, get all the 

784 names we expect might match those in cursor.description. 

785 """ 

786 

787 d: Dict[ 

788 Union[str, object], 

789 Tuple[str, TupleAny, TypeEngine[Any], int], 

790 ] = {} 

791 for ridx, elem in enumerate(result_columns): 

792 key = elem[RM_RENDERED_NAME] 

793 if key in d: 

794 # conflicting keyname - just add the column-linked objects 

795 # to the existing record. if there is a duplicate column 

796 # name in the cursor description, this will allow all of those 

797 # objects to raise an ambiguous column error 

798 e_name, e_obj, e_type, e_ridx = d[key] 

799 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx 

800 else: 

801 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx) 

802 

803 if loose_column_name_matching: 

804 # when using a textual statement with an unordered set 

805 # of columns that line up, we are expecting the user 

806 # to be using label names in the SQL that match to the column 

807 # expressions. Enable more liberal matching for this case; 

808 # duplicate keys that are ambiguous will be fixed later. 

809 for r_key in elem[RM_OBJECTS]: 

810 d.setdefault( 

811 r_key, 

812 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx), 

813 ) 

814 return d 

815 

816 def _merge_cols_by_none( 

817 self, 

818 context: DefaultExecutionContext, 

819 cursor_description: _DBAPICursorDescription, 

820 ) -> Iterator[_MergeColTuple]: 

821 self._keys = [] 

822 

823 for ( 

824 idx, 

825 colname, 

826 untranslated, 

827 coltype, 

828 ) in self._colnames_from_description(context, cursor_description): 

829 yield ( 

830 idx, 

831 None, 

832 colname, 

833 sqltypes.NULLTYPE, 

834 coltype, 

835 None, 

836 untranslated, 

837 ) 

838 

839 if not TYPE_CHECKING: 

840 

841 def _key_fallback( 

842 self, key: Any, err: Optional[Exception], raiseerr: bool = True 

843 ) -> Optional[NoReturn]: 

844 if raiseerr: 

845 if self._unpickled and isinstance(key, elements.ColumnElement): 

846 raise exc.NoSuchColumnError( 

847 "Row was unpickled; lookup by ColumnElement " 

848 "is unsupported" 

849 ) from err 

850 else: 

851 raise exc.NoSuchColumnError( 

852 "Could not locate column in row for column '%s'" 

853 % util.string_or_unprintable(key) 

854 ) from err 

855 else: 

856 return None 

857 

858 def _raise_for_ambiguous_column_name( 

859 self, rec: _KeyMapRecType 

860 ) -> NoReturn: 

861 raise exc.InvalidRequestError( 

862 "Ambiguous column name '%s' in " 

863 "result set column descriptions" % rec[MD_LOOKUP_KEY] 

864 ) 

865 

866 def _index_for_key( 

867 self, key: _KeyIndexType, raiseerr: bool = True 

868 ) -> Optional[int]: 

869 # TODO: can consider pre-loading ints and negative ints 

870 # into _keymap - also no coverage here 

871 if isinstance(key, int): 

872 key = self._keys[key] 

873 

874 try: 

875 rec = self._keymap[key] 

876 except KeyError as ke: 

877 x = self._key_fallback(key, ke, raiseerr) 

878 assert x is None 

879 return None 

880 

881 index = rec[0] 

882 

883 if index is None: 

884 self._raise_for_ambiguous_column_name(rec) 

885 return index 

886 

887 def _indexes_for_keys( 

888 self, keys: Sequence[_KeyIndexType] 

889 ) -> Sequence[int]: 

890 try: 

891 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501 

892 except KeyError as ke: 

893 # ensure it raises 

894 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

895 

896 def _metadata_for_keys( 

897 self, keys: Sequence[_KeyIndexType] 

898 ) -> Iterator[_NonAmbigCursorKeyMapRecType]: 

899 for key in keys: 

900 if int in key.__class__.__mro__: 

901 key = self._keys[key] # type: ignore[index] 

902 

903 try: 

904 rec = self._keymap[key] # type: ignore[index] 

905 except KeyError as ke: 

906 # ensure it raises 

907 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

908 

909 index = rec[MD_INDEX] 

910 

911 if index is None: 

912 self._raise_for_ambiguous_column_name(rec) 

913 

914 yield cast(_NonAmbigCursorKeyMapRecType, rec) 

915 

916 def __getstate__(self) -> Dict[str, Any]: 

917 # TODO: consider serializing this as SimpleResultMetaData 

918 return { 

919 "_keymap": { 

920 key: ( 

921 rec[MD_INDEX], 

922 rec[MD_RESULT_MAP_INDEX], 

923 [], 

924 key, 

925 rec[MD_RENDERED_NAME], 

926 None, 

927 None, 

928 ) 

929 for key, rec in self._keymap.items() 

930 if isinstance(key, (str, int)) 

931 }, 

932 "_keys": self._keys, 

933 "_translated_indexes": self._translated_indexes, 

934 } 

935 

936 def __setstate__(self, state: Dict[str, Any]) -> None: 

937 self._processors = [None for _ in range(len(state["_keys"]))] 

938 self._keymap = state["_keymap"] 

939 self._keymap_by_result_column_idx = None 

940 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

941 self._keys = state["_keys"] 

942 self._unpickled = True 

943 if state["_translated_indexes"]: 

944 self._translated_indexes = cast( 

945 "List[int]", state["_translated_indexes"] 

946 ) 

947 self._tuplefilter = tuplegetter(*self._translated_indexes) 

948 else: 

949 self._translated_indexes = self._tuplefilter = None 

950 

951 

952class ResultFetchStrategy: 

953 """Define a fetching strategy for a result object. 

954 

955 

956 .. versionadded:: 1.4 

957 

958 """ 

959 

960 __slots__ = () 

961 

962 alternate_cursor_description: Optional[_DBAPICursorDescription] = None 

963 

964 def soft_close( 

965 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

966 ) -> None: 

967 raise NotImplementedError() 

968 

969 def hard_close( 

970 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

971 ) -> None: 

972 raise NotImplementedError() 

973 

974 def yield_per( 

975 self, 

976 result: CursorResult[Any], 

977 dbapi_cursor: DBAPICursor, 

978 num: int, 

979 ) -> None: 

980 return 

981 

982 def fetchone( 

983 self, 

984 result: CursorResult[Any], 

985 dbapi_cursor: DBAPICursor, 

986 hard_close: bool = False, 

987 ) -> Any: 

988 raise NotImplementedError() 

989 

990 def fetchmany( 

991 self, 

992 result: CursorResult[Any], 

993 dbapi_cursor: DBAPICursor, 

994 size: Optional[int] = None, 

995 ) -> Any: 

996 raise NotImplementedError() 

997 

998 def fetchall( 

999 self, 

1000 result: CursorResult[Any], 

1001 dbapi_cursor: DBAPICursor, 

1002 ) -> Any: 

1003 raise NotImplementedError() 

1004 

1005 def handle_exception( 

1006 self, 

1007 result: CursorResult[Any], 

1008 dbapi_cursor: Optional[DBAPICursor], 

1009 err: BaseException, 

1010 ) -> NoReturn: 

1011 raise err 

1012 

1013 

1014class NoCursorFetchStrategy(ResultFetchStrategy): 

1015 """Cursor strategy for a result that has no open cursor. 

1016 

1017 There are two varieties of this strategy, one for DQL and one for 

1018 DML (and also DDL), each of which represent a result that had a cursor 

1019 but no longer has one. 

1020 

1021 """ 

1022 

1023 __slots__ = () 

1024 

1025 def soft_close( 

1026 self, 

1027 result: CursorResult[Any], 

1028 dbapi_cursor: Optional[DBAPICursor], 

1029 ) -> None: 

1030 pass 

1031 

1032 def hard_close( 

1033 self, 

1034 result: CursorResult[Any], 

1035 dbapi_cursor: Optional[DBAPICursor], 

1036 ) -> None: 

1037 pass 

1038 

1039 def fetchone( 

1040 self, 

1041 result: CursorResult[Any], 

1042 dbapi_cursor: DBAPICursor, 

1043 hard_close: bool = False, 

1044 ) -> Any: 

1045 return self._non_result(result, None) 

1046 

1047 def fetchmany( 

1048 self, 

1049 result: CursorResult[Any], 

1050 dbapi_cursor: DBAPICursor, 

1051 size: Optional[int] = None, 

1052 ) -> Any: 

1053 return self._non_result(result, []) 

1054 

1055 def fetchall( 

1056 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1057 ) -> Any: 

1058 return self._non_result(result, []) 

1059 

1060 def _non_result( 

1061 self, 

1062 result: CursorResult[Any], 

1063 default: Any, 

1064 err: Optional[BaseException] = None, 

1065 ) -> Any: 

1066 raise NotImplementedError() 

1067 

1068 

1069class NoCursorDQLFetchStrategy(NoCursorFetchStrategy): 

1070 """Cursor strategy for a DQL result that has no open cursor. 

1071 

1072 This is a result set that can return rows, i.e. for a SELECT, or for an 

1073 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state 

1074 where the cursor is closed and no rows remain available. The owning result 

1075 object may or may not be "hard closed", which determines if the fetch 

1076 methods send empty results or raise for closed result. 

1077 

1078 """ 

1079 

1080 __slots__ = () 

1081 

1082 def _non_result( 

1083 self, 

1084 result: CursorResult[Any], 

1085 default: Any, 

1086 err: Optional[BaseException] = None, 

1087 ) -> Any: 

1088 if result.closed: 

1089 raise exc.ResourceClosedError( 

1090 "This result object is closed." 

1091 ) from err 

1092 else: 

1093 return default 

1094 

1095 

1096_NO_CURSOR_DQL = NoCursorDQLFetchStrategy() 

1097 

1098 

1099class NoCursorDMLFetchStrategy(NoCursorFetchStrategy): 

1100 """Cursor strategy for a DML result that has no open cursor. 

1101 

1102 This is a result set that does not return rows, i.e. for an INSERT, 

1103 UPDATE, DELETE that does not include RETURNING. 

1104 

1105 """ 

1106 

1107 __slots__ = () 

1108 

1109 def _non_result( 

1110 self, 

1111 result: CursorResult[Any], 

1112 default: Any, 

1113 err: Optional[BaseException] = None, 

1114 ) -> Any: 

1115 # we only expect to have a _NoResultMetaData() here right now. 

1116 assert not result._metadata.returns_rows 

1117 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr] 

1118 

1119 

1120_NO_CURSOR_DML = NoCursorDMLFetchStrategy() 

1121 

1122 

1123class CursorFetchStrategy(ResultFetchStrategy): 

1124 """Call fetch methods from a DBAPI cursor. 

1125 

1126 Alternate versions of this class may instead buffer the rows from 

1127 cursors or not use cursors at all. 

1128 

1129 """ 

1130 

1131 __slots__ = () 

1132 

1133 def soft_close( 

1134 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1135 ) -> None: 

1136 result.cursor_strategy = _NO_CURSOR_DQL 

1137 

1138 def hard_close( 

1139 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1140 ) -> None: 

1141 result.cursor_strategy = _NO_CURSOR_DQL 

1142 

1143 def handle_exception( 

1144 self, 

1145 result: CursorResult[Any], 

1146 dbapi_cursor: Optional[DBAPICursor], 

1147 err: BaseException, 

1148 ) -> NoReturn: 

1149 result.connection._handle_dbapi_exception( 

1150 err, None, None, dbapi_cursor, result.context 

1151 ) 

1152 

1153 def yield_per( 

1154 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1155 ) -> None: 

1156 result.cursor_strategy = BufferedRowCursorFetchStrategy( 

1157 dbapi_cursor, 

1158 {"max_row_buffer": num}, 

1159 initial_buffer=collections.deque(), 

1160 growth_factor=0, 

1161 ) 

1162 

1163 def fetchone( 

1164 self, 

1165 result: CursorResult[Any], 

1166 dbapi_cursor: DBAPICursor, 

1167 hard_close: bool = False, 

1168 ) -> Any: 

1169 try: 

1170 row = dbapi_cursor.fetchone() 

1171 if row is None: 

1172 result._soft_close(hard=hard_close) 

1173 return row 

1174 except BaseException as e: 

1175 self.handle_exception(result, dbapi_cursor, e) 

1176 

1177 def fetchmany( 

1178 self, 

1179 result: CursorResult[Any], 

1180 dbapi_cursor: DBAPICursor, 

1181 size: Optional[int] = None, 

1182 ) -> Any: 

1183 try: 

1184 if size is None: 

1185 l = dbapi_cursor.fetchmany() 

1186 else: 

1187 l = dbapi_cursor.fetchmany(size) 

1188 

1189 if not l: 

1190 result._soft_close() 

1191 return l 

1192 except BaseException as e: 

1193 self.handle_exception(result, dbapi_cursor, e) 

1194 

1195 def fetchall( 

1196 self, 

1197 result: CursorResult[Any], 

1198 dbapi_cursor: DBAPICursor, 

1199 ) -> Any: 

1200 try: 

1201 rows = dbapi_cursor.fetchall() 

1202 result._soft_close() 

1203 return rows 

1204 except BaseException as e: 

1205 self.handle_exception(result, dbapi_cursor, e) 

1206 

1207 

1208_DEFAULT_FETCH = CursorFetchStrategy() 

1209 

1210 

1211class BufferedRowCursorFetchStrategy(CursorFetchStrategy): 

1212 """A cursor fetch strategy with row buffering behavior. 

1213 

1214 This strategy buffers the contents of a selection of rows 

1215 before ``fetchone()`` is called. This is to allow the results of 

1216 ``cursor.description`` to be available immediately, when 

1217 interfacing with a DB-API that requires rows to be consumed before 

1218 this information is available (currently psycopg2, when used with 

1219 server-side cursors). 

1220 

1221 The pre-fetching behavior fetches only one row initially, and then 

1222 grows its buffer size by a fixed amount with each successive need 

1223 for additional rows up the ``max_row_buffer`` size, which defaults 

1224 to 1000:: 

1225 

1226 with psycopg2_engine.connect() as conn: 

1227 

1228 result = conn.execution_options( 

1229 stream_results=True, max_row_buffer=50 

1230 ).execute(text("select * from table")) 

1231 

1232 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. 

1233 

1234 .. seealso:: 

1235 

1236 :ref:`psycopg2_execution_options` 

1237 """ 

1238 

1239 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor") 

1240 

1241 def __init__( 

1242 self, 

1243 dbapi_cursor: DBAPICursor, 

1244 execution_options: CoreExecuteOptionsParameter, 

1245 growth_factor: int = 5, 

1246 initial_buffer: Optional[Deque[Any]] = None, 

1247 ) -> None: 

1248 self._max_row_buffer = execution_options.get("max_row_buffer", 1000) 

1249 

1250 if initial_buffer is not None: 

1251 self._rowbuffer = initial_buffer 

1252 else: 

1253 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) 

1254 self._growth_factor = growth_factor 

1255 

1256 if growth_factor: 

1257 self._bufsize = min(self._max_row_buffer, self._growth_factor) 

1258 else: 

1259 self._bufsize = self._max_row_buffer 

1260 

1261 @classmethod 

1262 def create( 

1263 cls, result: CursorResult[Any] 

1264 ) -> BufferedRowCursorFetchStrategy: 

1265 return BufferedRowCursorFetchStrategy( 

1266 result.cursor, 

1267 result.context.execution_options, 

1268 ) 

1269 

1270 def _buffer_rows( 

1271 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1272 ) -> None: 

1273 """this is currently used only by fetchone().""" 

1274 

1275 size = self._bufsize 

1276 try: 

1277 if size < 1: 

1278 new_rows = dbapi_cursor.fetchall() 

1279 else: 

1280 new_rows = dbapi_cursor.fetchmany(size) 

1281 except BaseException as e: 

1282 self.handle_exception(result, dbapi_cursor, e) 

1283 

1284 if not new_rows: 

1285 return 

1286 self._rowbuffer = collections.deque(new_rows) 

1287 if self._growth_factor and size < self._max_row_buffer: 

1288 self._bufsize = min( 

1289 self._max_row_buffer, size * self._growth_factor 

1290 ) 

1291 

1292 def yield_per( 

1293 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1294 ) -> None: 

1295 self._growth_factor = 0 

1296 self._max_row_buffer = self._bufsize = num 

1297 

1298 def soft_close( 

1299 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1300 ) -> None: 

1301 self._rowbuffer.clear() 

1302 super().soft_close(result, dbapi_cursor) 

1303 

1304 def hard_close( 

1305 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1306 ) -> None: 

1307 self._rowbuffer.clear() 

1308 super().hard_close(result, dbapi_cursor) 

1309 

1310 def fetchone( 

1311 self, 

1312 result: CursorResult[Any], 

1313 dbapi_cursor: DBAPICursor, 

1314 hard_close: bool = False, 

1315 ) -> Any: 

1316 if not self._rowbuffer: 

1317 self._buffer_rows(result, dbapi_cursor) 

1318 if not self._rowbuffer: 

1319 try: 

1320 result._soft_close(hard=hard_close) 

1321 except BaseException as e: 

1322 self.handle_exception(result, dbapi_cursor, e) 

1323 return None 

1324 return self._rowbuffer.popleft() 

1325 

1326 def fetchmany( 

1327 self, 

1328 result: CursorResult[Any], 

1329 dbapi_cursor: DBAPICursor, 

1330 size: Optional[int] = None, 

1331 ) -> Any: 

1332 if size is None: 

1333 return self.fetchall(result, dbapi_cursor) 

1334 

1335 rb = self._rowbuffer 

1336 lb = len(rb) 

1337 close = False 

1338 if size > lb: 

1339 try: 

1340 new = dbapi_cursor.fetchmany(size - lb) 

1341 except BaseException as e: 

1342 self.handle_exception(result, dbapi_cursor, e) 

1343 else: 

1344 if not new: 

1345 # defer closing since it may clear the row buffer 

1346 close = True 

1347 else: 

1348 rb.extend(new) 

1349 

1350 res = [rb.popleft() for _ in range(min(size, len(rb)))] 

1351 if close: 

1352 result._soft_close() 

1353 return res 

1354 

1355 def fetchall( 

1356 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1357 ) -> Any: 

1358 try: 

1359 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) 

1360 self._rowbuffer.clear() 

1361 result._soft_close() 

1362 return ret 

1363 except BaseException as e: 

1364 self.handle_exception(result, dbapi_cursor, e) 

1365 

1366 

1367class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): 

1368 """A cursor strategy that buffers rows fully upon creation. 

1369 

1370 Used for operations where a result is to be delivered 

1371 after the database conversation can not be continued, 

1372 such as MSSQL INSERT...OUTPUT after an autocommit. 

1373 

1374 """ 

1375 

1376 __slots__ = ("_rowbuffer", "alternate_cursor_description") 

1377 

1378 def __init__( 

1379 self, 

1380 dbapi_cursor: Optional[DBAPICursor], 

1381 alternate_description: Optional[_DBAPICursorDescription] = None, 

1382 initial_buffer: Optional[Iterable[Any]] = None, 

1383 ): 

1384 self.alternate_cursor_description = alternate_description 

1385 if initial_buffer is not None: 

1386 self._rowbuffer = collections.deque(initial_buffer) 

1387 else: 

1388 assert dbapi_cursor is not None 

1389 self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) 

1390 

1391 def yield_per( 

1392 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1393 ) -> Any: 

1394 pass 

1395 

1396 def soft_close( 

1397 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1398 ) -> None: 

1399 self._rowbuffer.clear() 

1400 super().soft_close(result, dbapi_cursor) 

1401 

1402 def hard_close( 

1403 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1404 ) -> None: 

1405 self._rowbuffer.clear() 

1406 super().hard_close(result, dbapi_cursor) 

1407 

1408 def fetchone( 

1409 self, 

1410 result: CursorResult[Any], 

1411 dbapi_cursor: DBAPICursor, 

1412 hard_close: bool = False, 

1413 ) -> Any: 

1414 if self._rowbuffer: 

1415 return self._rowbuffer.popleft() 

1416 else: 

1417 result._soft_close(hard=hard_close) 

1418 return None 

1419 

1420 def fetchmany( 

1421 self, 

1422 result: CursorResult[Any], 

1423 dbapi_cursor: DBAPICursor, 

1424 size: Optional[int] = None, 

1425 ) -> Any: 

1426 if size is None: 

1427 return self.fetchall(result, dbapi_cursor) 

1428 

1429 rb = self._rowbuffer 

1430 rows = [rb.popleft() for _ in range(min(size, len(rb)))] 

1431 if not rows: 

1432 result._soft_close() 

1433 return rows 

1434 

1435 def fetchall( 

1436 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1437 ) -> Any: 

1438 ret = self._rowbuffer 

1439 self._rowbuffer = collections.deque() 

1440 result._soft_close() 

1441 return ret 

1442 

1443 

1444class _NoResultMetaData(ResultMetaData): 

1445 __slots__ = () 

1446 

1447 returns_rows = False 

1448 

1449 def _we_dont_return_rows( 

1450 self, err: Optional[BaseException] = None 

1451 ) -> NoReturn: 

1452 raise exc.ResourceClosedError( 

1453 "This result object does not return rows. " 

1454 "It has been closed automatically." 

1455 ) from err 

1456 

1457 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn: 

1458 self._we_dont_return_rows() 

1459 

1460 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1461 self._we_dont_return_rows() 

1462 

1463 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1464 self._we_dont_return_rows() 

1465 

1466 @property 

1467 def _keymap(self) -> NoReturn: # type: ignore[override] 

1468 self._we_dont_return_rows() 

1469 

1470 @property 

1471 def _key_to_index(self) -> NoReturn: # type: ignore[override] 

1472 self._we_dont_return_rows() 

1473 

1474 @property 

1475 def _processors(self) -> NoReturn: # type: ignore[override] 

1476 self._we_dont_return_rows() 

1477 

1478 @property 

1479 def keys(self) -> NoReturn: 

1480 self._we_dont_return_rows() 

1481 

1482 

1483_NO_RESULT_METADATA = _NoResultMetaData() 

1484 

1485 

1486def null_dml_result() -> IteratorResult[Any]: 

1487 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([])) 

1488 it._soft_close() 

1489 return it 

1490 

1491 

1492class CursorResult(Result[_T]): 

1493 """A Result that is representing state from a DBAPI cursor. 

1494 

1495 .. versionchanged:: 1.4 The :class:`.CursorResult`` 

1496 class replaces the previous :class:`.ResultProxy` interface. 

1497 This classes are based on the :class:`.Result` calling API 

1498 which provides an updated usage model and calling facade for 

1499 SQLAlchemy Core and SQLAlchemy ORM. 

1500 

1501 Returns database rows via the :class:`.Row` class, which provides 

1502 additional API features and behaviors on top of the raw data returned by 

1503 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` 

1504 method, other kinds of objects may also be returned. 

1505 

1506 .. seealso:: 

1507 

1508 :ref:`tutorial_selecting_data` - introductory material for accessing 

1509 :class:`_engine.CursorResult` and :class:`.Row` objects. 

1510 

1511 """ 

1512 

1513 __slots__ = ( 

1514 "context", 

1515 "dialect", 

1516 "cursor", 

1517 "cursor_strategy", 

1518 "_echo", 

1519 "connection", 

1520 ) 

1521 

1522 _metadata: Union[CursorResultMetaData, _NoResultMetaData] 

1523 _no_result_metadata = _NO_RESULT_METADATA 

1524 _soft_closed: bool = False 

1525 closed: bool = False 

1526 _is_cursor = True 

1527 

1528 context: DefaultExecutionContext 

1529 dialect: Dialect 

1530 cursor_strategy: ResultFetchStrategy 

1531 connection: Connection 

1532 

1533 def __init__( 

1534 self, 

1535 context: DefaultExecutionContext, 

1536 cursor_strategy: ResultFetchStrategy, 

1537 cursor_description: Optional[_DBAPICursorDescription], 

1538 ): 

1539 self.context = context 

1540 self.dialect = context.dialect 

1541 self.cursor = context.cursor 

1542 self.cursor_strategy = cursor_strategy 

1543 self.connection = context.root_connection 

1544 self._echo = echo = ( 

1545 self.connection._echo and context.engine._should_log_debug() 

1546 ) 

1547 

1548 if cursor_description is not None: 

1549 # inline of Result._row_getter(), set up an initial row 

1550 # getter assuming no transformations will be called as this 

1551 # is the most common case 

1552 

1553 metadata = self._init_metadata(context, cursor_description) 

1554 

1555 _make_row: Any 

1556 _make_row = functools.partial( 

1557 Row, 

1558 metadata, 

1559 metadata._effective_processors, 

1560 metadata._key_to_index, 

1561 ) 

1562 

1563 if context._num_sentinel_cols: 

1564 sentinel_filter = operator.itemgetter( 

1565 slice(-context._num_sentinel_cols) 

1566 ) 

1567 

1568 def _sliced_row(raw_data: Any) -> Any: 

1569 return _make_row(sentinel_filter(raw_data)) 

1570 

1571 sliced_row = _sliced_row 

1572 else: 

1573 sliced_row = _make_row 

1574 

1575 if echo: 

1576 log = self.context.connection._log_debug 

1577 

1578 def _log_row(row: Any) -> Any: 

1579 log("Row %r", sql_util._repr_row(row)) 

1580 return row 

1581 

1582 self._row_logging_fn = _log_row 

1583 

1584 def _make_row_2(row: Any) -> Any: 

1585 return _log_row(sliced_row(row)) 

1586 

1587 make_row = _make_row_2 

1588 else: 

1589 make_row = sliced_row # type: ignore[assignment] 

1590 self._set_memoized_attribute("_row_getter", make_row) 

1591 

1592 else: 

1593 assert context._num_sentinel_cols == 0 

1594 self._metadata = self._no_result_metadata 

1595 

1596 def _init_metadata( 

1597 self, 

1598 context: DefaultExecutionContext, 

1599 cursor_description: _DBAPICursorDescription, 

1600 ) -> CursorResultMetaData: 

1601 

1602 if context.compiled: 

1603 compiled = context.compiled 

1604 

1605 if compiled._cached_metadata: 

1606 metadata = compiled._cached_metadata 

1607 else: 

1608 metadata = CursorResultMetaData(self, cursor_description) 

1609 if metadata._safe_for_cache: 

1610 compiled._cached_metadata = metadata 

1611 

1612 # result rewrite/ adapt step. this is to suit the case 

1613 # when we are invoked against a cached Compiled object, we want 

1614 # to rewrite the ResultMetaData to reflect the Column objects 

1615 # that are in our current SQL statement object, not the one 

1616 # that is associated with the cached Compiled object. 

1617 # the Compiled object may also tell us to not 

1618 # actually do this step; this is to support the ORM where 

1619 # it is to produce a new Result object in any case, and will 

1620 # be using the cached Column objects against this database result 

1621 # so we don't want to rewrite them. 

1622 # 

1623 # Basically this step suits the use case where the end user 

1624 # is using Core SQL expressions and is accessing columns in the 

1625 # result row using row._mapping[table.c.column]. 

1626 if ( 

1627 not context.execution_options.get( 

1628 "_result_disable_adapt_to_context", False 

1629 ) 

1630 and compiled._result_columns 

1631 and context.cache_hit is context.dialect.CACHE_HIT 

1632 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501 

1633 ): 

1634 metadata = metadata._adapt_to_context(context) # type: ignore[assignment] # noqa: E501 

1635 

1636 self._metadata = metadata 

1637 

1638 else: 

1639 self._metadata = metadata = CursorResultMetaData( 

1640 self, cursor_description 

1641 ) 

1642 if self._echo: 

1643 context.connection._log_debug( 

1644 "Col %r", tuple(x[0] for x in cursor_description) 

1645 ) 

1646 return metadata 

1647 

1648 def _soft_close(self, hard: bool = False) -> None: 

1649 """Soft close this :class:`_engine.CursorResult`. 

1650 

1651 This releases all DBAPI cursor resources, but leaves the 

1652 CursorResult "open" from a semantic perspective, meaning the 

1653 fetchXXX() methods will continue to return empty results. 

1654 

1655 This method is called automatically when: 

1656 

1657 * all result rows are exhausted using the fetchXXX() methods. 

1658 * cursor.description is None. 

1659 

1660 This method is **not public**, but is documented in order to clarify 

1661 the "autoclose" process used. 

1662 

1663 .. seealso:: 

1664 

1665 :meth:`_engine.CursorResult.close` 

1666 

1667 

1668 """ 

1669 

1670 if (not hard and self._soft_closed) or (hard and self.closed): 

1671 return 

1672 

1673 if hard: 

1674 self.closed = True 

1675 self.cursor_strategy.hard_close(self, self.cursor) 

1676 else: 

1677 self.cursor_strategy.soft_close(self, self.cursor) 

1678 

1679 if not self._soft_closed: 

1680 cursor = self.cursor 

1681 self.cursor = None # type: ignore 

1682 self.connection._safe_close_cursor(cursor) 

1683 self._soft_closed = True 

1684 

1685 @property 

1686 def inserted_primary_key_rows(self) -> List[Optional[Any]]: 

1687 """Return the value of 

1688 :attr:`_engine.CursorResult.inserted_primary_key` 

1689 as a row contained within a list; some dialects may support a 

1690 multiple row form as well. 

1691 

1692 .. note:: As indicated below, in current SQLAlchemy versions this 

1693 accessor is only useful beyond what's already supplied by 

1694 :attr:`_engine.CursorResult.inserted_primary_key` when using the 

1695 :ref:`postgresql_psycopg2` dialect. Future versions hope to 

1696 generalize this feature to more dialects. 

1697 

1698 This accessor is added to support dialects that offer the feature 

1699 that is currently implemented by the :ref:`psycopg2_executemany_mode` 

1700 feature, currently **only the psycopg2 dialect**, which provides 

1701 for many rows to be INSERTed at once while still retaining the 

1702 behavior of being able to return server-generated primary key values. 

1703 

1704 * **When using the psycopg2 dialect, or other dialects that may support 

1705 "fast executemany" style inserts in upcoming releases** : When 

1706 invoking an INSERT statement while passing a list of rows as the 

1707 second argument to :meth:`_engine.Connection.execute`, this accessor 

1708 will then provide a list of rows, where each row contains the primary 

1709 key value for each row that was INSERTed. 

1710 

1711 * **When using all other dialects / backends that don't yet support 

1712 this feature**: This accessor is only useful for **single row INSERT 

1713 statements**, and returns the same information as that of the 

1714 :attr:`_engine.CursorResult.inserted_primary_key` within a 

1715 single-element list. When an INSERT statement is executed in 

1716 conjunction with a list of rows to be INSERTed, the list will contain 

1717 one row per row inserted in the statement, however it will contain 

1718 ``None`` for any server-generated values. 

1719 

1720 Future releases of SQLAlchemy will further generalize the 

1721 "fast execution helper" feature of psycopg2 to suit other dialects, 

1722 thus allowing this accessor to be of more general use. 

1723 

1724 .. versionadded:: 1.4 

1725 

1726 .. seealso:: 

1727 

1728 :attr:`_engine.CursorResult.inserted_primary_key` 

1729 

1730 """ 

1731 if not self.context.compiled: 

1732 raise exc.InvalidRequestError( 

1733 "Statement is not a compiled expression construct." 

1734 ) 

1735 elif not self.context.isinsert: 

1736 raise exc.InvalidRequestError( 

1737 "Statement is not an insert() expression construct." 

1738 ) 

1739 elif self.context._is_explicit_returning: 

1740 raise exc.InvalidRequestError( 

1741 "Can't call inserted_primary_key " 

1742 "when returning() " 

1743 "is used." 

1744 ) 

1745 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501 

1746 

1747 @property 

1748 def inserted_primary_key(self) -> Optional[Any]: 

1749 """Return the primary key for the row just inserted. 

1750 

1751 The return value is a :class:`_result.Row` object representing 

1752 a named tuple of primary key values in the order in which the 

1753 primary key columns are configured in the source 

1754 :class:`_schema.Table`. 

1755 

1756 .. versionchanged:: 1.4.8 - the 

1757 :attr:`_engine.CursorResult.inserted_primary_key` 

1758 value is now a named tuple via the :class:`_result.Row` class, 

1759 rather than a plain tuple. 

1760 

1761 This accessor only applies to single row :func:`_expression.insert` 

1762 constructs which did not explicitly specify 

1763 :meth:`_expression.Insert.returning`. Support for multirow inserts, 

1764 while not yet available for most backends, would be accessed using 

1765 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor. 

1766 

1767 Note that primary key columns which specify a server_default clause, or 

1768 otherwise do not qualify as "autoincrement" columns (see the notes at 

1769 :class:`_schema.Column`), and were generated using the database-side 

1770 default, will appear in this list as ``None`` unless the backend 

1771 supports "returning" and the insert statement executed with the 

1772 "implicit returning" enabled. 

1773 

1774 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1775 statement is not a compiled expression construct 

1776 or is not an insert() construct. 

1777 

1778 """ 

1779 

1780 if self.context.executemany: 

1781 raise exc.InvalidRequestError( 

1782 "This statement was an executemany call; if primary key " 

1783 "returning is supported, please " 

1784 "use .inserted_primary_key_rows." 

1785 ) 

1786 

1787 ikp = self.inserted_primary_key_rows 

1788 if ikp: 

1789 return ikp[0] 

1790 else: 

1791 return None 

1792 

1793 def last_updated_params( 

1794 self, 

1795 ) -> Union[ 

1796 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1797 ]: 

1798 """Return the collection of updated parameters from this 

1799 execution. 

1800 

1801 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1802 statement is not a compiled expression construct 

1803 or is not an update() construct. 

1804 

1805 """ 

1806 if not self.context.compiled: 

1807 raise exc.InvalidRequestError( 

1808 "Statement is not a compiled expression construct." 

1809 ) 

1810 elif not self.context.isupdate: 

1811 raise exc.InvalidRequestError( 

1812 "Statement is not an update() expression construct." 

1813 ) 

1814 elif self.context.executemany: 

1815 return self.context.compiled_parameters 

1816 else: 

1817 return self.context.compiled_parameters[0] 

1818 

1819 def last_inserted_params( 

1820 self, 

1821 ) -> Union[ 

1822 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1823 ]: 

1824 """Return the collection of inserted parameters from this 

1825 execution. 

1826 

1827 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1828 statement is not a compiled expression construct 

1829 or is not an insert() construct. 

1830 

1831 """ 

1832 if not self.context.compiled: 

1833 raise exc.InvalidRequestError( 

1834 "Statement is not a compiled expression construct." 

1835 ) 

1836 elif not self.context.isinsert: 

1837 raise exc.InvalidRequestError( 

1838 "Statement is not an insert() expression construct." 

1839 ) 

1840 elif self.context.executemany: 

1841 return self.context.compiled_parameters 

1842 else: 

1843 return self.context.compiled_parameters[0] 

1844 

1845 @property 

1846 def returned_defaults_rows( 

1847 self, 

1848 ) -> Optional[Sequence[Row[Any]]]: 

1849 """Return a list of rows each containing the values of default 

1850 columns that were fetched using 

1851 the :meth:`.ValuesBase.return_defaults` feature. 

1852 

1853 The return value is a list of :class:`.Row` objects. 

1854 

1855 .. versionadded:: 1.4 

1856 

1857 """ 

1858 return self.context.returned_default_rows 

1859 

1860 def splice_horizontally(self, other: CursorResult[Any]) -> Self: 

1861 """Return a new :class:`.CursorResult` that "horizontally splices" 

1862 together the rows of this :class:`.CursorResult` with that of another 

1863 :class:`.CursorResult`. 

1864 

1865 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

1866 not intended for general use. 

1867 

1868 "horizontally splices" means that for each row in the first and second 

1869 result sets, a new row that concatenates the two rows together is 

1870 produced, which then becomes the new row. The incoming 

1871 :class:`.CursorResult` must have the identical number of rows. It is 

1872 typically expected that the two result sets come from the same sort 

1873 order as well, as the result rows are spliced together based on their 

1874 position in the result. 

1875 

1876 The expected use case here is so that multiple INSERT..RETURNING 

1877 statements (which definitely need to be sorted) against different 

1878 tables can produce a single result that looks like a JOIN of those two 

1879 tables. 

1880 

1881 E.g.:: 

1882 

1883 r1 = connection.execute( 

1884 users.insert().returning( 

1885 users.c.user_name, users.c.user_id, sort_by_parameter_order=True 

1886 ), 

1887 user_values, 

1888 ) 

1889 

1890 r2 = connection.execute( 

1891 addresses.insert().returning( 

1892 addresses.c.address_id, 

1893 addresses.c.address, 

1894 addresses.c.user_id, 

1895 sort_by_parameter_order=True, 

1896 ), 

1897 address_values, 

1898 ) 

1899 

1900 rows = r1.splice_horizontally(r2).all() 

1901 assert rows == [ 

1902 ("john", 1, 1, "foo@bar.com", 1), 

1903 ("jack", 2, 2, "bar@bat.com", 2), 

1904 ] 

1905 

1906 .. versionadded:: 2.0 

1907 

1908 .. seealso:: 

1909 

1910 :meth:`.CursorResult.splice_vertically` 

1911 

1912 

1913 """ # noqa: E501 

1914 

1915 clone = self._generate() 

1916 total_rows = [ 

1917 tuple(r1) + tuple(r2) 

1918 for r1, r2 in zip( 

1919 list(self._raw_row_iterator()), 

1920 list(other._raw_row_iterator()), 

1921 ) 

1922 ] 

1923 

1924 clone._metadata = clone._metadata._splice_horizontally(other._metadata) # type: ignore[union-attr, arg-type] # noqa: E501 

1925 

1926 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

1927 None, 

1928 initial_buffer=total_rows, 

1929 ) 

1930 clone._reset_memoizations() 

1931 return clone 

1932 

1933 def splice_vertically(self, other: CursorResult[Any]) -> Self: 

1934 """Return a new :class:`.CursorResult` that "vertically splices", 

1935 i.e. "extends", the rows of this :class:`.CursorResult` with that of 

1936 another :class:`.CursorResult`. 

1937 

1938 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

1939 not intended for general use. 

1940 

1941 "vertically splices" means the rows of the given result are appended to 

1942 the rows of this cursor result. The incoming :class:`.CursorResult` 

1943 must have rows that represent the identical list of columns in the 

1944 identical order as they are in this :class:`.CursorResult`. 

1945 

1946 .. versionadded:: 2.0 

1947 

1948 .. seealso:: 

1949 

1950 :meth:`.CursorResult.splice_horizontally` 

1951 

1952 """ 

1953 clone = self._generate() 

1954 total_rows = list(self._raw_row_iterator()) + list( 

1955 other._raw_row_iterator() 

1956 ) 

1957 

1958 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

1959 None, 

1960 initial_buffer=total_rows, 

1961 ) 

1962 clone._reset_memoizations() 

1963 return clone 

1964 

1965 def _rewind(self, rows: Any) -> Self: 

1966 """rewind this result back to the given rowset. 

1967 

1968 this is used internally for the case where an :class:`.Insert` 

1969 construct combines the use of 

1970 :meth:`.Insert.return_defaults` along with the 

1971 "supplemental columns" feature. 

1972 

1973 """ 

1974 

1975 if self._echo: 

1976 self.context.connection._log_debug( 

1977 "CursorResult rewound %d row(s)", len(rows) 

1978 ) 

1979 

1980 # the rows given are expected to be Row objects, so we 

1981 # have to clear out processors which have already run on these 

1982 # rows 

1983 self._metadata = cast( 

1984 CursorResultMetaData, self._metadata 

1985 )._remove_processors() 

1986 

1987 self.cursor_strategy = FullyBufferedCursorFetchStrategy( 

1988 None, 

1989 # TODO: if these are Row objects, can we save on not having to 

1990 # re-make new Row objects out of them a second time? is that 

1991 # what's actually happening right now? maybe look into this 

1992 initial_buffer=rows, 

1993 ) 

1994 self._reset_memoizations() 

1995 return self 

1996 

1997 @property 

1998 def returned_defaults(self) -> Optional[Row[Any]]: 

1999 """Return the values of default columns that were fetched using 

2000 the :meth:`.ValuesBase.return_defaults` feature. 

2001 

2002 The value is an instance of :class:`.Row`, or ``None`` 

2003 if :meth:`.ValuesBase.return_defaults` was not used or if the 

2004 backend does not support RETURNING. 

2005 

2006 .. seealso:: 

2007 

2008 :meth:`.ValuesBase.return_defaults` 

2009 

2010 """ 

2011 

2012 if self.context.executemany: 

2013 raise exc.InvalidRequestError( 

2014 "This statement was an executemany call; if return defaults " 

2015 "is supported, please use .returned_defaults_rows." 

2016 ) 

2017 

2018 rows = self.context.returned_default_rows 

2019 if rows: 

2020 return rows[0] 

2021 else: 

2022 return None 

2023 

2024 def lastrow_has_defaults(self) -> bool: 

2025 """Return ``lastrow_has_defaults()`` from the underlying 

2026 :class:`.ExecutionContext`. 

2027 

2028 See :class:`.ExecutionContext` for details. 

2029 

2030 """ 

2031 

2032 return self.context.lastrow_has_defaults() 

2033 

2034 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2035 """Return ``postfetch_cols()`` from the underlying 

2036 :class:`.ExecutionContext`. 

2037 

2038 See :class:`.ExecutionContext` for details. 

2039 

2040 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2041 statement is not a compiled expression construct 

2042 or is not an insert() or update() construct. 

2043 

2044 """ 

2045 

2046 if not self.context.compiled: 

2047 raise exc.InvalidRequestError( 

2048 "Statement is not a compiled expression construct." 

2049 ) 

2050 elif not self.context.isinsert and not self.context.isupdate: 

2051 raise exc.InvalidRequestError( 

2052 "Statement is not an insert() or update() " 

2053 "expression construct." 

2054 ) 

2055 return self.context.postfetch_cols 

2056 

2057 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2058 """Return ``prefetch_cols()`` from the underlying 

2059 :class:`.ExecutionContext`. 

2060 

2061 See :class:`.ExecutionContext` for details. 

2062 

2063 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2064 statement is not a compiled expression construct 

2065 or is not an insert() or update() construct. 

2066 

2067 """ 

2068 

2069 if not self.context.compiled: 

2070 raise exc.InvalidRequestError( 

2071 "Statement is not a compiled expression construct." 

2072 ) 

2073 elif not self.context.isinsert and not self.context.isupdate: 

2074 raise exc.InvalidRequestError( 

2075 "Statement is not an insert() or update() " 

2076 "expression construct." 

2077 ) 

2078 return self.context.prefetch_cols 

2079 

2080 def supports_sane_rowcount(self) -> bool: 

2081 """Return ``supports_sane_rowcount`` from the dialect. 

2082 

2083 See :attr:`_engine.CursorResult.rowcount` for background. 

2084 

2085 """ 

2086 

2087 return self.dialect.supports_sane_rowcount 

2088 

2089 def supports_sane_multi_rowcount(self) -> bool: 

2090 """Return ``supports_sane_multi_rowcount`` from the dialect. 

2091 

2092 See :attr:`_engine.CursorResult.rowcount` for background. 

2093 

2094 """ 

2095 

2096 return self.dialect.supports_sane_multi_rowcount 

2097 

2098 @util.memoized_property 

2099 def rowcount(self) -> int: 

2100 """Return the 'rowcount' for this result. 

2101 

2102 The primary purpose of 'rowcount' is to report the number of rows 

2103 matched by the WHERE criterion of an UPDATE or DELETE statement 

2104 executed once (i.e. for a single parameter set), which may then be 

2105 compared to the number of rows expected to be updated or deleted as a 

2106 means of asserting data integrity. 

2107 

2108 This attribute is transferred from the ``cursor.rowcount`` attribute 

2109 of the DBAPI before the cursor is closed, to support DBAPIs that 

2110 don't make this value available after cursor close. Some DBAPIs may 

2111 offer meaningful values for other kinds of statements, such as INSERT 

2112 and SELECT statements as well. In order to retrieve ``cursor.rowcount`` 

2113 for these statements, set the 

2114 :paramref:`.Connection.execution_options.preserve_rowcount` 

2115 execution option to True, which will cause the ``cursor.rowcount`` 

2116 value to be unconditionally memoized before any results are returned 

2117 or the cursor is closed, regardless of statement type. 

2118 

2119 For cases where the DBAPI does not support rowcount for a particular 

2120 kind of statement and/or execution, the returned value will be ``-1``, 

2121 which is delivered directly from the DBAPI and is part of :pep:`249`. 

2122 All DBAPIs should support rowcount for single-parameter-set 

2123 UPDATE and DELETE statements, however. 

2124 

2125 .. note:: 

2126 

2127 Notes regarding :attr:`_engine.CursorResult.rowcount`: 

2128 

2129 

2130 * This attribute returns the number of rows *matched*, 

2131 which is not necessarily the same as the number of rows 

2132 that were actually *modified*. For example, an UPDATE statement 

2133 may have no net change on a given row if the SET values 

2134 given are the same as those present in the row already. 

2135 Such a row would be matched but not modified. 

2136 On backends that feature both styles, such as MySQL, 

2137 rowcount is configured to return the match 

2138 count in all cases. 

2139 

2140 * :attr:`_engine.CursorResult.rowcount` in the default case is 

2141 *only* useful in conjunction with an UPDATE or DELETE statement, 

2142 and only with a single set of parameters. For other kinds of 

2143 statements, SQLAlchemy will not attempt to pre-memoize the value 

2144 unless the 

2145 :paramref:`.Connection.execution_options.preserve_rowcount` 

2146 execution option is used. Note that contrary to :pep:`249`, many 

2147 DBAPIs do not support rowcount values for statements that are not 

2148 UPDATE or DELETE, particularly when rows are being returned which 

2149 are not fully pre-buffered. DBAPIs that dont support rowcount 

2150 for a particular kind of statement should return the value ``-1`` 

2151 for such statements. 

2152 

2153 * :attr:`_engine.CursorResult.rowcount` may not be meaningful 

2154 when executing a single statement with multiple parameter sets 

2155 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount" 

2156 values across multiple parameter sets and will return ``-1`` 

2157 when accessed. 

2158 

2159 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support 

2160 a correct population of :attr:`_engine.CursorResult.rowcount` 

2161 when the :paramref:`.Connection.execution_options.preserve_rowcount` 

2162 execution option is set to True. 

2163 

2164 * Statements that use RETURNING may not support rowcount, returning 

2165 a ``-1`` value instead. 

2166 

2167 .. seealso:: 

2168 

2169 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial` 

2170 

2171 :paramref:`.Connection.execution_options.preserve_rowcount` 

2172 

2173 """ # noqa: E501 

2174 try: 

2175 return self.context.rowcount 

2176 except BaseException as e: 

2177 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2178 raise # not called 

2179 

2180 @property 

2181 def lastrowid(self) -> int: 

2182 """Return the 'lastrowid' accessor on the DBAPI cursor. 

2183 

2184 This is a DBAPI specific method and is only functional 

2185 for those backends which support it, for statements 

2186 where it is appropriate. It's behavior is not 

2187 consistent across backends. 

2188 

2189 Usage of this method is normally unnecessary when 

2190 using insert() expression constructs; the 

2191 :attr:`~CursorResult.inserted_primary_key` attribute provides a 

2192 tuple of primary key values for a newly inserted row, 

2193 regardless of database backend. 

2194 

2195 """ 

2196 try: 

2197 return self.context.get_lastrowid() 

2198 except BaseException as e: 

2199 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2200 

2201 @property 

2202 def returns_rows(self) -> bool: 

2203 """True if this :class:`_engine.CursorResult` returns zero or more 

2204 rows. 

2205 

2206 I.e. if it is legal to call the methods 

2207 :meth:`_engine.CursorResult.fetchone`, 

2208 :meth:`_engine.CursorResult.fetchmany` 

2209 :meth:`_engine.CursorResult.fetchall`. 

2210 

2211 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should 

2212 always be synonymous with whether or not the DBAPI cursor had a 

2213 ``.description`` attribute, indicating the presence of result columns, 

2214 noting that a cursor that returns zero rows still has a 

2215 ``.description`` if a row-returning statement was emitted. 

2216 

2217 This attribute should be True for all results that are against 

2218 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE 

2219 that use RETURNING. For INSERT/UPDATE/DELETE statements that were 

2220 not using RETURNING, the value will usually be False, however 

2221 there are some dialect-specific exceptions to this, such as when 

2222 using the MSSQL / pyodbc dialect a SELECT is emitted inline in 

2223 order to retrieve an inserted primary key value. 

2224 

2225 .. seealso:: 

2226 

2227 :meth:`.Result.close` 

2228 

2229 :attr:`.Result.closed` 

2230 

2231 """ 

2232 return self._metadata.returns_rows 

2233 

2234 @property 

2235 def is_insert(self) -> bool: 

2236 """True if this :class:`_engine.CursorResult` is the result 

2237 of a executing an expression language compiled 

2238 :func:`_expression.insert` construct. 

2239 

2240 When True, this implies that the 

2241 :attr:`inserted_primary_key` attribute is accessible, 

2242 assuming the statement did not include 

2243 a user defined "returning" construct. 

2244 

2245 """ 

2246 return self.context.isinsert 

2247 

2248 def _fetchiter_impl(self) -> Iterator[Any]: 

2249 fetchone = self.cursor_strategy.fetchone 

2250 

2251 while True: 

2252 row = fetchone(self, self.cursor) 

2253 if row is None: 

2254 break 

2255 yield row 

2256 

2257 def _fetchone_impl(self, hard_close: bool = False) -> Any: 

2258 return self.cursor_strategy.fetchone(self, self.cursor, hard_close) 

2259 

2260 def _fetchall_impl(self) -> Any: 

2261 return self.cursor_strategy.fetchall(self, self.cursor) 

2262 

2263 def _fetchmany_impl(self, size: Optional[int] = None) -> Any: 

2264 return self.cursor_strategy.fetchmany(self, self.cursor, size) 

2265 

2266 def _raw_row_iterator(self) -> Any: 

2267 return self._fetchiter_impl() 

2268 

2269 def merge(self, *others: Result[Any]) -> MergedResult[Any]: 

2270 merged_result = super().merge(*others) 

2271 if self.context._has_rowcount: 

2272 merged_result.rowcount = sum( 

2273 cast("CursorResult[Any]", result).rowcount 

2274 for result in (self,) + others 

2275 ) 

2276 return merged_result 

2277 

2278 def close(self) -> None: 

2279 """Close this :class:`_engine.CursorResult`. 

2280 

2281 This closes out the underlying DBAPI cursor corresponding to the 

2282 statement execution, if one is still present. Note that the DBAPI 

2283 cursor is automatically released when the :class:`_engine.CursorResult` 

2284 exhausts all available rows. :meth:`_engine.CursorResult.close` is 

2285 generally an optional method except in the case when discarding a 

2286 :class:`_engine.CursorResult` that still has additional rows pending 

2287 for fetch. 

2288 

2289 After this method is called, it is no longer valid to call upon 

2290 the fetch methods, which will raise a :class:`.ResourceClosedError` 

2291 on subsequent use. 

2292 

2293 .. seealso:: 

2294 

2295 :ref:`connections_toplevel` 

2296 

2297 """ 

2298 self._soft_close(hard=True) 

2299 

2300 @_generative 

2301 def yield_per(self, num: int) -> Self: 

2302 self._yield_per = num 

2303 self.cursor_strategy.yield_per(self, self.cursor, num) 

2304 return self 

2305 

2306 

2307ResultProxy = CursorResult