Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

750 statements  

1# engine/cursor.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Define cursor-specific result set constructs including 

9:class:`.CursorResult`.""" 

10 

11from __future__ import annotations 

12 

13import collections 

14import operator 

15import typing 

16from typing import Any 

17from typing import cast 

18from typing import ClassVar 

19from typing import Deque 

20from typing import Dict 

21from typing import Final 

22from typing import Iterable 

23from typing import Iterator 

24from typing import List 

25from typing import Literal 

26from typing import Mapping 

27from typing import NoReturn 

28from typing import Optional 

29from typing import Sequence 

30from typing import Tuple 

31from typing import TYPE_CHECKING 

32from typing import Union 

33 

34from .result import IteratorResult 

35from .result import MergedResult 

36from .result import Result 

37from .result import ResultMetaData 

38from .result import SimpleResultMetaData 

39from .result import tuplegetter 

40from .row import Row 

41from .. import exc 

42from .. import util 

43from ..sql import elements 

44from ..sql import sqltypes 

45from ..sql import util as sql_util 

46from ..sql.base import _generative 

47from ..sql.compiler import ResultColumnsEntry 

48from ..sql.compiler import RM_NAME 

49from ..sql.compiler import RM_OBJECTS 

50from ..sql.compiler import RM_RENDERED_NAME 

51from ..sql.compiler import RM_TYPE 

52from ..sql.type_api import TypeEngine 

53from ..util.typing import Self 

54from ..util.typing import TupleAny 

55from ..util.typing import TypeVarTuple 

56from ..util.typing import Unpack 

57 

58if typing.TYPE_CHECKING: 

59 from .base import Connection 

60 from .default import DefaultExecutionContext 

61 from .interfaces import _DBAPICursorDescription 

62 from .interfaces import _MutableCoreSingleExecuteParams 

63 from .interfaces import CoreExecuteOptionsParameter 

64 from .interfaces import DBAPICursor 

65 from .interfaces import DBAPIType 

66 from .interfaces import Dialect 

67 from .interfaces import ExecutionContext 

68 from .result import _KeyIndexType 

69 from .result import _KeyMapRecType 

70 from .result import _KeyMapType 

71 from .result import _KeyType 

72 from .result import _ProcessorsType 

73 from .result import _TupleGetterType 

74 from ..sql.schema import Column 

75 from ..sql.type_api import _ResultProcessorType 

76 

77 

78_Ts = TypeVarTuple("_Ts") 

79 

80 

81# metadata entry tuple indexes. 

82# using raw tuple is faster than namedtuple. 

83# these match up to the positions in 

84# _CursorKeyMapRecType 

85MD_INDEX: Final[Literal[0]] = 0 

86"""integer index in cursor.description 

87 

88""" 

89 

90MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1 

91"""integer index in compiled._result_columns""" 

92 

93MD_OBJECTS: Final[Literal[2]] = 2 

94"""other string keys and ColumnElement obj that can match. 

95 

96This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects 

97 

98""" 

99 

100MD_LOOKUP_KEY: Final[Literal[3]] = 3 

101"""string key we usually expect for key-based lookup 

102 

103this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name 

104""" 

105 

106 

107MD_RENDERED_NAME: Final[Literal[4]] = 4 

108"""name that is usually in cursor.description 

109 

110this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname 

111""" 

112 

113 

114MD_PROCESSOR: Final[Literal[5]] = 5 

115"""callable to process a result value into a row""" 

116 

117MD_UNTRANSLATED: Final[Literal[6]] = 6 

118"""raw name from cursor.description""" 

119 

120 

121_CursorKeyMapRecType = Tuple[ 

122 Optional[int], # MD_INDEX, None means the record is ambiguously named 

123 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None 

124 TupleAny, # MD_OBJECTS 

125 str, # MD_LOOKUP_KEY 

126 str, # MD_RENDERED_NAME 

127 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR 

128 Optional[str], # MD_UNTRANSLATED 

129] 

130 

131_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType] 

132 

133# same as _CursorKeyMapRecType except the MD_INDEX value is definitely 

134# not None 

135_NonAmbigCursorKeyMapRecType = Tuple[ 

136 int, 

137 int, 

138 List[Any], 

139 str, 

140 str, 

141 Optional["_ResultProcessorType[Any]"], 

142 str, 

143] 

144 

145_MergeColTuple = Tuple[ 

146 int, 

147 Optional[int], 

148 str, 

149 TypeEngine[Any], 

150 "DBAPIType", 

151 Optional[TupleAny], 

152 Optional[str], 

153] 

154 

155 

156class CursorResultMetaData(ResultMetaData): 

157 """Result metadata for DBAPI cursors.""" 

158 

159 __slots__ = ( 

160 "_keymap", 

161 "_processors", 

162 "_keys", 

163 "_keymap_by_result_column_idx", 

164 "_tuplefilter", 

165 "_translated_indexes", 

166 "_safe_for_cache", 

167 "_unpickled", 

168 "_key_to_index", 

169 # don't need _unique_filters support here for now. Can be added 

170 # if a need arises. 

171 ) 

172 

173 _keymap: _CursorKeyMapType 

174 _processors: _ProcessorsType 

175 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]] 

176 _unpickled: bool 

177 _safe_for_cache: bool 

178 _translated_indexes: Optional[List[int]] 

179 

180 returns_rows: ClassVar[bool] = True 

181 

182 def _has_key(self, key: Any) -> bool: 

183 return key in self._keymap 

184 

185 def _for_freeze(self) -> ResultMetaData: 

186 return SimpleResultMetaData( 

187 self._keys, 

188 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys], 

189 ) 

190 

191 def _make_new_metadata( 

192 self, 

193 *, 

194 unpickled: bool, 

195 processors: _ProcessorsType, 

196 keys: Sequence[str], 

197 keymap: _KeyMapType, 

198 tuplefilter: Optional[_TupleGetterType], 

199 translated_indexes: Optional[List[int]], 

200 safe_for_cache: bool, 

201 keymap_by_result_column_idx: Any, 

202 ) -> Self: 

203 new_obj = self.__class__.__new__(self.__class__) 

204 new_obj._unpickled = unpickled 

205 new_obj._processors = processors 

206 new_obj._keys = keys 

207 new_obj._keymap = keymap 

208 new_obj._tuplefilter = tuplefilter 

209 new_obj._translated_indexes = translated_indexes 

210 new_obj._safe_for_cache = safe_for_cache 

211 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx 

212 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX) 

213 return new_obj 

214 

215 def _remove_processors_and_tuple_filter(self) -> Self: 

216 if self._tuplefilter: 

217 proc = self._tuplefilter(self._processors) 

218 else: 

219 proc = self._processors 

220 return self._make_new_metadata( 

221 unpickled=self._unpickled, 

222 processors=[None] * len(proc), 

223 tuplefilter=None, 

224 translated_indexes=None, 

225 keymap={ 

226 key: value[0:5] + (None,) + value[6:] 

227 for key, value in self._keymap.items() 

228 }, 

229 keys=self._keys, 

230 safe_for_cache=self._safe_for_cache, 

231 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

232 ) 

233 

234 def _splice_horizontally(self, other: CursorResultMetaData) -> Self: 

235 keymap = dict(self._keymap) 

236 offset = len(self._keys) 

237 

238 for key, value in other._keymap.items(): 

239 # int index should be None for ambiguous key 

240 if value[MD_INDEX] is not None and key not in keymap: 

241 md_index = value[MD_INDEX] + offset 

242 md_object = value[MD_RESULT_MAP_INDEX] + offset 

243 else: 

244 md_index = None 

245 md_object = -1 

246 keymap[key] = (md_index, md_object, *value[2:]) 

247 

248 self_tf = self._tuplefilter 

249 other_tf = other._tuplefilter 

250 

251 proc: List[Any] = [] 

252 for pp, tf in [ 

253 (self._processors, self_tf), 

254 (other._processors, other_tf), 

255 ]: 

256 proc.extend(pp if tf is None else tf(pp)) 

257 

258 new_keys = [*self._keys, *other._keys] 

259 assert len(proc) == len(new_keys) 

260 

261 return self._make_new_metadata( 

262 unpickled=self._unpickled, 

263 processors=proc, 

264 tuplefilter=None, 

265 translated_indexes=None, 

266 keys=new_keys, 

267 keymap=keymap, 

268 safe_for_cache=self._safe_for_cache, 

269 keymap_by_result_column_idx={ 

270 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

271 for metadata_entry in keymap.values() 

272 }, 

273 ) 

274 

275 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self: 

276 recs = list(self._metadata_for_keys(keys)) 

277 

278 indexes = [rec[MD_INDEX] for rec in recs] 

279 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] 

280 

281 if self._translated_indexes: 

282 indexes = [self._translated_indexes[idx] for idx in indexes] 

283 tup = tuplegetter(*indexes) 

284 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] 

285 

286 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} 

287 # TODO: need unit test for: 

288 # result = connection.execute("raw sql, no columns").scalars() 

289 # without the "or ()" it's failing because MD_OBJECTS is None 

290 keymap.update( 

291 (e, new_rec) 

292 for new_rec in new_recs 

293 for e in new_rec[MD_OBJECTS] or () 

294 ) 

295 

296 return self._make_new_metadata( 

297 unpickled=self._unpickled, 

298 processors=self._processors, 

299 keys=new_keys, 

300 tuplefilter=tup, 

301 translated_indexes=indexes, 

302 keymap=keymap, # type: ignore[arg-type] 

303 safe_for_cache=self._safe_for_cache, 

304 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

305 ) 

306 

307 def _adapt_to_context(self, context: ExecutionContext) -> Self: 

308 """When using a cached Compiled construct that has a _result_map, 

309 for a new statement that used the cached Compiled, we need to ensure 

310 the keymap has the Column objects from our new statement as keys. 

311 So here we rewrite keymap with new entries for the new columns 

312 as matched to those of the cached statement. 

313 

314 """ 

315 

316 if not context.compiled or not context.compiled._result_columns: 

317 return self 

318 

319 compiled_statement = context.compiled.statement 

320 invoked_statement = context.invoked_statement 

321 

322 if TYPE_CHECKING: 

323 assert isinstance(invoked_statement, elements.ClauseElement) 

324 

325 if compiled_statement is invoked_statement: 

326 return self 

327 

328 assert invoked_statement is not None 

329 

330 # this is the most common path for Core statements when 

331 # caching is used. In ORM use, this codepath is not really used 

332 # as the _result_disable_adapt_to_context execution option is 

333 # set by the ORM. 

334 

335 # make a copy and add the columns from the invoked statement 

336 # to the result map. 

337 

338 keymap_by_position = self._keymap_by_result_column_idx 

339 

340 if keymap_by_position is None: 

341 # first retrieval from cache, this map will not be set up yet, 

342 # initialize lazily 

343 keymap_by_position = self._keymap_by_result_column_idx = { 

344 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

345 for metadata_entry in self._keymap.values() 

346 } 

347 

348 return self._make_new_metadata( 

349 keymap=self._keymap 

350 | { 

351 new: keymap_by_position[idx] 

352 for idx, new in enumerate( 

353 invoked_statement._all_selected_columns 

354 ) 

355 if idx in keymap_by_position 

356 }, 

357 unpickled=self._unpickled, 

358 processors=self._processors, 

359 tuplefilter=self._tuplefilter, 

360 translated_indexes=None, 

361 keys=self._keys, 

362 safe_for_cache=self._safe_for_cache, 

363 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

364 ) 

365 

366 def __init__( 

367 self, 

368 parent: CursorResult[Unpack[TupleAny]], 

369 cursor_description: _DBAPICursorDescription, 

370 *, 

371 driver_column_names: bool = False, 

372 num_sentinel_cols: int = 0, 

373 ): 

374 context = parent.context 

375 if num_sentinel_cols > 0: 

376 # this is slightly faster than letting tuplegetter use the indexes 

377 self._tuplefilter = tuplefilter = operator.itemgetter( 

378 slice(-num_sentinel_cols) 

379 ) 

380 cursor_description = tuplefilter(cursor_description) 

381 else: 

382 self._tuplefilter = tuplefilter = None 

383 self._translated_indexes = None 

384 self._safe_for_cache = self._unpickled = False 

385 

386 if context.result_column_struct: 

387 ( 

388 result_columns, 

389 cols_are_ordered, 

390 textual_ordered, 

391 ad_hoc_textual, 

392 loose_column_name_matching, 

393 ) = context.result_column_struct 

394 if tuplefilter is not None: 

395 result_columns = tuplefilter(result_columns) 

396 num_ctx_cols = len(result_columns) 

397 else: 

398 result_columns = cols_are_ordered = ( # type: ignore 

399 num_ctx_cols 

400 ) = ad_hoc_textual = loose_column_name_matching = ( 

401 textual_ordered 

402 ) = False 

403 

404 # merge cursor.description with the column info 

405 # present in the compiled structure, if any 

406 raw = self._merge_cursor_description( 

407 context, 

408 cursor_description, 

409 result_columns, 

410 num_ctx_cols, 

411 cols_are_ordered, 

412 textual_ordered, 

413 ad_hoc_textual, 

414 loose_column_name_matching, 

415 driver_column_names, 

416 ) 

417 

418 # processors in key order which are used when building up 

419 # a row 

420 self._processors = [ 

421 metadata_entry[MD_PROCESSOR] for metadata_entry in raw 

422 ] 

423 if num_sentinel_cols > 0: 

424 # add the number of sentinel columns since these are passed 

425 # to the tuplefilters before being used 

426 self._processors.extend([None] * num_sentinel_cols) 

427 

428 # this is used when using this ResultMetaData in a Core-only cache 

429 # retrieval context. it's initialized on first cache retrieval 

430 # when the _result_disable_adapt_to_context execution option 

431 # (which the ORM generally sets) is not set. 

432 self._keymap_by_result_column_idx = None 

433 

434 # for compiled SQL constructs, copy additional lookup keys into 

435 # the key lookup map, such as Column objects, labels, 

436 # column keys and other names 

437 if num_ctx_cols: 

438 # keymap by primary string... 

439 by_key: Dict[_KeyType, _CursorKeyMapRecType] = { 

440 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

441 for metadata_entry in raw 

442 } 

443 

444 if len(by_key) != num_ctx_cols: 

445 # if by-primary-string dictionary smaller than 

446 # number of columns, assume we have dupes; (this check 

447 # is also in place if string dictionary is bigger, as 

448 # can occur when '*' was used as one of the compiled columns, 

449 # which may or may not be suggestive of dupes), rewrite 

450 # dupe records with "None" for index which results in 

451 # ambiguous column exception when accessed. 

452 # 

453 # this is considered to be the less common case as it is not 

454 # common to have dupe column keys in a SELECT statement. 

455 # 

456 # new in 1.4: get the complete set of all possible keys, 

457 # strings, objects, whatever, that are dupes across two 

458 # different records, first. 

459 index_by_key: Dict[Any, Any] = {} 

460 dupes = set() 

461 for metadata_entry in raw: 

462 for key in (metadata_entry[MD_RENDERED_NAME],) + ( 

463 metadata_entry[MD_OBJECTS] or () 

464 ): 

465 idx = metadata_entry[MD_INDEX] 

466 # if this key has been associated with more than one 

467 # positional index, it's a dupe 

468 if index_by_key.setdefault(key, idx) != idx: 

469 dupes.add(key) 

470 

471 # then put everything we have into the keymap excluding only 

472 # those keys that are dupes. 

473 self._keymap = { 

474 obj_elem: metadata_entry 

475 for metadata_entry in raw 

476 if metadata_entry[MD_OBJECTS] 

477 for obj_elem in metadata_entry[MD_OBJECTS] 

478 if obj_elem not in dupes 

479 } 

480 

481 # then for the dupe keys, put the "ambiguous column" 

482 # record into by_key. 

483 by_key.update( 

484 { 

485 key: (None, -1, (), key, key, None, None) 

486 for key in dupes 

487 } 

488 ) 

489 

490 else: 

491 # no dupes - copy secondary elements from compiled 

492 # columns into self._keymap. this is the most common 

493 # codepath for Core / ORM statement executions before the 

494 # result metadata is cached 

495 self._keymap = { 

496 obj_elem: metadata_entry 

497 for metadata_entry in raw 

498 if metadata_entry[MD_OBJECTS] 

499 for obj_elem in metadata_entry[MD_OBJECTS] 

500 } 

501 # update keymap with primary string names taking 

502 # precedence 

503 self._keymap.update(by_key) 

504 else: 

505 # no compiled objects to map, just create keymap by primary string 

506 self._keymap = { 

507 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

508 for metadata_entry in raw 

509 } 

510 

511 # update keymap with "translated" names. 

512 # the "translated" name thing has a long history: 

513 # 1. originally, it was used to fix an issue in very old SQLite 

514 # versions prior to 3.10.0. This code is still there in the 

515 # sqlite dialect. 

516 # 2. Next, the pyhive third party dialect started using this hook 

517 # for some driver related issue on their end. 

518 # 3. Most recently, the "driver_column_names" execution option has 

519 # taken advantage of this hook to get raw DBAPI col names in the 

520 # result keys without disrupting the usual merge process. 

521 

522 if driver_column_names or ( 

523 not num_ctx_cols and context._translate_colname 

524 ): 

525 self._keymap.update( 

526 { 

527 metadata_entry[MD_UNTRANSLATED]: self._keymap[ 

528 metadata_entry[MD_LOOKUP_KEY] 

529 ] 

530 for metadata_entry in raw 

531 if metadata_entry[MD_UNTRANSLATED] 

532 } 

533 ) 

534 

535 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

536 

537 def _merge_cursor_description( 

538 self, 

539 context: DefaultExecutionContext, 

540 cursor_description: _DBAPICursorDescription, 

541 result_columns: Sequence[ResultColumnsEntry], 

542 num_ctx_cols: int, 

543 cols_are_ordered: bool, 

544 textual_ordered: bool, 

545 ad_hoc_textual: bool, 

546 loose_column_name_matching: bool, 

547 driver_column_names: bool, 

548 ) -> List[_CursorKeyMapRecType]: 

549 """Merge a cursor.description with compiled result column information. 

550 

551 There are at least four separate strategies used here, selected 

552 depending on the type of SQL construct used to start with. 

553 

554 The most common case is that of the compiled SQL expression construct, 

555 which generated the column names present in the raw SQL string and 

556 which has the identical number of columns as were reported by 

557 cursor.description. In this case, we assume a 1-1 positional mapping 

558 between the entries in cursor.description and the compiled object. 

559 This is also the most performant case as we disregard extracting / 

560 decoding the column names present in cursor.description since we 

561 already have the desired name we generated in the compiled SQL 

562 construct. 

563 

564 The next common case is that of the completely raw string SQL, 

565 such as passed to connection.execute(). In this case we have no 

566 compiled construct to work with, so we extract and decode the 

567 names from cursor.description and index those as the primary 

568 result row target keys. 

569 

570 The remaining fairly common case is that of the textual SQL 

571 that includes at least partial column information; this is when 

572 we use a :class:`_expression.TextualSelect` construct. 

573 This construct may have 

574 unordered or ordered column information. In the ordered case, we 

575 merge the cursor.description and the compiled construct's information 

576 positionally, and warn if there are additional description names 

577 present, however we still decode the names in cursor.description 

578 as we don't have a guarantee that the names in the columns match 

579 on these. In the unordered case, we match names in cursor.description 

580 to that of the compiled construct based on name matching. 

581 In both of these cases, the cursor.description names and the column 

582 expression objects and names are indexed as result row target keys. 

583 

584 The final case is much less common, where we have a compiled 

585 non-textual SQL expression construct, but the number of columns 

586 in cursor.description doesn't match what's in the compiled 

587 construct. We make the guess here that there might be textual 

588 column expressions in the compiled construct that themselves include 

589 a comma in them causing them to split. We do the same name-matching 

590 as with textual non-ordered columns. 

591 

592 The name-matched system of merging is the same as that used by 

593 SQLAlchemy for all cases up through the 0.9 series. Positional 

594 matching for compiled SQL expressions was introduced in 1.0 as a 

595 major performance feature, and positional matching for textual 

596 :class:`_expression.TextualSelect` objects in 1.1. 

597 As name matching is no longer 

598 a common case, it was acceptable to factor it into smaller generator- 

599 oriented methods that are easier to understand, but incur slightly 

600 more performance overhead. 

601 

602 """ 

603 

604 if ( 

605 num_ctx_cols 

606 and cols_are_ordered 

607 and not textual_ordered 

608 and num_ctx_cols == len(cursor_description) 

609 and not driver_column_names 

610 ): 

611 self._keys = [elem[0] for elem in result_columns] 

612 # pure positional 1-1 case; doesn't need to read 

613 # the names from cursor.description 

614 

615 # most common case for Core and ORM 

616 

617 # this metadata is safe to 

618 # cache because we are guaranteed 

619 # to have the columns in the same order for new executions 

620 self._safe_for_cache = True 

621 

622 return [ 

623 ( 

624 idx, 

625 idx, 

626 rmap_entry[RM_OBJECTS], 

627 rmap_entry[RM_NAME], 

628 rmap_entry[RM_RENDERED_NAME], 

629 context.get_result_processor( 

630 rmap_entry[RM_TYPE], 

631 rmap_entry[RM_RENDERED_NAME], 

632 cursor_description[idx][1], 

633 ), 

634 None, 

635 ) 

636 for idx, rmap_entry in enumerate(result_columns) 

637 ] 

638 else: 

639 # name-based or text-positional cases, where we need 

640 # to read cursor.description names 

641 

642 if textual_ordered or ( 

643 ad_hoc_textual and len(cursor_description) == num_ctx_cols 

644 ): 

645 self._safe_for_cache = not driver_column_names 

646 # textual positional case 

647 raw_iterator = self._merge_textual_cols_by_position( 

648 context, 

649 cursor_description, 

650 result_columns, 

651 driver_column_names, 

652 ) 

653 elif num_ctx_cols: 

654 # compiled SQL with a mismatch of description cols 

655 # vs. compiled cols, or textual w/ unordered columns 

656 # the order of columns can change if the query is 

657 # against a "select *", so not safe to cache 

658 self._safe_for_cache = False 

659 raw_iterator = self._merge_cols_by_name( 

660 context, 

661 cursor_description, 

662 result_columns, 

663 loose_column_name_matching, 

664 driver_column_names, 

665 ) 

666 else: 

667 # no compiled SQL, just a raw string, order of columns 

668 # can change for "select *" 

669 self._safe_for_cache = False 

670 raw_iterator = self._merge_cols_by_none( 

671 context, cursor_description, driver_column_names 

672 ) 

673 

674 return [ 

675 ( 

676 idx, 

677 ridx, 

678 obj, 

679 cursor_colname, 

680 cursor_colname, 

681 context.get_result_processor( 

682 mapped_type, cursor_colname, coltype 

683 ), 

684 untranslated, 

685 ) # type: ignore[misc] 

686 for ( 

687 idx, 

688 ridx, 

689 cursor_colname, 

690 mapped_type, 

691 coltype, 

692 obj, 

693 untranslated, 

694 ) in raw_iterator 

695 ] 

696 

697 def _colnames_from_description( 

698 self, 

699 context: DefaultExecutionContext, 

700 cursor_description: _DBAPICursorDescription, 

701 driver_column_names: bool, 

702 ) -> Iterator[Tuple[int, str, str, Optional[str], DBAPIType]]: 

703 """Extract column names and data types from a cursor.description. 

704 

705 Applies unicode decoding, column translation, "normalization", 

706 and case sensitivity rules to the names based on the dialect. 

707 

708 """ 

709 dialect = context.dialect 

710 translate_colname = context._translate_colname 

711 normalize_name = ( 

712 dialect.normalize_name if dialect.requires_name_normalize else None 

713 ) 

714 

715 untranslated = None 

716 

717 for idx, rec in enumerate(cursor_description): 

718 colname = unnormalized = rec[0] 

719 coltype = rec[1] 

720 

721 if translate_colname: 

722 # a None here for "untranslated" means "the dialect did not 

723 # change the column name and the untranslated case can be 

724 # ignored". otherwise "untranslated" is expected to be the 

725 # original, unchanged colname (e.g. is == to "unnormalized") 

726 colname, untranslated = translate_colname(colname) 

727 

728 assert untranslated is None or untranslated == unnormalized 

729 

730 if normalize_name: 

731 colname = normalize_name(colname) 

732 

733 if driver_column_names: 

734 yield idx, colname, unnormalized, unnormalized, coltype 

735 

736 else: 

737 yield idx, colname, unnormalized, untranslated, coltype 

738 

739 def _merge_textual_cols_by_position( 

740 self, 

741 context: DefaultExecutionContext, 

742 cursor_description: _DBAPICursorDescription, 

743 result_columns: Sequence[ResultColumnsEntry], 

744 driver_column_names: bool, 

745 ) -> Iterator[_MergeColTuple]: 

746 num_ctx_cols = len(result_columns) 

747 

748 if num_ctx_cols > len(cursor_description): 

749 util.warn( 

750 "Number of columns in textual SQL (%d) is " 

751 "smaller than number of columns requested (%d)" 

752 % (num_ctx_cols, len(cursor_description)) 

753 ) 

754 seen = set() 

755 

756 self._keys = [] 

757 

758 uses_denormalize = context.dialect.requires_name_normalize 

759 for ( 

760 idx, 

761 colname, 

762 unnormalized, 

763 untranslated, 

764 coltype, 

765 ) in self._colnames_from_description( 

766 context, cursor_description, driver_column_names 

767 ): 

768 if idx < num_ctx_cols: 

769 ctx_rec = result_columns[idx] 

770 obj = ctx_rec[RM_OBJECTS] 

771 ridx = idx 

772 mapped_type = ctx_rec[RM_TYPE] 

773 if obj[0] in seen: 

774 raise exc.InvalidRequestError( 

775 "Duplicate column expression requested " 

776 "in textual SQL: %r" % obj[0] 

777 ) 

778 seen.add(obj[0]) 

779 

780 # special check for all uppercase unnormalized name; 

781 # use the unnormalized name as the key. 

782 # see #10788 

783 # if these names don't match, then we still honor the 

784 # cursor.description name as the key and not what the 

785 # Column has, see 

786 # test_resultset.py::PositionalTextTest::test_via_column 

787 if ( 

788 uses_denormalize 

789 and unnormalized == ctx_rec[RM_RENDERED_NAME] 

790 ): 

791 result_name = unnormalized 

792 else: 

793 result_name = colname 

794 else: 

795 mapped_type = sqltypes.NULLTYPE 

796 obj = None 

797 ridx = None 

798 

799 result_name = colname 

800 

801 if driver_column_names: 

802 assert untranslated is not None 

803 self._keys.append(untranslated) 

804 else: 

805 self._keys.append(result_name) 

806 

807 yield ( 

808 idx, 

809 ridx, 

810 result_name, 

811 mapped_type, 

812 coltype, 

813 obj, 

814 untranslated, 

815 ) 

816 

817 def _merge_cols_by_name( 

818 self, 

819 context: DefaultExecutionContext, 

820 cursor_description: _DBAPICursorDescription, 

821 result_columns: Sequence[ResultColumnsEntry], 

822 loose_column_name_matching: bool, 

823 driver_column_names: bool, 

824 ) -> Iterator[_MergeColTuple]: 

825 match_map = self._create_description_match_map( 

826 result_columns, loose_column_name_matching 

827 ) 

828 mapped_type: TypeEngine[Any] 

829 

830 self._keys = [] 

831 

832 for ( 

833 idx, 

834 colname, 

835 unnormalized, 

836 untranslated, 

837 coltype, 

838 ) in self._colnames_from_description( 

839 context, cursor_description, driver_column_names 

840 ): 

841 try: 

842 ctx_rec = match_map[colname] 

843 except KeyError: 

844 mapped_type = sqltypes.NULLTYPE 

845 obj = None 

846 result_columns_idx = None 

847 else: 

848 obj = ctx_rec[1] 

849 mapped_type = ctx_rec[2] 

850 result_columns_idx = ctx_rec[3] 

851 

852 if driver_column_names: 

853 assert untranslated is not None 

854 self._keys.append(untranslated) 

855 else: 

856 self._keys.append(colname) 

857 yield ( 

858 idx, 

859 result_columns_idx, 

860 colname, 

861 mapped_type, 

862 coltype, 

863 obj, 

864 untranslated, 

865 ) 

866 

867 @classmethod 

868 def _create_description_match_map( 

869 cls, 

870 result_columns: Sequence[ResultColumnsEntry], 

871 loose_column_name_matching: bool = False, 

872 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]: 

873 """when matching cursor.description to a set of names that are present 

874 in a Compiled object, as is the case with TextualSelect, get all the 

875 names we expect might match those in cursor.description. 

876 """ 

877 

878 d: Dict[ 

879 Union[str, object], 

880 Tuple[str, TupleAny, TypeEngine[Any], int], 

881 ] = {} 

882 for ridx, elem in enumerate(result_columns): 

883 key = elem[RM_RENDERED_NAME] 

884 

885 if key in d: 

886 # conflicting keyname - just add the column-linked objects 

887 # to the existing record. if there is a duplicate column 

888 # name in the cursor description, this will allow all of those 

889 # objects to raise an ambiguous column error 

890 e_name, e_obj, e_type, e_ridx = d[key] 

891 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx 

892 else: 

893 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx) 

894 

895 if loose_column_name_matching: 

896 # when using a textual statement with an unordered set 

897 # of columns that line up, we are expecting the user 

898 # to be using label names in the SQL that match to the column 

899 # expressions. Enable more liberal matching for this case; 

900 # duplicate keys that are ambiguous will be fixed later. 

901 for r_key in elem[RM_OBJECTS]: 

902 d.setdefault( 

903 r_key, 

904 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx), 

905 ) 

906 return d 

907 

908 def _merge_cols_by_none( 

909 self, 

910 context: DefaultExecutionContext, 

911 cursor_description: _DBAPICursorDescription, 

912 driver_column_names: bool, 

913 ) -> Iterator[_MergeColTuple]: 

914 self._keys = [] 

915 

916 for ( 

917 idx, 

918 colname, 

919 unnormalized, 

920 untranslated, 

921 coltype, 

922 ) in self._colnames_from_description( 

923 context, cursor_description, driver_column_names 

924 ): 

925 

926 if driver_column_names: 

927 assert untranslated is not None 

928 self._keys.append(untranslated) 

929 else: 

930 self._keys.append(colname) 

931 

932 yield ( 

933 idx, 

934 None, 

935 colname, 

936 sqltypes.NULLTYPE, 

937 coltype, 

938 None, 

939 untranslated, 

940 ) 

941 

942 if not TYPE_CHECKING: 

943 

944 def _key_fallback( 

945 self, key: Any, err: Optional[Exception], raiseerr: bool = True 

946 ) -> Optional[NoReturn]: 

947 if raiseerr: 

948 if self._unpickled and isinstance(key, elements.ColumnElement): 

949 raise exc.NoSuchColumnError( 

950 "Row was unpickled; lookup by ColumnElement " 

951 "is unsupported" 

952 ) from err 

953 else: 

954 raise exc.NoSuchColumnError( 

955 "Could not locate column in row for column '%s'" 

956 % util.string_or_unprintable(key) 

957 ) from err 

958 else: 

959 return None 

960 

961 def _raise_for_ambiguous_column_name( 

962 self, rec: _KeyMapRecType 

963 ) -> NoReturn: 

964 raise exc.InvalidRequestError( 

965 "Ambiguous column name '%s' in " 

966 "result set column descriptions" % rec[MD_LOOKUP_KEY] 

967 ) 

968 

969 def _index_for_key( 

970 self, key: _KeyIndexType, raiseerr: bool = True 

971 ) -> Optional[int]: 

972 # TODO: can consider pre-loading ints and negative ints 

973 # into _keymap - also no coverage here 

974 if isinstance(key, int): 

975 key = self._keys[key] 

976 

977 try: 

978 rec = self._keymap[key] 

979 except KeyError as ke: 

980 x = self._key_fallback(key, ke, raiseerr) 

981 assert x is None 

982 return None 

983 

984 index = rec[0] 

985 

986 if index is None: 

987 self._raise_for_ambiguous_column_name(rec) 

988 return index 

989 

990 def _indexes_for_keys( 

991 self, keys: Sequence[_KeyIndexType] 

992 ) -> Sequence[int]: 

993 try: 

994 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501 

995 except KeyError as ke: 

996 # ensure it raises 

997 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

998 

999 def _metadata_for_keys( 

1000 self, keys: Sequence[_KeyIndexType] 

1001 ) -> Iterator[_NonAmbigCursorKeyMapRecType]: 

1002 for key in keys: 

1003 if isinstance(key, int): 

1004 key = self._keys[key] 

1005 

1006 try: 

1007 rec = self._keymap[key] 

1008 except KeyError as ke: 

1009 # ensure it raises 

1010 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

1011 

1012 index = rec[MD_INDEX] 

1013 

1014 if index is None: 

1015 self._raise_for_ambiguous_column_name(rec) 

1016 

1017 yield cast(_NonAmbigCursorKeyMapRecType, rec) 

1018 

1019 def __getstate__(self) -> Dict[str, Any]: 

1020 # TODO: consider serializing this as SimpleResultMetaData 

1021 return { 

1022 "_keymap": { 

1023 key: ( 

1024 rec[MD_INDEX], 

1025 rec[MD_RESULT_MAP_INDEX], 

1026 [], 

1027 key, 

1028 rec[MD_RENDERED_NAME], 

1029 None, 

1030 None, 

1031 ) 

1032 for key, rec in self._keymap.items() 

1033 if isinstance(key, (str, int)) 

1034 }, 

1035 "_keys": self._keys, 

1036 "_translated_indexes": self._translated_indexes, 

1037 } 

1038 

1039 def __setstate__(self, state: Dict[str, Any]) -> None: 

1040 self._processors = [None for _ in range(len(state["_keys"]))] 

1041 self._keymap = state["_keymap"] 

1042 self._keymap_by_result_column_idx = None 

1043 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

1044 self._keys = state["_keys"] 

1045 self._unpickled = True 

1046 if state["_translated_indexes"]: 

1047 translated_indexes: List[Any] 

1048 self._translated_indexes = translated_indexes = state[ 

1049 "_translated_indexes" 

1050 ] 

1051 self._tuplefilter = tuplegetter(*translated_indexes) 

1052 else: 

1053 self._translated_indexes = self._tuplefilter = None 

1054 

1055 

1056class ResultFetchStrategy: 

1057 """Define a fetching strategy for a result object. 

1058 

1059 

1060 .. versionadded:: 1.4 

1061 

1062 """ 

1063 

1064 __slots__ = () 

1065 

1066 alternate_cursor_description: Optional[_DBAPICursorDescription] = None 

1067 

1068 def soft_close( 

1069 self, 

1070 result: CursorResult[Unpack[TupleAny]], 

1071 dbapi_cursor: Optional[DBAPICursor], 

1072 ) -> None: 

1073 raise NotImplementedError() 

1074 

1075 def hard_close( 

1076 self, 

1077 result: CursorResult[Unpack[TupleAny]], 

1078 dbapi_cursor: Optional[DBAPICursor], 

1079 ) -> None: 

1080 raise NotImplementedError() 

1081 

1082 def yield_per( 

1083 self, 

1084 result: CursorResult[Unpack[TupleAny]], 

1085 dbapi_cursor: DBAPICursor, 

1086 num: int, 

1087 ) -> None: 

1088 return 

1089 

1090 def fetchone( 

1091 self, 

1092 result: CursorResult[Unpack[TupleAny]], 

1093 dbapi_cursor: DBAPICursor, 

1094 hard_close: bool = False, 

1095 ) -> Any: 

1096 raise NotImplementedError() 

1097 

1098 def fetchmany( 

1099 self, 

1100 result: CursorResult[Unpack[TupleAny]], 

1101 dbapi_cursor: DBAPICursor, 

1102 size: Optional[int] = None, 

1103 ) -> Any: 

1104 raise NotImplementedError() 

1105 

1106 def fetchall( 

1107 self, 

1108 result: CursorResult[Unpack[TupleAny]], 

1109 dbapi_cursor: DBAPICursor, 

1110 ) -> Any: 

1111 raise NotImplementedError() 

1112 

1113 def handle_exception( 

1114 self, 

1115 result: CursorResult[Unpack[TupleAny]], 

1116 dbapi_cursor: Optional[DBAPICursor], 

1117 err: BaseException, 

1118 ) -> NoReturn: 

1119 raise err 

1120 

1121 

1122class NoCursorFetchStrategy(ResultFetchStrategy): 

1123 """Cursor strategy for a result that has no open cursor. 

1124 

1125 There are two varieties of this strategy, one for DQL and one for 

1126 DML (and also DDL), each of which represent a result that had a cursor 

1127 but no longer has one. 

1128 

1129 """ 

1130 

1131 __slots__ = () 

1132 

1133 def soft_close( 

1134 self, 

1135 result: CursorResult[Unpack[TupleAny]], 

1136 dbapi_cursor: Optional[DBAPICursor], 

1137 ) -> None: 

1138 pass 

1139 

1140 def hard_close( 

1141 self, 

1142 result: CursorResult[Unpack[TupleAny]], 

1143 dbapi_cursor: Optional[DBAPICursor], 

1144 ) -> None: 

1145 pass 

1146 

1147 def fetchone( 

1148 self, 

1149 result: CursorResult[Unpack[TupleAny]], 

1150 dbapi_cursor: DBAPICursor, 

1151 hard_close: bool = False, 

1152 ) -> Any: 

1153 return self._non_result(result, None) 

1154 

1155 def fetchmany( 

1156 self, 

1157 result: CursorResult[Unpack[TupleAny]], 

1158 dbapi_cursor: DBAPICursor, 

1159 size: Optional[int] = None, 

1160 ) -> Any: 

1161 return self._non_result(result, []) 

1162 

1163 def fetchall( 

1164 self, result: CursorResult[Unpack[TupleAny]], dbapi_cursor: DBAPICursor 

1165 ) -> Any: 

1166 return self._non_result(result, []) 

1167 

1168 def _non_result( 

1169 self, 

1170 result: CursorResult[Unpack[TupleAny]], 

1171 default: Any, 

1172 err: Optional[BaseException] = None, 

1173 ) -> Any: 

1174 raise NotImplementedError() 

1175 

1176 

1177class NoCursorDQLFetchStrategy(NoCursorFetchStrategy): 

1178 """Cursor strategy for a DQL result that has no open cursor. 

1179 

1180 This is a result set that can return rows, i.e. for a SELECT, or for an 

1181 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state 

1182 where the cursor is closed and no rows remain available. The owning result 

1183 object may or may not be "hard closed", which determines if the fetch 

1184 methods send empty results or raise for closed result. 

1185 

1186 """ 

1187 

1188 __slots__ = () 

1189 

1190 def _non_result( 

1191 self, 

1192 result: CursorResult[Unpack[TupleAny]], 

1193 default: Any, 

1194 err: Optional[BaseException] = None, 

1195 ) -> Any: 

1196 if result.closed: 

1197 raise exc.ResourceClosedError( 

1198 "This result object is closed." 

1199 ) from err 

1200 else: 

1201 return default 

1202 

1203 

1204_NO_CURSOR_DQL = NoCursorDQLFetchStrategy() 

1205 

1206 

1207class NoCursorDMLFetchStrategy(NoCursorFetchStrategy): 

1208 """Cursor strategy for a DML result that has no open cursor. 

1209 

1210 This is a result set that does not return rows, i.e. for an INSERT, 

1211 UPDATE, DELETE that does not include RETURNING. 

1212 

1213 """ 

1214 

1215 __slots__ = () 

1216 

1217 def _non_result( 

1218 self, 

1219 result: CursorResult[Unpack[TupleAny]], 

1220 default: Any, 

1221 err: Optional[BaseException] = None, 

1222 ) -> Any: 

1223 # we only expect to have a _NoResultMetaData() here right now. 

1224 assert not result._metadata.returns_rows 

1225 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr] 

1226 

1227 

1228_NO_CURSOR_DML = NoCursorDMLFetchStrategy() 

1229 

1230 

1231class CursorFetchStrategy(ResultFetchStrategy): 

1232 """Call fetch methods from a DBAPI cursor. 

1233 

1234 Alternate versions of this class may instead buffer the rows from 

1235 cursors or not use cursors at all. 

1236 

1237 """ 

1238 

1239 __slots__ = () 

1240 

1241 def soft_close( 

1242 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1243 ) -> None: 

1244 result.cursor_strategy = _NO_CURSOR_DQL 

1245 

1246 def hard_close( 

1247 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1248 ) -> None: 

1249 result.cursor_strategy = _NO_CURSOR_DQL 

1250 

1251 def handle_exception( 

1252 self, 

1253 result: CursorResult[Any], 

1254 dbapi_cursor: Optional[DBAPICursor], 

1255 err: BaseException, 

1256 ) -> NoReturn: 

1257 result.connection._handle_dbapi_exception( 

1258 err, None, None, dbapi_cursor, result.context 

1259 ) 

1260 

1261 def yield_per( 

1262 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1263 ) -> None: 

1264 result.cursor_strategy = BufferedRowCursorFetchStrategy( 

1265 dbapi_cursor, 

1266 {"max_row_buffer": num}, 

1267 initial_buffer=collections.deque(), 

1268 growth_factor=0, 

1269 ) 

1270 

1271 def fetchone( 

1272 self, 

1273 result: CursorResult[Any], 

1274 dbapi_cursor: DBAPICursor, 

1275 hard_close: bool = False, 

1276 ) -> Any: 

1277 try: 

1278 row = dbapi_cursor.fetchone() 

1279 if row is None: 

1280 result._soft_close(hard=hard_close) 

1281 return row 

1282 except BaseException as e: 

1283 self.handle_exception(result, dbapi_cursor, e) 

1284 

1285 def fetchmany( 

1286 self, 

1287 result: CursorResult[Any], 

1288 dbapi_cursor: DBAPICursor, 

1289 size: Optional[int] = None, 

1290 ) -> Any: 

1291 try: 

1292 if size is None: 

1293 l = dbapi_cursor.fetchmany() 

1294 else: 

1295 l = dbapi_cursor.fetchmany(size) 

1296 

1297 if not l: 

1298 result._soft_close() 

1299 return l 

1300 except BaseException as e: 

1301 self.handle_exception(result, dbapi_cursor, e) 

1302 

1303 def fetchall( 

1304 self, 

1305 result: CursorResult[Any], 

1306 dbapi_cursor: DBAPICursor, 

1307 ) -> Any: 

1308 try: 

1309 rows = dbapi_cursor.fetchall() 

1310 result._soft_close() 

1311 return rows 

1312 except BaseException as e: 

1313 self.handle_exception(result, dbapi_cursor, e) 

1314 

1315 

1316_DEFAULT_FETCH = CursorFetchStrategy() 

1317 

1318 

1319class BufferedRowCursorFetchStrategy(CursorFetchStrategy): 

1320 """A cursor fetch strategy with row buffering behavior. 

1321 

1322 This strategy buffers the contents of a selection of rows 

1323 before ``fetchone()`` is called. This is to allow the results of 

1324 ``cursor.description`` to be available immediately, when 

1325 interfacing with a DB-API that requires rows to be consumed before 

1326 this information is available (currently psycopg2, when used with 

1327 server-side cursors). 

1328 

1329 The pre-fetching behavior fetches only one row initially, and then 

1330 grows its buffer size by a fixed amount with each successive need 

1331 for additional rows up the ``max_row_buffer`` size, which defaults 

1332 to 1000:: 

1333 

1334 with psycopg2_engine.connect() as conn: 

1335 

1336 result = conn.execution_options( 

1337 stream_results=True, max_row_buffer=50 

1338 ).execute(text("select * from table")) 

1339 

1340 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. 

1341 

1342 .. seealso:: 

1343 

1344 :ref:`psycopg2_execution_options` 

1345 """ 

1346 

1347 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor") 

1348 

1349 def __init__( 

1350 self, 

1351 dbapi_cursor: DBAPICursor, 

1352 execution_options: CoreExecuteOptionsParameter, 

1353 growth_factor: int = 5, 

1354 initial_buffer: Optional[Deque[Any]] = None, 

1355 ) -> None: 

1356 self._max_row_buffer = execution_options.get("max_row_buffer", 1000) 

1357 

1358 if initial_buffer is not None: 

1359 self._rowbuffer = initial_buffer 

1360 else: 

1361 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) 

1362 self._growth_factor = growth_factor 

1363 

1364 if growth_factor: 

1365 self._bufsize = min(self._max_row_buffer, self._growth_factor) 

1366 else: 

1367 self._bufsize = self._max_row_buffer 

1368 

1369 @classmethod 

1370 def create( 

1371 cls, result: CursorResult[Any] 

1372 ) -> BufferedRowCursorFetchStrategy: 

1373 return BufferedRowCursorFetchStrategy( 

1374 result.cursor, 

1375 result.context.execution_options, 

1376 ) 

1377 

1378 def _buffer_rows( 

1379 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1380 ) -> None: 

1381 """this is currently used only by fetchone().""" 

1382 

1383 size = self._bufsize 

1384 try: 

1385 if size < 1: 

1386 new_rows = dbapi_cursor.fetchall() 

1387 else: 

1388 new_rows = dbapi_cursor.fetchmany(size) 

1389 except BaseException as e: 

1390 self.handle_exception(result, dbapi_cursor, e) 

1391 

1392 if not new_rows: 

1393 return 

1394 self._rowbuffer = collections.deque(new_rows) 

1395 if self._growth_factor and size < self._max_row_buffer: 

1396 self._bufsize = min( 

1397 self._max_row_buffer, size * self._growth_factor 

1398 ) 

1399 

1400 def yield_per( 

1401 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1402 ) -> None: 

1403 self._growth_factor = 0 

1404 self._max_row_buffer = self._bufsize = num 

1405 

1406 def soft_close( 

1407 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1408 ) -> None: 

1409 self._rowbuffer.clear() 

1410 super().soft_close(result, dbapi_cursor) 

1411 

1412 def hard_close( 

1413 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1414 ) -> None: 

1415 self._rowbuffer.clear() 

1416 super().hard_close(result, dbapi_cursor) 

1417 

1418 def fetchone( 

1419 self, 

1420 result: CursorResult[Any], 

1421 dbapi_cursor: DBAPICursor, 

1422 hard_close: bool = False, 

1423 ) -> Any: 

1424 if not self._rowbuffer: 

1425 self._buffer_rows(result, dbapi_cursor) 

1426 if not self._rowbuffer: 

1427 try: 

1428 result._soft_close(hard=hard_close) 

1429 except BaseException as e: 

1430 self.handle_exception(result, dbapi_cursor, e) 

1431 return None 

1432 return self._rowbuffer.popleft() 

1433 

1434 def fetchmany( 

1435 self, 

1436 result: CursorResult[Any], 

1437 dbapi_cursor: DBAPICursor, 

1438 size: Optional[int] = None, 

1439 ) -> Any: 

1440 if size is None: 

1441 return self.fetchall(result, dbapi_cursor) 

1442 

1443 rb = self._rowbuffer 

1444 lb = len(rb) 

1445 close = False 

1446 if size > lb: 

1447 try: 

1448 new = dbapi_cursor.fetchmany(size - lb) 

1449 except BaseException as e: 

1450 self.handle_exception(result, dbapi_cursor, e) 

1451 else: 

1452 if not new: 

1453 # defer closing since it may clear the row buffer 

1454 close = True 

1455 else: 

1456 rb.extend(new) 

1457 

1458 res = [rb.popleft() for _ in range(min(size, len(rb)))] 

1459 if close: 

1460 result._soft_close() 

1461 return res 

1462 

1463 def fetchall( 

1464 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1465 ) -> Any: 

1466 try: 

1467 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) 

1468 self._rowbuffer.clear() 

1469 result._soft_close() 

1470 return ret 

1471 except BaseException as e: 

1472 self.handle_exception(result, dbapi_cursor, e) 

1473 

1474 

1475class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): 

1476 """A cursor strategy that buffers rows fully upon creation. 

1477 

1478 Used for operations where a result is to be delivered 

1479 after the database conversation can not be continued, 

1480 such as MSSQL INSERT...OUTPUT after an autocommit. 

1481 

1482 """ 

1483 

1484 __slots__ = ("_rowbuffer", "alternate_cursor_description") 

1485 

1486 def __init__( 

1487 self, 

1488 dbapi_cursor: Optional[DBAPICursor], 

1489 alternate_description: Optional[_DBAPICursorDescription] = None, 

1490 initial_buffer: Optional[Iterable[Any]] = None, 

1491 ): 

1492 self.alternate_cursor_description = alternate_description 

1493 if initial_buffer is not None: 

1494 self._rowbuffer = collections.deque(initial_buffer) 

1495 else: 

1496 assert dbapi_cursor is not None 

1497 self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) 

1498 

1499 def yield_per( 

1500 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1501 ) -> Any: 

1502 pass 

1503 

1504 def soft_close( 

1505 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1506 ) -> None: 

1507 self._rowbuffer.clear() 

1508 super().soft_close(result, dbapi_cursor) 

1509 

1510 def hard_close( 

1511 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1512 ) -> None: 

1513 self._rowbuffer.clear() 

1514 super().hard_close(result, dbapi_cursor) 

1515 

1516 def fetchone( 

1517 self, 

1518 result: CursorResult[Any], 

1519 dbapi_cursor: DBAPICursor, 

1520 hard_close: bool = False, 

1521 ) -> Any: 

1522 if self._rowbuffer: 

1523 return self._rowbuffer.popleft() 

1524 else: 

1525 result._soft_close(hard=hard_close) 

1526 return None 

1527 

1528 def fetchmany( 

1529 self, 

1530 result: CursorResult[Any], 

1531 dbapi_cursor: DBAPICursor, 

1532 size: Optional[int] = None, 

1533 ) -> Any: 

1534 if size is None: 

1535 return self.fetchall(result, dbapi_cursor) 

1536 

1537 rb = self._rowbuffer 

1538 rows = [rb.popleft() for _ in range(min(size, len(rb)))] 

1539 if not rows: 

1540 result._soft_close() 

1541 return rows 

1542 

1543 def fetchall( 

1544 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1545 ) -> Any: 

1546 ret = self._rowbuffer 

1547 self._rowbuffer = collections.deque() 

1548 result._soft_close() 

1549 return ret 

1550 

1551 

1552class _NoResultMetaData(ResultMetaData): 

1553 __slots__ = () 

1554 

1555 returns_rows = False 

1556 

1557 def _we_dont_return_rows( 

1558 self, err: Optional[BaseException] = None 

1559 ) -> NoReturn: 

1560 raise exc.ResourceClosedError( 

1561 "This result object does not return rows. " 

1562 "It has been closed automatically." 

1563 ) from err 

1564 

1565 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn: 

1566 self._we_dont_return_rows() 

1567 

1568 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1569 self._we_dont_return_rows() 

1570 

1571 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1572 self._we_dont_return_rows() 

1573 

1574 @property 

1575 def _keymap(self) -> NoReturn: # type: ignore[override] 

1576 self._we_dont_return_rows() 

1577 

1578 @property 

1579 def _key_to_index(self) -> NoReturn: # type: ignore[override] 

1580 self._we_dont_return_rows() 

1581 

1582 @property 

1583 def _processors(self) -> NoReturn: # type: ignore[override] 

1584 self._we_dont_return_rows() 

1585 

1586 @property 

1587 def keys(self) -> NoReturn: 

1588 self._we_dont_return_rows() 

1589 

1590 

1591_NO_RESULT_METADATA = _NoResultMetaData() 

1592 

1593 

1594def null_dml_result() -> IteratorResult[Any]: 

1595 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([])) 

1596 it._soft_close() 

1597 return it 

1598 

1599 

1600class CursorResult(Result[Unpack[_Ts]]): 

1601 """A Result that is representing state from a DBAPI cursor. 

1602 

1603 .. versionchanged:: 1.4 The :class:`.CursorResult`` 

1604 class replaces the previous :class:`.ResultProxy` interface. 

1605 This classes are based on the :class:`.Result` calling API 

1606 which provides an updated usage model and calling facade for 

1607 SQLAlchemy Core and SQLAlchemy ORM. 

1608 

1609 Returns database rows via the :class:`.Row` class, which provides 

1610 additional API features and behaviors on top of the raw data returned by 

1611 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` 

1612 method, other kinds of objects may also be returned. 

1613 

1614 .. seealso:: 

1615 

1616 :ref:`tutorial_selecting_data` - introductory material for accessing 

1617 :class:`_engine.CursorResult` and :class:`.Row` objects. 

1618 

1619 """ 

1620 

1621 __slots__ = ( 

1622 "context", 

1623 "dialect", 

1624 "cursor", 

1625 "cursor_strategy", 

1626 "_echo", 

1627 "connection", 

1628 ) 

1629 

1630 _metadata: Union[CursorResultMetaData, _NoResultMetaData] 

1631 _no_result_metadata = _NO_RESULT_METADATA 

1632 _soft_closed: bool = False 

1633 closed: bool = False 

1634 _is_cursor = True 

1635 

1636 context: DefaultExecutionContext 

1637 dialect: Dialect 

1638 cursor_strategy: ResultFetchStrategy 

1639 connection: Connection 

1640 

1641 def __init__( 

1642 self, 

1643 context: DefaultExecutionContext, 

1644 cursor_strategy: ResultFetchStrategy, 

1645 cursor_description: Optional[_DBAPICursorDescription], 

1646 ): 

1647 self.context = context 

1648 self.dialect = context.dialect 

1649 self.cursor = context.cursor 

1650 self.cursor_strategy = cursor_strategy 

1651 self.connection = context.root_connection 

1652 self._echo = echo = ( 

1653 self.connection._echo and context.engine._should_log_debug() 

1654 ) 

1655 

1656 if cursor_description is not None: 

1657 self._init_metadata(context, cursor_description) 

1658 

1659 if echo: 

1660 log = self.context.connection._log_debug 

1661 

1662 def _log_row(row: Any) -> Any: 

1663 log("Row %r", sql_util._repr_row(row)) 

1664 return row 

1665 

1666 self._row_logging_fn = _log_row 

1667 

1668 # call Result._row_getter to set up the row factory 

1669 self._row_getter 

1670 

1671 else: 

1672 assert context._num_sentinel_cols == 0 

1673 self._metadata = self._no_result_metadata 

1674 

1675 def _init_metadata( 

1676 self, 

1677 context: DefaultExecutionContext, 

1678 cursor_description: _DBAPICursorDescription, 

1679 ) -> CursorResultMetaData: 

1680 driver_column_names = context.execution_options.get( 

1681 "driver_column_names", False 

1682 ) 

1683 if context.compiled: 

1684 compiled = context.compiled 

1685 

1686 metadata: CursorResultMetaData 

1687 

1688 if driver_column_names: 

1689 # TODO: test this case 

1690 metadata = CursorResultMetaData( 

1691 self, 

1692 cursor_description, 

1693 driver_column_names=True, 

1694 num_sentinel_cols=context._num_sentinel_cols, 

1695 ) 

1696 assert not metadata._safe_for_cache 

1697 elif compiled._cached_metadata: 

1698 metadata = compiled._cached_metadata 

1699 else: 

1700 metadata = CursorResultMetaData( 

1701 self, 

1702 cursor_description, 

1703 # the number of sentinel columns is stored on the context 

1704 # but it's a characteristic of the compiled object 

1705 # so it's ok to apply it to a cacheable metadata. 

1706 num_sentinel_cols=context._num_sentinel_cols, 

1707 ) 

1708 if metadata._safe_for_cache: 

1709 compiled._cached_metadata = metadata 

1710 

1711 # result rewrite/ adapt step. this is to suit the case 

1712 # when we are invoked against a cached Compiled object, we want 

1713 # to rewrite the ResultMetaData to reflect the Column objects 

1714 # that are in our current SQL statement object, not the one 

1715 # that is associated with the cached Compiled object. 

1716 # the Compiled object may also tell us to not 

1717 # actually do this step; this is to support the ORM where 

1718 # it is to produce a new Result object in any case, and will 

1719 # be using the cached Column objects against this database result 

1720 # so we don't want to rewrite them. 

1721 # 

1722 # Basically this step suits the use case where the end user 

1723 # is using Core SQL expressions and is accessing columns in the 

1724 # result row using row._mapping[table.c.column]. 

1725 if ( 

1726 not context.execution_options.get( 

1727 "_result_disable_adapt_to_context", False 

1728 ) 

1729 and compiled._result_columns 

1730 and context.cache_hit is context.dialect.CACHE_HIT 

1731 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501 

1732 ): 

1733 metadata = metadata._adapt_to_context(context) 

1734 

1735 self._metadata = metadata 

1736 

1737 else: 

1738 self._metadata = metadata = CursorResultMetaData( 

1739 self, 

1740 cursor_description, 

1741 driver_column_names=driver_column_names, 

1742 ) 

1743 if self._echo: 

1744 context.connection._log_debug( 

1745 "Col %r", tuple(x[0] for x in cursor_description) 

1746 ) 

1747 return metadata 

1748 

1749 def _soft_close(self, hard: bool = False) -> None: 

1750 """Soft close this :class:`_engine.CursorResult`. 

1751 

1752 This releases all DBAPI cursor resources, but leaves the 

1753 CursorResult "open" from a semantic perspective, meaning the 

1754 fetchXXX() methods will continue to return empty results. 

1755 

1756 This method is called automatically when: 

1757 

1758 * all result rows are exhausted using the fetchXXX() methods. 

1759 * cursor.description is None. 

1760 

1761 This method is **not public**, but is documented in order to clarify 

1762 the "autoclose" process used. 

1763 

1764 .. seealso:: 

1765 

1766 :meth:`_engine.CursorResult.close` 

1767 

1768 

1769 """ 

1770 

1771 if (not hard and self._soft_closed) or (hard and self.closed): 

1772 return 

1773 

1774 if hard: 

1775 self.closed = True 

1776 self.cursor_strategy.hard_close(self, self.cursor) 

1777 else: 

1778 self.cursor_strategy.soft_close(self, self.cursor) 

1779 

1780 if not self._soft_closed: 

1781 cursor = self.cursor 

1782 self.cursor = None # type: ignore 

1783 self.connection._safe_close_cursor(cursor) 

1784 self._soft_closed = True 

1785 

1786 @property 

1787 def inserted_primary_key_rows(self) -> List[Optional[Any]]: 

1788 """Return the value of 

1789 :attr:`_engine.CursorResult.inserted_primary_key` 

1790 as a row contained within a list; some dialects may support a 

1791 multiple row form as well. 

1792 

1793 .. note:: As indicated below, in current SQLAlchemy versions this 

1794 accessor is only useful beyond what's already supplied by 

1795 :attr:`_engine.CursorResult.inserted_primary_key` when using the 

1796 :ref:`postgresql_psycopg2` dialect. Future versions hope to 

1797 generalize this feature to more dialects. 

1798 

1799 This accessor is added to support dialects that offer the feature 

1800 that is currently implemented by the :ref:`psycopg2_executemany_mode` 

1801 feature, currently **only the psycopg2 dialect**, which provides 

1802 for many rows to be INSERTed at once while still retaining the 

1803 behavior of being able to return server-generated primary key values. 

1804 

1805 * **When using the psycopg2 dialect, or other dialects that may support 

1806 "fast executemany" style inserts in upcoming releases** : When 

1807 invoking an INSERT statement while passing a list of rows as the 

1808 second argument to :meth:`_engine.Connection.execute`, this accessor 

1809 will then provide a list of rows, where each row contains the primary 

1810 key value for each row that was INSERTed. 

1811 

1812 * **When using all other dialects / backends that don't yet support 

1813 this feature**: This accessor is only useful for **single row INSERT 

1814 statements**, and returns the same information as that of the 

1815 :attr:`_engine.CursorResult.inserted_primary_key` within a 

1816 single-element list. When an INSERT statement is executed in 

1817 conjunction with a list of rows to be INSERTed, the list will contain 

1818 one row per row inserted in the statement, however it will contain 

1819 ``None`` for any server-generated values. 

1820 

1821 Future releases of SQLAlchemy will further generalize the 

1822 "fast execution helper" feature of psycopg2 to suit other dialects, 

1823 thus allowing this accessor to be of more general use. 

1824 

1825 .. versionadded:: 1.4 

1826 

1827 .. seealso:: 

1828 

1829 :attr:`_engine.CursorResult.inserted_primary_key` 

1830 

1831 """ 

1832 if not self.context.compiled: 

1833 raise exc.InvalidRequestError( 

1834 "Statement is not a compiled expression construct." 

1835 ) 

1836 elif not self.context.isinsert: 

1837 raise exc.InvalidRequestError( 

1838 "Statement is not an insert() expression construct." 

1839 ) 

1840 elif self.context._is_explicit_returning: 

1841 raise exc.InvalidRequestError( 

1842 "Can't call inserted_primary_key " 

1843 "when returning() " 

1844 "is used." 

1845 ) 

1846 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501 

1847 

1848 @property 

1849 def inserted_primary_key(self) -> Optional[Any]: 

1850 """Return the primary key for the row just inserted. 

1851 

1852 The return value is a :class:`_result.Row` object representing 

1853 a named tuple of primary key values in the order in which the 

1854 primary key columns are configured in the source 

1855 :class:`_schema.Table`. 

1856 

1857 .. versionchanged:: 1.4.8 - the 

1858 :attr:`_engine.CursorResult.inserted_primary_key` 

1859 value is now a named tuple via the :class:`_result.Row` class, 

1860 rather than a plain tuple. 

1861 

1862 This accessor only applies to single row :func:`_expression.insert` 

1863 constructs which did not explicitly specify 

1864 :meth:`_expression.Insert.returning`. Support for multirow inserts, 

1865 while not yet available for most backends, would be accessed using 

1866 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor. 

1867 

1868 Note that primary key columns which specify a server_default clause, or 

1869 otherwise do not qualify as "autoincrement" columns (see the notes at 

1870 :class:`_schema.Column`), and were generated using the database-side 

1871 default, will appear in this list as ``None`` unless the backend 

1872 supports "returning" and the insert statement executed with the 

1873 "implicit returning" enabled. 

1874 

1875 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1876 statement is not a compiled expression construct 

1877 or is not an insert() construct. 

1878 

1879 """ 

1880 

1881 if self.context.executemany: 

1882 raise exc.InvalidRequestError( 

1883 "This statement was an executemany call; if primary key " 

1884 "returning is supported, please " 

1885 "use .inserted_primary_key_rows." 

1886 ) 

1887 

1888 ikp = self.inserted_primary_key_rows 

1889 if ikp: 

1890 return ikp[0] 

1891 else: 

1892 return None 

1893 

1894 def last_updated_params( 

1895 self, 

1896 ) -> Union[ 

1897 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1898 ]: 

1899 """Return the collection of updated parameters from this 

1900 execution. 

1901 

1902 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1903 statement is not a compiled expression construct 

1904 or is not an update() construct. 

1905 

1906 """ 

1907 if not self.context.compiled: 

1908 raise exc.InvalidRequestError( 

1909 "Statement is not a compiled expression construct." 

1910 ) 

1911 elif not self.context.isupdate: 

1912 raise exc.InvalidRequestError( 

1913 "Statement is not an update() expression construct." 

1914 ) 

1915 elif self.context.executemany: 

1916 return self.context.compiled_parameters 

1917 else: 

1918 return self.context.compiled_parameters[0] 

1919 

1920 def last_inserted_params( 

1921 self, 

1922 ) -> Union[ 

1923 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1924 ]: 

1925 """Return the collection of inserted parameters from this 

1926 execution. 

1927 

1928 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1929 statement is not a compiled expression construct 

1930 or is not an insert() construct. 

1931 

1932 """ 

1933 if not self.context.compiled: 

1934 raise exc.InvalidRequestError( 

1935 "Statement is not a compiled expression construct." 

1936 ) 

1937 elif not self.context.isinsert: 

1938 raise exc.InvalidRequestError( 

1939 "Statement is not an insert() expression construct." 

1940 ) 

1941 elif self.context.executemany: 

1942 return self.context.compiled_parameters 

1943 else: 

1944 return self.context.compiled_parameters[0] 

1945 

1946 @property 

1947 def returned_defaults_rows( 

1948 self, 

1949 ) -> Optional[Sequence[Row[Unpack[TupleAny]]]]: 

1950 """Return a list of rows each containing the values of default 

1951 columns that were fetched using 

1952 the :meth:`.ValuesBase.return_defaults` feature. 

1953 

1954 The return value is a list of :class:`.Row` objects. 

1955 

1956 .. versionadded:: 1.4 

1957 

1958 """ 

1959 return self.context.returned_default_rows 

1960 

1961 def splice_horizontally(self, other: CursorResult[Any]) -> Self: 

1962 """Return a new :class:`.CursorResult` that "horizontally splices" 

1963 together the rows of this :class:`.CursorResult` with that of another 

1964 :class:`.CursorResult`. 

1965 

1966 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

1967 not intended for general use. 

1968 

1969 "horizontally splices" means that for each row in the first and second 

1970 result sets, a new row that concatenates the two rows together is 

1971 produced, which then becomes the new row. The incoming 

1972 :class:`.CursorResult` must have the identical number of rows. It is 

1973 typically expected that the two result sets come from the same sort 

1974 order as well, as the result rows are spliced together based on their 

1975 position in the result. 

1976 

1977 The expected use case here is so that multiple INSERT..RETURNING 

1978 statements (which definitely need to be sorted) against different 

1979 tables can produce a single result that looks like a JOIN of those two 

1980 tables. 

1981 

1982 E.g.:: 

1983 

1984 r1 = connection.execute( 

1985 users.insert().returning( 

1986 users.c.user_name, users.c.user_id, sort_by_parameter_order=True 

1987 ), 

1988 user_values, 

1989 ) 

1990 

1991 r2 = connection.execute( 

1992 addresses.insert().returning( 

1993 addresses.c.address_id, 

1994 addresses.c.address, 

1995 addresses.c.user_id, 

1996 sort_by_parameter_order=True, 

1997 ), 

1998 address_values, 

1999 ) 

2000 

2001 rows = r1.splice_horizontally(r2).all() 

2002 assert rows == [ 

2003 ("john", 1, 1, "foo@bar.com", 1), 

2004 ("jack", 2, 2, "bar@bat.com", 2), 

2005 ] 

2006 

2007 .. versionadded:: 2.0 

2008 

2009 .. seealso:: 

2010 

2011 :meth:`.CursorResult.splice_vertically` 

2012 

2013 

2014 """ # noqa: E501 

2015 

2016 clone = self._generate() 

2017 assert clone is self # just to note 

2018 assert isinstance(other._metadata, CursorResultMetaData) 

2019 assert isinstance(self._metadata, CursorResultMetaData) 

2020 self_tf = self._metadata._tuplefilter 

2021 other_tf = other._metadata._tuplefilter 

2022 clone._metadata = self._metadata._splice_horizontally(other._metadata) 

2023 

2024 total_rows = [ 

2025 tuple(r1 if self_tf is None else self_tf(r1)) 

2026 + tuple(r2 if other_tf is None else other_tf(r2)) 

2027 for r1, r2 in zip( 

2028 list(self._raw_row_iterator()), 

2029 list(other._raw_row_iterator()), 

2030 ) 

2031 ] 

2032 

2033 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2034 None, 

2035 initial_buffer=total_rows, 

2036 ) 

2037 clone._reset_memoizations() 

2038 return clone 

2039 

2040 def splice_vertically(self, other: CursorResult[Any]) -> Self: 

2041 """Return a new :class:`.CursorResult` that "vertically splices", 

2042 i.e. "extends", the rows of this :class:`.CursorResult` with that of 

2043 another :class:`.CursorResult`. 

2044 

2045 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

2046 not intended for general use. 

2047 

2048 "vertically splices" means the rows of the given result are appended to 

2049 the rows of this cursor result. The incoming :class:`.CursorResult` 

2050 must have rows that represent the identical list of columns in the 

2051 identical order as they are in this :class:`.CursorResult`. 

2052 

2053 .. versionadded:: 2.0 

2054 

2055 .. seealso:: 

2056 

2057 :meth:`.CursorResult.splice_horizontally` 

2058 

2059 """ 

2060 clone = self._generate() 

2061 total_rows = list(self._raw_row_iterator()) + list( 

2062 other._raw_row_iterator() 

2063 ) 

2064 

2065 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2066 None, 

2067 initial_buffer=total_rows, 

2068 ) 

2069 clone._reset_memoizations() 

2070 return clone 

2071 

2072 def _rewind(self, rows: Any) -> Self: 

2073 """rewind this result back to the given rowset. 

2074 

2075 this is used internally for the case where an :class:`.Insert` 

2076 construct combines the use of 

2077 :meth:`.Insert.return_defaults` along with the 

2078 "supplemental columns" feature. 

2079 

2080 NOTE: this method has not effect then an unique filter is applied 

2081 to the result, meaning that no row will be returned. 

2082 

2083 """ 

2084 

2085 if self._echo: 

2086 self.context.connection._log_debug( 

2087 "CursorResult rewound %d row(s)", len(rows) 

2088 ) 

2089 

2090 # the rows given are expected to be Row objects, so we 

2091 # have to clear out processors which have already run on these 

2092 # rows 

2093 self._metadata = cast( 

2094 CursorResultMetaData, self._metadata 

2095 )._remove_processors_and_tuple_filter() 

2096 

2097 self.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2098 None, 

2099 # TODO: if these are Row objects, can we save on not having to 

2100 # re-make new Row objects out of them a second time? is that 

2101 # what's actually happening right now? maybe look into this 

2102 initial_buffer=rows, 

2103 ) 

2104 self._reset_memoizations() 

2105 return self 

2106 

2107 @property 

2108 def returned_defaults(self) -> Optional[Row[Unpack[TupleAny]]]: 

2109 """Return the values of default columns that were fetched using 

2110 the :meth:`.ValuesBase.return_defaults` feature. 

2111 

2112 The value is an instance of :class:`.Row`, or ``None`` 

2113 if :meth:`.ValuesBase.return_defaults` was not used or if the 

2114 backend does not support RETURNING. 

2115 

2116 .. seealso:: 

2117 

2118 :meth:`.ValuesBase.return_defaults` 

2119 

2120 """ 

2121 

2122 if self.context.executemany: 

2123 raise exc.InvalidRequestError( 

2124 "This statement was an executemany call; if return defaults " 

2125 "is supported, please use .returned_defaults_rows." 

2126 ) 

2127 

2128 rows = self.context.returned_default_rows 

2129 if rows: 

2130 return rows[0] 

2131 else: 

2132 return None 

2133 

2134 def lastrow_has_defaults(self) -> bool: 

2135 """Return ``lastrow_has_defaults()`` from the underlying 

2136 :class:`.ExecutionContext`. 

2137 

2138 See :class:`.ExecutionContext` for details. 

2139 

2140 """ 

2141 

2142 return self.context.lastrow_has_defaults() 

2143 

2144 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2145 """Return ``postfetch_cols()`` from the underlying 

2146 :class:`.ExecutionContext`. 

2147 

2148 See :class:`.ExecutionContext` for details. 

2149 

2150 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2151 statement is not a compiled expression construct 

2152 or is not an insert() or update() construct. 

2153 

2154 """ 

2155 

2156 if not self.context.compiled: 

2157 raise exc.InvalidRequestError( 

2158 "Statement is not a compiled expression construct." 

2159 ) 

2160 elif not self.context.isinsert and not self.context.isupdate: 

2161 raise exc.InvalidRequestError( 

2162 "Statement is not an insert() or update() " 

2163 "expression construct." 

2164 ) 

2165 return self.context.postfetch_cols 

2166 

2167 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2168 """Return ``prefetch_cols()`` from the underlying 

2169 :class:`.ExecutionContext`. 

2170 

2171 See :class:`.ExecutionContext` for details. 

2172 

2173 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2174 statement is not a compiled expression construct 

2175 or is not an insert() or update() construct. 

2176 

2177 """ 

2178 

2179 if not self.context.compiled: 

2180 raise exc.InvalidRequestError( 

2181 "Statement is not a compiled expression construct." 

2182 ) 

2183 elif not self.context.isinsert and not self.context.isupdate: 

2184 raise exc.InvalidRequestError( 

2185 "Statement is not an insert() or update() " 

2186 "expression construct." 

2187 ) 

2188 return self.context.prefetch_cols 

2189 

2190 def supports_sane_rowcount(self) -> bool: 

2191 """Return ``supports_sane_rowcount`` from the dialect. 

2192 

2193 See :attr:`_engine.CursorResult.rowcount` for background. 

2194 

2195 """ 

2196 

2197 return self.dialect.supports_sane_rowcount 

2198 

2199 def supports_sane_multi_rowcount(self) -> bool: 

2200 """Return ``supports_sane_multi_rowcount`` from the dialect. 

2201 

2202 See :attr:`_engine.CursorResult.rowcount` for background. 

2203 

2204 """ 

2205 

2206 return self.dialect.supports_sane_multi_rowcount 

2207 

2208 @util.memoized_property 

2209 def rowcount(self) -> int: 

2210 """Return the 'rowcount' for this result. 

2211 

2212 The primary purpose of 'rowcount' is to report the number of rows 

2213 matched by the WHERE criterion of an UPDATE or DELETE statement 

2214 executed once (i.e. for a single parameter set), which may then be 

2215 compared to the number of rows expected to be updated or deleted as a 

2216 means of asserting data integrity. 

2217 

2218 This attribute is transferred from the ``cursor.rowcount`` attribute 

2219 of the DBAPI before the cursor is closed, to support DBAPIs that 

2220 don't make this value available after cursor close. Some DBAPIs may 

2221 offer meaningful values for other kinds of statements, such as INSERT 

2222 and SELECT statements as well. In order to retrieve ``cursor.rowcount`` 

2223 for these statements, set the 

2224 :paramref:`.Connection.execution_options.preserve_rowcount` 

2225 execution option to True, which will cause the ``cursor.rowcount`` 

2226 value to be unconditionally memoized before any results are returned 

2227 or the cursor is closed, regardless of statement type. 

2228 

2229 For cases where the DBAPI does not support rowcount for a particular 

2230 kind of statement and/or execution, the returned value will be ``-1``, 

2231 which is delivered directly from the DBAPI and is part of :pep:`249`. 

2232 All DBAPIs should support rowcount for single-parameter-set 

2233 UPDATE and DELETE statements, however. 

2234 

2235 .. note:: 

2236 

2237 Notes regarding :attr:`_engine.CursorResult.rowcount`: 

2238 

2239 

2240 * This attribute returns the number of rows *matched*, 

2241 which is not necessarily the same as the number of rows 

2242 that were actually *modified*. For example, an UPDATE statement 

2243 may have no net change on a given row if the SET values 

2244 given are the same as those present in the row already. 

2245 Such a row would be matched but not modified. 

2246 On backends that feature both styles, such as MySQL, 

2247 rowcount is configured to return the match 

2248 count in all cases. 

2249 

2250 * :attr:`_engine.CursorResult.rowcount` in the default case is 

2251 *only* useful in conjunction with an UPDATE or DELETE statement, 

2252 and only with a single set of parameters. For other kinds of 

2253 statements, SQLAlchemy will not attempt to pre-memoize the value 

2254 unless the 

2255 :paramref:`.Connection.execution_options.preserve_rowcount` 

2256 execution option is used. Note that contrary to :pep:`249`, many 

2257 DBAPIs do not support rowcount values for statements that are not 

2258 UPDATE or DELETE, particularly when rows are being returned which 

2259 are not fully pre-buffered. DBAPIs that dont support rowcount 

2260 for a particular kind of statement should return the value ``-1`` 

2261 for such statements. 

2262 

2263 * :attr:`_engine.CursorResult.rowcount` may not be meaningful 

2264 when executing a single statement with multiple parameter sets 

2265 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount" 

2266 values across multiple parameter sets and will return ``-1`` 

2267 when accessed. 

2268 

2269 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support 

2270 a correct population of :attr:`_engine.CursorResult.rowcount` 

2271 when the :paramref:`.Connection.execution_options.preserve_rowcount` 

2272 execution option is set to True. 

2273 

2274 * Statements that use RETURNING may not support rowcount, returning 

2275 a ``-1`` value instead. 

2276 

2277 .. seealso:: 

2278 

2279 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial` 

2280 

2281 :paramref:`.Connection.execution_options.preserve_rowcount` 

2282 

2283 """ # noqa: E501 

2284 try: 

2285 return self.context.rowcount 

2286 except BaseException as e: 

2287 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2288 raise # not called 

2289 

2290 @property 

2291 def lastrowid(self) -> int: 

2292 """Return the 'lastrowid' accessor on the DBAPI cursor. 

2293 

2294 This is a DBAPI specific method and is only functional 

2295 for those backends which support it, for statements 

2296 where it is appropriate. It's behavior is not 

2297 consistent across backends. 

2298 

2299 Usage of this method is normally unnecessary when 

2300 using insert() expression constructs; the 

2301 :attr:`~CursorResult.inserted_primary_key` attribute provides a 

2302 tuple of primary key values for a newly inserted row, 

2303 regardless of database backend. 

2304 

2305 """ 

2306 try: 

2307 return self.context.get_lastrowid() 

2308 except BaseException as e: 

2309 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2310 

2311 @property 

2312 def returns_rows(self) -> bool: 

2313 """True if this :class:`_engine.CursorResult` returns zero or more 

2314 rows. 

2315 

2316 I.e. if it is legal to call the methods 

2317 :meth:`_engine.CursorResult.fetchone`, 

2318 :meth:`_engine.CursorResult.fetchmany` 

2319 :meth:`_engine.CursorResult.fetchall`. 

2320 

2321 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should 

2322 always be synonymous with whether or not the DBAPI cursor had a 

2323 ``.description`` attribute, indicating the presence of result columns, 

2324 noting that a cursor that returns zero rows still has a 

2325 ``.description`` if a row-returning statement was emitted. 

2326 

2327 This attribute should be True for all results that are against 

2328 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE 

2329 that use RETURNING. For INSERT/UPDATE/DELETE statements that were 

2330 not using RETURNING, the value will usually be False, however 

2331 there are some dialect-specific exceptions to this, such as when 

2332 using the MSSQL / pyodbc dialect a SELECT is emitted inline in 

2333 order to retrieve an inserted primary key value. 

2334 

2335 .. seealso:: 

2336 

2337 :meth:`.Result.close` 

2338 

2339 :attr:`.Result.closed` 

2340 

2341 """ 

2342 return self._metadata.returns_rows 

2343 

2344 @property 

2345 def is_insert(self) -> bool: 

2346 """True if this :class:`_engine.CursorResult` is the result 

2347 of a executing an expression language compiled 

2348 :func:`_expression.insert` construct. 

2349 

2350 When True, this implies that the 

2351 :attr:`inserted_primary_key` attribute is accessible, 

2352 assuming the statement did not include 

2353 a user defined "returning" construct. 

2354 

2355 """ 

2356 return self.context.isinsert 

2357 

2358 def _fetchiter_impl(self) -> Iterator[Any]: 

2359 fetchone = self.cursor_strategy.fetchone 

2360 

2361 while True: 

2362 row = fetchone(self, self.cursor) 

2363 if row is None: 

2364 break 

2365 yield row 

2366 

2367 def _fetchone_impl(self, hard_close: bool = False) -> Any: 

2368 return self.cursor_strategy.fetchone(self, self.cursor, hard_close) 

2369 

2370 def _fetchall_impl(self) -> Any: 

2371 return self.cursor_strategy.fetchall(self, self.cursor) 

2372 

2373 def _fetchmany_impl(self, size: Optional[int] = None) -> Any: 

2374 return self.cursor_strategy.fetchmany(self, self.cursor, size) 

2375 

2376 def _raw_row_iterator(self) -> Any: 

2377 return self._fetchiter_impl() 

2378 

2379 def merge( 

2380 self, *others: Result[Unpack[TupleAny]] 

2381 ) -> MergedResult[Unpack[TupleAny]]: 

2382 merged_result = super().merge(*others) 

2383 if self.context._has_rowcount: 

2384 merged_result.rowcount = sum( 

2385 cast("CursorResult[Any]", result).rowcount 

2386 for result in (self,) + others 

2387 ) 

2388 return merged_result 

2389 

2390 def close(self) -> None: 

2391 """Close this :class:`_engine.CursorResult`. 

2392 

2393 This closes out the underlying DBAPI cursor corresponding to the 

2394 statement execution, if one is still present. Note that the DBAPI 

2395 cursor is automatically released when the :class:`_engine.CursorResult` 

2396 exhausts all available rows. :meth:`_engine.CursorResult.close` is 

2397 generally an optional method except in the case when discarding a 

2398 :class:`_engine.CursorResult` that still has additional rows pending 

2399 for fetch. 

2400 

2401 After this method is called, it is no longer valid to call upon 

2402 the fetch methods, which will raise a :class:`.ResourceClosedError` 

2403 on subsequent use. 

2404 

2405 .. seealso:: 

2406 

2407 :ref:`connections_toplevel` 

2408 

2409 """ 

2410 self._soft_close(hard=True) 

2411 

2412 @_generative 

2413 def yield_per(self, num: int) -> Self: 

2414 self._yield_per = num 

2415 self.cursor_strategy.yield_per(self, self.cursor, num) 

2416 return self 

2417 

2418 

2419ResultProxy = CursorResult