Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

719 statements  

1# engine/cursor.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Define cursor-specific result set constructs including 

9:class:`.CursorResult`.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15import functools 

16import operator 

17import typing 

18from typing import Any 

19from typing import cast 

20from typing import ClassVar 

21from typing import Deque 

22from typing import Dict 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Mapping 

27from typing import NoReturn 

28from typing import Optional 

29from typing import Sequence 

30from typing import Tuple 

31from typing import TYPE_CHECKING 

32from typing import TypeVar 

33from typing import Union 

34 

35from .result import IteratorResult 

36from .result import MergedResult 

37from .result import Result 

38from .result import ResultMetaData 

39from .result import SimpleResultMetaData 

40from .result import tuplegetter 

41from .row import Row 

42from .. import exc 

43from .. import util 

44from ..sql import elements 

45from ..sql import sqltypes 

46from ..sql import util as sql_util 

47from ..sql.base import _generative 

48from ..sql.compiler import ResultColumnsEntry 

49from ..sql.compiler import RM_NAME 

50from ..sql.compiler import RM_OBJECTS 

51from ..sql.compiler import RM_RENDERED_NAME 

52from ..sql.compiler import RM_TYPE 

53from ..sql.type_api import TypeEngine 

54from ..util import compat 

55from ..util.typing import Final 

56from ..util.typing import Literal 

57from ..util.typing import Self 

58 

59 

60if typing.TYPE_CHECKING: 

61 from .base import Connection 

62 from .default import DefaultExecutionContext 

63 from .interfaces import _DBAPICursorDescription 

64 from .interfaces import _MutableCoreSingleExecuteParams 

65 from .interfaces import CoreExecuteOptionsParameter 

66 from .interfaces import DBAPICursor 

67 from .interfaces import DBAPIType 

68 from .interfaces import Dialect 

69 from .interfaces import ExecutionContext 

70 from .result import _KeyIndexType 

71 from .result import _KeyMapRecType 

72 from .result import _KeyMapType 

73 from .result import _KeyType 

74 from .result import _ProcessorsType 

75 from .result import _TupleGetterType 

76 from ..sql.schema import Column 

77 from ..sql.type_api import _ResultProcessorType 

78 

79 

80_T = TypeVar("_T", bound=Any) 

81TupleAny = Tuple[Any, ...] 

82 

83# metadata entry tuple indexes. 

84# using raw tuple is faster than namedtuple. 

85# these match up to the positions in 

86# _CursorKeyMapRecType 

87MD_INDEX: Final[Literal[0]] = 0 

88"""integer index in cursor.description 

89 

90""" 

91 

92MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1 

93"""integer index in compiled._result_columns""" 

94 

95MD_OBJECTS: Final[Literal[2]] = 2 

96"""other string keys and ColumnElement obj that can match. 

97 

98This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects 

99 

100""" 

101 

102MD_LOOKUP_KEY: Final[Literal[3]] = 3 

103"""string key we usually expect for key-based lookup 

104 

105this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name 

106""" 

107 

108 

109MD_RENDERED_NAME: Final[Literal[4]] = 4 

110"""name that is usually in cursor.description 

111 

112this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname 

113""" 

114 

115 

116MD_PROCESSOR: Final[Literal[5]] = 5 

117"""callable to process a result value into a row""" 

118 

119MD_UNTRANSLATED: Final[Literal[6]] = 6 

120"""raw name from cursor.description""" 

121 

122 

123_CursorKeyMapRecType = Tuple[ 

124 Optional[int], # MD_INDEX, None means the record is ambiguously named 

125 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None 

126 TupleAny, # MD_OBJECTS 

127 str, # MD_LOOKUP_KEY 

128 str, # MD_RENDERED_NAME 

129 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR 

130 Optional[str], # MD_UNTRANSLATED 

131] 

132 

133_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType] 

134 

135# same as _CursorKeyMapRecType except the MD_INDEX value is definitely 

136# not None 

137_NonAmbigCursorKeyMapRecType = Tuple[ 

138 int, 

139 int, 

140 List[Any], 

141 str, 

142 str, 

143 Optional["_ResultProcessorType[Any]"], 

144 str, 

145] 

146 

147_MergeColTuple = Tuple[ 

148 int, 

149 Optional[int], 

150 str, 

151 TypeEngine[Any], 

152 "DBAPIType", 

153 Optional[TupleAny], 

154 Optional[str], 

155] 

156 

157 

158class CursorResultMetaData(ResultMetaData): 

159 """Result metadata for DBAPI cursors.""" 

160 

161 __slots__ = ( 

162 "_keymap", 

163 "_processors", 

164 "_keys", 

165 "_keymap_by_result_column_idx", 

166 "_tuplefilter", 

167 "_translated_indexes", 

168 "_safe_for_cache", 

169 "_unpickled", 

170 "_key_to_index", 

171 # don't need _unique_filters support here for now. Can be added 

172 # if a need arises. 

173 ) 

174 

175 _keymap: _CursorKeyMapType 

176 _processors: _ProcessorsType 

177 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]] 

178 _unpickled: bool 

179 _safe_for_cache: bool 

180 _translated_indexes: Optional[List[int]] 

181 

182 returns_rows: ClassVar[bool] = True 

183 

184 def _has_key(self, key: Any) -> bool: 

185 return key in self._keymap 

186 

187 def _for_freeze(self) -> ResultMetaData: 

188 return SimpleResultMetaData( 

189 self._keys, 

190 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys], 

191 ) 

192 

193 def _make_new_metadata( 

194 self, 

195 *, 

196 unpickled: bool, 

197 processors: _ProcessorsType, 

198 keys: Sequence[str], 

199 keymap: _KeyMapType, 

200 tuplefilter: Optional[_TupleGetterType], 

201 translated_indexes: Optional[List[int]], 

202 safe_for_cache: bool, 

203 keymap_by_result_column_idx: Any, 

204 ) -> CursorResultMetaData: 

205 new_obj = self.__class__.__new__(self.__class__) 

206 new_obj._unpickled = unpickled 

207 new_obj._processors = processors 

208 new_obj._keys = keys 

209 new_obj._keymap = keymap 

210 new_obj._tuplefilter = tuplefilter 

211 new_obj._translated_indexes = translated_indexes 

212 new_obj._safe_for_cache = safe_for_cache 

213 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx 

214 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX) 

215 return new_obj 

216 

217 def _remove_processors(self) -> CursorResultMetaData: 

218 assert not self._tuplefilter 

219 return self._make_new_metadata( 

220 unpickled=self._unpickled, 

221 processors=[None] * len(self._processors), 

222 tuplefilter=None, 

223 translated_indexes=None, 

224 keymap={ 

225 key: value[0:5] + (None,) + value[6:] 

226 for key, value in self._keymap.items() 

227 }, 

228 keys=self._keys, 

229 safe_for_cache=self._safe_for_cache, 

230 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

231 ) 

232 

233 def _splice_horizontally( 

234 self, other: CursorResultMetaData 

235 ) -> CursorResultMetaData: 

236 assert not self._tuplefilter 

237 

238 keymap = dict(self._keymap) 

239 offset = len(self._keys) 

240 

241 for key, value in other._keymap.items(): 

242 # int index should be None for ambiguous key 

243 if value[MD_INDEX] is not None and key not in keymap: 

244 md_index = value[MD_INDEX] + offset 

245 md_object = value[MD_RESULT_MAP_INDEX] + offset 

246 else: 

247 md_index = None 

248 md_object = -1 

249 keymap[key] = (md_index, md_object, *value[2:]) 

250 

251 return self._make_new_metadata( 

252 unpickled=self._unpickled, 

253 processors=self._processors + other._processors, # type: ignore 

254 tuplefilter=None, 

255 translated_indexes=None, 

256 keys=self._keys + other._keys, # type: ignore 

257 keymap=keymap, 

258 safe_for_cache=self._safe_for_cache, 

259 keymap_by_result_column_idx={ 

260 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

261 for metadata_entry in keymap.values() 

262 }, 

263 ) 

264 

265 def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData: 

266 recs = list(self._metadata_for_keys(keys)) 

267 

268 indexes = [rec[MD_INDEX] for rec in recs] 

269 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] 

270 

271 if self._translated_indexes: 

272 indexes = [self._translated_indexes[idx] for idx in indexes] 

273 tup = tuplegetter(*indexes) 

274 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] 

275 

276 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} 

277 # TODO: need unit test for: 

278 # result = connection.execute("raw sql, no columns").scalars() 

279 # without the "or ()" it's failing because MD_OBJECTS is None 

280 keymap.update( 

281 (e, new_rec) 

282 for new_rec in new_recs 

283 for e in new_rec[MD_OBJECTS] or () 

284 ) 

285 

286 return self._make_new_metadata( 

287 unpickled=self._unpickled, 

288 processors=self._processors, 

289 keys=new_keys, 

290 tuplefilter=tup, 

291 translated_indexes=indexes, 

292 keymap=keymap, # type: ignore[arg-type] 

293 safe_for_cache=self._safe_for_cache, 

294 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

295 ) 

296 

297 def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData: 

298 """When using a cached Compiled construct that has a _result_map, 

299 for a new statement that used the cached Compiled, we need to ensure 

300 the keymap has the Column objects from our new statement as keys. 

301 So here we rewrite keymap with new entries for the new columns 

302 as matched to those of the cached statement. 

303 

304 """ 

305 

306 if not context.compiled or not context.compiled._result_columns: 

307 return self 

308 

309 compiled_statement = context.compiled.statement 

310 invoked_statement = context.invoked_statement 

311 

312 if TYPE_CHECKING: 

313 assert isinstance(invoked_statement, elements.ClauseElement) 

314 

315 if compiled_statement is invoked_statement: 

316 return self 

317 

318 assert invoked_statement is not None 

319 

320 # this is the most common path for Core statements when 

321 # caching is used. In ORM use, this codepath is not really used 

322 # as the _result_disable_adapt_to_context execution option is 

323 # set by the ORM. 

324 

325 # make a copy and add the columns from the invoked statement 

326 # to the result map. 

327 

328 keymap_by_position = self._keymap_by_result_column_idx 

329 

330 if keymap_by_position is None: 

331 # first retrival from cache, this map will not be set up yet, 

332 # initialize lazily 

333 keymap_by_position = self._keymap_by_result_column_idx = { 

334 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

335 for metadata_entry in self._keymap.values() 

336 } 

337 

338 assert not self._tuplefilter 

339 return self._make_new_metadata( 

340 keymap=compat.dict_union( 

341 self._keymap, 

342 { 

343 new: keymap_by_position[idx] 

344 for idx, new in enumerate( 

345 invoked_statement._all_selected_columns 

346 ) 

347 if idx in keymap_by_position 

348 }, 

349 ), 

350 unpickled=self._unpickled, 

351 processors=self._processors, 

352 tuplefilter=None, 

353 translated_indexes=None, 

354 keys=self._keys, 

355 safe_for_cache=self._safe_for_cache, 

356 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

357 ) 

358 

359 def __init__( 

360 self, 

361 parent: CursorResult[Any], 

362 cursor_description: _DBAPICursorDescription, 

363 ): 

364 context = parent.context 

365 self._tuplefilter = None 

366 self._translated_indexes = None 

367 self._safe_for_cache = self._unpickled = False 

368 

369 if context.result_column_struct: 

370 ( 

371 result_columns, 

372 cols_are_ordered, 

373 textual_ordered, 

374 ad_hoc_textual, 

375 loose_column_name_matching, 

376 ) = context.result_column_struct 

377 num_ctx_cols = len(result_columns) 

378 else: 

379 result_columns = cols_are_ordered = ( # type: ignore 

380 num_ctx_cols 

381 ) = ad_hoc_textual = loose_column_name_matching = ( 

382 textual_ordered 

383 ) = False 

384 

385 # merge cursor.description with the column info 

386 # present in the compiled structure, if any 

387 raw = self._merge_cursor_description( 

388 context, 

389 cursor_description, 

390 result_columns, 

391 num_ctx_cols, 

392 cols_are_ordered, 

393 textual_ordered, 

394 ad_hoc_textual, 

395 loose_column_name_matching, 

396 ) 

397 

398 # processors in key order which are used when building up 

399 # a row 

400 self._processors = [ 

401 metadata_entry[MD_PROCESSOR] for metadata_entry in raw 

402 ] 

403 

404 # this is used when using this ResultMetaData in a Core-only cache 

405 # retrieval context. it's initialized on first cache retrieval 

406 # when the _result_disable_adapt_to_context execution option 

407 # (which the ORM generally sets) is not set. 

408 self._keymap_by_result_column_idx = None 

409 

410 # for compiled SQL constructs, copy additional lookup keys into 

411 # the key lookup map, such as Column objects, labels, 

412 # column keys and other names 

413 if num_ctx_cols: 

414 # keymap by primary string... 

415 by_key: Dict[_KeyType, _CursorKeyMapRecType] = { 

416 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

417 for metadata_entry in raw 

418 } 

419 

420 if len(by_key) != num_ctx_cols: 

421 # if by-primary-string dictionary smaller than 

422 # number of columns, assume we have dupes; (this check 

423 # is also in place if string dictionary is bigger, as 

424 # can occur when '*' was used as one of the compiled columns, 

425 # which may or may not be suggestive of dupes), rewrite 

426 # dupe records with "None" for index which results in 

427 # ambiguous column exception when accessed. 

428 # 

429 # this is considered to be the less common case as it is not 

430 # common to have dupe column keys in a SELECT statement. 

431 # 

432 # new in 1.4: get the complete set of all possible keys, 

433 # strings, objects, whatever, that are dupes across two 

434 # different records, first. 

435 index_by_key: Dict[Any, Any] = {} 

436 dupes = set() 

437 for metadata_entry in raw: 

438 for key in (metadata_entry[MD_RENDERED_NAME],) + ( 

439 metadata_entry[MD_OBJECTS] or () 

440 ): 

441 idx = metadata_entry[MD_INDEX] 

442 # if this key has been associated with more than one 

443 # positional index, it's a dupe 

444 if index_by_key.setdefault(key, idx) != idx: 

445 dupes.add(key) 

446 

447 # then put everything we have into the keymap excluding only 

448 # those keys that are dupes. 

449 self._keymap = { 

450 obj_elem: metadata_entry 

451 for metadata_entry in raw 

452 if metadata_entry[MD_OBJECTS] 

453 for obj_elem in metadata_entry[MD_OBJECTS] 

454 if obj_elem not in dupes 

455 } 

456 

457 # then for the dupe keys, put the "ambiguous column" 

458 # record into by_key. 

459 by_key.update( 

460 { 

461 key: (None, -1, (), key, key, None, None) 

462 for key in dupes 

463 } 

464 ) 

465 

466 else: 

467 # no dupes - copy secondary elements from compiled 

468 # columns into self._keymap. this is the most common 

469 # codepath for Core / ORM statement executions before the 

470 # result metadata is cached 

471 self._keymap = { 

472 obj_elem: metadata_entry 

473 for metadata_entry in raw 

474 if metadata_entry[MD_OBJECTS] 

475 for obj_elem in metadata_entry[MD_OBJECTS] 

476 } 

477 # update keymap with primary string names taking 

478 # precedence 

479 self._keymap.update(by_key) 

480 else: 

481 # no compiled objects to map, just create keymap by primary string 

482 self._keymap = { 

483 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

484 for metadata_entry in raw 

485 } 

486 

487 # update keymap with "translated" names. In SQLAlchemy this is a 

488 # sqlite only thing, and in fact impacting only extremely old SQLite 

489 # versions unlikely to be present in modern Python versions. 

490 # however, the pyhive third party dialect is 

491 # also using this hook, which means others still might use it as well. 

492 # I dislike having this awkward hook here but as long as we need 

493 # to use names in cursor.description in some cases we need to have 

494 # some hook to accomplish this. 

495 if not num_ctx_cols and context._translate_colname: 

496 self._keymap.update( 

497 { 

498 metadata_entry[MD_UNTRANSLATED]: self._keymap[ 

499 metadata_entry[MD_LOOKUP_KEY] 

500 ] 

501 for metadata_entry in raw 

502 if metadata_entry[MD_UNTRANSLATED] 

503 } 

504 ) 

505 

506 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

507 

508 def _merge_cursor_description( 

509 self, 

510 context: DefaultExecutionContext, 

511 cursor_description: _DBAPICursorDescription, 

512 result_columns: Sequence[ResultColumnsEntry], 

513 num_ctx_cols: int, 

514 cols_are_ordered: bool, 

515 textual_ordered: bool, 

516 ad_hoc_textual: bool, 

517 loose_column_name_matching: bool, 

518 ) -> List[_CursorKeyMapRecType]: 

519 """Merge a cursor.description with compiled result column information. 

520 

521 There are at least four separate strategies used here, selected 

522 depending on the type of SQL construct used to start with. 

523 

524 The most common case is that of the compiled SQL expression construct, 

525 which generated the column names present in the raw SQL string and 

526 which has the identical number of columns as were reported by 

527 cursor.description. In this case, we assume a 1-1 positional mapping 

528 between the entries in cursor.description and the compiled object. 

529 This is also the most performant case as we disregard extracting / 

530 decoding the column names present in cursor.description since we 

531 already have the desired name we generated in the compiled SQL 

532 construct. 

533 

534 The next common case is that of the completely raw string SQL, 

535 such as passed to connection.execute(). In this case we have no 

536 compiled construct to work with, so we extract and decode the 

537 names from cursor.description and index those as the primary 

538 result row target keys. 

539 

540 The remaining fairly common case is that of the textual SQL 

541 that includes at least partial column information; this is when 

542 we use a :class:`_expression.TextualSelect` construct. 

543 This construct may have 

544 unordered or ordered column information. In the ordered case, we 

545 merge the cursor.description and the compiled construct's information 

546 positionally, and warn if there are additional description names 

547 present, however we still decode the names in cursor.description 

548 as we don't have a guarantee that the names in the columns match 

549 on these. In the unordered case, we match names in cursor.description 

550 to that of the compiled construct based on name matching. 

551 In both of these cases, the cursor.description names and the column 

552 expression objects and names are indexed as result row target keys. 

553 

554 The final case is much less common, where we have a compiled 

555 non-textual SQL expression construct, but the number of columns 

556 in cursor.description doesn't match what's in the compiled 

557 construct. We make the guess here that there might be textual 

558 column expressions in the compiled construct that themselves include 

559 a comma in them causing them to split. We do the same name-matching 

560 as with textual non-ordered columns. 

561 

562 The name-matched system of merging is the same as that used by 

563 SQLAlchemy for all cases up through the 0.9 series. Positional 

564 matching for compiled SQL expressions was introduced in 1.0 as a 

565 major performance feature, and positional matching for textual 

566 :class:`_expression.TextualSelect` objects in 1.1. 

567 As name matching is no longer 

568 a common case, it was acceptable to factor it into smaller generator- 

569 oriented methods that are easier to understand, but incur slightly 

570 more performance overhead. 

571 

572 """ 

573 

574 if ( 

575 num_ctx_cols 

576 and cols_are_ordered 

577 and not textual_ordered 

578 and num_ctx_cols == len(cursor_description) 

579 ): 

580 self._keys = [elem[0] for elem in result_columns] 

581 # pure positional 1-1 case; doesn't need to read 

582 # the names from cursor.description 

583 

584 # most common case for Core and ORM 

585 

586 # this metadata is safe to cache because we are guaranteed 

587 # to have the columns in the same order for new executions 

588 self._safe_for_cache = True 

589 return [ 

590 ( 

591 idx, 

592 idx, 

593 rmap_entry[RM_OBJECTS], 

594 rmap_entry[RM_NAME], 

595 rmap_entry[RM_RENDERED_NAME], 

596 context.get_result_processor( 

597 rmap_entry[RM_TYPE], 

598 rmap_entry[RM_RENDERED_NAME], 

599 cursor_description[idx][1], 

600 ), 

601 None, 

602 ) 

603 for idx, rmap_entry in enumerate(result_columns) 

604 ] 

605 else: 

606 # name-based or text-positional cases, where we need 

607 # to read cursor.description names 

608 

609 if textual_ordered or ( 

610 ad_hoc_textual and len(cursor_description) == num_ctx_cols 

611 ): 

612 self._safe_for_cache = True 

613 # textual positional case 

614 raw_iterator = self._merge_textual_cols_by_position( 

615 context, cursor_description, result_columns 

616 ) 

617 elif num_ctx_cols: 

618 # compiled SQL with a mismatch of description cols 

619 # vs. compiled cols, or textual w/ unordered columns 

620 # the order of columns can change if the query is 

621 # against a "select *", so not safe to cache 

622 self._safe_for_cache = False 

623 raw_iterator = self._merge_cols_by_name( 

624 context, 

625 cursor_description, 

626 result_columns, 

627 loose_column_name_matching, 

628 ) 

629 else: 

630 # no compiled SQL, just a raw string, order of columns 

631 # can change for "select *" 

632 self._safe_for_cache = False 

633 raw_iterator = self._merge_cols_by_none( 

634 context, cursor_description 

635 ) 

636 

637 return [ 

638 ( 

639 idx, 

640 ridx, 

641 obj, 

642 cursor_colname, 

643 cursor_colname, 

644 context.get_result_processor( 

645 mapped_type, cursor_colname, coltype 

646 ), 

647 untranslated, 

648 ) # type: ignore[misc] 

649 for ( 

650 idx, 

651 ridx, 

652 cursor_colname, 

653 mapped_type, 

654 coltype, 

655 obj, 

656 untranslated, 

657 ) in raw_iterator 

658 ] 

659 

660 def _colnames_from_description( 

661 self, 

662 context: DefaultExecutionContext, 

663 cursor_description: _DBAPICursorDescription, 

664 ) -> Iterator[Tuple[int, str, Optional[str], DBAPIType]]: 

665 """Extract column names and data types from a cursor.description. 

666 

667 Applies unicode decoding, column translation, "normalization", 

668 and case sensitivity rules to the names based on the dialect. 

669 

670 """ 

671 

672 dialect = context.dialect 

673 translate_colname = context._translate_colname 

674 normalize_name = ( 

675 dialect.normalize_name if dialect.requires_name_normalize else None 

676 ) 

677 untranslated = None 

678 

679 self._keys = [] 

680 

681 for idx, rec in enumerate(cursor_description): 

682 colname = rec[0] 

683 coltype = rec[1] 

684 

685 if translate_colname: 

686 colname, untranslated = translate_colname(colname) 

687 

688 if normalize_name: 

689 colname = normalize_name(colname) 

690 

691 self._keys.append(colname) 

692 

693 yield idx, colname, untranslated, coltype 

694 

695 def _merge_textual_cols_by_position( 

696 self, 

697 context: DefaultExecutionContext, 

698 cursor_description: _DBAPICursorDescription, 

699 result_columns: Sequence[ResultColumnsEntry], 

700 ) -> Iterator[_MergeColTuple]: 

701 num_ctx_cols = len(result_columns) 

702 

703 if num_ctx_cols > len(cursor_description): 

704 util.warn( 

705 "Number of columns in textual SQL (%d) is " 

706 "smaller than number of columns requested (%d)" 

707 % (num_ctx_cols, len(cursor_description)) 

708 ) 

709 seen = set() 

710 

711 for ( 

712 idx, 

713 colname, 

714 untranslated, 

715 coltype, 

716 ) in self._colnames_from_description(context, cursor_description): 

717 if idx < num_ctx_cols: 

718 ctx_rec = result_columns[idx] 

719 obj = ctx_rec[RM_OBJECTS] 

720 ridx = idx 

721 mapped_type = ctx_rec[RM_TYPE] 

722 if obj[0] in seen: 

723 raise exc.InvalidRequestError( 

724 "Duplicate column expression requested " 

725 "in textual SQL: %r" % obj[0] 

726 ) 

727 seen.add(obj[0]) 

728 else: 

729 mapped_type = sqltypes.NULLTYPE 

730 obj = None 

731 ridx = None 

732 yield idx, ridx, colname, mapped_type, coltype, obj, untranslated 

733 

734 def _merge_cols_by_name( 

735 self, 

736 context: DefaultExecutionContext, 

737 cursor_description: _DBAPICursorDescription, 

738 result_columns: Sequence[ResultColumnsEntry], 

739 loose_column_name_matching: bool, 

740 ) -> Iterator[_MergeColTuple]: 

741 match_map = self._create_description_match_map( 

742 result_columns, loose_column_name_matching 

743 ) 

744 mapped_type: TypeEngine[Any] 

745 

746 for ( 

747 idx, 

748 colname, 

749 untranslated, 

750 coltype, 

751 ) in self._colnames_from_description(context, cursor_description): 

752 try: 

753 ctx_rec = match_map[colname] 

754 except KeyError: 

755 mapped_type = sqltypes.NULLTYPE 

756 obj = None 

757 result_columns_idx = None 

758 else: 

759 obj = ctx_rec[1] 

760 mapped_type = ctx_rec[2] 

761 result_columns_idx = ctx_rec[3] 

762 yield ( 

763 idx, 

764 result_columns_idx, 

765 colname, 

766 mapped_type, 

767 coltype, 

768 obj, 

769 untranslated, 

770 ) 

771 

772 @classmethod 

773 def _create_description_match_map( 

774 cls, 

775 result_columns: Sequence[ResultColumnsEntry], 

776 loose_column_name_matching: bool = False, 

777 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]: 

778 """when matching cursor.description to a set of names that are present 

779 in a Compiled object, as is the case with TextualSelect, get all the 

780 names we expect might match those in cursor.description. 

781 """ 

782 

783 d: Dict[ 

784 Union[str, object], 

785 Tuple[str, TupleAny, TypeEngine[Any], int], 

786 ] = {} 

787 for ridx, elem in enumerate(result_columns): 

788 key = elem[RM_RENDERED_NAME] 

789 if key in d: 

790 # conflicting keyname - just add the column-linked objects 

791 # to the existing record. if there is a duplicate column 

792 # name in the cursor description, this will allow all of those 

793 # objects to raise an ambiguous column error 

794 e_name, e_obj, e_type, e_ridx = d[key] 

795 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx 

796 else: 

797 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx) 

798 

799 if loose_column_name_matching: 

800 # when using a textual statement with an unordered set 

801 # of columns that line up, we are expecting the user 

802 # to be using label names in the SQL that match to the column 

803 # expressions. Enable more liberal matching for this case; 

804 # duplicate keys that are ambiguous will be fixed later. 

805 for r_key in elem[RM_OBJECTS]: 

806 d.setdefault( 

807 r_key, 

808 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx), 

809 ) 

810 return d 

811 

812 def _merge_cols_by_none( 

813 self, 

814 context: DefaultExecutionContext, 

815 cursor_description: _DBAPICursorDescription, 

816 ) -> Iterator[_MergeColTuple]: 

817 self._keys = [] 

818 

819 for ( 

820 idx, 

821 colname, 

822 untranslated, 

823 coltype, 

824 ) in self._colnames_from_description(context, cursor_description): 

825 yield ( 

826 idx, 

827 None, 

828 colname, 

829 sqltypes.NULLTYPE, 

830 coltype, 

831 None, 

832 untranslated, 

833 ) 

834 

835 if not TYPE_CHECKING: 

836 

837 def _key_fallback( 

838 self, key: Any, err: Optional[Exception], raiseerr: bool = True 

839 ) -> Optional[NoReturn]: 

840 if raiseerr: 

841 if self._unpickled and isinstance(key, elements.ColumnElement): 

842 raise exc.NoSuchColumnError( 

843 "Row was unpickled; lookup by ColumnElement " 

844 "is unsupported" 

845 ) from err 

846 else: 

847 raise exc.NoSuchColumnError( 

848 "Could not locate column in row for column '%s'" 

849 % util.string_or_unprintable(key) 

850 ) from err 

851 else: 

852 return None 

853 

854 def _raise_for_ambiguous_column_name( 

855 self, rec: _KeyMapRecType 

856 ) -> NoReturn: 

857 raise exc.InvalidRequestError( 

858 "Ambiguous column name '%s' in " 

859 "result set column descriptions" % rec[MD_LOOKUP_KEY] 

860 ) 

861 

862 def _index_for_key( 

863 self, key: _KeyIndexType, raiseerr: bool = True 

864 ) -> Optional[int]: 

865 # TODO: can consider pre-loading ints and negative ints 

866 # into _keymap - also no coverage here 

867 if isinstance(key, int): 

868 key = self._keys[key] 

869 

870 try: 

871 rec = self._keymap[key] 

872 except KeyError as ke: 

873 x = self._key_fallback(key, ke, raiseerr) 

874 assert x is None 

875 return None 

876 

877 index = rec[0] 

878 

879 if index is None: 

880 self._raise_for_ambiguous_column_name(rec) 

881 return index 

882 

883 def _indexes_for_keys( 

884 self, keys: Sequence[_KeyIndexType] 

885 ) -> Sequence[int]: 

886 try: 

887 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501 

888 except KeyError as ke: 

889 # ensure it raises 

890 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

891 

892 def _metadata_for_keys( 

893 self, keys: Sequence[_KeyIndexType] 

894 ) -> Iterator[_NonAmbigCursorKeyMapRecType]: 

895 for key in keys: 

896 if int in key.__class__.__mro__: 

897 key = self._keys[key] # type: ignore[index] 

898 

899 try: 

900 rec = self._keymap[key] # type: ignore[index] 

901 except KeyError as ke: 

902 # ensure it raises 

903 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

904 

905 index = rec[MD_INDEX] 

906 

907 if index is None: 

908 self._raise_for_ambiguous_column_name(rec) 

909 

910 yield cast(_NonAmbigCursorKeyMapRecType, rec) 

911 

912 def __getstate__(self) -> Dict[str, Any]: 

913 # TODO: consider serializing this as SimpleResultMetaData 

914 return { 

915 "_keymap": { 

916 key: ( 

917 rec[MD_INDEX], 

918 rec[MD_RESULT_MAP_INDEX], 

919 [], 

920 key, 

921 rec[MD_RENDERED_NAME], 

922 None, 

923 None, 

924 ) 

925 for key, rec in self._keymap.items() 

926 if isinstance(key, (str, int)) 

927 }, 

928 "_keys": self._keys, 

929 "_translated_indexes": self._translated_indexes, 

930 } 

931 

932 def __setstate__(self, state: Dict[str, Any]) -> None: 

933 self._processors = [None for _ in range(len(state["_keys"]))] 

934 self._keymap = state["_keymap"] 

935 self._keymap_by_result_column_idx = None 

936 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

937 self._keys = state["_keys"] 

938 self._unpickled = True 

939 if state["_translated_indexes"]: 

940 self._translated_indexes = cast( 

941 "List[int]", state["_translated_indexes"] 

942 ) 

943 self._tuplefilter = tuplegetter(*self._translated_indexes) 

944 else: 

945 self._translated_indexes = self._tuplefilter = None 

946 

947 

948class ResultFetchStrategy: 

949 """Define a fetching strategy for a result object. 

950 

951 

952 .. versionadded:: 1.4 

953 

954 """ 

955 

956 __slots__ = () 

957 

958 alternate_cursor_description: Optional[_DBAPICursorDescription] = None 

959 

960 def soft_close( 

961 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

962 ) -> None: 

963 raise NotImplementedError() 

964 

965 def hard_close( 

966 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

967 ) -> None: 

968 raise NotImplementedError() 

969 

970 def yield_per( 

971 self, 

972 result: CursorResult[Any], 

973 dbapi_cursor: DBAPICursor, 

974 num: int, 

975 ) -> None: 

976 return 

977 

978 def fetchone( 

979 self, 

980 result: CursorResult[Any], 

981 dbapi_cursor: DBAPICursor, 

982 hard_close: bool = False, 

983 ) -> Any: 

984 raise NotImplementedError() 

985 

986 def fetchmany( 

987 self, 

988 result: CursorResult[Any], 

989 dbapi_cursor: DBAPICursor, 

990 size: Optional[int] = None, 

991 ) -> Any: 

992 raise NotImplementedError() 

993 

994 def fetchall( 

995 self, 

996 result: CursorResult[Any], 

997 dbapi_cursor: DBAPICursor, 

998 ) -> Any: 

999 raise NotImplementedError() 

1000 

1001 def handle_exception( 

1002 self, 

1003 result: CursorResult[Any], 

1004 dbapi_cursor: Optional[DBAPICursor], 

1005 err: BaseException, 

1006 ) -> NoReturn: 

1007 raise err 

1008 

1009 

1010class NoCursorFetchStrategy(ResultFetchStrategy): 

1011 """Cursor strategy for a result that has no open cursor. 

1012 

1013 There are two varieties of this strategy, one for DQL and one for 

1014 DML (and also DDL), each of which represent a result that had a cursor 

1015 but no longer has one. 

1016 

1017 """ 

1018 

1019 __slots__ = () 

1020 

1021 def soft_close( 

1022 self, 

1023 result: CursorResult[Any], 

1024 dbapi_cursor: Optional[DBAPICursor], 

1025 ) -> None: 

1026 pass 

1027 

1028 def hard_close( 

1029 self, 

1030 result: CursorResult[Any], 

1031 dbapi_cursor: Optional[DBAPICursor], 

1032 ) -> None: 

1033 pass 

1034 

1035 def fetchone( 

1036 self, 

1037 result: CursorResult[Any], 

1038 dbapi_cursor: DBAPICursor, 

1039 hard_close: bool = False, 

1040 ) -> Any: 

1041 return self._non_result(result, None) 

1042 

1043 def fetchmany( 

1044 self, 

1045 result: CursorResult[Any], 

1046 dbapi_cursor: DBAPICursor, 

1047 size: Optional[int] = None, 

1048 ) -> Any: 

1049 return self._non_result(result, []) 

1050 

1051 def fetchall( 

1052 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1053 ) -> Any: 

1054 return self._non_result(result, []) 

1055 

1056 def _non_result( 

1057 self, 

1058 result: CursorResult[Any], 

1059 default: Any, 

1060 err: Optional[BaseException] = None, 

1061 ) -> Any: 

1062 raise NotImplementedError() 

1063 

1064 

1065class NoCursorDQLFetchStrategy(NoCursorFetchStrategy): 

1066 """Cursor strategy for a DQL result that has no open cursor. 

1067 

1068 This is a result set that can return rows, i.e. for a SELECT, or for an 

1069 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state 

1070 where the cursor is closed and no rows remain available. The owning result 

1071 object may or may not be "hard closed", which determines if the fetch 

1072 methods send empty results or raise for closed result. 

1073 

1074 """ 

1075 

1076 __slots__ = () 

1077 

1078 def _non_result( 

1079 self, 

1080 result: CursorResult[Any], 

1081 default: Any, 

1082 err: Optional[BaseException] = None, 

1083 ) -> Any: 

1084 if result.closed: 

1085 raise exc.ResourceClosedError( 

1086 "This result object is closed." 

1087 ) from err 

1088 else: 

1089 return default 

1090 

1091 

1092_NO_CURSOR_DQL = NoCursorDQLFetchStrategy() 

1093 

1094 

1095class NoCursorDMLFetchStrategy(NoCursorFetchStrategy): 

1096 """Cursor strategy for a DML result that has no open cursor. 

1097 

1098 This is a result set that does not return rows, i.e. for an INSERT, 

1099 UPDATE, DELETE that does not include RETURNING. 

1100 

1101 """ 

1102 

1103 __slots__ = () 

1104 

1105 def _non_result( 

1106 self, 

1107 result: CursorResult[Any], 

1108 default: Any, 

1109 err: Optional[BaseException] = None, 

1110 ) -> Any: 

1111 # we only expect to have a _NoResultMetaData() here right now. 

1112 assert not result._metadata.returns_rows 

1113 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr] 

1114 

1115 

1116_NO_CURSOR_DML = NoCursorDMLFetchStrategy() 

1117 

1118 

1119class CursorFetchStrategy(ResultFetchStrategy): 

1120 """Call fetch methods from a DBAPI cursor. 

1121 

1122 Alternate versions of this class may instead buffer the rows from 

1123 cursors or not use cursors at all. 

1124 

1125 """ 

1126 

1127 __slots__ = () 

1128 

1129 def soft_close( 

1130 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1131 ) -> None: 

1132 result.cursor_strategy = _NO_CURSOR_DQL 

1133 

1134 def hard_close( 

1135 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1136 ) -> None: 

1137 result.cursor_strategy = _NO_CURSOR_DQL 

1138 

1139 def handle_exception( 

1140 self, 

1141 result: CursorResult[Any], 

1142 dbapi_cursor: Optional[DBAPICursor], 

1143 err: BaseException, 

1144 ) -> NoReturn: 

1145 result.connection._handle_dbapi_exception( 

1146 err, None, None, dbapi_cursor, result.context 

1147 ) 

1148 

1149 def yield_per( 

1150 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1151 ) -> None: 

1152 result.cursor_strategy = BufferedRowCursorFetchStrategy( 

1153 dbapi_cursor, 

1154 {"max_row_buffer": num}, 

1155 initial_buffer=collections.deque(), 

1156 growth_factor=0, 

1157 ) 

1158 

1159 def fetchone( 

1160 self, 

1161 result: CursorResult[Any], 

1162 dbapi_cursor: DBAPICursor, 

1163 hard_close: bool = False, 

1164 ) -> Any: 

1165 try: 

1166 row = dbapi_cursor.fetchone() 

1167 if row is None: 

1168 result._soft_close(hard=hard_close) 

1169 return row 

1170 except BaseException as e: 

1171 self.handle_exception(result, dbapi_cursor, e) 

1172 

1173 def fetchmany( 

1174 self, 

1175 result: CursorResult[Any], 

1176 dbapi_cursor: DBAPICursor, 

1177 size: Optional[int] = None, 

1178 ) -> Any: 

1179 try: 

1180 if size is None: 

1181 l = dbapi_cursor.fetchmany() 

1182 else: 

1183 l = dbapi_cursor.fetchmany(size) 

1184 

1185 if not l: 

1186 result._soft_close() 

1187 return l 

1188 except BaseException as e: 

1189 self.handle_exception(result, dbapi_cursor, e) 

1190 

1191 def fetchall( 

1192 self, 

1193 result: CursorResult[Any], 

1194 dbapi_cursor: DBAPICursor, 

1195 ) -> Any: 

1196 try: 

1197 rows = dbapi_cursor.fetchall() 

1198 result._soft_close() 

1199 return rows 

1200 except BaseException as e: 

1201 self.handle_exception(result, dbapi_cursor, e) 

1202 

1203 

1204_DEFAULT_FETCH = CursorFetchStrategy() 

1205 

1206 

1207class BufferedRowCursorFetchStrategy(CursorFetchStrategy): 

1208 """A cursor fetch strategy with row buffering behavior. 

1209 

1210 This strategy buffers the contents of a selection of rows 

1211 before ``fetchone()`` is called. This is to allow the results of 

1212 ``cursor.description`` to be available immediately, when 

1213 interfacing with a DB-API that requires rows to be consumed before 

1214 this information is available (currently psycopg2, when used with 

1215 server-side cursors). 

1216 

1217 The pre-fetching behavior fetches only one row initially, and then 

1218 grows its buffer size by a fixed amount with each successive need 

1219 for additional rows up the ``max_row_buffer`` size, which defaults 

1220 to 1000:: 

1221 

1222 with psycopg2_engine.connect() as conn: 

1223 

1224 result = conn.execution_options( 

1225 stream_results=True, max_row_buffer=50 

1226 ).execute(text("select * from table")) 

1227 

1228 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. 

1229 

1230 .. seealso:: 

1231 

1232 :ref:`psycopg2_execution_options` 

1233 """ 

1234 

1235 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor") 

1236 

1237 def __init__( 

1238 self, 

1239 dbapi_cursor: DBAPICursor, 

1240 execution_options: CoreExecuteOptionsParameter, 

1241 growth_factor: int = 5, 

1242 initial_buffer: Optional[Deque[Any]] = None, 

1243 ) -> None: 

1244 self._max_row_buffer = execution_options.get("max_row_buffer", 1000) 

1245 

1246 if initial_buffer is not None: 

1247 self._rowbuffer = initial_buffer 

1248 else: 

1249 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) 

1250 self._growth_factor = growth_factor 

1251 

1252 if growth_factor: 

1253 self._bufsize = min(self._max_row_buffer, self._growth_factor) 

1254 else: 

1255 self._bufsize = self._max_row_buffer 

1256 

1257 @classmethod 

1258 def create( 

1259 cls, result: CursorResult[Any] 

1260 ) -> BufferedRowCursorFetchStrategy: 

1261 return BufferedRowCursorFetchStrategy( 

1262 result.cursor, 

1263 result.context.execution_options, 

1264 ) 

1265 

1266 def _buffer_rows( 

1267 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1268 ) -> None: 

1269 """this is currently used only by fetchone().""" 

1270 

1271 size = self._bufsize 

1272 try: 

1273 if size < 1: 

1274 new_rows = dbapi_cursor.fetchall() 

1275 else: 

1276 new_rows = dbapi_cursor.fetchmany(size) 

1277 except BaseException as e: 

1278 self.handle_exception(result, dbapi_cursor, e) 

1279 

1280 if not new_rows: 

1281 return 

1282 self._rowbuffer = collections.deque(new_rows) 

1283 if self._growth_factor and size < self._max_row_buffer: 

1284 self._bufsize = min( 

1285 self._max_row_buffer, size * self._growth_factor 

1286 ) 

1287 

1288 def yield_per( 

1289 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1290 ) -> None: 

1291 self._growth_factor = 0 

1292 self._max_row_buffer = self._bufsize = num 

1293 

1294 def soft_close( 

1295 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1296 ) -> None: 

1297 self._rowbuffer.clear() 

1298 super().soft_close(result, dbapi_cursor) 

1299 

1300 def hard_close( 

1301 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1302 ) -> None: 

1303 self._rowbuffer.clear() 

1304 super().hard_close(result, dbapi_cursor) 

1305 

1306 def fetchone( 

1307 self, 

1308 result: CursorResult[Any], 

1309 dbapi_cursor: DBAPICursor, 

1310 hard_close: bool = False, 

1311 ) -> Any: 

1312 if not self._rowbuffer: 

1313 self._buffer_rows(result, dbapi_cursor) 

1314 if not self._rowbuffer: 

1315 try: 

1316 result._soft_close(hard=hard_close) 

1317 except BaseException as e: 

1318 self.handle_exception(result, dbapi_cursor, e) 

1319 return None 

1320 return self._rowbuffer.popleft() 

1321 

1322 def fetchmany( 

1323 self, 

1324 result: CursorResult[Any], 

1325 dbapi_cursor: DBAPICursor, 

1326 size: Optional[int] = None, 

1327 ) -> Any: 

1328 if size is None: 

1329 return self.fetchall(result, dbapi_cursor) 

1330 

1331 rb = self._rowbuffer 

1332 lb = len(rb) 

1333 close = False 

1334 if size > lb: 

1335 try: 

1336 new = dbapi_cursor.fetchmany(size - lb) 

1337 except BaseException as e: 

1338 self.handle_exception(result, dbapi_cursor, e) 

1339 else: 

1340 if not new: 

1341 # defer closing since it may clear the row buffer 

1342 close = True 

1343 else: 

1344 rb.extend(new) 

1345 

1346 res = [rb.popleft() for _ in range(min(size, len(rb)))] 

1347 if close: 

1348 result._soft_close() 

1349 return res 

1350 

1351 def fetchall( 

1352 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1353 ) -> Any: 

1354 try: 

1355 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) 

1356 self._rowbuffer.clear() 

1357 result._soft_close() 

1358 return ret 

1359 except BaseException as e: 

1360 self.handle_exception(result, dbapi_cursor, e) 

1361 

1362 

1363class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): 

1364 """A cursor strategy that buffers rows fully upon creation. 

1365 

1366 Used for operations where a result is to be delivered 

1367 after the database conversation can not be continued, 

1368 such as MSSQL INSERT...OUTPUT after an autocommit. 

1369 

1370 """ 

1371 

1372 __slots__ = ("_rowbuffer", "alternate_cursor_description") 

1373 

1374 def __init__( 

1375 self, 

1376 dbapi_cursor: Optional[DBAPICursor], 

1377 alternate_description: Optional[_DBAPICursorDescription] = None, 

1378 initial_buffer: Optional[Iterable[Any]] = None, 

1379 ): 

1380 self.alternate_cursor_description = alternate_description 

1381 if initial_buffer is not None: 

1382 self._rowbuffer = collections.deque(initial_buffer) 

1383 else: 

1384 assert dbapi_cursor is not None 

1385 self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) 

1386 

1387 def yield_per( 

1388 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1389 ) -> Any: 

1390 pass 

1391 

1392 def soft_close( 

1393 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1394 ) -> None: 

1395 self._rowbuffer.clear() 

1396 super().soft_close(result, dbapi_cursor) 

1397 

1398 def hard_close( 

1399 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1400 ) -> None: 

1401 self._rowbuffer.clear() 

1402 super().hard_close(result, dbapi_cursor) 

1403 

1404 def fetchone( 

1405 self, 

1406 result: CursorResult[Any], 

1407 dbapi_cursor: DBAPICursor, 

1408 hard_close: bool = False, 

1409 ) -> Any: 

1410 if self._rowbuffer: 

1411 return self._rowbuffer.popleft() 

1412 else: 

1413 result._soft_close(hard=hard_close) 

1414 return None 

1415 

1416 def fetchmany( 

1417 self, 

1418 result: CursorResult[Any], 

1419 dbapi_cursor: DBAPICursor, 

1420 size: Optional[int] = None, 

1421 ) -> Any: 

1422 if size is None: 

1423 return self.fetchall(result, dbapi_cursor) 

1424 

1425 rb = self._rowbuffer 

1426 rows = [rb.popleft() for _ in range(min(size, len(rb)))] 

1427 if not rows: 

1428 result._soft_close() 

1429 return rows 

1430 

1431 def fetchall( 

1432 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1433 ) -> Any: 

1434 ret = self._rowbuffer 

1435 self._rowbuffer = collections.deque() 

1436 result._soft_close() 

1437 return ret 

1438 

1439 

1440class _NoResultMetaData(ResultMetaData): 

1441 __slots__ = () 

1442 

1443 returns_rows = False 

1444 

1445 def _we_dont_return_rows( 

1446 self, err: Optional[BaseException] = None 

1447 ) -> NoReturn: 

1448 raise exc.ResourceClosedError( 

1449 "This result object does not return rows. " 

1450 "It has been closed automatically." 

1451 ) from err 

1452 

1453 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn: 

1454 self._we_dont_return_rows() 

1455 

1456 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1457 self._we_dont_return_rows() 

1458 

1459 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1460 self._we_dont_return_rows() 

1461 

1462 @property 

1463 def _keymap(self) -> NoReturn: # type: ignore[override] 

1464 self._we_dont_return_rows() 

1465 

1466 @property 

1467 def _key_to_index(self) -> NoReturn: # type: ignore[override] 

1468 self._we_dont_return_rows() 

1469 

1470 @property 

1471 def _processors(self) -> NoReturn: # type: ignore[override] 

1472 self._we_dont_return_rows() 

1473 

1474 @property 

1475 def keys(self) -> NoReturn: 

1476 self._we_dont_return_rows() 

1477 

1478 

1479_NO_RESULT_METADATA = _NoResultMetaData() 

1480 

1481 

1482def null_dml_result() -> IteratorResult[Any]: 

1483 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([])) 

1484 it._soft_close() 

1485 return it 

1486 

1487 

1488class CursorResult(Result[_T]): 

1489 """A Result that is representing state from a DBAPI cursor. 

1490 

1491 .. versionchanged:: 1.4 The :class:`.CursorResult`` 

1492 class replaces the previous :class:`.ResultProxy` interface. 

1493 This classes are based on the :class:`.Result` calling API 

1494 which provides an updated usage model and calling facade for 

1495 SQLAlchemy Core and SQLAlchemy ORM. 

1496 

1497 Returns database rows via the :class:`.Row` class, which provides 

1498 additional API features and behaviors on top of the raw data returned by 

1499 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` 

1500 method, other kinds of objects may also be returned. 

1501 

1502 .. seealso:: 

1503 

1504 :ref:`tutorial_selecting_data` - introductory material for accessing 

1505 :class:`_engine.CursorResult` and :class:`.Row` objects. 

1506 

1507 """ 

1508 

1509 __slots__ = ( 

1510 "context", 

1511 "dialect", 

1512 "cursor", 

1513 "cursor_strategy", 

1514 "_echo", 

1515 "connection", 

1516 ) 

1517 

1518 _metadata: Union[CursorResultMetaData, _NoResultMetaData] 

1519 _no_result_metadata = _NO_RESULT_METADATA 

1520 _soft_closed: bool = False 

1521 closed: bool = False 

1522 _is_cursor = True 

1523 

1524 context: DefaultExecutionContext 

1525 dialect: Dialect 

1526 cursor_strategy: ResultFetchStrategy 

1527 connection: Connection 

1528 

1529 def __init__( 

1530 self, 

1531 context: DefaultExecutionContext, 

1532 cursor_strategy: ResultFetchStrategy, 

1533 cursor_description: Optional[_DBAPICursorDescription], 

1534 ): 

1535 self.context = context 

1536 self.dialect = context.dialect 

1537 self.cursor = context.cursor 

1538 self.cursor_strategy = cursor_strategy 

1539 self.connection = context.root_connection 

1540 self._echo = echo = ( 

1541 self.connection._echo and context.engine._should_log_debug() 

1542 ) 

1543 

1544 if cursor_description is not None: 

1545 # inline of Result._row_getter(), set up an initial row 

1546 # getter assuming no transformations will be called as this 

1547 # is the most common case 

1548 

1549 metadata = self._init_metadata(context, cursor_description) 

1550 

1551 _make_row: Any 

1552 _make_row = functools.partial( 

1553 Row, 

1554 metadata, 

1555 metadata._effective_processors, 

1556 metadata._key_to_index, 

1557 ) 

1558 

1559 if context._num_sentinel_cols: 

1560 sentinel_filter = operator.itemgetter( 

1561 slice(-context._num_sentinel_cols) 

1562 ) 

1563 

1564 def _sliced_row(raw_data: Any) -> Any: 

1565 return _make_row(sentinel_filter(raw_data)) 

1566 

1567 sliced_row = _sliced_row 

1568 else: 

1569 sliced_row = _make_row 

1570 

1571 if echo: 

1572 log = self.context.connection._log_debug 

1573 

1574 def _log_row(row: Any) -> Any: 

1575 log("Row %r", sql_util._repr_row(row)) 

1576 return row 

1577 

1578 self._row_logging_fn = _log_row 

1579 

1580 def _make_row_2(row: Any) -> Any: 

1581 return _log_row(sliced_row(row)) 

1582 

1583 make_row = _make_row_2 

1584 else: 

1585 make_row = sliced_row # type: ignore[assignment] 

1586 self._set_memoized_attribute("_row_getter", make_row) 

1587 

1588 else: 

1589 assert context._num_sentinel_cols == 0 

1590 self._metadata = self._no_result_metadata 

1591 

1592 def _init_metadata( 

1593 self, 

1594 context: DefaultExecutionContext, 

1595 cursor_description: _DBAPICursorDescription, 

1596 ) -> CursorResultMetaData: 

1597 

1598 if context.compiled: 

1599 compiled = context.compiled 

1600 

1601 if compiled._cached_metadata: 

1602 metadata = compiled._cached_metadata 

1603 else: 

1604 metadata = CursorResultMetaData(self, cursor_description) 

1605 if metadata._safe_for_cache: 

1606 compiled._cached_metadata = metadata 

1607 

1608 # result rewrite/ adapt step. this is to suit the case 

1609 # when we are invoked against a cached Compiled object, we want 

1610 # to rewrite the ResultMetaData to reflect the Column objects 

1611 # that are in our current SQL statement object, not the one 

1612 # that is associated with the cached Compiled object. 

1613 # the Compiled object may also tell us to not 

1614 # actually do this step; this is to support the ORM where 

1615 # it is to produce a new Result object in any case, and will 

1616 # be using the cached Column objects against this database result 

1617 # so we don't want to rewrite them. 

1618 # 

1619 # Basically this step suits the use case where the end user 

1620 # is using Core SQL expressions and is accessing columns in the 

1621 # result row using row._mapping[table.c.column]. 

1622 if ( 

1623 not context.execution_options.get( 

1624 "_result_disable_adapt_to_context", False 

1625 ) 

1626 and compiled._result_columns 

1627 and context.cache_hit is context.dialect.CACHE_HIT 

1628 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501 

1629 ): 

1630 metadata = metadata._adapt_to_context(context) # type: ignore[assignment] # noqa: E501 

1631 

1632 self._metadata = metadata 

1633 

1634 else: 

1635 self._metadata = metadata = CursorResultMetaData( 

1636 self, cursor_description 

1637 ) 

1638 if self._echo: 

1639 context.connection._log_debug( 

1640 "Col %r", tuple(x[0] for x in cursor_description) 

1641 ) 

1642 return metadata 

1643 

1644 def _soft_close(self, hard: bool = False) -> None: 

1645 """Soft close this :class:`_engine.CursorResult`. 

1646 

1647 This releases all DBAPI cursor resources, but leaves the 

1648 CursorResult "open" from a semantic perspective, meaning the 

1649 fetchXXX() methods will continue to return empty results. 

1650 

1651 This method is called automatically when: 

1652 

1653 * all result rows are exhausted using the fetchXXX() methods. 

1654 * cursor.description is None. 

1655 

1656 This method is **not public**, but is documented in order to clarify 

1657 the "autoclose" process used. 

1658 

1659 .. seealso:: 

1660 

1661 :meth:`_engine.CursorResult.close` 

1662 

1663 

1664 """ 

1665 

1666 if (not hard and self._soft_closed) or (hard and self.closed): 

1667 return 

1668 

1669 if hard: 

1670 self.closed = True 

1671 self.cursor_strategy.hard_close(self, self.cursor) 

1672 else: 

1673 self.cursor_strategy.soft_close(self, self.cursor) 

1674 

1675 if not self._soft_closed: 

1676 cursor = self.cursor 

1677 self.cursor = None # type: ignore 

1678 self.connection._safe_close_cursor(cursor) 

1679 self._soft_closed = True 

1680 

1681 @property 

1682 def inserted_primary_key_rows(self) -> List[Optional[Any]]: 

1683 """Return the value of 

1684 :attr:`_engine.CursorResult.inserted_primary_key` 

1685 as a row contained within a list; some dialects may support a 

1686 multiple row form as well. 

1687 

1688 .. note:: As indicated below, in current SQLAlchemy versions this 

1689 accessor is only useful beyond what's already supplied by 

1690 :attr:`_engine.CursorResult.inserted_primary_key` when using the 

1691 :ref:`postgresql_psycopg2` dialect. Future versions hope to 

1692 generalize this feature to more dialects. 

1693 

1694 This accessor is added to support dialects that offer the feature 

1695 that is currently implemented by the :ref:`psycopg2_executemany_mode` 

1696 feature, currently **only the psycopg2 dialect**, which provides 

1697 for many rows to be INSERTed at once while still retaining the 

1698 behavior of being able to return server-generated primary key values. 

1699 

1700 * **When using the psycopg2 dialect, or other dialects that may support 

1701 "fast executemany" style inserts in upcoming releases** : When 

1702 invoking an INSERT statement while passing a list of rows as the 

1703 second argument to :meth:`_engine.Connection.execute`, this accessor 

1704 will then provide a list of rows, where each row contains the primary 

1705 key value for each row that was INSERTed. 

1706 

1707 * **When using all other dialects / backends that don't yet support 

1708 this feature**: This accessor is only useful for **single row INSERT 

1709 statements**, and returns the same information as that of the 

1710 :attr:`_engine.CursorResult.inserted_primary_key` within a 

1711 single-element list. When an INSERT statement is executed in 

1712 conjunction with a list of rows to be INSERTed, the list will contain 

1713 one row per row inserted in the statement, however it will contain 

1714 ``None`` for any server-generated values. 

1715 

1716 Future releases of SQLAlchemy will further generalize the 

1717 "fast execution helper" feature of psycopg2 to suit other dialects, 

1718 thus allowing this accessor to be of more general use. 

1719 

1720 .. versionadded:: 1.4 

1721 

1722 .. seealso:: 

1723 

1724 :attr:`_engine.CursorResult.inserted_primary_key` 

1725 

1726 """ 

1727 if not self.context.compiled: 

1728 raise exc.InvalidRequestError( 

1729 "Statement is not a compiled expression construct." 

1730 ) 

1731 elif not self.context.isinsert: 

1732 raise exc.InvalidRequestError( 

1733 "Statement is not an insert() expression construct." 

1734 ) 

1735 elif self.context._is_explicit_returning: 

1736 raise exc.InvalidRequestError( 

1737 "Can't call inserted_primary_key " 

1738 "when returning() " 

1739 "is used." 

1740 ) 

1741 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501 

1742 

1743 @property 

1744 def inserted_primary_key(self) -> Optional[Any]: 

1745 """Return the primary key for the row just inserted. 

1746 

1747 The return value is a :class:`_result.Row` object representing 

1748 a named tuple of primary key values in the order in which the 

1749 primary key columns are configured in the source 

1750 :class:`_schema.Table`. 

1751 

1752 .. versionchanged:: 1.4.8 - the 

1753 :attr:`_engine.CursorResult.inserted_primary_key` 

1754 value is now a named tuple via the :class:`_result.Row` class, 

1755 rather than a plain tuple. 

1756 

1757 This accessor only applies to single row :func:`_expression.insert` 

1758 constructs which did not explicitly specify 

1759 :meth:`_expression.Insert.returning`. Support for multirow inserts, 

1760 while not yet available for most backends, would be accessed using 

1761 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor. 

1762 

1763 Note that primary key columns which specify a server_default clause, or 

1764 otherwise do not qualify as "autoincrement" columns (see the notes at 

1765 :class:`_schema.Column`), and were generated using the database-side 

1766 default, will appear in this list as ``None`` unless the backend 

1767 supports "returning" and the insert statement executed with the 

1768 "implicit returning" enabled. 

1769 

1770 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1771 statement is not a compiled expression construct 

1772 or is not an insert() construct. 

1773 

1774 """ 

1775 

1776 if self.context.executemany: 

1777 raise exc.InvalidRequestError( 

1778 "This statement was an executemany call; if primary key " 

1779 "returning is supported, please " 

1780 "use .inserted_primary_key_rows." 

1781 ) 

1782 

1783 ikp = self.inserted_primary_key_rows 

1784 if ikp: 

1785 return ikp[0] 

1786 else: 

1787 return None 

1788 

1789 def last_updated_params( 

1790 self, 

1791 ) -> Union[ 

1792 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1793 ]: 

1794 """Return the collection of updated parameters from this 

1795 execution. 

1796 

1797 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1798 statement is not a compiled expression construct 

1799 or is not an update() construct. 

1800 

1801 """ 

1802 if not self.context.compiled: 

1803 raise exc.InvalidRequestError( 

1804 "Statement is not a compiled expression construct." 

1805 ) 

1806 elif not self.context.isupdate: 

1807 raise exc.InvalidRequestError( 

1808 "Statement is not an update() expression construct." 

1809 ) 

1810 elif self.context.executemany: 

1811 return self.context.compiled_parameters 

1812 else: 

1813 return self.context.compiled_parameters[0] 

1814 

1815 def last_inserted_params( 

1816 self, 

1817 ) -> Union[ 

1818 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1819 ]: 

1820 """Return the collection of inserted parameters from this 

1821 execution. 

1822 

1823 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1824 statement is not a compiled expression construct 

1825 or is not an insert() construct. 

1826 

1827 """ 

1828 if not self.context.compiled: 

1829 raise exc.InvalidRequestError( 

1830 "Statement is not a compiled expression construct." 

1831 ) 

1832 elif not self.context.isinsert: 

1833 raise exc.InvalidRequestError( 

1834 "Statement is not an insert() expression construct." 

1835 ) 

1836 elif self.context.executemany: 

1837 return self.context.compiled_parameters 

1838 else: 

1839 return self.context.compiled_parameters[0] 

1840 

1841 @property 

1842 def returned_defaults_rows( 

1843 self, 

1844 ) -> Optional[Sequence[Row[Any]]]: 

1845 """Return a list of rows each containing the values of default 

1846 columns that were fetched using 

1847 the :meth:`.ValuesBase.return_defaults` feature. 

1848 

1849 The return value is a list of :class:`.Row` objects. 

1850 

1851 .. versionadded:: 1.4 

1852 

1853 """ 

1854 return self.context.returned_default_rows 

1855 

1856 def splice_horizontally(self, other: CursorResult[Any]) -> Self: 

1857 """Return a new :class:`.CursorResult` that "horizontally splices" 

1858 together the rows of this :class:`.CursorResult` with that of another 

1859 :class:`.CursorResult`. 

1860 

1861 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

1862 not intended for general use. 

1863 

1864 "horizontally splices" means that for each row in the first and second 

1865 result sets, a new row that concatenates the two rows together is 

1866 produced, which then becomes the new row. The incoming 

1867 :class:`.CursorResult` must have the identical number of rows. It is 

1868 typically expected that the two result sets come from the same sort 

1869 order as well, as the result rows are spliced together based on their 

1870 position in the result. 

1871 

1872 The expected use case here is so that multiple INSERT..RETURNING 

1873 statements (which definitely need to be sorted) against different 

1874 tables can produce a single result that looks like a JOIN of those two 

1875 tables. 

1876 

1877 E.g.:: 

1878 

1879 r1 = connection.execute( 

1880 users.insert().returning( 

1881 users.c.user_name, users.c.user_id, sort_by_parameter_order=True 

1882 ), 

1883 user_values, 

1884 ) 

1885 

1886 r2 = connection.execute( 

1887 addresses.insert().returning( 

1888 addresses.c.address_id, 

1889 addresses.c.address, 

1890 addresses.c.user_id, 

1891 sort_by_parameter_order=True, 

1892 ), 

1893 address_values, 

1894 ) 

1895 

1896 rows = r1.splice_horizontally(r2).all() 

1897 assert rows == [ 

1898 ("john", 1, 1, "foo@bar.com", 1), 

1899 ("jack", 2, 2, "bar@bat.com", 2), 

1900 ] 

1901 

1902 .. versionadded:: 2.0 

1903 

1904 .. seealso:: 

1905 

1906 :meth:`.CursorResult.splice_vertically` 

1907 

1908 

1909 """ # noqa: E501 

1910 

1911 clone = self._generate() 

1912 total_rows = [ 

1913 tuple(r1) + tuple(r2) 

1914 for r1, r2 in zip( 

1915 list(self._raw_row_iterator()), 

1916 list(other._raw_row_iterator()), 

1917 ) 

1918 ] 

1919 

1920 clone._metadata = clone._metadata._splice_horizontally(other._metadata) # type: ignore[union-attr, arg-type] # noqa: E501 

1921 

1922 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

1923 None, 

1924 initial_buffer=total_rows, 

1925 ) 

1926 clone._reset_memoizations() 

1927 return clone 

1928 

1929 def splice_vertically(self, other: CursorResult[Any]) -> Self: 

1930 """Return a new :class:`.CursorResult` that "vertically splices", 

1931 i.e. "extends", the rows of this :class:`.CursorResult` with that of 

1932 another :class:`.CursorResult`. 

1933 

1934 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

1935 not intended for general use. 

1936 

1937 "vertically splices" means the rows of the given result are appended to 

1938 the rows of this cursor result. The incoming :class:`.CursorResult` 

1939 must have rows that represent the identical list of columns in the 

1940 identical order as they are in this :class:`.CursorResult`. 

1941 

1942 .. versionadded:: 2.0 

1943 

1944 .. seealso:: 

1945 

1946 :meth:`.CursorResult.splice_horizontally` 

1947 

1948 """ 

1949 clone = self._generate() 

1950 total_rows = list(self._raw_row_iterator()) + list( 

1951 other._raw_row_iterator() 

1952 ) 

1953 

1954 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

1955 None, 

1956 initial_buffer=total_rows, 

1957 ) 

1958 clone._reset_memoizations() 

1959 return clone 

1960 

1961 def _rewind(self, rows: Any) -> Self: 

1962 """rewind this result back to the given rowset. 

1963 

1964 this is used internally for the case where an :class:`.Insert` 

1965 construct combines the use of 

1966 :meth:`.Insert.return_defaults` along with the 

1967 "supplemental columns" feature. 

1968 

1969 """ 

1970 

1971 if self._echo: 

1972 self.context.connection._log_debug( 

1973 "CursorResult rewound %d row(s)", len(rows) 

1974 ) 

1975 

1976 # the rows given are expected to be Row objects, so we 

1977 # have to clear out processors which have already run on these 

1978 # rows 

1979 self._metadata = cast( 

1980 CursorResultMetaData, self._metadata 

1981 )._remove_processors() 

1982 

1983 self.cursor_strategy = FullyBufferedCursorFetchStrategy( 

1984 None, 

1985 # TODO: if these are Row objects, can we save on not having to 

1986 # re-make new Row objects out of them a second time? is that 

1987 # what's actually happening right now? maybe look into this 

1988 initial_buffer=rows, 

1989 ) 

1990 self._reset_memoizations() 

1991 return self 

1992 

1993 @property 

1994 def returned_defaults(self) -> Optional[Row[Any]]: 

1995 """Return the values of default columns that were fetched using 

1996 the :meth:`.ValuesBase.return_defaults` feature. 

1997 

1998 The value is an instance of :class:`.Row`, or ``None`` 

1999 if :meth:`.ValuesBase.return_defaults` was not used or if the 

2000 backend does not support RETURNING. 

2001 

2002 .. seealso:: 

2003 

2004 :meth:`.ValuesBase.return_defaults` 

2005 

2006 """ 

2007 

2008 if self.context.executemany: 

2009 raise exc.InvalidRequestError( 

2010 "This statement was an executemany call; if return defaults " 

2011 "is supported, please use .returned_defaults_rows." 

2012 ) 

2013 

2014 rows = self.context.returned_default_rows 

2015 if rows: 

2016 return rows[0] 

2017 else: 

2018 return None 

2019 

2020 def lastrow_has_defaults(self) -> bool: 

2021 """Return ``lastrow_has_defaults()`` from the underlying 

2022 :class:`.ExecutionContext`. 

2023 

2024 See :class:`.ExecutionContext` for details. 

2025 

2026 """ 

2027 

2028 return self.context.lastrow_has_defaults() 

2029 

2030 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2031 """Return ``postfetch_cols()`` from the underlying 

2032 :class:`.ExecutionContext`. 

2033 

2034 See :class:`.ExecutionContext` for details. 

2035 

2036 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2037 statement is not a compiled expression construct 

2038 or is not an insert() or update() construct. 

2039 

2040 """ 

2041 

2042 if not self.context.compiled: 

2043 raise exc.InvalidRequestError( 

2044 "Statement is not a compiled expression construct." 

2045 ) 

2046 elif not self.context.isinsert and not self.context.isupdate: 

2047 raise exc.InvalidRequestError( 

2048 "Statement is not an insert() or update() " 

2049 "expression construct." 

2050 ) 

2051 return self.context.postfetch_cols 

2052 

2053 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2054 """Return ``prefetch_cols()`` from the underlying 

2055 :class:`.ExecutionContext`. 

2056 

2057 See :class:`.ExecutionContext` for details. 

2058 

2059 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2060 statement is not a compiled expression construct 

2061 or is not an insert() or update() construct. 

2062 

2063 """ 

2064 

2065 if not self.context.compiled: 

2066 raise exc.InvalidRequestError( 

2067 "Statement is not a compiled expression construct." 

2068 ) 

2069 elif not self.context.isinsert and not self.context.isupdate: 

2070 raise exc.InvalidRequestError( 

2071 "Statement is not an insert() or update() " 

2072 "expression construct." 

2073 ) 

2074 return self.context.prefetch_cols 

2075 

2076 def supports_sane_rowcount(self) -> bool: 

2077 """Return ``supports_sane_rowcount`` from the dialect. 

2078 

2079 See :attr:`_engine.CursorResult.rowcount` for background. 

2080 

2081 """ 

2082 

2083 return self.dialect.supports_sane_rowcount 

2084 

2085 def supports_sane_multi_rowcount(self) -> bool: 

2086 """Return ``supports_sane_multi_rowcount`` from the dialect. 

2087 

2088 See :attr:`_engine.CursorResult.rowcount` for background. 

2089 

2090 """ 

2091 

2092 return self.dialect.supports_sane_multi_rowcount 

2093 

2094 @util.memoized_property 

2095 def rowcount(self) -> int: 

2096 """Return the 'rowcount' for this result. 

2097 

2098 The primary purpose of 'rowcount' is to report the number of rows 

2099 matched by the WHERE criterion of an UPDATE or DELETE statement 

2100 executed once (i.e. for a single parameter set), which may then be 

2101 compared to the number of rows expected to be updated or deleted as a 

2102 means of asserting data integrity. 

2103 

2104 This attribute is transferred from the ``cursor.rowcount`` attribute 

2105 of the DBAPI before the cursor is closed, to support DBAPIs that 

2106 don't make this value available after cursor close. Some DBAPIs may 

2107 offer meaningful values for other kinds of statements, such as INSERT 

2108 and SELECT statements as well. In order to retrieve ``cursor.rowcount`` 

2109 for these statements, set the 

2110 :paramref:`.Connection.execution_options.preserve_rowcount` 

2111 execution option to True, which will cause the ``cursor.rowcount`` 

2112 value to be unconditionally memoized before any results are returned 

2113 or the cursor is closed, regardless of statement type. 

2114 

2115 For cases where the DBAPI does not support rowcount for a particular 

2116 kind of statement and/or execution, the returned value will be ``-1``, 

2117 which is delivered directly from the DBAPI and is part of :pep:`249`. 

2118 All DBAPIs should support rowcount for single-parameter-set 

2119 UPDATE and DELETE statements, however. 

2120 

2121 .. note:: 

2122 

2123 Notes regarding :attr:`_engine.CursorResult.rowcount`: 

2124 

2125 

2126 * This attribute returns the number of rows *matched*, 

2127 which is not necessarily the same as the number of rows 

2128 that were actually *modified*. For example, an UPDATE statement 

2129 may have no net change on a given row if the SET values 

2130 given are the same as those present in the row already. 

2131 Such a row would be matched but not modified. 

2132 On backends that feature both styles, such as MySQL, 

2133 rowcount is configured to return the match 

2134 count in all cases. 

2135 

2136 * :attr:`_engine.CursorResult.rowcount` in the default case is 

2137 *only* useful in conjunction with an UPDATE or DELETE statement, 

2138 and only with a single set of parameters. For other kinds of 

2139 statements, SQLAlchemy will not attempt to pre-memoize the value 

2140 unless the 

2141 :paramref:`.Connection.execution_options.preserve_rowcount` 

2142 execution option is used. Note that contrary to :pep:`249`, many 

2143 DBAPIs do not support rowcount values for statements that are not 

2144 UPDATE or DELETE, particularly when rows are being returned which 

2145 are not fully pre-buffered. DBAPIs that dont support rowcount 

2146 for a particular kind of statement should return the value ``-1`` 

2147 for such statements. 

2148 

2149 * :attr:`_engine.CursorResult.rowcount` may not be meaningful 

2150 when executing a single statement with multiple parameter sets 

2151 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount" 

2152 values across multiple parameter sets and will return ``-1`` 

2153 when accessed. 

2154 

2155 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support 

2156 a correct population of :attr:`_engine.CursorResult.rowcount` 

2157 when the :paramref:`.Connection.execution_options.preserve_rowcount` 

2158 execution option is set to True. 

2159 

2160 * Statements that use RETURNING may not support rowcount, returning 

2161 a ``-1`` value instead. 

2162 

2163 .. seealso:: 

2164 

2165 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial` 

2166 

2167 :paramref:`.Connection.execution_options.preserve_rowcount` 

2168 

2169 """ # noqa: E501 

2170 try: 

2171 return self.context.rowcount 

2172 except BaseException as e: 

2173 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2174 raise # not called 

2175 

2176 @property 

2177 def lastrowid(self) -> int: 

2178 """Return the 'lastrowid' accessor on the DBAPI cursor. 

2179 

2180 This is a DBAPI specific method and is only functional 

2181 for those backends which support it, for statements 

2182 where it is appropriate. It's behavior is not 

2183 consistent across backends. 

2184 

2185 Usage of this method is normally unnecessary when 

2186 using insert() expression constructs; the 

2187 :attr:`~CursorResult.inserted_primary_key` attribute provides a 

2188 tuple of primary key values for a newly inserted row, 

2189 regardless of database backend. 

2190 

2191 """ 

2192 try: 

2193 return self.context.get_lastrowid() 

2194 except BaseException as e: 

2195 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2196 

2197 @property 

2198 def returns_rows(self) -> bool: 

2199 """True if this :class:`_engine.CursorResult` returns zero or more 

2200 rows. 

2201 

2202 I.e. if it is legal to call the methods 

2203 :meth:`_engine.CursorResult.fetchone`, 

2204 :meth:`_engine.CursorResult.fetchmany` 

2205 :meth:`_engine.CursorResult.fetchall`. 

2206 

2207 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should 

2208 always be synonymous with whether or not the DBAPI cursor had a 

2209 ``.description`` attribute, indicating the presence of result columns, 

2210 noting that a cursor that returns zero rows still has a 

2211 ``.description`` if a row-returning statement was emitted. 

2212 

2213 This attribute should be True for all results that are against 

2214 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE 

2215 that use RETURNING. For INSERT/UPDATE/DELETE statements that were 

2216 not using RETURNING, the value will usually be False, however 

2217 there are some dialect-specific exceptions to this, such as when 

2218 using the MSSQL / pyodbc dialect a SELECT is emitted inline in 

2219 order to retrieve an inserted primary key value. 

2220 

2221 

2222 """ 

2223 return self._metadata.returns_rows 

2224 

2225 @property 

2226 def is_insert(self) -> bool: 

2227 """True if this :class:`_engine.CursorResult` is the result 

2228 of a executing an expression language compiled 

2229 :func:`_expression.insert` construct. 

2230 

2231 When True, this implies that the 

2232 :attr:`inserted_primary_key` attribute is accessible, 

2233 assuming the statement did not include 

2234 a user defined "returning" construct. 

2235 

2236 """ 

2237 return self.context.isinsert 

2238 

2239 def _fetchiter_impl(self) -> Iterator[Any]: 

2240 fetchone = self.cursor_strategy.fetchone 

2241 

2242 while True: 

2243 row = fetchone(self, self.cursor) 

2244 if row is None: 

2245 break 

2246 yield row 

2247 

2248 def _fetchone_impl(self, hard_close: bool = False) -> Any: 

2249 return self.cursor_strategy.fetchone(self, self.cursor, hard_close) 

2250 

2251 def _fetchall_impl(self) -> Any: 

2252 return self.cursor_strategy.fetchall(self, self.cursor) 

2253 

2254 def _fetchmany_impl(self, size: Optional[int] = None) -> Any: 

2255 return self.cursor_strategy.fetchmany(self, self.cursor, size) 

2256 

2257 def _raw_row_iterator(self) -> Any: 

2258 return self._fetchiter_impl() 

2259 

2260 def merge(self, *others: Result[Any]) -> MergedResult[Any]: 

2261 merged_result = super().merge(*others) 

2262 if self.context._has_rowcount: 

2263 merged_result.rowcount = sum( 

2264 cast("CursorResult[Any]", result).rowcount 

2265 for result in (self,) + others 

2266 ) 

2267 return merged_result 

2268 

2269 def close(self) -> None: 

2270 """Close this :class:`_engine.CursorResult`. 

2271 

2272 This closes out the underlying DBAPI cursor corresponding to the 

2273 statement execution, if one is still present. Note that the DBAPI 

2274 cursor is automatically released when the :class:`_engine.CursorResult` 

2275 exhausts all available rows. :meth:`_engine.CursorResult.close` is 

2276 generally an optional method except in the case when discarding a 

2277 :class:`_engine.CursorResult` that still has additional rows pending 

2278 for fetch. 

2279 

2280 After this method is called, it is no longer valid to call upon 

2281 the fetch methods, which will raise a :class:`.ResourceClosedError` 

2282 on subsequent use. 

2283 

2284 .. seealso:: 

2285 

2286 :ref:`connections_toplevel` 

2287 

2288 """ 

2289 self._soft_close(hard=True) 

2290 

2291 @_generative 

2292 def yield_per(self, num: int) -> Self: 

2293 self._yield_per = num 

2294 self.cursor_strategy.yield_per(self, self.cursor, num) 

2295 return self 

2296 

2297 

2298ResultProxy = CursorResult