Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

751 statements  

1# engine/cursor.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Define cursor-specific result set constructs including 

9:class:`.CursorResult`.""" 

10 

11from __future__ import annotations 

12 

13import collections 

14import operator 

15import typing 

16from typing import Any 

17from typing import cast 

18from typing import ClassVar 

19from typing import Deque 

20from typing import Dict 

21from typing import Final 

22from typing import Iterable 

23from typing import Iterator 

24from typing import List 

25from typing import Literal 

26from typing import Mapping 

27from typing import NoReturn 

28from typing import Optional 

29from typing import Sequence 

30from typing import Tuple 

31from typing import TYPE_CHECKING 

32from typing import Union 

33 

34from .result import IteratorResult 

35from .result import MergedResult 

36from .result import Result 

37from .result import ResultMetaData 

38from .result import SimpleResultMetaData 

39from .result import tuplegetter 

40from .row import Row 

41from .. import exc 

42from .. import util 

43from ..sql import elements 

44from ..sql import sqltypes 

45from ..sql import util as sql_util 

46from ..sql.base import _generative 

47from ..sql.compiler import ResultColumnsEntry 

48from ..sql.compiler import RM_NAME 

49from ..sql.compiler import RM_OBJECTS 

50from ..sql.compiler import RM_RENDERED_NAME 

51from ..sql.compiler import RM_TYPE 

52from ..sql.type_api import TypeEngine 

53from ..util.typing import Self 

54from ..util.typing import TupleAny 

55from ..util.typing import TypeVarTuple 

56from ..util.typing import Unpack 

57 

58if typing.TYPE_CHECKING: 

59 from .base import Connection 

60 from .default import DefaultExecutionContext 

61 from .interfaces import _DBAPICursorDescription 

62 from .interfaces import _MutableCoreSingleExecuteParams 

63 from .interfaces import CoreExecuteOptionsParameter 

64 from .interfaces import DBAPICursor 

65 from .interfaces import DBAPIType 

66 from .interfaces import Dialect 

67 from .interfaces import ExecutionContext 

68 from .result import _KeyIndexType 

69 from .result import _KeyMapRecType 

70 from .result import _KeyMapType 

71 from .result import _KeyType 

72 from .result import _ProcessorsType 

73 from .result import _TupleGetterType 

74 from ..sql.schema import Column 

75 from ..sql.type_api import _ResultProcessorType 

76 

77 

78_Ts = TypeVarTuple("_Ts") 

79 

80 

81# metadata entry tuple indexes. 

82# using raw tuple is faster than namedtuple. 

83# these match up to the positions in 

84# _CursorKeyMapRecType 

85MD_INDEX: Final[Literal[0]] = 0 

86"""integer index in cursor.description 

87 

88""" 

89 

90MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1 

91"""integer index in compiled._result_columns""" 

92 

93MD_OBJECTS: Final[Literal[2]] = 2 

94"""other string keys and ColumnElement obj that can match. 

95 

96This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects 

97 

98""" 

99 

100MD_LOOKUP_KEY: Final[Literal[3]] = 3 

101"""string key we usually expect for key-based lookup 

102 

103this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name 

104""" 

105 

106 

107MD_RENDERED_NAME: Final[Literal[4]] = 4 

108"""name that is usually in cursor.description 

109 

110this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname 

111""" 

112 

113 

114MD_PROCESSOR: Final[Literal[5]] = 5 

115"""callable to process a result value into a row""" 

116 

117MD_UNTRANSLATED: Final[Literal[6]] = 6 

118"""raw name from cursor.description""" 

119 

120 

121_CursorKeyMapRecType = Tuple[ 

122 Optional[int], # MD_INDEX, None means the record is ambiguously named 

123 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None 

124 TupleAny, # MD_OBJECTS 

125 str, # MD_LOOKUP_KEY 

126 str, # MD_RENDERED_NAME 

127 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR 

128 Optional[str], # MD_UNTRANSLATED 

129] 

130 

131_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType] 

132 

133# same as _CursorKeyMapRecType except the MD_INDEX value is definitely 

134# not None 

135_NonAmbigCursorKeyMapRecType = Tuple[ 

136 int, 

137 int, 

138 List[Any], 

139 str, 

140 str, 

141 Optional["_ResultProcessorType[Any]"], 

142 str, 

143] 

144 

145_MergeColTuple = Tuple[ 

146 int, 

147 Optional[int], 

148 str, 

149 TypeEngine[Any], 

150 "DBAPIType", 

151 Optional[TupleAny], 

152 Optional[str], 

153] 

154 

155 

156class CursorResultMetaData(ResultMetaData): 

157 """Result metadata for DBAPI cursors.""" 

158 

159 __slots__ = ( 

160 "_keymap", 

161 "_processors", 

162 "_keys", 

163 "_keymap_by_result_column_idx", 

164 "_tuplefilter", 

165 "_translated_indexes", 

166 "_safe_for_cache", 

167 "_unpickled", 

168 "_key_to_index", 

169 # don't need _create_unique_filters here for now. Can be added 

170 # if a need arises. 

171 ) 

172 

173 _keymap: _CursorKeyMapType 

174 _processors: _ProcessorsType 

175 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]] 

176 _unpickled: bool 

177 _safe_for_cache: bool 

178 _translated_indexes: Optional[List[int]] 

179 

180 returns_rows: ClassVar[bool] = True 

181 

182 def _has_key(self, key: Any) -> bool: 

183 return key in self._keymap 

184 

185 def _for_freeze(self) -> ResultMetaData: 

186 ambiguous = { 

187 rec[MD_LOOKUP_KEY] 

188 for rec in self._keymap.values() 

189 if rec[MD_INDEX] is None 

190 } 

191 return SimpleResultMetaData( 

192 self._keys, 

193 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys], 

194 _ambiguous_keys=frozenset(ambiguous) if ambiguous else None, 

195 ) 

196 

197 def _make_new_metadata( 

198 self, 

199 *, 

200 unpickled: bool, 

201 processors: _ProcessorsType, 

202 keys: Sequence[str], 

203 keymap: _KeyMapType, 

204 tuplefilter: Optional[_TupleGetterType], 

205 translated_indexes: Optional[List[int]], 

206 safe_for_cache: bool, 

207 keymap_by_result_column_idx: Any, 

208 ) -> Self: 

209 new_obj = self.__class__.__new__(self.__class__) 

210 new_obj._unpickled = unpickled 

211 new_obj._processors = processors 

212 new_obj._keys = keys 

213 new_obj._keymap = keymap 

214 new_obj._tuplefilter = tuplefilter 

215 new_obj._translated_indexes = translated_indexes 

216 new_obj._safe_for_cache = safe_for_cache 

217 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx 

218 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX) 

219 return new_obj 

220 

221 def _remove_processors_and_tuple_filter(self) -> Self: 

222 if self._tuplefilter: 

223 proc = self._tuplefilter(self._processors) 

224 else: 

225 proc = self._processors 

226 return self._make_new_metadata( 

227 unpickled=self._unpickled, 

228 processors=[None] * len(proc), 

229 tuplefilter=None, 

230 translated_indexes=None, 

231 keymap={ 

232 key: value[0:5] + (None,) + value[6:] 

233 for key, value in self._keymap.items() 

234 }, 

235 keys=self._keys, 

236 safe_for_cache=self._safe_for_cache, 

237 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

238 ) 

239 

240 def _splice_horizontally(self, other: CursorResultMetaData) -> Self: 

241 keymap = dict(self._keymap) 

242 offset = len(self._keys) 

243 

244 for key, value in other._keymap.items(): 

245 # int index should be None for ambiguous key 

246 if value[MD_INDEX] is not None and key not in keymap: 

247 md_index = value[MD_INDEX] + offset 

248 md_object = value[MD_RESULT_MAP_INDEX] + offset 

249 else: 

250 md_index = None 

251 md_object = -1 

252 keymap[key] = (md_index, md_object, *value[2:]) 

253 

254 self_tf = self._tuplefilter 

255 other_tf = other._tuplefilter 

256 

257 proc: List[Any] = [] 

258 for pp, tf in [ 

259 (self._processors, self_tf), 

260 (other._processors, other_tf), 

261 ]: 

262 proc.extend(pp if tf is None else tf(pp)) 

263 

264 new_keys = [*self._keys, *other._keys] 

265 assert len(proc) == len(new_keys) 

266 

267 return self._make_new_metadata( 

268 unpickled=self._unpickled, 

269 processors=proc, 

270 tuplefilter=None, 

271 translated_indexes=None, 

272 keys=new_keys, 

273 keymap=keymap, 

274 safe_for_cache=self._safe_for_cache, 

275 keymap_by_result_column_idx={ 

276 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

277 for metadata_entry in keymap.values() 

278 }, 

279 ) 

280 

281 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self: 

282 recs = list(self._metadata_for_keys(keys)) 

283 

284 indexes = [rec[MD_INDEX] for rec in recs] 

285 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] 

286 

287 if self._translated_indexes: 

288 indexes = [self._translated_indexes[idx] for idx in indexes] 

289 tup = tuplegetter(*indexes) 

290 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] 

291 

292 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} 

293 # TODO: need unit test for: 

294 # result = connection.execute("raw sql, no columns").scalars() 

295 # without the "or ()" it's failing because MD_OBJECTS is None 

296 keymap.update( 

297 (e, new_rec) 

298 for new_rec in new_recs 

299 for e in new_rec[MD_OBJECTS] or () 

300 ) 

301 

302 return self._make_new_metadata( 

303 unpickled=self._unpickled, 

304 processors=self._processors, 

305 keys=new_keys, 

306 tuplefilter=tup, 

307 translated_indexes=indexes, 

308 keymap=keymap, # type: ignore[arg-type] 

309 safe_for_cache=self._safe_for_cache, 

310 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

311 ) 

312 

313 def _adapt_to_context(self, context: ExecutionContext) -> Self: 

314 """When using a cached Compiled construct that has a _result_map, 

315 for a new statement that used the cached Compiled, we need to ensure 

316 the keymap has the Column objects from our new statement as keys. 

317 So here we rewrite keymap with new entries for the new columns 

318 as matched to those of the cached statement. 

319 

320 """ 

321 

322 if not context.compiled or not context.compiled._result_columns: 

323 return self 

324 

325 compiled_statement = context.compiled.statement 

326 invoked_statement = context.invoked_statement 

327 

328 if TYPE_CHECKING: 

329 assert isinstance(invoked_statement, elements.ClauseElement) 

330 

331 if compiled_statement is invoked_statement: 

332 return self 

333 

334 assert invoked_statement is not None 

335 

336 # this is the most common path for Core statements when 

337 # caching is used. In ORM use, this codepath is not really used 

338 # as the _result_disable_adapt_to_context execution option is 

339 # set by the ORM. 

340 

341 # make a copy and add the columns from the invoked statement 

342 # to the result map. 

343 

344 keymap_by_position = self._keymap_by_result_column_idx 

345 

346 if keymap_by_position is None: 

347 # first retrieval from cache, this map will not be set up yet, 

348 # initialize lazily 

349 keymap_by_position = self._keymap_by_result_column_idx = { 

350 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

351 for metadata_entry in self._keymap.values() 

352 } 

353 

354 return self._make_new_metadata( 

355 keymap=self._keymap 

356 | { 

357 new: keymap_by_position[idx] 

358 for idx, new in enumerate( 

359 invoked_statement._all_selected_columns 

360 ) 

361 if idx in keymap_by_position 

362 }, 

363 unpickled=self._unpickled, 

364 processors=self._processors, 

365 tuplefilter=self._tuplefilter, 

366 translated_indexes=None, 

367 keys=self._keys, 

368 safe_for_cache=self._safe_for_cache, 

369 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

370 ) 

371 

372 def __init__( 

373 self, 

374 parent: CursorResult[Unpack[TupleAny]], 

375 cursor_description: _DBAPICursorDescription, 

376 *, 

377 driver_column_names: bool = False, 

378 num_sentinel_cols: int = 0, 

379 ): 

380 context = parent.context 

381 if num_sentinel_cols > 0: 

382 # this is slightly faster than letting tuplegetter use the indexes 

383 self._tuplefilter = tuplefilter = operator.itemgetter( 

384 slice(-num_sentinel_cols) 

385 ) 

386 cursor_description = tuplefilter(cursor_description) 

387 else: 

388 self._tuplefilter = tuplefilter = None 

389 self._translated_indexes = None 

390 self._safe_for_cache = self._unpickled = False 

391 

392 if context.result_column_struct: 

393 ( 

394 result_columns, 

395 cols_are_ordered, 

396 textual_ordered, 

397 ad_hoc_textual, 

398 loose_column_name_matching, 

399 ) = context.result_column_struct 

400 if tuplefilter is not None: 

401 result_columns = tuplefilter(result_columns) 

402 num_ctx_cols = len(result_columns) 

403 else: 

404 result_columns = cols_are_ordered = ( # type: ignore 

405 num_ctx_cols 

406 ) = ad_hoc_textual = loose_column_name_matching = ( 

407 textual_ordered 

408 ) = False 

409 

410 # merge cursor.description with the column info 

411 # present in the compiled structure, if any 

412 raw = self._merge_cursor_description( 

413 context, 

414 cursor_description, 

415 result_columns, 

416 num_ctx_cols, 

417 cols_are_ordered, 

418 textual_ordered, 

419 ad_hoc_textual, 

420 loose_column_name_matching, 

421 driver_column_names, 

422 ) 

423 

424 # processors in key order which are used when building up 

425 # a row 

426 self._processors = [ 

427 metadata_entry[MD_PROCESSOR] for metadata_entry in raw 

428 ] 

429 if num_sentinel_cols > 0: 

430 # add the number of sentinel columns since these are passed 

431 # to the tuplefilters before being used 

432 self._processors.extend([None] * num_sentinel_cols) 

433 

434 # this is used when using this ResultMetaData in a Core-only cache 

435 # retrieval context. it's initialized on first cache retrieval 

436 # when the _result_disable_adapt_to_context execution option 

437 # (which the ORM generally sets) is not set. 

438 self._keymap_by_result_column_idx = None 

439 

440 # for compiled SQL constructs, copy additional lookup keys into 

441 # the key lookup map, such as Column objects, labels, 

442 # column keys and other names 

443 if num_ctx_cols: 

444 # keymap by primary string... 

445 by_key: Dict[_KeyType, _CursorKeyMapRecType] = { 

446 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

447 for metadata_entry in raw 

448 } 

449 

450 if len(by_key) != num_ctx_cols: 

451 # if by-primary-string dictionary smaller than 

452 # number of columns, assume we have dupes; (this check 

453 # is also in place if string dictionary is bigger, as 

454 # can occur when '*' was used as one of the compiled columns, 

455 # which may or may not be suggestive of dupes), rewrite 

456 # dupe records with "None" for index which results in 

457 # ambiguous column exception when accessed. 

458 # 

459 # this is considered to be the less common case as it is not 

460 # common to have dupe column keys in a SELECT statement. 

461 # 

462 # new in 1.4: get the complete set of all possible keys, 

463 # strings, objects, whatever, that are dupes across two 

464 # different records, first. 

465 index_by_key: Dict[Any, Any] = {} 

466 dupes = set() 

467 for metadata_entry in raw: 

468 for key in (metadata_entry[MD_RENDERED_NAME],) + ( 

469 metadata_entry[MD_OBJECTS] or () 

470 ): 

471 idx = metadata_entry[MD_INDEX] 

472 # if this key has been associated with more than one 

473 # positional index, it's a dupe 

474 if index_by_key.setdefault(key, idx) != idx: 

475 dupes.add(key) 

476 

477 # then put everything we have into the keymap excluding only 

478 # those keys that are dupes. 

479 self._keymap = { 

480 obj_elem: metadata_entry 

481 for metadata_entry in raw 

482 if metadata_entry[MD_OBJECTS] 

483 for obj_elem in metadata_entry[MD_OBJECTS] 

484 if obj_elem not in dupes 

485 } 

486 

487 # then for the dupe keys, put the "ambiguous column" 

488 # record into by_key. 

489 by_key.update( 

490 { 

491 key: (None, -1, (), key, key, None, None) 

492 for key in dupes 

493 } 

494 ) 

495 

496 else: 

497 # no dupes - copy secondary elements from compiled 

498 # columns into self._keymap. this is the most common 

499 # codepath for Core / ORM statement executions before the 

500 # result metadata is cached 

501 self._keymap = { 

502 obj_elem: metadata_entry 

503 for metadata_entry in raw 

504 if metadata_entry[MD_OBJECTS] 

505 for obj_elem in metadata_entry[MD_OBJECTS] 

506 } 

507 # update keymap with primary string names taking 

508 # precedence 

509 self._keymap.update(by_key) 

510 else: 

511 # no compiled objects to map, just create keymap by primary string 

512 self._keymap = { 

513 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

514 for metadata_entry in raw 

515 } 

516 

517 # update keymap with "translated" names. 

518 # the "translated" name thing has a long history: 

519 # 1. originally, it was used to fix an issue in very old SQLite 

520 # versions prior to 3.10.0. This code is still there in the 

521 # sqlite dialect. 

522 # 2. Next, the pyhive third party dialect started using this hook 

523 # for some driver related issue on their end. 

524 # 3. Most recently, the "driver_column_names" execution option has 

525 # taken advantage of this hook to get raw DBAPI col names in the 

526 # result keys without disrupting the usual merge process. 

527 

528 if driver_column_names or ( 

529 not num_ctx_cols and context._translate_colname 

530 ): 

531 self._keymap.update( 

532 { 

533 metadata_entry[MD_UNTRANSLATED]: self._keymap[ 

534 metadata_entry[MD_LOOKUP_KEY] 

535 ] 

536 for metadata_entry in raw 

537 if metadata_entry[MD_UNTRANSLATED] 

538 } 

539 ) 

540 

541 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

542 

543 def _merge_cursor_description( 

544 self, 

545 context: DefaultExecutionContext, 

546 cursor_description: _DBAPICursorDescription, 

547 result_columns: Sequence[ResultColumnsEntry], 

548 num_ctx_cols: int, 

549 cols_are_ordered: bool, 

550 textual_ordered: bool, 

551 ad_hoc_textual: bool, 

552 loose_column_name_matching: bool, 

553 driver_column_names: bool, 

554 ) -> List[_CursorKeyMapRecType]: 

555 """Merge a cursor.description with compiled result column information. 

556 

557 There are at least four separate strategies used here, selected 

558 depending on the type of SQL construct used to start with. 

559 

560 The most common case is that of the compiled SQL expression construct, 

561 which generated the column names present in the raw SQL string and 

562 which has the identical number of columns as were reported by 

563 cursor.description. In this case, we assume a 1-1 positional mapping 

564 between the entries in cursor.description and the compiled object. 

565 This is also the most performant case as we disregard extracting / 

566 decoding the column names present in cursor.description since we 

567 already have the desired name we generated in the compiled SQL 

568 construct. 

569 

570 The next common case is that of the completely raw string SQL, 

571 such as passed to connection.execute(). In this case we have no 

572 compiled construct to work with, so we extract and decode the 

573 names from cursor.description and index those as the primary 

574 result row target keys. 

575 

576 The remaining fairly common case is that of the textual SQL 

577 that includes at least partial column information; this is when 

578 we use a :class:`_expression.TextualSelect` construct. 

579 This construct may have 

580 unordered or ordered column information. In the ordered case, we 

581 merge the cursor.description and the compiled construct's information 

582 positionally, and warn if there are additional description names 

583 present, however we still decode the names in cursor.description 

584 as we don't have a guarantee that the names in the columns match 

585 on these. In the unordered case, we match names in cursor.description 

586 to that of the compiled construct based on name matching. 

587 In both of these cases, the cursor.description names and the column 

588 expression objects and names are indexed as result row target keys. 

589 

590 The final case is much less common, where we have a compiled 

591 non-textual SQL expression construct, but the number of columns 

592 in cursor.description doesn't match what's in the compiled 

593 construct. We make the guess here that there might be textual 

594 column expressions in the compiled construct that themselves include 

595 a comma in them causing them to split. We do the same name-matching 

596 as with textual non-ordered columns. 

597 

598 The name-matched system of merging is the same as that used by 

599 SQLAlchemy for all cases up through the 0.9 series. Positional 

600 matching for compiled SQL expressions was introduced in 1.0 as a 

601 major performance feature, and positional matching for textual 

602 :class:`_expression.TextualSelect` objects in 1.1. 

603 As name matching is no longer 

604 a common case, it was acceptable to factor it into smaller generator- 

605 oriented methods that are easier to understand, but incur slightly 

606 more performance overhead. 

607 

608 """ 

609 

610 if ( 

611 num_ctx_cols 

612 and cols_are_ordered 

613 and not textual_ordered 

614 and num_ctx_cols == len(cursor_description) 

615 and not driver_column_names 

616 ): 

617 self._keys = [elem[0] for elem in result_columns] 

618 # pure positional 1-1 case; doesn't need to read 

619 # the names from cursor.description 

620 

621 # most common case for Core and ORM 

622 

623 # this metadata is safe to 

624 # cache because we are guaranteed 

625 # to have the columns in the same order for new executions 

626 self._safe_for_cache = True 

627 

628 return [ 

629 ( 

630 idx, 

631 idx, 

632 rmap_entry[RM_OBJECTS], 

633 rmap_entry[RM_NAME], 

634 rmap_entry[RM_RENDERED_NAME], 

635 context.get_result_processor( 

636 rmap_entry[RM_TYPE], 

637 rmap_entry[RM_RENDERED_NAME], 

638 cursor_description[idx][1], 

639 ), 

640 None, 

641 ) 

642 for idx, rmap_entry in enumerate(result_columns) 

643 ] 

644 else: 

645 # name-based or text-positional cases, where we need 

646 # to read cursor.description names 

647 

648 if textual_ordered or ( 

649 ad_hoc_textual and len(cursor_description) == num_ctx_cols 

650 ): 

651 self._safe_for_cache = not driver_column_names 

652 # textual positional case 

653 raw_iterator = self._merge_textual_cols_by_position( 

654 context, 

655 cursor_description, 

656 result_columns, 

657 driver_column_names, 

658 ) 

659 elif num_ctx_cols: 

660 # compiled SQL with a mismatch of description cols 

661 # vs. compiled cols, or textual w/ unordered columns 

662 # the order of columns can change if the query is 

663 # against a "select *", so not safe to cache 

664 self._safe_for_cache = False 

665 raw_iterator = self._merge_cols_by_name( 

666 context, 

667 cursor_description, 

668 result_columns, 

669 loose_column_name_matching, 

670 driver_column_names, 

671 ) 

672 else: 

673 # no compiled SQL, just a raw string, order of columns 

674 # can change for "select *" 

675 self._safe_for_cache = False 

676 raw_iterator = self._merge_cols_by_none( 

677 context, cursor_description, driver_column_names 

678 ) 

679 

680 return [ 

681 ( 

682 idx, 

683 ridx, 

684 obj, 

685 cursor_colname, 

686 cursor_colname, 

687 context.get_result_processor( 

688 mapped_type, cursor_colname, coltype 

689 ), 

690 untranslated, 

691 ) # type: ignore[misc] 

692 for ( 

693 idx, 

694 ridx, 

695 cursor_colname, 

696 mapped_type, 

697 coltype, 

698 obj, 

699 untranslated, 

700 ) in raw_iterator 

701 ] 

702 

703 def _colnames_from_description( 

704 self, 

705 context: DefaultExecutionContext, 

706 cursor_description: _DBAPICursorDescription, 

707 driver_column_names: bool, 

708 ) -> Iterator[Tuple[int, str, str, Optional[str], DBAPIType]]: 

709 """Extract column names and data types from a cursor.description. 

710 

711 Applies unicode decoding, column translation, "normalization", 

712 and case sensitivity rules to the names based on the dialect. 

713 

714 """ 

715 dialect = context.dialect 

716 translate_colname = context._translate_colname 

717 normalize_name = ( 

718 dialect.normalize_name if dialect.requires_name_normalize else None 

719 ) 

720 

721 untranslated = None 

722 

723 for idx, rec in enumerate(cursor_description): 

724 colname = unnormalized = rec[0] 

725 coltype = rec[1] 

726 

727 if translate_colname: 

728 # a None here for "untranslated" means "the dialect did not 

729 # change the column name and the untranslated case can be 

730 # ignored". otherwise "untranslated" is expected to be the 

731 # original, unchanged colname (e.g. is == to "unnormalized") 

732 colname, untranslated = translate_colname(colname) 

733 

734 assert untranslated is None or untranslated == unnormalized 

735 

736 if normalize_name: 

737 colname = normalize_name(colname) 

738 

739 if driver_column_names: 

740 yield idx, colname, unnormalized, unnormalized, coltype 

741 

742 else: 

743 yield idx, colname, unnormalized, untranslated, coltype 

744 

745 def _merge_textual_cols_by_position( 

746 self, 

747 context: DefaultExecutionContext, 

748 cursor_description: _DBAPICursorDescription, 

749 result_columns: Sequence[ResultColumnsEntry], 

750 driver_column_names: bool, 

751 ) -> Iterator[_MergeColTuple]: 

752 num_ctx_cols = len(result_columns) 

753 

754 if num_ctx_cols > len(cursor_description): 

755 util.warn( 

756 "Number of columns in textual SQL (%d) is " 

757 "smaller than number of columns requested (%d)" 

758 % (num_ctx_cols, len(cursor_description)) 

759 ) 

760 seen = set() 

761 

762 self._keys = [] 

763 

764 uses_denormalize = context.dialect.requires_name_normalize 

765 for ( 

766 idx, 

767 colname, 

768 unnormalized, 

769 untranslated, 

770 coltype, 

771 ) in self._colnames_from_description( 

772 context, cursor_description, driver_column_names 

773 ): 

774 if idx < num_ctx_cols: 

775 ctx_rec = result_columns[idx] 

776 obj = ctx_rec[RM_OBJECTS] 

777 ridx = idx 

778 mapped_type = ctx_rec[RM_TYPE] 

779 if obj[0] in seen: 

780 raise exc.InvalidRequestError( 

781 "Duplicate column expression requested " 

782 "in textual SQL: %r" % obj[0] 

783 ) 

784 seen.add(obj[0]) 

785 

786 # special check for all uppercase unnormalized name; 

787 # use the unnormalized name as the key. 

788 # see #10788 

789 # if these names don't match, then we still honor the 

790 # cursor.description name as the key and not what the 

791 # Column has, see 

792 # test_resultset.py::PositionalTextTest::test_via_column 

793 if ( 

794 uses_denormalize 

795 and unnormalized == ctx_rec[RM_RENDERED_NAME] 

796 ): 

797 result_name = unnormalized 

798 else: 

799 result_name = colname 

800 else: 

801 mapped_type = sqltypes.NULLTYPE 

802 obj = None 

803 ridx = None 

804 

805 result_name = colname 

806 

807 if driver_column_names: 

808 assert untranslated is not None 

809 self._keys.append(untranslated) 

810 else: 

811 self._keys.append(result_name) 

812 

813 yield ( 

814 idx, 

815 ridx, 

816 result_name, 

817 mapped_type, 

818 coltype, 

819 obj, 

820 untranslated, 

821 ) 

822 

823 def _merge_cols_by_name( 

824 self, 

825 context: DefaultExecutionContext, 

826 cursor_description: _DBAPICursorDescription, 

827 result_columns: Sequence[ResultColumnsEntry], 

828 loose_column_name_matching: bool, 

829 driver_column_names: bool, 

830 ) -> Iterator[_MergeColTuple]: 

831 match_map = self._create_description_match_map( 

832 result_columns, loose_column_name_matching 

833 ) 

834 mapped_type: TypeEngine[Any] 

835 

836 self._keys = [] 

837 

838 for ( 

839 idx, 

840 colname, 

841 unnormalized, 

842 untranslated, 

843 coltype, 

844 ) in self._colnames_from_description( 

845 context, cursor_description, driver_column_names 

846 ): 

847 try: 

848 ctx_rec = match_map[colname] 

849 except KeyError: 

850 mapped_type = sqltypes.NULLTYPE 

851 obj = None 

852 result_columns_idx = None 

853 else: 

854 obj = ctx_rec[1] 

855 mapped_type = ctx_rec[2] 

856 result_columns_idx = ctx_rec[3] 

857 

858 if driver_column_names: 

859 assert untranslated is not None 

860 self._keys.append(untranslated) 

861 else: 

862 self._keys.append(colname) 

863 yield ( 

864 idx, 

865 result_columns_idx, 

866 colname, 

867 mapped_type, 

868 coltype, 

869 obj, 

870 untranslated, 

871 ) 

872 

873 @classmethod 

874 def _create_description_match_map( 

875 cls, 

876 result_columns: Sequence[ResultColumnsEntry], 

877 loose_column_name_matching: bool = False, 

878 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]: 

879 """when matching cursor.description to a set of names that are present 

880 in a Compiled object, as is the case with TextualSelect, get all the 

881 names we expect might match those in cursor.description. 

882 """ 

883 

884 d: Dict[ 

885 Union[str, object], 

886 Tuple[str, TupleAny, TypeEngine[Any], int], 

887 ] = {} 

888 for ridx, elem in enumerate(result_columns): 

889 key = elem[RM_RENDERED_NAME] 

890 

891 if key in d: 

892 # conflicting keyname - just add the column-linked objects 

893 # to the existing record. if there is a duplicate column 

894 # name in the cursor description, this will allow all of those 

895 # objects to raise an ambiguous column error 

896 e_name, e_obj, e_type, e_ridx = d[key] 

897 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx 

898 else: 

899 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx) 

900 

901 if loose_column_name_matching: 

902 # when using a textual statement with an unordered set 

903 # of columns that line up, we are expecting the user 

904 # to be using label names in the SQL that match to the column 

905 # expressions. Enable more liberal matching for this case; 

906 # duplicate keys that are ambiguous will be fixed later. 

907 for r_key in elem[RM_OBJECTS]: 

908 d.setdefault( 

909 r_key, 

910 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx), 

911 ) 

912 return d 

913 

914 def _merge_cols_by_none( 

915 self, 

916 context: DefaultExecutionContext, 

917 cursor_description: _DBAPICursorDescription, 

918 driver_column_names: bool, 

919 ) -> Iterator[_MergeColTuple]: 

920 self._keys = [] 

921 

922 for ( 

923 idx, 

924 colname, 

925 unnormalized, 

926 untranslated, 

927 coltype, 

928 ) in self._colnames_from_description( 

929 context, cursor_description, driver_column_names 

930 ): 

931 

932 if driver_column_names: 

933 assert untranslated is not None 

934 self._keys.append(untranslated) 

935 else: 

936 self._keys.append(colname) 

937 

938 yield ( 

939 idx, 

940 None, 

941 colname, 

942 sqltypes.NULLTYPE, 

943 coltype, 

944 None, 

945 untranslated, 

946 ) 

947 

948 if not TYPE_CHECKING: 

949 

950 def _key_fallback( 

951 self, key: Any, err: Optional[Exception], raiseerr: bool = True 

952 ) -> Optional[NoReturn]: 

953 if raiseerr: 

954 if self._unpickled and isinstance(key, elements.ColumnElement): 

955 raise exc.NoSuchColumnError( 

956 "Row was unpickled; lookup by ColumnElement " 

957 "is unsupported" 

958 ) from err 

959 else: 

960 raise exc.NoSuchColumnError( 

961 "Could not locate column in row for column '%s'" 

962 % util.string_or_unprintable(key) 

963 ) from err 

964 else: 

965 return None 

966 

967 def _raise_for_ambiguous_column_name( 

968 self, rec: _KeyMapRecType 

969 ) -> NoReturn: 

970 raise exc.InvalidRequestError( 

971 "Ambiguous column name '%s' in " 

972 "result set column descriptions" % rec[MD_LOOKUP_KEY] 

973 ) 

974 

975 def _index_for_key( 

976 self, key: _KeyIndexType, raiseerr: bool = True 

977 ) -> Optional[int]: 

978 # TODO: can consider pre-loading ints and negative ints 

979 # into _keymap - also no coverage here 

980 if isinstance(key, int): 

981 key = self._keys[key] 

982 

983 try: 

984 rec = self._keymap[key] 

985 except KeyError as ke: 

986 x = self._key_fallback(key, ke, raiseerr) 

987 assert x is None 

988 return None 

989 

990 index = rec[0] 

991 

992 if index is None: 

993 self._raise_for_ambiguous_column_name(rec) 

994 return index 

995 

996 def _indexes_for_keys( 

997 self, keys: Sequence[_KeyIndexType] 

998 ) -> Sequence[int]: 

999 try: 

1000 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501 

1001 except KeyError as ke: 

1002 # ensure it raises 

1003 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

1004 

1005 def _metadata_for_keys( 

1006 self, keys: Sequence[_KeyIndexType] 

1007 ) -> Iterator[_NonAmbigCursorKeyMapRecType]: 

1008 for key in keys: 

1009 if isinstance(key, int): 

1010 key = self._keys[key] 

1011 

1012 try: 

1013 rec = self._keymap[key] 

1014 except KeyError as ke: 

1015 # ensure it raises 

1016 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

1017 

1018 index = rec[MD_INDEX] 

1019 

1020 if index is None: 

1021 self._raise_for_ambiguous_column_name(rec) 

1022 

1023 yield cast(_NonAmbigCursorKeyMapRecType, rec) 

1024 

1025 def __getstate__(self) -> Dict[str, Any]: 

1026 # TODO: consider serializing this as SimpleResultMetaData 

1027 return { 

1028 "_keymap": { 

1029 key: ( 

1030 rec[MD_INDEX], 

1031 rec[MD_RESULT_MAP_INDEX], 

1032 [], 

1033 key, 

1034 rec[MD_RENDERED_NAME], 

1035 None, 

1036 None, 

1037 ) 

1038 for key, rec in self._keymap.items() 

1039 if isinstance(key, (str, int)) 

1040 }, 

1041 "_keys": self._keys, 

1042 "_translated_indexes": self._translated_indexes, 

1043 } 

1044 

1045 def __setstate__(self, state: Dict[str, Any]) -> None: 

1046 self._processors = [None for _ in range(len(state["_keys"]))] 

1047 self._keymap = state["_keymap"] 

1048 self._keymap_by_result_column_idx = None 

1049 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

1050 self._keys = state["_keys"] 

1051 self._unpickled = True 

1052 if state["_translated_indexes"]: 

1053 translated_indexes: List[Any] 

1054 self._translated_indexes = translated_indexes = state[ 

1055 "_translated_indexes" 

1056 ] 

1057 self._tuplefilter = tuplegetter(*translated_indexes) 

1058 else: 

1059 self._translated_indexes = self._tuplefilter = None 

1060 

1061 

1062class ResultFetchStrategy: 

1063 """Define a fetching strategy for a result object. 

1064 

1065 

1066 .. versionadded:: 1.4 

1067 

1068 """ 

1069 

1070 __slots__ = () 

1071 

1072 alternate_cursor_description: Optional[_DBAPICursorDescription] = None 

1073 

1074 def soft_close( 

1075 self, 

1076 result: CursorResult[Unpack[TupleAny]], 

1077 dbapi_cursor: Optional[DBAPICursor], 

1078 ) -> None: 

1079 raise NotImplementedError() 

1080 

1081 def hard_close( 

1082 self, 

1083 result: CursorResult[Unpack[TupleAny]], 

1084 dbapi_cursor: Optional[DBAPICursor], 

1085 ) -> None: 

1086 raise NotImplementedError() 

1087 

1088 def yield_per( 

1089 self, 

1090 result: CursorResult[Unpack[TupleAny]], 

1091 dbapi_cursor: DBAPICursor, 

1092 num: int, 

1093 ) -> None: 

1094 return 

1095 

1096 def fetchone( 

1097 self, 

1098 result: CursorResult[Unpack[TupleAny]], 

1099 dbapi_cursor: DBAPICursor, 

1100 hard_close: bool = False, 

1101 ) -> Any: 

1102 raise NotImplementedError() 

1103 

1104 def fetchmany( 

1105 self, 

1106 result: CursorResult[Unpack[TupleAny]], 

1107 dbapi_cursor: DBAPICursor, 

1108 size: Optional[int] = None, 

1109 ) -> Any: 

1110 raise NotImplementedError() 

1111 

1112 def fetchall( 

1113 self, 

1114 result: CursorResult[Unpack[TupleAny]], 

1115 dbapi_cursor: DBAPICursor, 

1116 ) -> Any: 

1117 raise NotImplementedError() 

1118 

1119 def handle_exception( 

1120 self, 

1121 result: CursorResult[Unpack[TupleAny]], 

1122 dbapi_cursor: Optional[DBAPICursor], 

1123 err: BaseException, 

1124 ) -> NoReturn: 

1125 raise err 

1126 

1127 

1128class NoCursorFetchStrategy(ResultFetchStrategy): 

1129 """Cursor strategy for a result that has no open cursor. 

1130 

1131 There are two varieties of this strategy, one for DQL and one for 

1132 DML (and also DDL), each of which represent a result that had a cursor 

1133 but no longer has one. 

1134 

1135 """ 

1136 

1137 __slots__ = () 

1138 

1139 def soft_close( 

1140 self, 

1141 result: CursorResult[Unpack[TupleAny]], 

1142 dbapi_cursor: Optional[DBAPICursor], 

1143 ) -> None: 

1144 pass 

1145 

1146 def hard_close( 

1147 self, 

1148 result: CursorResult[Unpack[TupleAny]], 

1149 dbapi_cursor: Optional[DBAPICursor], 

1150 ) -> None: 

1151 pass 

1152 

1153 def fetchone( 

1154 self, 

1155 result: CursorResult[Unpack[TupleAny]], 

1156 dbapi_cursor: DBAPICursor, 

1157 hard_close: bool = False, 

1158 ) -> Any: 

1159 return self._non_result(result, None) 

1160 

1161 def fetchmany( 

1162 self, 

1163 result: CursorResult[Unpack[TupleAny]], 

1164 dbapi_cursor: DBAPICursor, 

1165 size: Optional[int] = None, 

1166 ) -> Any: 

1167 return self._non_result(result, []) 

1168 

1169 def fetchall( 

1170 self, result: CursorResult[Unpack[TupleAny]], dbapi_cursor: DBAPICursor 

1171 ) -> Any: 

1172 return self._non_result(result, []) 

1173 

1174 def _non_result( 

1175 self, 

1176 result: CursorResult[Unpack[TupleAny]], 

1177 default: Any, 

1178 err: Optional[BaseException] = None, 

1179 ) -> Any: 

1180 raise NotImplementedError() 

1181 

1182 

1183class NoCursorDQLFetchStrategy(NoCursorFetchStrategy): 

1184 """Cursor strategy for a DQL result that has no open cursor. 

1185 

1186 This is a result set that can return rows, i.e. for a SELECT, or for an 

1187 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state 

1188 where the cursor is closed and no rows remain available. The owning result 

1189 object may or may not be "hard closed", which determines if the fetch 

1190 methods send empty results or raise for closed result. 

1191 

1192 """ 

1193 

1194 __slots__ = () 

1195 

1196 def _non_result( 

1197 self, 

1198 result: CursorResult[Unpack[TupleAny]], 

1199 default: Any, 

1200 err: Optional[BaseException] = None, 

1201 ) -> Any: 

1202 if result.closed: 

1203 raise exc.ResourceClosedError( 

1204 "This result object is closed." 

1205 ) from err 

1206 else: 

1207 return default 

1208 

1209 

1210_NO_CURSOR_DQL = NoCursorDQLFetchStrategy() 

1211 

1212 

1213class NoCursorDMLFetchStrategy(NoCursorFetchStrategy): 

1214 """Cursor strategy for a DML result that has no open cursor. 

1215 

1216 This is a result set that does not return rows, i.e. for an INSERT, 

1217 UPDATE, DELETE that does not include RETURNING. 

1218 

1219 """ 

1220 

1221 __slots__ = () 

1222 

1223 def _non_result( 

1224 self, 

1225 result: CursorResult[Unpack[TupleAny]], 

1226 default: Any, 

1227 err: Optional[BaseException] = None, 

1228 ) -> Any: 

1229 # we only expect to have a _NoResultMetaData() here right now. 

1230 assert not result._metadata.returns_rows 

1231 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr] 

1232 

1233 

1234_NO_CURSOR_DML = NoCursorDMLFetchStrategy() 

1235 

1236 

1237class CursorFetchStrategy(ResultFetchStrategy): 

1238 """Call fetch methods from a DBAPI cursor. 

1239 

1240 Alternate versions of this class may instead buffer the rows from 

1241 cursors or not use cursors at all. 

1242 

1243 """ 

1244 

1245 __slots__ = () 

1246 

1247 def soft_close( 

1248 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1249 ) -> None: 

1250 result.cursor_strategy = _NO_CURSOR_DQL 

1251 

1252 def hard_close( 

1253 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1254 ) -> None: 

1255 result.cursor_strategy = _NO_CURSOR_DQL 

1256 

1257 def handle_exception( 

1258 self, 

1259 result: CursorResult[Any], 

1260 dbapi_cursor: Optional[DBAPICursor], 

1261 err: BaseException, 

1262 ) -> NoReturn: 

1263 result.connection._handle_dbapi_exception( 

1264 err, None, None, dbapi_cursor, result.context 

1265 ) 

1266 

1267 def yield_per( 

1268 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1269 ) -> None: 

1270 result.cursor_strategy = BufferedRowCursorFetchStrategy( 

1271 dbapi_cursor, 

1272 {"max_row_buffer": num}, 

1273 initial_buffer=collections.deque(), 

1274 growth_factor=0, 

1275 ) 

1276 

1277 def fetchone( 

1278 self, 

1279 result: CursorResult[Any], 

1280 dbapi_cursor: DBAPICursor, 

1281 hard_close: bool = False, 

1282 ) -> Any: 

1283 try: 

1284 row = dbapi_cursor.fetchone() 

1285 if row is None: 

1286 result._soft_close(hard=hard_close) 

1287 return row 

1288 except BaseException as e: 

1289 self.handle_exception(result, dbapi_cursor, e) 

1290 

1291 def fetchmany( 

1292 self, 

1293 result: CursorResult[Any], 

1294 dbapi_cursor: DBAPICursor, 

1295 size: Optional[int] = None, 

1296 ) -> Any: 

1297 try: 

1298 if size is None: 

1299 l = dbapi_cursor.fetchmany() 

1300 else: 

1301 l = dbapi_cursor.fetchmany(size) 

1302 

1303 if not l: 

1304 result._soft_close() 

1305 return l 

1306 except BaseException as e: 

1307 self.handle_exception(result, dbapi_cursor, e) 

1308 

1309 def fetchall( 

1310 self, 

1311 result: CursorResult[Any], 

1312 dbapi_cursor: DBAPICursor, 

1313 ) -> Any: 

1314 try: 

1315 rows = dbapi_cursor.fetchall() 

1316 result._soft_close() 

1317 return rows 

1318 except BaseException as e: 

1319 self.handle_exception(result, dbapi_cursor, e) 

1320 

1321 

1322_DEFAULT_FETCH = CursorFetchStrategy() 

1323 

1324 

1325class BufferedRowCursorFetchStrategy(CursorFetchStrategy): 

1326 """A cursor fetch strategy with row buffering behavior. 

1327 

1328 This strategy buffers the contents of a selection of rows 

1329 before ``fetchone()`` is called. This is to allow the results of 

1330 ``cursor.description`` to be available immediately, when 

1331 interfacing with a DB-API that requires rows to be consumed before 

1332 this information is available (currently psycopg2, when used with 

1333 server-side cursors). 

1334 

1335 The pre-fetching behavior fetches only one row initially, and then 

1336 grows its buffer size by a fixed amount with each successive need 

1337 for additional rows up the ``max_row_buffer`` size, which defaults 

1338 to 1000:: 

1339 

1340 with psycopg2_engine.connect() as conn: 

1341 

1342 result = conn.execution_options( 

1343 stream_results=True, max_row_buffer=50 

1344 ).execute(text("select * from table")) 

1345 

1346 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. 

1347 

1348 .. seealso:: 

1349 

1350 :ref:`psycopg2_execution_options` 

1351 """ 

1352 

1353 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor") 

1354 

1355 def __init__( 

1356 self, 

1357 dbapi_cursor: DBAPICursor, 

1358 execution_options: CoreExecuteOptionsParameter, 

1359 growth_factor: int = 5, 

1360 initial_buffer: Optional[Deque[Any]] = None, 

1361 ) -> None: 

1362 self._max_row_buffer = execution_options.get("max_row_buffer", 1000) 

1363 

1364 if initial_buffer is not None: 

1365 self._rowbuffer = initial_buffer 

1366 else: 

1367 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) 

1368 self._growth_factor = growth_factor 

1369 

1370 if growth_factor: 

1371 self._bufsize = min(self._max_row_buffer, self._growth_factor) 

1372 else: 

1373 self._bufsize = self._max_row_buffer 

1374 

1375 @classmethod 

1376 def create( 

1377 cls, result: CursorResult[Any] 

1378 ) -> BufferedRowCursorFetchStrategy: 

1379 return BufferedRowCursorFetchStrategy( 

1380 result.cursor, 

1381 result.context.execution_options, 

1382 ) 

1383 

1384 def _buffer_rows( 

1385 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1386 ) -> None: 

1387 """this is currently used only by fetchone().""" 

1388 

1389 size = self._bufsize 

1390 try: 

1391 if size < 1: 

1392 new_rows = dbapi_cursor.fetchall() 

1393 else: 

1394 new_rows = dbapi_cursor.fetchmany(size) 

1395 except BaseException as e: 

1396 self.handle_exception(result, dbapi_cursor, e) 

1397 

1398 if not new_rows: 

1399 return 

1400 self._rowbuffer = collections.deque(new_rows) 

1401 if self._growth_factor and size < self._max_row_buffer: 

1402 self._bufsize = min( 

1403 self._max_row_buffer, size * self._growth_factor 

1404 ) 

1405 

1406 def yield_per( 

1407 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1408 ) -> None: 

1409 self._growth_factor = 0 

1410 self._max_row_buffer = self._bufsize = num 

1411 

1412 def soft_close( 

1413 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1414 ) -> None: 

1415 self._rowbuffer.clear() 

1416 super().soft_close(result, dbapi_cursor) 

1417 

1418 def hard_close( 

1419 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1420 ) -> None: 

1421 self._rowbuffer.clear() 

1422 super().hard_close(result, dbapi_cursor) 

1423 

1424 def fetchone( 

1425 self, 

1426 result: CursorResult[Any], 

1427 dbapi_cursor: DBAPICursor, 

1428 hard_close: bool = False, 

1429 ) -> Any: 

1430 if not self._rowbuffer: 

1431 self._buffer_rows(result, dbapi_cursor) 

1432 if not self._rowbuffer: 

1433 try: 

1434 result._soft_close(hard=hard_close) 

1435 except BaseException as e: 

1436 self.handle_exception(result, dbapi_cursor, e) 

1437 return None 

1438 return self._rowbuffer.popleft() 

1439 

1440 def fetchmany( 

1441 self, 

1442 result: CursorResult[Any], 

1443 dbapi_cursor: DBAPICursor, 

1444 size: Optional[int] = None, 

1445 ) -> Any: 

1446 if size is None: 

1447 return self.fetchall(result, dbapi_cursor) 

1448 

1449 rb = self._rowbuffer 

1450 lb = len(rb) 

1451 close = False 

1452 if size > lb: 

1453 try: 

1454 new = dbapi_cursor.fetchmany(size - lb) 

1455 except BaseException as e: 

1456 self.handle_exception(result, dbapi_cursor, e) 

1457 else: 

1458 if not new: 

1459 # defer closing since it may clear the row buffer 

1460 close = True 

1461 else: 

1462 rb.extend(new) 

1463 

1464 res = [rb.popleft() for _ in range(min(size, len(rb)))] 

1465 if close: 

1466 result._soft_close() 

1467 return res 

1468 

1469 def fetchall( 

1470 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1471 ) -> Any: 

1472 try: 

1473 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) 

1474 self._rowbuffer.clear() 

1475 result._soft_close() 

1476 return ret 

1477 except BaseException as e: 

1478 self.handle_exception(result, dbapi_cursor, e) 

1479 

1480 

1481class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): 

1482 """A cursor strategy that buffers rows fully upon creation. 

1483 

1484 Used for operations where a result is to be delivered 

1485 after the database conversation can not be continued, 

1486 such as MSSQL INSERT...OUTPUT after an autocommit. 

1487 

1488 """ 

1489 

1490 __slots__ = ("_rowbuffer", "alternate_cursor_description") 

1491 

1492 def __init__( 

1493 self, 

1494 dbapi_cursor: Optional[DBAPICursor], 

1495 alternate_description: Optional[_DBAPICursorDescription] = None, 

1496 initial_buffer: Optional[Iterable[Any]] = None, 

1497 ): 

1498 self.alternate_cursor_description = alternate_description 

1499 if initial_buffer is not None: 

1500 self._rowbuffer = collections.deque(initial_buffer) 

1501 else: 

1502 assert dbapi_cursor is not None 

1503 self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) 

1504 

1505 def yield_per( 

1506 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1507 ) -> Any: 

1508 pass 

1509 

1510 def soft_close( 

1511 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1512 ) -> None: 

1513 self._rowbuffer.clear() 

1514 super().soft_close(result, dbapi_cursor) 

1515 

1516 def hard_close( 

1517 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1518 ) -> None: 

1519 self._rowbuffer.clear() 

1520 super().hard_close(result, dbapi_cursor) 

1521 

1522 def fetchone( 

1523 self, 

1524 result: CursorResult[Any], 

1525 dbapi_cursor: DBAPICursor, 

1526 hard_close: bool = False, 

1527 ) -> Any: 

1528 if self._rowbuffer: 

1529 return self._rowbuffer.popleft() 

1530 else: 

1531 result._soft_close(hard=hard_close) 

1532 return None 

1533 

1534 def fetchmany( 

1535 self, 

1536 result: CursorResult[Any], 

1537 dbapi_cursor: DBAPICursor, 

1538 size: Optional[int] = None, 

1539 ) -> Any: 

1540 if size is None: 

1541 return self.fetchall(result, dbapi_cursor) 

1542 

1543 rb = self._rowbuffer 

1544 rows = [rb.popleft() for _ in range(min(size, len(rb)))] 

1545 if not rows: 

1546 result._soft_close() 

1547 return rows 

1548 

1549 def fetchall( 

1550 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1551 ) -> Any: 

1552 ret = self._rowbuffer 

1553 self._rowbuffer = collections.deque() 

1554 result._soft_close() 

1555 return ret 

1556 

1557 

1558class _NoResultMetaData(ResultMetaData): 

1559 __slots__ = () 

1560 

1561 returns_rows = False 

1562 

1563 def _we_dont_return_rows( 

1564 self, err: Optional[BaseException] = None 

1565 ) -> NoReturn: 

1566 raise exc.ResourceClosedError( 

1567 "This result object does not return rows. " 

1568 "It has been closed automatically." 

1569 ) from err 

1570 

1571 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn: 

1572 self._we_dont_return_rows() 

1573 

1574 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1575 self._we_dont_return_rows() 

1576 

1577 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1578 self._we_dont_return_rows() 

1579 

1580 @property 

1581 def _keymap(self) -> NoReturn: # type: ignore[override] 

1582 self._we_dont_return_rows() 

1583 

1584 @property 

1585 def _key_to_index(self) -> NoReturn: # type: ignore[override] 

1586 self._we_dont_return_rows() 

1587 

1588 @property 

1589 def _processors(self) -> NoReturn: # type: ignore[override] 

1590 self._we_dont_return_rows() 

1591 

1592 @property 

1593 def keys(self) -> NoReturn: 

1594 self._we_dont_return_rows() 

1595 

1596 

1597_NO_RESULT_METADATA = _NoResultMetaData() 

1598 

1599 

1600def null_dml_result() -> IteratorResult[Any]: 

1601 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([])) 

1602 it._soft_close() 

1603 return it 

1604 

1605 

1606class CursorResult(Result[Unpack[_Ts]]): 

1607 """A Result that is representing state from a DBAPI cursor. 

1608 

1609 .. versionchanged:: 1.4 The :class:`.CursorResult`` 

1610 class replaces the previous :class:`.ResultProxy` interface. 

1611 This classes are based on the :class:`.Result` calling API 

1612 which provides an updated usage model and calling facade for 

1613 SQLAlchemy Core and SQLAlchemy ORM. 

1614 

1615 Returns database rows via the :class:`.Row` class, which provides 

1616 additional API features and behaviors on top of the raw data returned by 

1617 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` 

1618 method, other kinds of objects may also be returned. 

1619 

1620 .. seealso:: 

1621 

1622 :ref:`tutorial_selecting_data` - introductory material for accessing 

1623 :class:`_engine.CursorResult` and :class:`.Row` objects. 

1624 

1625 """ 

1626 

1627 __slots__ = ( 

1628 "context", 

1629 "dialect", 

1630 "cursor", 

1631 "cursor_strategy", 

1632 "_echo", 

1633 "connection", 

1634 ) 

1635 

1636 _metadata: Union[CursorResultMetaData, _NoResultMetaData] 

1637 _no_result_metadata = _NO_RESULT_METADATA 

1638 _soft_closed: bool = False 

1639 closed: bool = False 

1640 _is_cursor = True 

1641 

1642 context: DefaultExecutionContext 

1643 dialect: Dialect 

1644 cursor_strategy: ResultFetchStrategy 

1645 connection: Connection 

1646 

1647 def __init__( 

1648 self, 

1649 context: DefaultExecutionContext, 

1650 cursor_strategy: ResultFetchStrategy, 

1651 cursor_description: Optional[_DBAPICursorDescription], 

1652 ): 

1653 self.context = context 

1654 self.dialect = context.dialect 

1655 self.cursor = context.cursor 

1656 self.cursor_strategy = cursor_strategy 

1657 self.connection = context.root_connection 

1658 self._echo = echo = ( 

1659 self.connection._echo and context.engine._should_log_debug() 

1660 ) 

1661 

1662 if cursor_description is not None: 

1663 self._init_metadata(context, cursor_description) 

1664 

1665 if echo: 

1666 log = self.context.connection._log_debug 

1667 

1668 def _log_row(row: Any) -> Any: 

1669 log("Row %r", sql_util._repr_row(row)) 

1670 return row 

1671 

1672 self._row_logging_fn = _log_row 

1673 

1674 # call Result._row_getter to set up the row factory 

1675 self._row_getter 

1676 

1677 else: 

1678 assert context._num_sentinel_cols == 0 

1679 self._metadata = self._no_result_metadata 

1680 

1681 def _init_metadata( 

1682 self, 

1683 context: DefaultExecutionContext, 

1684 cursor_description: _DBAPICursorDescription, 

1685 ) -> CursorResultMetaData: 

1686 driver_column_names = context.execution_options.get( 

1687 "driver_column_names", False 

1688 ) 

1689 if context.compiled: 

1690 compiled = context.compiled 

1691 

1692 metadata: CursorResultMetaData 

1693 

1694 if driver_column_names: 

1695 # TODO: test this case 

1696 metadata = CursorResultMetaData( 

1697 self, 

1698 cursor_description, 

1699 driver_column_names=True, 

1700 num_sentinel_cols=context._num_sentinel_cols, 

1701 ) 

1702 assert not metadata._safe_for_cache 

1703 elif compiled._cached_metadata: 

1704 metadata = compiled._cached_metadata 

1705 else: 

1706 metadata = CursorResultMetaData( 

1707 self, 

1708 cursor_description, 

1709 # the number of sentinel columns is stored on the context 

1710 # but it's a characteristic of the compiled object 

1711 # so it's ok to apply it to a cacheable metadata. 

1712 num_sentinel_cols=context._num_sentinel_cols, 

1713 ) 

1714 if metadata._safe_for_cache: 

1715 compiled._cached_metadata = metadata 

1716 

1717 # result rewrite/ adapt step. this is to suit the case 

1718 # when we are invoked against a cached Compiled object, we want 

1719 # to rewrite the ResultMetaData to reflect the Column objects 

1720 # that are in our current SQL statement object, not the one 

1721 # that is associated with the cached Compiled object. 

1722 # the Compiled object may also tell us to not 

1723 # actually do this step; this is to support the ORM where 

1724 # it is to produce a new Result object in any case, and will 

1725 # be using the cached Column objects against this database result 

1726 # so we don't want to rewrite them. 

1727 # 

1728 # Basically this step suits the use case where the end user 

1729 # is using Core SQL expressions and is accessing columns in the 

1730 # result row using row._mapping[table.c.column]. 

1731 if ( 

1732 not context.execution_options.get( 

1733 "_result_disable_adapt_to_context", False 

1734 ) 

1735 and compiled._result_columns 

1736 and context.cache_hit is context.dialect.CACHE_HIT 

1737 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501 

1738 ): 

1739 metadata = metadata._adapt_to_context(context) 

1740 

1741 self._metadata = metadata 

1742 

1743 else: 

1744 self._metadata = metadata = CursorResultMetaData( 

1745 self, 

1746 cursor_description, 

1747 driver_column_names=driver_column_names, 

1748 ) 

1749 if self._echo: 

1750 context.connection._log_debug( 

1751 "Col %r", tuple(x[0] for x in cursor_description) 

1752 ) 

1753 return metadata 

1754 

1755 def _soft_close(self, hard: bool = False) -> None: 

1756 """Soft close this :class:`_engine.CursorResult`. 

1757 

1758 This releases all DBAPI cursor resources, but leaves the 

1759 CursorResult "open" from a semantic perspective, meaning the 

1760 fetchXXX() methods will continue to return empty results. 

1761 

1762 This method is called automatically when: 

1763 

1764 * all result rows are exhausted using the fetchXXX() methods. 

1765 * cursor.description is None. 

1766 

1767 This method is **not public**, but is documented in order to clarify 

1768 the "autoclose" process used. 

1769 

1770 .. seealso:: 

1771 

1772 :meth:`_engine.CursorResult.close` 

1773 

1774 

1775 """ 

1776 

1777 if (not hard and self._soft_closed) or (hard and self.closed): 

1778 return 

1779 

1780 if hard: 

1781 self.closed = True 

1782 self.cursor_strategy.hard_close(self, self.cursor) 

1783 else: 

1784 self.cursor_strategy.soft_close(self, self.cursor) 

1785 

1786 if not self._soft_closed: 

1787 cursor = self.cursor 

1788 self.cursor = None # type: ignore 

1789 self.connection._safe_close_cursor(cursor) 

1790 self._soft_closed = True 

1791 

1792 @property 

1793 def inserted_primary_key_rows(self) -> List[Optional[Any]]: 

1794 """Return the value of 

1795 :attr:`_engine.CursorResult.inserted_primary_key` 

1796 as a row contained within a list; some dialects may support a 

1797 multiple row form as well. 

1798 

1799 .. note:: As indicated below, in current SQLAlchemy versions this 

1800 accessor is only useful beyond what's already supplied by 

1801 :attr:`_engine.CursorResult.inserted_primary_key` when using the 

1802 :ref:`postgresql_psycopg2` dialect. Future versions hope to 

1803 generalize this feature to more dialects. 

1804 

1805 This accessor is added to support dialects that offer the feature 

1806 that is currently implemented by the :ref:`psycopg2_executemany_mode` 

1807 feature, currently **only the psycopg2 dialect**, which provides 

1808 for many rows to be INSERTed at once while still retaining the 

1809 behavior of being able to return server-generated primary key values. 

1810 

1811 * **When using the psycopg2 dialect, or other dialects that may support 

1812 "fast executemany" style inserts in upcoming releases** : When 

1813 invoking an INSERT statement while passing a list of rows as the 

1814 second argument to :meth:`_engine.Connection.execute`, this accessor 

1815 will then provide a list of rows, where each row contains the primary 

1816 key value for each row that was INSERTed. 

1817 

1818 * **When using all other dialects / backends that don't yet support 

1819 this feature**: This accessor is only useful for **single row INSERT 

1820 statements**, and returns the same information as that of the 

1821 :attr:`_engine.CursorResult.inserted_primary_key` within a 

1822 single-element list. When an INSERT statement is executed in 

1823 conjunction with a list of rows to be INSERTed, the list will contain 

1824 one row per row inserted in the statement, however it will contain 

1825 ``None`` for any server-generated values. 

1826 

1827 Future releases of SQLAlchemy will further generalize the 

1828 "fast execution helper" feature of psycopg2 to suit other dialects, 

1829 thus allowing this accessor to be of more general use. 

1830 

1831 .. versionadded:: 1.4 

1832 

1833 .. seealso:: 

1834 

1835 :attr:`_engine.CursorResult.inserted_primary_key` 

1836 

1837 """ 

1838 if not self.context.compiled: 

1839 raise exc.InvalidRequestError( 

1840 "Statement is not a compiled expression construct." 

1841 ) 

1842 elif not self.context.isinsert: 

1843 raise exc.InvalidRequestError( 

1844 "Statement is not an insert() expression construct." 

1845 ) 

1846 elif self.context._is_explicit_returning: 

1847 raise exc.InvalidRequestError( 

1848 "Can't call inserted_primary_key " 

1849 "when returning() " 

1850 "is used." 

1851 ) 

1852 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501 

1853 

1854 @property 

1855 def inserted_primary_key(self) -> Optional[Any]: 

1856 """Return the primary key for the row just inserted. 

1857 

1858 The return value is a :class:`_result.Row` object representing 

1859 a named tuple of primary key values in the order in which the 

1860 primary key columns are configured in the source 

1861 :class:`_schema.Table`. 

1862 

1863 .. versionchanged:: 1.4.8 - the 

1864 :attr:`_engine.CursorResult.inserted_primary_key` 

1865 value is now a named tuple via the :class:`_result.Row` class, 

1866 rather than a plain tuple. 

1867 

1868 This accessor only applies to single row :func:`_expression.insert` 

1869 constructs which did not explicitly specify 

1870 :meth:`_expression.Insert.returning`. Support for multirow inserts, 

1871 while not yet available for most backends, would be accessed using 

1872 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor. 

1873 

1874 Note that primary key columns which specify a server_default clause, or 

1875 otherwise do not qualify as "autoincrement" columns (see the notes at 

1876 :class:`_schema.Column`), and were generated using the database-side 

1877 default, will appear in this list as ``None`` unless the backend 

1878 supports "returning" and the insert statement executed with the 

1879 "implicit returning" enabled. 

1880 

1881 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1882 statement is not a compiled expression construct 

1883 or is not an insert() construct. 

1884 

1885 """ 

1886 

1887 if self.context.executemany: 

1888 raise exc.InvalidRequestError( 

1889 "This statement was an executemany call; if primary key " 

1890 "returning is supported, please " 

1891 "use .inserted_primary_key_rows." 

1892 ) 

1893 

1894 ikp = self.inserted_primary_key_rows 

1895 if ikp: 

1896 return ikp[0] 

1897 else: 

1898 return None 

1899 

1900 def last_updated_params( 

1901 self, 

1902 ) -> Union[ 

1903 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1904 ]: 

1905 """Return the collection of updated parameters from this 

1906 execution. 

1907 

1908 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1909 statement is not a compiled expression construct 

1910 or is not an update() construct. 

1911 

1912 """ 

1913 if not self.context.compiled: 

1914 raise exc.InvalidRequestError( 

1915 "Statement is not a compiled expression construct." 

1916 ) 

1917 elif not self.context.isupdate: 

1918 raise exc.InvalidRequestError( 

1919 "Statement is not an update() expression construct." 

1920 ) 

1921 elif self.context.executemany: 

1922 return self.context.compiled_parameters 

1923 else: 

1924 return self.context.compiled_parameters[0] 

1925 

1926 def last_inserted_params( 

1927 self, 

1928 ) -> Union[ 

1929 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1930 ]: 

1931 """Return the collection of inserted parameters from this 

1932 execution. 

1933 

1934 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1935 statement is not a compiled expression construct 

1936 or is not an insert() construct. 

1937 

1938 """ 

1939 if not self.context.compiled: 

1940 raise exc.InvalidRequestError( 

1941 "Statement is not a compiled expression construct." 

1942 ) 

1943 elif not self.context.isinsert: 

1944 raise exc.InvalidRequestError( 

1945 "Statement is not an insert() expression construct." 

1946 ) 

1947 elif self.context.executemany: 

1948 return self.context.compiled_parameters 

1949 else: 

1950 return self.context.compiled_parameters[0] 

1951 

1952 @property 

1953 def returned_defaults_rows( 

1954 self, 

1955 ) -> Optional[Sequence[Row[Unpack[TupleAny]]]]: 

1956 """Return a list of rows each containing the values of default 

1957 columns that were fetched using 

1958 the :meth:`.ValuesBase.return_defaults` feature. 

1959 

1960 The return value is a list of :class:`.Row` objects. 

1961 

1962 .. versionadded:: 1.4 

1963 

1964 """ 

1965 return self.context.returned_default_rows 

1966 

1967 def splice_horizontally(self, other: CursorResult[Any]) -> Self: 

1968 """Return a new :class:`.CursorResult` that "horizontally splices" 

1969 together the rows of this :class:`.CursorResult` with that of another 

1970 :class:`.CursorResult`. 

1971 

1972 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

1973 not intended for general use. 

1974 

1975 "horizontally splices" means that for each row in the first and second 

1976 result sets, a new row that concatenates the two rows together is 

1977 produced, which then becomes the new row. The incoming 

1978 :class:`.CursorResult` must have the identical number of rows. It is 

1979 typically expected that the two result sets come from the same sort 

1980 order as well, as the result rows are spliced together based on their 

1981 position in the result. 

1982 

1983 The expected use case here is so that multiple INSERT..RETURNING 

1984 statements (which definitely need to be sorted) against different 

1985 tables can produce a single result that looks like a JOIN of those two 

1986 tables. 

1987 

1988 E.g.:: 

1989 

1990 r1 = connection.execute( 

1991 users.insert().returning( 

1992 users.c.user_name, users.c.user_id, sort_by_parameter_order=True 

1993 ), 

1994 user_values, 

1995 ) 

1996 

1997 r2 = connection.execute( 

1998 addresses.insert().returning( 

1999 addresses.c.address_id, 

2000 addresses.c.address, 

2001 addresses.c.user_id, 

2002 sort_by_parameter_order=True, 

2003 ), 

2004 address_values, 

2005 ) 

2006 

2007 rows = r1.splice_horizontally(r2).all() 

2008 assert rows == [ 

2009 ("john", 1, 1, "foo@bar.com", 1), 

2010 ("jack", 2, 2, "bar@bat.com", 2), 

2011 ] 

2012 

2013 .. versionadded:: 2.0 

2014 

2015 .. seealso:: 

2016 

2017 :meth:`.CursorResult.splice_vertically` 

2018 

2019 

2020 """ # noqa: E501 

2021 

2022 clone = self._generate() 

2023 assert clone is self # just to note 

2024 assert isinstance(other._metadata, CursorResultMetaData) 

2025 assert isinstance(self._metadata, CursorResultMetaData) 

2026 self_tf = self._metadata._tuplefilter 

2027 other_tf = other._metadata._tuplefilter 

2028 clone._metadata = self._metadata._splice_horizontally(other._metadata) 

2029 

2030 total_rows = [ 

2031 tuple(r1 if self_tf is None else self_tf(r1)) 

2032 + tuple(r2 if other_tf is None else other_tf(r2)) 

2033 for r1, r2 in zip( 

2034 list(self._raw_row_iterator()), 

2035 list(other._raw_row_iterator()), 

2036 ) 

2037 ] 

2038 

2039 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2040 None, 

2041 initial_buffer=total_rows, 

2042 ) 

2043 clone._reset_memoizations() 

2044 return clone 

2045 

2046 def splice_vertically(self, other: CursorResult[Any]) -> Self: 

2047 """Return a new :class:`.CursorResult` that "vertically splices", 

2048 i.e. "extends", the rows of this :class:`.CursorResult` with that of 

2049 another :class:`.CursorResult`. 

2050 

2051 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

2052 not intended for general use. 

2053 

2054 "vertically splices" means the rows of the given result are appended to 

2055 the rows of this cursor result. The incoming :class:`.CursorResult` 

2056 must have rows that represent the identical list of columns in the 

2057 identical order as they are in this :class:`.CursorResult`. 

2058 

2059 .. versionadded:: 2.0 

2060 

2061 .. seealso:: 

2062 

2063 :meth:`.CursorResult.splice_horizontally` 

2064 

2065 """ 

2066 clone = self._generate() 

2067 total_rows = list(self._raw_row_iterator()) + list( 

2068 other._raw_row_iterator() 

2069 ) 

2070 

2071 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2072 None, 

2073 initial_buffer=total_rows, 

2074 ) 

2075 clone._reset_memoizations() 

2076 return clone 

2077 

2078 def _rewind(self, rows: Any) -> Self: 

2079 """rewind this result back to the given rowset. 

2080 

2081 this is used internally for the case where an :class:`.Insert` 

2082 construct combines the use of 

2083 :meth:`.Insert.return_defaults` along with the 

2084 "supplemental columns" feature. 

2085 

2086 NOTE: this method has not effect then an unique filter is applied 

2087 to the result, meaning that no row will be returned. 

2088 

2089 """ 

2090 

2091 if self._echo: 

2092 self.context.connection._log_debug( 

2093 "CursorResult rewound %d row(s)", len(rows) 

2094 ) 

2095 

2096 # the rows given are expected to be Row objects, so we 

2097 # have to clear out processors which have already run on these 

2098 # rows 

2099 self._metadata = cast( 

2100 CursorResultMetaData, self._metadata 

2101 )._remove_processors_and_tuple_filter() 

2102 

2103 self.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2104 None, 

2105 # TODO: if these are Row objects, can we save on not having to 

2106 # re-make new Row objects out of them a second time? is that 

2107 # what's actually happening right now? maybe look into this 

2108 initial_buffer=rows, 

2109 ) 

2110 self._reset_memoizations() 

2111 return self 

2112 

2113 @property 

2114 def returned_defaults(self) -> Optional[Row[Unpack[TupleAny]]]: 

2115 """Return the values of default columns that were fetched using 

2116 the :meth:`.ValuesBase.return_defaults` feature. 

2117 

2118 The value is an instance of :class:`.Row`, or ``None`` 

2119 if :meth:`.ValuesBase.return_defaults` was not used or if the 

2120 backend does not support RETURNING. 

2121 

2122 .. seealso:: 

2123 

2124 :meth:`.ValuesBase.return_defaults` 

2125 

2126 """ 

2127 

2128 if self.context.executemany: 

2129 raise exc.InvalidRequestError( 

2130 "This statement was an executemany call; if return defaults " 

2131 "is supported, please use .returned_defaults_rows." 

2132 ) 

2133 

2134 rows = self.context.returned_default_rows 

2135 if rows: 

2136 return rows[0] 

2137 else: 

2138 return None 

2139 

2140 def lastrow_has_defaults(self) -> bool: 

2141 """Return ``lastrow_has_defaults()`` from the underlying 

2142 :class:`.ExecutionContext`. 

2143 

2144 See :class:`.ExecutionContext` for details. 

2145 

2146 """ 

2147 

2148 return self.context.lastrow_has_defaults() 

2149 

2150 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2151 """Return ``postfetch_cols()`` from the underlying 

2152 :class:`.ExecutionContext`. 

2153 

2154 See :class:`.ExecutionContext` for details. 

2155 

2156 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2157 statement is not a compiled expression construct 

2158 or is not an insert() or update() construct. 

2159 

2160 """ 

2161 

2162 if not self.context.compiled: 

2163 raise exc.InvalidRequestError( 

2164 "Statement is not a compiled expression construct." 

2165 ) 

2166 elif not self.context.isinsert and not self.context.isupdate: 

2167 raise exc.InvalidRequestError( 

2168 "Statement is not an insert() or update() " 

2169 "expression construct." 

2170 ) 

2171 return self.context.postfetch_cols 

2172 

2173 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2174 """Return ``prefetch_cols()`` from the underlying 

2175 :class:`.ExecutionContext`. 

2176 

2177 See :class:`.ExecutionContext` for details. 

2178 

2179 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2180 statement is not a compiled expression construct 

2181 or is not an insert() or update() construct. 

2182 

2183 """ 

2184 

2185 if not self.context.compiled: 

2186 raise exc.InvalidRequestError( 

2187 "Statement is not a compiled expression construct." 

2188 ) 

2189 elif not self.context.isinsert and not self.context.isupdate: 

2190 raise exc.InvalidRequestError( 

2191 "Statement is not an insert() or update() " 

2192 "expression construct." 

2193 ) 

2194 return self.context.prefetch_cols 

2195 

2196 def supports_sane_rowcount(self) -> bool: 

2197 """Return ``supports_sane_rowcount`` from the dialect. 

2198 

2199 See :attr:`_engine.CursorResult.rowcount` for background. 

2200 

2201 """ 

2202 

2203 return self.dialect.supports_sane_rowcount 

2204 

2205 def supports_sane_multi_rowcount(self) -> bool: 

2206 """Return ``supports_sane_multi_rowcount`` from the dialect. 

2207 

2208 See :attr:`_engine.CursorResult.rowcount` for background. 

2209 

2210 """ 

2211 

2212 return self.dialect.supports_sane_multi_rowcount 

2213 

2214 @util.memoized_property 

2215 def rowcount(self) -> int: 

2216 """Return the 'rowcount' for this result. 

2217 

2218 The primary purpose of 'rowcount' is to report the number of rows 

2219 matched by the WHERE criterion of an UPDATE or DELETE statement 

2220 executed once (i.e. for a single parameter set), which may then be 

2221 compared to the number of rows expected to be updated or deleted as a 

2222 means of asserting data integrity. 

2223 

2224 This attribute is transferred from the ``cursor.rowcount`` attribute 

2225 of the DBAPI before the cursor is closed, to support DBAPIs that 

2226 don't make this value available after cursor close. Some DBAPIs may 

2227 offer meaningful values for other kinds of statements, such as INSERT 

2228 and SELECT statements as well. In order to retrieve ``cursor.rowcount`` 

2229 for these statements, set the 

2230 :paramref:`.Connection.execution_options.preserve_rowcount` 

2231 execution option to True, which will cause the ``cursor.rowcount`` 

2232 value to be unconditionally memoized before any results are returned 

2233 or the cursor is closed, regardless of statement type. 

2234 

2235 For cases where the DBAPI does not support rowcount for a particular 

2236 kind of statement and/or execution, the returned value will be ``-1``, 

2237 which is delivered directly from the DBAPI and is part of :pep:`249`. 

2238 All DBAPIs should support rowcount for single-parameter-set 

2239 UPDATE and DELETE statements, however. 

2240 

2241 .. note:: 

2242 

2243 Notes regarding :attr:`_engine.CursorResult.rowcount`: 

2244 

2245 

2246 * This attribute returns the number of rows *matched*, 

2247 which is not necessarily the same as the number of rows 

2248 that were actually *modified*. For example, an UPDATE statement 

2249 may have no net change on a given row if the SET values 

2250 given are the same as those present in the row already. 

2251 Such a row would be matched but not modified. 

2252 On backends that feature both styles, such as MySQL, 

2253 rowcount is configured to return the match 

2254 count in all cases. 

2255 

2256 * :attr:`_engine.CursorResult.rowcount` in the default case is 

2257 *only* useful in conjunction with an UPDATE or DELETE statement, 

2258 and only with a single set of parameters. For other kinds of 

2259 statements, SQLAlchemy will not attempt to pre-memoize the value 

2260 unless the 

2261 :paramref:`.Connection.execution_options.preserve_rowcount` 

2262 execution option is used. Note that contrary to :pep:`249`, many 

2263 DBAPIs do not support rowcount values for statements that are not 

2264 UPDATE or DELETE, particularly when rows are being returned which 

2265 are not fully pre-buffered. DBAPIs that dont support rowcount 

2266 for a particular kind of statement should return the value ``-1`` 

2267 for such statements. 

2268 

2269 * :attr:`_engine.CursorResult.rowcount` may not be meaningful 

2270 when executing a single statement with multiple parameter sets 

2271 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount" 

2272 values across multiple parameter sets and will return ``-1`` 

2273 when accessed. 

2274 

2275 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support 

2276 a correct population of :attr:`_engine.CursorResult.rowcount` 

2277 when the :paramref:`.Connection.execution_options.preserve_rowcount` 

2278 execution option is set to True. 

2279 

2280 * Statements that use RETURNING may not support rowcount, returning 

2281 a ``-1`` value instead. 

2282 

2283 .. seealso:: 

2284 

2285 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial` 

2286 

2287 :paramref:`.Connection.execution_options.preserve_rowcount` 

2288 

2289 """ # noqa: E501 

2290 try: 

2291 return self.context.rowcount 

2292 except BaseException as e: 

2293 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2294 raise # not called 

2295 

2296 @property 

2297 def lastrowid(self) -> int: 

2298 """Return the 'lastrowid' accessor on the DBAPI cursor. 

2299 

2300 This is a DBAPI specific method and is only functional 

2301 for those backends which support it, for statements 

2302 where it is appropriate. It's behavior is not 

2303 consistent across backends. 

2304 

2305 Usage of this method is normally unnecessary when 

2306 using insert() expression constructs; the 

2307 :attr:`~CursorResult.inserted_primary_key` attribute provides a 

2308 tuple of primary key values for a newly inserted row, 

2309 regardless of database backend. 

2310 

2311 """ 

2312 try: 

2313 return self.context.get_lastrowid() 

2314 except BaseException as e: 

2315 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2316 

2317 @property 

2318 def returns_rows(self) -> bool: 

2319 """True if this :class:`_engine.CursorResult` returns zero or more 

2320 rows. 

2321 

2322 I.e. if it is legal to call the methods 

2323 :meth:`_engine.CursorResult.fetchone`, 

2324 :meth:`_engine.CursorResult.fetchmany` 

2325 :meth:`_engine.CursorResult.fetchall`. 

2326 

2327 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should 

2328 always be synonymous with whether or not the DBAPI cursor had a 

2329 ``.description`` attribute, indicating the presence of result columns, 

2330 noting that a cursor that returns zero rows still has a 

2331 ``.description`` if a row-returning statement was emitted. 

2332 

2333 This attribute should be True for all results that are against 

2334 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE 

2335 that use RETURNING. For INSERT/UPDATE/DELETE statements that were 

2336 not using RETURNING, the value will usually be False, however 

2337 there are some dialect-specific exceptions to this, such as when 

2338 using the MSSQL / pyodbc dialect a SELECT is emitted inline in 

2339 order to retrieve an inserted primary key value. 

2340 

2341 .. seealso:: 

2342 

2343 :meth:`.Result.close` 

2344 

2345 :attr:`.Result.closed` 

2346 

2347 """ 

2348 return self._metadata.returns_rows 

2349 

2350 @property 

2351 def is_insert(self) -> bool: 

2352 """True if this :class:`_engine.CursorResult` is the result 

2353 of a executing an expression language compiled 

2354 :func:`_expression.insert` construct. 

2355 

2356 When True, this implies that the 

2357 :attr:`inserted_primary_key` attribute is accessible, 

2358 assuming the statement did not include 

2359 a user defined "returning" construct. 

2360 

2361 """ 

2362 return self.context.isinsert 

2363 

2364 def _fetchiter_impl(self) -> Iterator[Any]: 

2365 fetchone = self.cursor_strategy.fetchone 

2366 

2367 while True: 

2368 row = fetchone(self, self.cursor) 

2369 if row is None: 

2370 break 

2371 yield row 

2372 

2373 def _fetchone_impl(self, hard_close: bool = False) -> Any: 

2374 return self.cursor_strategy.fetchone(self, self.cursor, hard_close) 

2375 

2376 def _fetchall_impl(self) -> Any: 

2377 return self.cursor_strategy.fetchall(self, self.cursor) 

2378 

2379 def _fetchmany_impl(self, size: Optional[int] = None) -> Any: 

2380 return self.cursor_strategy.fetchmany(self, self.cursor, size) 

2381 

2382 def _raw_row_iterator(self) -> Any: 

2383 return self._fetchiter_impl() 

2384 

2385 def merge( 

2386 self, *others: Result[Unpack[TupleAny]] 

2387 ) -> MergedResult[Unpack[TupleAny]]: 

2388 merged_result = super().merge(*others) 

2389 if self.context._has_rowcount: 

2390 merged_result.rowcount = sum( 

2391 cast("CursorResult[Any]", result).rowcount 

2392 for result in (self,) + others 

2393 ) 

2394 return merged_result 

2395 

2396 def close(self) -> None: 

2397 """Close this :class:`_engine.CursorResult`. 

2398 

2399 This closes out the underlying DBAPI cursor corresponding to the 

2400 statement execution, if one is still present. Note that the DBAPI 

2401 cursor is automatically released when the :class:`_engine.CursorResult` 

2402 exhausts all available rows. :meth:`_engine.CursorResult.close` is 

2403 generally an optional method except in the case when discarding a 

2404 :class:`_engine.CursorResult` that still has additional rows pending 

2405 for fetch. 

2406 

2407 After this method is called, it is no longer valid to call upon 

2408 the fetch methods, which will raise a :class:`.ResourceClosedError` 

2409 on subsequent use. 

2410 

2411 .. seealso:: 

2412 

2413 :ref:`connections_toplevel` 

2414 

2415 """ 

2416 self._soft_close(hard=True) 

2417 

2418 @_generative 

2419 def yield_per(self, num: int) -> Self: 

2420 self._yield_per = num 

2421 self.cursor_strategy.yield_per(self, self.cursor, num) 

2422 return self 

2423 

2424 

2425ResultProxy = CursorResult