Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

764 statements  

1# engine/cursor.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Define cursor-specific result set constructs including 

9:class:`.CursorResult`.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15import functools 

16import operator 

17import typing 

18from typing import Any 

19from typing import cast 

20from typing import ClassVar 

21from typing import Deque 

22from typing import Dict 

23from typing import Final 

24from typing import Iterable 

25from typing import Iterator 

26from typing import List 

27from typing import Literal 

28from typing import Mapping 

29from typing import NoReturn 

30from typing import Optional 

31from typing import Sequence 

32from typing import Tuple 

33from typing import TYPE_CHECKING 

34from typing import Union 

35 

36from .result import IteratorResult 

37from .result import MergedResult 

38from .result import Result 

39from .result import ResultMetaData 

40from .result import SimpleResultMetaData 

41from .result import tuplegetter 

42from .row import Row 

43from .. import exc 

44from .. import util 

45from ..sql import elements 

46from ..sql import sqltypes 

47from ..sql import util as sql_util 

48from ..sql.base import _generative 

49from ..sql.compiler import ResultColumnsEntry 

50from ..sql.compiler import RM_NAME 

51from ..sql.compiler import RM_OBJECTS 

52from ..sql.compiler import RM_RENDERED_NAME 

53from ..sql.compiler import RM_TYPE 

54from ..sql.type_api import TypeEngine 

55from ..util.typing import Self 

56from ..util.typing import TupleAny 

57from ..util.typing import TypeVarTuple 

58from ..util.typing import Unpack 

59 

60 

61if typing.TYPE_CHECKING: 

62 from .base import Connection 

63 from .default import DefaultExecutionContext 

64 from .interfaces import _DBAPICursorDescription 

65 from .interfaces import _MutableCoreSingleExecuteParams 

66 from .interfaces import CoreExecuteOptionsParameter 

67 from .interfaces import DBAPICursor 

68 from .interfaces import DBAPIType 

69 from .interfaces import Dialect 

70 from .interfaces import ExecutionContext 

71 from .result import _KeyIndexType 

72 from .result import _KeyMapRecType 

73 from .result import _KeyMapType 

74 from .result import _KeyType 

75 from .result import _ProcessorsType 

76 from .result import _TupleGetterType 

77 from ..sql.schema import Column 

78 from ..sql.type_api import _ResultProcessorType 

79 

80 

81_Ts = TypeVarTuple("_Ts") 

82 

83 

84# metadata entry tuple indexes. 

85# using raw tuple is faster than namedtuple. 

86# these match up to the positions in 

87# _CursorKeyMapRecType 

88MD_INDEX: Final[Literal[0]] = 0 

89"""integer index in cursor.description 

90 

91""" 

92 

93MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1 

94"""integer index in compiled._result_columns""" 

95 

96MD_OBJECTS: Final[Literal[2]] = 2 

97"""other string keys and ColumnElement obj that can match. 

98 

99This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects 

100 

101""" 

102 

103MD_LOOKUP_KEY: Final[Literal[3]] = 3 

104"""string key we usually expect for key-based lookup 

105 

106this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name 

107""" 

108 

109 

110MD_RENDERED_NAME: Final[Literal[4]] = 4 

111"""name that is usually in cursor.description 

112 

113this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname 

114""" 

115 

116 

117MD_PROCESSOR: Final[Literal[5]] = 5 

118"""callable to process a result value into a row""" 

119 

120MD_UNTRANSLATED: Final[Literal[6]] = 6 

121"""raw name from cursor.description""" 

122 

123 

124_CursorKeyMapRecType = Tuple[ 

125 Optional[int], # MD_INDEX, None means the record is ambiguously named 

126 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None 

127 TupleAny, # MD_OBJECTS 

128 str, # MD_LOOKUP_KEY 

129 str, # MD_RENDERED_NAME 

130 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR 

131 Optional[str], # MD_UNTRANSLATED 

132] 

133 

134_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType] 

135 

136# same as _CursorKeyMapRecType except the MD_INDEX value is definitely 

137# not None 

138_NonAmbigCursorKeyMapRecType = Tuple[ 

139 int, 

140 int, 

141 List[Any], 

142 str, 

143 str, 

144 Optional["_ResultProcessorType[Any]"], 

145 str, 

146] 

147 

148_MergeColTuple = Tuple[ 

149 int, 

150 Optional[int], 

151 str, 

152 TypeEngine[Any], 

153 "DBAPIType", 

154 Optional[TupleAny], 

155 Optional[str], 

156] 

157 

158 

159class CursorResultMetaData(ResultMetaData): 

160 """Result metadata for DBAPI cursors.""" 

161 

162 __slots__ = ( 

163 "_keymap", 

164 "_processors", 

165 "_keys", 

166 "_keymap_by_result_column_idx", 

167 "_tuplefilter", 

168 "_translated_indexes", 

169 "_safe_for_cache", 

170 "_unpickled", 

171 "_key_to_index", 

172 # don't need _unique_filters support here for now. Can be added 

173 # if a need arises. 

174 ) 

175 

176 _keymap: _CursorKeyMapType 

177 _processors: _ProcessorsType 

178 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]] 

179 _unpickled: bool 

180 _safe_for_cache: bool 

181 _translated_indexes: Optional[List[int]] 

182 

183 returns_rows: ClassVar[bool] = True 

184 

185 def _has_key(self, key: Any) -> bool: 

186 return key in self._keymap 

187 

188 def _for_freeze(self) -> ResultMetaData: 

189 return SimpleResultMetaData( 

190 self._keys, 

191 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys], 

192 ) 

193 

194 def _make_new_metadata( 

195 self, 

196 *, 

197 unpickled: bool, 

198 processors: _ProcessorsType, 

199 keys: Sequence[str], 

200 keymap: _KeyMapType, 

201 tuplefilter: Optional[_TupleGetterType], 

202 translated_indexes: Optional[List[int]], 

203 safe_for_cache: bool, 

204 keymap_by_result_column_idx: Any, 

205 ) -> Self: 

206 new_obj = self.__class__.__new__(self.__class__) 

207 new_obj._unpickled = unpickled 

208 new_obj._processors = processors 

209 new_obj._keys = keys 

210 new_obj._keymap = keymap 

211 new_obj._tuplefilter = tuplefilter 

212 new_obj._translated_indexes = translated_indexes 

213 new_obj._safe_for_cache = safe_for_cache 

214 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx 

215 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX) 

216 return new_obj 

217 

218 def _remove_processors_and_tuple_filter(self) -> Self: 

219 if self._tuplefilter: 

220 proc = self._tuplefilter(self._processors) 

221 else: 

222 proc = self._processors 

223 return self._make_new_metadata( 

224 unpickled=self._unpickled, 

225 processors=[None] * len(proc), 

226 tuplefilter=None, 

227 translated_indexes=None, 

228 keymap={ 

229 key: value[0:5] + (None,) + value[6:] 

230 for key, value in self._keymap.items() 

231 }, 

232 keys=self._keys, 

233 safe_for_cache=self._safe_for_cache, 

234 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

235 ) 

236 

237 def _splice_horizontally(self, other: CursorResultMetaData) -> Self: 

238 keymap = dict(self._keymap) 

239 offset = len(self._keys) 

240 

241 for key, value in other._keymap.items(): 

242 # int index should be None for ambiguous key 

243 if value[MD_INDEX] is not None and key not in keymap: 

244 md_index = value[MD_INDEX] + offset 

245 md_object = value[MD_RESULT_MAP_INDEX] + offset 

246 else: 

247 md_index = None 

248 md_object = -1 

249 keymap[key] = (md_index, md_object, *value[2:]) 

250 

251 self_tf = self._tuplefilter 

252 other_tf = other._tuplefilter 

253 

254 proc: List[Any] = [] 

255 for pp, tf in [ 

256 (self._processors, self_tf), 

257 (other._processors, other_tf), 

258 ]: 

259 proc.extend(pp if tf is None else tf(pp)) 

260 

261 new_keys = [*self._keys, *other._keys] 

262 assert len(proc) == len(new_keys) 

263 

264 return self._make_new_metadata( 

265 unpickled=self._unpickled, 

266 processors=proc, 

267 tuplefilter=None, 

268 translated_indexes=None, 

269 keys=new_keys, 

270 keymap=keymap, 

271 safe_for_cache=self._safe_for_cache, 

272 keymap_by_result_column_idx={ 

273 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

274 for metadata_entry in keymap.values() 

275 }, 

276 ) 

277 

278 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self: 

279 recs = list(self._metadata_for_keys(keys)) 

280 

281 indexes = [rec[MD_INDEX] for rec in recs] 

282 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] 

283 

284 if self._translated_indexes: 

285 indexes = [self._translated_indexes[idx] for idx in indexes] 

286 tup = tuplegetter(*indexes) 

287 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] 

288 

289 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} 

290 # TODO: need unit test for: 

291 # result = connection.execute("raw sql, no columns").scalars() 

292 # without the "or ()" it's failing because MD_OBJECTS is None 

293 keymap.update( 

294 (e, new_rec) 

295 for new_rec in new_recs 

296 for e in new_rec[MD_OBJECTS] or () 

297 ) 

298 

299 return self._make_new_metadata( 

300 unpickled=self._unpickled, 

301 processors=self._processors, 

302 keys=new_keys, 

303 tuplefilter=tup, 

304 translated_indexes=indexes, 

305 keymap=keymap, # type: ignore[arg-type] 

306 safe_for_cache=self._safe_for_cache, 

307 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

308 ) 

309 

310 def _adapt_to_context(self, context: ExecutionContext) -> Self: 

311 """When using a cached Compiled construct that has a _result_map, 

312 for a new statement that used the cached Compiled, we need to ensure 

313 the keymap has the Column objects from our new statement as keys. 

314 So here we rewrite keymap with new entries for the new columns 

315 as matched to those of the cached statement. 

316 

317 """ 

318 

319 if not context.compiled or not context.compiled._result_columns: 

320 return self 

321 

322 compiled_statement = context.compiled.statement 

323 invoked_statement = context.invoked_statement 

324 

325 if TYPE_CHECKING: 

326 assert isinstance(invoked_statement, elements.ClauseElement) 

327 

328 if compiled_statement is invoked_statement: 

329 return self 

330 

331 assert invoked_statement is not None 

332 

333 # this is the most common path for Core statements when 

334 # caching is used. In ORM use, this codepath is not really used 

335 # as the _result_disable_adapt_to_context execution option is 

336 # set by the ORM. 

337 

338 # make a copy and add the columns from the invoked statement 

339 # to the result map. 

340 

341 keymap_by_position = self._keymap_by_result_column_idx 

342 

343 if keymap_by_position is None: 

344 # first retrival from cache, this map will not be set up yet, 

345 # initialize lazily 

346 keymap_by_position = self._keymap_by_result_column_idx = { 

347 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

348 for metadata_entry in self._keymap.values() 

349 } 

350 

351 return self._make_new_metadata( 

352 keymap=self._keymap 

353 | { 

354 new: keymap_by_position[idx] 

355 for idx, new in enumerate( 

356 invoked_statement._all_selected_columns 

357 ) 

358 if idx in keymap_by_position 

359 }, 

360 unpickled=self._unpickled, 

361 processors=self._processors, 

362 tuplefilter=self._tuplefilter, 

363 translated_indexes=None, 

364 keys=self._keys, 

365 safe_for_cache=self._safe_for_cache, 

366 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

367 ) 

368 

369 def __init__( 

370 self, 

371 parent: CursorResult[Unpack[TupleAny]], 

372 cursor_description: _DBAPICursorDescription, 

373 *, 

374 driver_column_names: bool = False, 

375 num_sentinel_cols: int = 0, 

376 ): 

377 context = parent.context 

378 if num_sentinel_cols > 0: 

379 # this is slightly faster than letting tuplegetter use the indexes 

380 self._tuplefilter = tuplefilter = operator.itemgetter( 

381 slice(-num_sentinel_cols) 

382 ) 

383 cursor_description = tuplefilter(cursor_description) 

384 else: 

385 self._tuplefilter = tuplefilter = None 

386 self._translated_indexes = None 

387 self._safe_for_cache = self._unpickled = False 

388 

389 if context.result_column_struct: 

390 ( 

391 result_columns, 

392 cols_are_ordered, 

393 textual_ordered, 

394 ad_hoc_textual, 

395 loose_column_name_matching, 

396 ) = context.result_column_struct 

397 if tuplefilter is not None: 

398 result_columns = tuplefilter(result_columns) 

399 num_ctx_cols = len(result_columns) 

400 else: 

401 result_columns = cols_are_ordered = ( # type: ignore 

402 num_ctx_cols 

403 ) = ad_hoc_textual = loose_column_name_matching = ( 

404 textual_ordered 

405 ) = False 

406 

407 # merge cursor.description with the column info 

408 # present in the compiled structure, if any 

409 raw = self._merge_cursor_description( 

410 context, 

411 cursor_description, 

412 result_columns, 

413 num_ctx_cols, 

414 cols_are_ordered, 

415 textual_ordered, 

416 ad_hoc_textual, 

417 loose_column_name_matching, 

418 driver_column_names, 

419 ) 

420 

421 # processors in key order which are used when building up 

422 # a row 

423 self._processors = [ 

424 metadata_entry[MD_PROCESSOR] for metadata_entry in raw 

425 ] 

426 if num_sentinel_cols > 0: 

427 # add the number of sentinel columns since these are passed 

428 # to the tuplefilters before being used 

429 self._processors.extend([None] * num_sentinel_cols) 

430 

431 # this is used when using this ResultMetaData in a Core-only cache 

432 # retrieval context. it's initialized on first cache retrieval 

433 # when the _result_disable_adapt_to_context execution option 

434 # (which the ORM generally sets) is not set. 

435 self._keymap_by_result_column_idx = None 

436 

437 # for compiled SQL constructs, copy additional lookup keys into 

438 # the key lookup map, such as Column objects, labels, 

439 # column keys and other names 

440 if num_ctx_cols: 

441 # keymap by primary string... 

442 by_key: Dict[_KeyType, _CursorKeyMapRecType] = { 

443 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

444 for metadata_entry in raw 

445 } 

446 

447 if len(by_key) != num_ctx_cols: 

448 # if by-primary-string dictionary smaller than 

449 # number of columns, assume we have dupes; (this check 

450 # is also in place if string dictionary is bigger, as 

451 # can occur when '*' was used as one of the compiled columns, 

452 # which may or may not be suggestive of dupes), rewrite 

453 # dupe records with "None" for index which results in 

454 # ambiguous column exception when accessed. 

455 # 

456 # this is considered to be the less common case as it is not 

457 # common to have dupe column keys in a SELECT statement. 

458 # 

459 # new in 1.4: get the complete set of all possible keys, 

460 # strings, objects, whatever, that are dupes across two 

461 # different records, first. 

462 index_by_key: Dict[Any, Any] = {} 

463 dupes = set() 

464 for metadata_entry in raw: 

465 for key in (metadata_entry[MD_RENDERED_NAME],) + ( 

466 metadata_entry[MD_OBJECTS] or () 

467 ): 

468 idx = metadata_entry[MD_INDEX] 

469 # if this key has been associated with more than one 

470 # positional index, it's a dupe 

471 if index_by_key.setdefault(key, idx) != idx: 

472 dupes.add(key) 

473 

474 # then put everything we have into the keymap excluding only 

475 # those keys that are dupes. 

476 self._keymap = { 

477 obj_elem: metadata_entry 

478 for metadata_entry in raw 

479 if metadata_entry[MD_OBJECTS] 

480 for obj_elem in metadata_entry[MD_OBJECTS] 

481 if obj_elem not in dupes 

482 } 

483 

484 # then for the dupe keys, put the "ambiguous column" 

485 # record into by_key. 

486 by_key.update( 

487 { 

488 key: (None, -1, (), key, key, None, None) 

489 for key in dupes 

490 } 

491 ) 

492 

493 else: 

494 # no dupes - copy secondary elements from compiled 

495 # columns into self._keymap. this is the most common 

496 # codepath for Core / ORM statement executions before the 

497 # result metadata is cached 

498 self._keymap = { 

499 obj_elem: metadata_entry 

500 for metadata_entry in raw 

501 if metadata_entry[MD_OBJECTS] 

502 for obj_elem in metadata_entry[MD_OBJECTS] 

503 } 

504 # update keymap with primary string names taking 

505 # precedence 

506 self._keymap.update(by_key) 

507 else: 

508 # no compiled objects to map, just create keymap by primary string 

509 self._keymap = { 

510 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

511 for metadata_entry in raw 

512 } 

513 

514 # update keymap with "translated" names. 

515 # the "translated" name thing has a long history: 

516 # 1. originally, it was used to fix an issue in very old SQLite 

517 # versions prior to 3.10.0. This code is still there in the 

518 # sqlite dialect. 

519 # 2. Next, the pyhive third party dialect started using this hook 

520 # for some driver related issue on their end. 

521 # 3. Most recently, the "driver_column_names" execution option has 

522 # taken advantage of this hook to get raw DBAPI col names in the 

523 # result keys without disrupting the usual merge process. 

524 

525 if driver_column_names or ( 

526 not num_ctx_cols and context._translate_colname 

527 ): 

528 self._keymap.update( 

529 { 

530 metadata_entry[MD_UNTRANSLATED]: self._keymap[ 

531 metadata_entry[MD_LOOKUP_KEY] 

532 ] 

533 for metadata_entry in raw 

534 if metadata_entry[MD_UNTRANSLATED] 

535 } 

536 ) 

537 

538 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

539 

540 def _merge_cursor_description( 

541 self, 

542 context: DefaultExecutionContext, 

543 cursor_description: _DBAPICursorDescription, 

544 result_columns: Sequence[ResultColumnsEntry], 

545 num_ctx_cols: int, 

546 cols_are_ordered: bool, 

547 textual_ordered: bool, 

548 ad_hoc_textual: bool, 

549 loose_column_name_matching: bool, 

550 driver_column_names: bool, 

551 ) -> List[_CursorKeyMapRecType]: 

552 """Merge a cursor.description with compiled result column information. 

553 

554 There are at least four separate strategies used here, selected 

555 depending on the type of SQL construct used to start with. 

556 

557 The most common case is that of the compiled SQL expression construct, 

558 which generated the column names present in the raw SQL string and 

559 which has the identical number of columns as were reported by 

560 cursor.description. In this case, we assume a 1-1 positional mapping 

561 between the entries in cursor.description and the compiled object. 

562 This is also the most performant case as we disregard extracting / 

563 decoding the column names present in cursor.description since we 

564 already have the desired name we generated in the compiled SQL 

565 construct. 

566 

567 The next common case is that of the completely raw string SQL, 

568 such as passed to connection.execute(). In this case we have no 

569 compiled construct to work with, so we extract and decode the 

570 names from cursor.description and index those as the primary 

571 result row target keys. 

572 

573 The remaining fairly common case is that of the textual SQL 

574 that includes at least partial column information; this is when 

575 we use a :class:`_expression.TextualSelect` construct. 

576 This construct may have 

577 unordered or ordered column information. In the ordered case, we 

578 merge the cursor.description and the compiled construct's information 

579 positionally, and warn if there are additional description names 

580 present, however we still decode the names in cursor.description 

581 as we don't have a guarantee that the names in the columns match 

582 on these. In the unordered case, we match names in cursor.description 

583 to that of the compiled construct based on name matching. 

584 In both of these cases, the cursor.description names and the column 

585 expression objects and names are indexed as result row target keys. 

586 

587 The final case is much less common, where we have a compiled 

588 non-textual SQL expression construct, but the number of columns 

589 in cursor.description doesn't match what's in the compiled 

590 construct. We make the guess here that there might be textual 

591 column expressions in the compiled construct that themselves include 

592 a comma in them causing them to split. We do the same name-matching 

593 as with textual non-ordered columns. 

594 

595 The name-matched system of merging is the same as that used by 

596 SQLAlchemy for all cases up through the 0.9 series. Positional 

597 matching for compiled SQL expressions was introduced in 1.0 as a 

598 major performance feature, and positional matching for textual 

599 :class:`_expression.TextualSelect` objects in 1.1. 

600 As name matching is no longer 

601 a common case, it was acceptable to factor it into smaller generator- 

602 oriented methods that are easier to understand, but incur slightly 

603 more performance overhead. 

604 

605 """ 

606 

607 if ( 

608 num_ctx_cols 

609 and cols_are_ordered 

610 and not textual_ordered 

611 and num_ctx_cols == len(cursor_description) 

612 and not driver_column_names 

613 ): 

614 self._keys = [elem[0] for elem in result_columns] 

615 # pure positional 1-1 case; doesn't need to read 

616 # the names from cursor.description 

617 

618 # most common case for Core and ORM 

619 

620 # this metadata is safe to 

621 # cache because we are guaranteed 

622 # to have the columns in the same order for new executions 

623 self._safe_for_cache = True 

624 

625 return [ 

626 ( 

627 idx, 

628 idx, 

629 rmap_entry[RM_OBJECTS], 

630 rmap_entry[RM_NAME], 

631 rmap_entry[RM_RENDERED_NAME], 

632 context.get_result_processor( 

633 rmap_entry[RM_TYPE], 

634 rmap_entry[RM_RENDERED_NAME], 

635 cursor_description[idx][1], 

636 ), 

637 None, 

638 ) 

639 for idx, rmap_entry in enumerate(result_columns) 

640 ] 

641 else: 

642 # name-based or text-positional cases, where we need 

643 # to read cursor.description names 

644 

645 if textual_ordered or ( 

646 ad_hoc_textual and len(cursor_description) == num_ctx_cols 

647 ): 

648 self._safe_for_cache = not driver_column_names 

649 # textual positional case 

650 raw_iterator = self._merge_textual_cols_by_position( 

651 context, 

652 cursor_description, 

653 result_columns, 

654 driver_column_names, 

655 ) 

656 elif num_ctx_cols: 

657 # compiled SQL with a mismatch of description cols 

658 # vs. compiled cols, or textual w/ unordered columns 

659 # the order of columns can change if the query is 

660 # against a "select *", so not safe to cache 

661 self._safe_for_cache = False 

662 raw_iterator = self._merge_cols_by_name( 

663 context, 

664 cursor_description, 

665 result_columns, 

666 loose_column_name_matching, 

667 driver_column_names, 

668 ) 

669 else: 

670 # no compiled SQL, just a raw string, order of columns 

671 # can change for "select *" 

672 self._safe_for_cache = False 

673 raw_iterator = self._merge_cols_by_none( 

674 context, cursor_description, driver_column_names 

675 ) 

676 

677 return [ 

678 ( 

679 idx, 

680 ridx, 

681 obj, 

682 cursor_colname, 

683 cursor_colname, 

684 context.get_result_processor( 

685 mapped_type, cursor_colname, coltype 

686 ), 

687 untranslated, 

688 ) # type: ignore[misc] 

689 for ( 

690 idx, 

691 ridx, 

692 cursor_colname, 

693 mapped_type, 

694 coltype, 

695 obj, 

696 untranslated, 

697 ) in raw_iterator 

698 ] 

699 

700 def _colnames_from_description( 

701 self, 

702 context: DefaultExecutionContext, 

703 cursor_description: _DBAPICursorDescription, 

704 driver_column_names: bool, 

705 ) -> Iterator[Tuple[int, str, str, Optional[str], DBAPIType]]: 

706 """Extract column names and data types from a cursor.description. 

707 

708 Applies unicode decoding, column translation, "normalization", 

709 and case sensitivity rules to the names based on the dialect. 

710 

711 """ 

712 dialect = context.dialect 

713 translate_colname = context._translate_colname 

714 normalize_name = ( 

715 dialect.normalize_name if dialect.requires_name_normalize else None 

716 ) 

717 

718 untranslated = None 

719 

720 for idx, rec in enumerate(cursor_description): 

721 colname = unnormalized = rec[0] 

722 coltype = rec[1] 

723 

724 if translate_colname: 

725 # a None here for "untranslated" means "the dialect did not 

726 # change the column name and the untranslated case can be 

727 # ignored". otherwise "untranslated" is expected to be the 

728 # original, unchanged colname (e.g. is == to "unnormalized") 

729 colname, untranslated = translate_colname(colname) 

730 

731 assert untranslated is None or untranslated == unnormalized 

732 

733 if normalize_name: 

734 colname = normalize_name(colname) 

735 

736 if driver_column_names: 

737 yield idx, colname, unnormalized, unnormalized, coltype 

738 

739 else: 

740 yield idx, colname, unnormalized, untranslated, coltype 

741 

742 def _merge_textual_cols_by_position( 

743 self, 

744 context: DefaultExecutionContext, 

745 cursor_description: _DBAPICursorDescription, 

746 result_columns: Sequence[ResultColumnsEntry], 

747 driver_column_names: bool, 

748 ) -> Iterator[_MergeColTuple]: 

749 num_ctx_cols = len(result_columns) 

750 

751 if num_ctx_cols > len(cursor_description): 

752 util.warn( 

753 "Number of columns in textual SQL (%d) is " 

754 "smaller than number of columns requested (%d)" 

755 % (num_ctx_cols, len(cursor_description)) 

756 ) 

757 seen = set() 

758 

759 self._keys = [] 

760 

761 uses_denormalize = context.dialect.requires_name_normalize 

762 for ( 

763 idx, 

764 colname, 

765 unnormalized, 

766 untranslated, 

767 coltype, 

768 ) in self._colnames_from_description( 

769 context, cursor_description, driver_column_names 

770 ): 

771 if idx < num_ctx_cols: 

772 ctx_rec = result_columns[idx] 

773 obj = ctx_rec[RM_OBJECTS] 

774 ridx = idx 

775 mapped_type = ctx_rec[RM_TYPE] 

776 if obj[0] in seen: 

777 raise exc.InvalidRequestError( 

778 "Duplicate column expression requested " 

779 "in textual SQL: %r" % obj[0] 

780 ) 

781 seen.add(obj[0]) 

782 

783 # special check for all uppercase unnormalized name; 

784 # use the unnormalized name as the key. 

785 # see #10788 

786 # if these names don't match, then we still honor the 

787 # cursor.description name as the key and not what the 

788 # Column has, see 

789 # test_resultset.py::PositionalTextTest::test_via_column 

790 if ( 

791 uses_denormalize 

792 and unnormalized == ctx_rec[RM_RENDERED_NAME] 

793 ): 

794 result_name = unnormalized 

795 else: 

796 result_name = colname 

797 else: 

798 mapped_type = sqltypes.NULLTYPE 

799 obj = None 

800 ridx = None 

801 

802 result_name = colname 

803 

804 if driver_column_names: 

805 assert untranslated is not None 

806 self._keys.append(untranslated) 

807 else: 

808 self._keys.append(result_name) 

809 

810 yield ( 

811 idx, 

812 ridx, 

813 result_name, 

814 mapped_type, 

815 coltype, 

816 obj, 

817 untranslated, 

818 ) 

819 

820 def _merge_cols_by_name( 

821 self, 

822 context: DefaultExecutionContext, 

823 cursor_description: _DBAPICursorDescription, 

824 result_columns: Sequence[ResultColumnsEntry], 

825 loose_column_name_matching: bool, 

826 driver_column_names: bool, 

827 ) -> Iterator[_MergeColTuple]: 

828 match_map = self._create_description_match_map( 

829 result_columns, loose_column_name_matching 

830 ) 

831 mapped_type: TypeEngine[Any] 

832 

833 self._keys = [] 

834 

835 for ( 

836 idx, 

837 colname, 

838 unnormalized, 

839 untranslated, 

840 coltype, 

841 ) in self._colnames_from_description( 

842 context, cursor_description, driver_column_names 

843 ): 

844 try: 

845 ctx_rec = match_map[colname] 

846 except KeyError: 

847 mapped_type = sqltypes.NULLTYPE 

848 obj = None 

849 result_columns_idx = None 

850 else: 

851 obj = ctx_rec[1] 

852 mapped_type = ctx_rec[2] 

853 result_columns_idx = ctx_rec[3] 

854 

855 if driver_column_names: 

856 assert untranslated is not None 

857 self._keys.append(untranslated) 

858 else: 

859 self._keys.append(colname) 

860 yield ( 

861 idx, 

862 result_columns_idx, 

863 colname, 

864 mapped_type, 

865 coltype, 

866 obj, 

867 untranslated, 

868 ) 

869 

870 @classmethod 

871 def _create_description_match_map( 

872 cls, 

873 result_columns: Sequence[ResultColumnsEntry], 

874 loose_column_name_matching: bool = False, 

875 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]: 

876 """when matching cursor.description to a set of names that are present 

877 in a Compiled object, as is the case with TextualSelect, get all the 

878 names we expect might match those in cursor.description. 

879 """ 

880 

881 d: Dict[ 

882 Union[str, object], 

883 Tuple[str, TupleAny, TypeEngine[Any], int], 

884 ] = {} 

885 for ridx, elem in enumerate(result_columns): 

886 key = elem[RM_RENDERED_NAME] 

887 

888 if key in d: 

889 # conflicting keyname - just add the column-linked objects 

890 # to the existing record. if there is a duplicate column 

891 # name in the cursor description, this will allow all of those 

892 # objects to raise an ambiguous column error 

893 e_name, e_obj, e_type, e_ridx = d[key] 

894 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx 

895 else: 

896 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx) 

897 

898 if loose_column_name_matching: 

899 # when using a textual statement with an unordered set 

900 # of columns that line up, we are expecting the user 

901 # to be using label names in the SQL that match to the column 

902 # expressions. Enable more liberal matching for this case; 

903 # duplicate keys that are ambiguous will be fixed later. 

904 for r_key in elem[RM_OBJECTS]: 

905 d.setdefault( 

906 r_key, 

907 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx), 

908 ) 

909 return d 

910 

911 def _merge_cols_by_none( 

912 self, 

913 context: DefaultExecutionContext, 

914 cursor_description: _DBAPICursorDescription, 

915 driver_column_names: bool, 

916 ) -> Iterator[_MergeColTuple]: 

917 self._keys = [] 

918 

919 for ( 

920 idx, 

921 colname, 

922 unnormalized, 

923 untranslated, 

924 coltype, 

925 ) in self._colnames_from_description( 

926 context, cursor_description, driver_column_names 

927 ): 

928 

929 if driver_column_names: 

930 assert untranslated is not None 

931 self._keys.append(untranslated) 

932 else: 

933 self._keys.append(colname) 

934 

935 yield ( 

936 idx, 

937 None, 

938 colname, 

939 sqltypes.NULLTYPE, 

940 coltype, 

941 None, 

942 untranslated, 

943 ) 

944 

945 if not TYPE_CHECKING: 

946 

947 def _key_fallback( 

948 self, key: Any, err: Optional[Exception], raiseerr: bool = True 

949 ) -> Optional[NoReturn]: 

950 if raiseerr: 

951 if self._unpickled and isinstance(key, elements.ColumnElement): 

952 raise exc.NoSuchColumnError( 

953 "Row was unpickled; lookup by ColumnElement " 

954 "is unsupported" 

955 ) from err 

956 else: 

957 raise exc.NoSuchColumnError( 

958 "Could not locate column in row for column '%s'" 

959 % util.string_or_unprintable(key) 

960 ) from err 

961 else: 

962 return None 

963 

964 def _raise_for_ambiguous_column_name( 

965 self, rec: _KeyMapRecType 

966 ) -> NoReturn: 

967 raise exc.InvalidRequestError( 

968 "Ambiguous column name '%s' in " 

969 "result set column descriptions" % rec[MD_LOOKUP_KEY] 

970 ) 

971 

972 def _index_for_key( 

973 self, key: _KeyIndexType, raiseerr: bool = True 

974 ) -> Optional[int]: 

975 # TODO: can consider pre-loading ints and negative ints 

976 # into _keymap - also no coverage here 

977 if isinstance(key, int): 

978 key = self._keys[key] 

979 

980 try: 

981 rec = self._keymap[key] 

982 except KeyError as ke: 

983 x = self._key_fallback(key, ke, raiseerr) 

984 assert x is None 

985 return None 

986 

987 index = rec[0] 

988 

989 if index is None: 

990 self._raise_for_ambiguous_column_name(rec) 

991 return index 

992 

993 def _indexes_for_keys( 

994 self, keys: Sequence[_KeyIndexType] 

995 ) -> Sequence[int]: 

996 try: 

997 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501 

998 except KeyError as ke: 

999 # ensure it raises 

1000 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

1001 

1002 def _metadata_for_keys( 

1003 self, keys: Sequence[_KeyIndexType] 

1004 ) -> Iterator[_NonAmbigCursorKeyMapRecType]: 

1005 for key in keys: 

1006 if isinstance(key, int): 

1007 key = self._keys[key] 

1008 

1009 try: 

1010 rec = self._keymap[key] 

1011 except KeyError as ke: 

1012 # ensure it raises 

1013 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

1014 

1015 index = rec[MD_INDEX] 

1016 

1017 if index is None: 

1018 self._raise_for_ambiguous_column_name(rec) 

1019 

1020 yield cast(_NonAmbigCursorKeyMapRecType, rec) 

1021 

1022 def __getstate__(self) -> Dict[str, Any]: 

1023 # TODO: consider serializing this as SimpleResultMetaData 

1024 return { 

1025 "_keymap": { 

1026 key: ( 

1027 rec[MD_INDEX], 

1028 rec[MD_RESULT_MAP_INDEX], 

1029 [], 

1030 key, 

1031 rec[MD_RENDERED_NAME], 

1032 None, 

1033 None, 

1034 ) 

1035 for key, rec in self._keymap.items() 

1036 if isinstance(key, (str, int)) 

1037 }, 

1038 "_keys": self._keys, 

1039 "_translated_indexes": self._translated_indexes, 

1040 } 

1041 

1042 def __setstate__(self, state: Dict[str, Any]) -> None: 

1043 self._processors = [None for _ in range(len(state["_keys"]))] 

1044 self._keymap = state["_keymap"] 

1045 self._keymap_by_result_column_idx = None 

1046 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

1047 self._keys = state["_keys"] 

1048 self._unpickled = True 

1049 if state["_translated_indexes"]: 

1050 translated_indexes: List[Any] 

1051 self._translated_indexes = translated_indexes = state[ 

1052 "_translated_indexes" 

1053 ] 

1054 self._tuplefilter = tuplegetter(*translated_indexes) 

1055 else: 

1056 self._translated_indexes = self._tuplefilter = None 

1057 

1058 

1059class ResultFetchStrategy: 

1060 """Define a fetching strategy for a result object. 

1061 

1062 

1063 .. versionadded:: 1.4 

1064 

1065 """ 

1066 

1067 __slots__ = () 

1068 

1069 alternate_cursor_description: Optional[_DBAPICursorDescription] = None 

1070 

1071 def soft_close( 

1072 self, 

1073 result: CursorResult[Unpack[TupleAny]], 

1074 dbapi_cursor: Optional[DBAPICursor], 

1075 ) -> None: 

1076 raise NotImplementedError() 

1077 

1078 def hard_close( 

1079 self, 

1080 result: CursorResult[Unpack[TupleAny]], 

1081 dbapi_cursor: Optional[DBAPICursor], 

1082 ) -> None: 

1083 raise NotImplementedError() 

1084 

1085 def yield_per( 

1086 self, 

1087 result: CursorResult[Unpack[TupleAny]], 

1088 dbapi_cursor: DBAPICursor, 

1089 num: int, 

1090 ) -> None: 

1091 return 

1092 

1093 def fetchone( 

1094 self, 

1095 result: CursorResult[Unpack[TupleAny]], 

1096 dbapi_cursor: DBAPICursor, 

1097 hard_close: bool = False, 

1098 ) -> Any: 

1099 raise NotImplementedError() 

1100 

1101 def fetchmany( 

1102 self, 

1103 result: CursorResult[Unpack[TupleAny]], 

1104 dbapi_cursor: DBAPICursor, 

1105 size: Optional[int] = None, 

1106 ) -> Any: 

1107 raise NotImplementedError() 

1108 

1109 def fetchall( 

1110 self, 

1111 result: CursorResult[Unpack[TupleAny]], 

1112 dbapi_cursor: DBAPICursor, 

1113 ) -> Any: 

1114 raise NotImplementedError() 

1115 

1116 def handle_exception( 

1117 self, 

1118 result: CursorResult[Unpack[TupleAny]], 

1119 dbapi_cursor: Optional[DBAPICursor], 

1120 err: BaseException, 

1121 ) -> NoReturn: 

1122 raise err 

1123 

1124 

1125class NoCursorFetchStrategy(ResultFetchStrategy): 

1126 """Cursor strategy for a result that has no open cursor. 

1127 

1128 There are two varieties of this strategy, one for DQL and one for 

1129 DML (and also DDL), each of which represent a result that had a cursor 

1130 but no longer has one. 

1131 

1132 """ 

1133 

1134 __slots__ = () 

1135 

1136 def soft_close( 

1137 self, 

1138 result: CursorResult[Unpack[TupleAny]], 

1139 dbapi_cursor: Optional[DBAPICursor], 

1140 ) -> None: 

1141 pass 

1142 

1143 def hard_close( 

1144 self, 

1145 result: CursorResult[Unpack[TupleAny]], 

1146 dbapi_cursor: Optional[DBAPICursor], 

1147 ) -> None: 

1148 pass 

1149 

1150 def fetchone( 

1151 self, 

1152 result: CursorResult[Unpack[TupleAny]], 

1153 dbapi_cursor: DBAPICursor, 

1154 hard_close: bool = False, 

1155 ) -> Any: 

1156 return self._non_result(result, None) 

1157 

1158 def fetchmany( 

1159 self, 

1160 result: CursorResult[Unpack[TupleAny]], 

1161 dbapi_cursor: DBAPICursor, 

1162 size: Optional[int] = None, 

1163 ) -> Any: 

1164 return self._non_result(result, []) 

1165 

1166 def fetchall( 

1167 self, result: CursorResult[Unpack[TupleAny]], dbapi_cursor: DBAPICursor 

1168 ) -> Any: 

1169 return self._non_result(result, []) 

1170 

1171 def _non_result( 

1172 self, 

1173 result: CursorResult[Unpack[TupleAny]], 

1174 default: Any, 

1175 err: Optional[BaseException] = None, 

1176 ) -> Any: 

1177 raise NotImplementedError() 

1178 

1179 

1180class NoCursorDQLFetchStrategy(NoCursorFetchStrategy): 

1181 """Cursor strategy for a DQL result that has no open cursor. 

1182 

1183 This is a result set that can return rows, i.e. for a SELECT, or for an 

1184 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state 

1185 where the cursor is closed and no rows remain available. The owning result 

1186 object may or may not be "hard closed", which determines if the fetch 

1187 methods send empty results or raise for closed result. 

1188 

1189 """ 

1190 

1191 __slots__ = () 

1192 

1193 def _non_result( 

1194 self, 

1195 result: CursorResult[Unpack[TupleAny]], 

1196 default: Any, 

1197 err: Optional[BaseException] = None, 

1198 ) -> Any: 

1199 if result.closed: 

1200 raise exc.ResourceClosedError( 

1201 "This result object is closed." 

1202 ) from err 

1203 else: 

1204 return default 

1205 

1206 

1207_NO_CURSOR_DQL = NoCursorDQLFetchStrategy() 

1208 

1209 

1210class NoCursorDMLFetchStrategy(NoCursorFetchStrategy): 

1211 """Cursor strategy for a DML result that has no open cursor. 

1212 

1213 This is a result set that does not return rows, i.e. for an INSERT, 

1214 UPDATE, DELETE that does not include RETURNING. 

1215 

1216 """ 

1217 

1218 __slots__ = () 

1219 

1220 def _non_result( 

1221 self, 

1222 result: CursorResult[Unpack[TupleAny]], 

1223 default: Any, 

1224 err: Optional[BaseException] = None, 

1225 ) -> Any: 

1226 # we only expect to have a _NoResultMetaData() here right now. 

1227 assert not result._metadata.returns_rows 

1228 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr] 

1229 

1230 

1231_NO_CURSOR_DML = NoCursorDMLFetchStrategy() 

1232 

1233 

1234class CursorFetchStrategy(ResultFetchStrategy): 

1235 """Call fetch methods from a DBAPI cursor. 

1236 

1237 Alternate versions of this class may instead buffer the rows from 

1238 cursors or not use cursors at all. 

1239 

1240 """ 

1241 

1242 __slots__ = () 

1243 

1244 def soft_close( 

1245 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1246 ) -> None: 

1247 result.cursor_strategy = _NO_CURSOR_DQL 

1248 

1249 def hard_close( 

1250 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1251 ) -> None: 

1252 result.cursor_strategy = _NO_CURSOR_DQL 

1253 

1254 def handle_exception( 

1255 self, 

1256 result: CursorResult[Any], 

1257 dbapi_cursor: Optional[DBAPICursor], 

1258 err: BaseException, 

1259 ) -> NoReturn: 

1260 result.connection._handle_dbapi_exception( 

1261 err, None, None, dbapi_cursor, result.context 

1262 ) 

1263 

1264 def yield_per( 

1265 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1266 ) -> None: 

1267 result.cursor_strategy = BufferedRowCursorFetchStrategy( 

1268 dbapi_cursor, 

1269 {"max_row_buffer": num}, 

1270 initial_buffer=collections.deque(), 

1271 growth_factor=0, 

1272 ) 

1273 

1274 def fetchone( 

1275 self, 

1276 result: CursorResult[Any], 

1277 dbapi_cursor: DBAPICursor, 

1278 hard_close: bool = False, 

1279 ) -> Any: 

1280 try: 

1281 row = dbapi_cursor.fetchone() 

1282 if row is None: 

1283 result._soft_close(hard=hard_close) 

1284 return row 

1285 except BaseException as e: 

1286 self.handle_exception(result, dbapi_cursor, e) 

1287 

1288 def fetchmany( 

1289 self, 

1290 result: CursorResult[Any], 

1291 dbapi_cursor: DBAPICursor, 

1292 size: Optional[int] = None, 

1293 ) -> Any: 

1294 try: 

1295 if size is None: 

1296 l = dbapi_cursor.fetchmany() 

1297 else: 

1298 l = dbapi_cursor.fetchmany(size) 

1299 

1300 if not l: 

1301 result._soft_close() 

1302 return l 

1303 except BaseException as e: 

1304 self.handle_exception(result, dbapi_cursor, e) 

1305 

1306 def fetchall( 

1307 self, 

1308 result: CursorResult[Any], 

1309 dbapi_cursor: DBAPICursor, 

1310 ) -> Any: 

1311 try: 

1312 rows = dbapi_cursor.fetchall() 

1313 result._soft_close() 

1314 return rows 

1315 except BaseException as e: 

1316 self.handle_exception(result, dbapi_cursor, e) 

1317 

1318 

1319_DEFAULT_FETCH = CursorFetchStrategy() 

1320 

1321 

1322class BufferedRowCursorFetchStrategy(CursorFetchStrategy): 

1323 """A cursor fetch strategy with row buffering behavior. 

1324 

1325 This strategy buffers the contents of a selection of rows 

1326 before ``fetchone()`` is called. This is to allow the results of 

1327 ``cursor.description`` to be available immediately, when 

1328 interfacing with a DB-API that requires rows to be consumed before 

1329 this information is available (currently psycopg2, when used with 

1330 server-side cursors). 

1331 

1332 The pre-fetching behavior fetches only one row initially, and then 

1333 grows its buffer size by a fixed amount with each successive need 

1334 for additional rows up the ``max_row_buffer`` size, which defaults 

1335 to 1000:: 

1336 

1337 with psycopg2_engine.connect() as conn: 

1338 

1339 result = conn.execution_options( 

1340 stream_results=True, max_row_buffer=50 

1341 ).execute(text("select * from table")) 

1342 

1343 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. 

1344 

1345 .. seealso:: 

1346 

1347 :ref:`psycopg2_execution_options` 

1348 """ 

1349 

1350 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor") 

1351 

1352 def __init__( 

1353 self, 

1354 dbapi_cursor: DBAPICursor, 

1355 execution_options: CoreExecuteOptionsParameter, 

1356 growth_factor: int = 5, 

1357 initial_buffer: Optional[Deque[Any]] = None, 

1358 ) -> None: 

1359 self._max_row_buffer = execution_options.get("max_row_buffer", 1000) 

1360 

1361 if initial_buffer is not None: 

1362 self._rowbuffer = initial_buffer 

1363 else: 

1364 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) 

1365 self._growth_factor = growth_factor 

1366 

1367 if growth_factor: 

1368 self._bufsize = min(self._max_row_buffer, self._growth_factor) 

1369 else: 

1370 self._bufsize = self._max_row_buffer 

1371 

1372 @classmethod 

1373 def create( 

1374 cls, result: CursorResult[Any] 

1375 ) -> BufferedRowCursorFetchStrategy: 

1376 return BufferedRowCursorFetchStrategy( 

1377 result.cursor, 

1378 result.context.execution_options, 

1379 ) 

1380 

1381 def _buffer_rows( 

1382 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1383 ) -> None: 

1384 """this is currently used only by fetchone().""" 

1385 

1386 size = self._bufsize 

1387 try: 

1388 if size < 1: 

1389 new_rows = dbapi_cursor.fetchall() 

1390 else: 

1391 new_rows = dbapi_cursor.fetchmany(size) 

1392 except BaseException as e: 

1393 self.handle_exception(result, dbapi_cursor, e) 

1394 

1395 if not new_rows: 

1396 return 

1397 self._rowbuffer = collections.deque(new_rows) 

1398 if self._growth_factor and size < self._max_row_buffer: 

1399 self._bufsize = min( 

1400 self._max_row_buffer, size * self._growth_factor 

1401 ) 

1402 

1403 def yield_per( 

1404 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1405 ) -> None: 

1406 self._growth_factor = 0 

1407 self._max_row_buffer = self._bufsize = num 

1408 

1409 def soft_close( 

1410 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1411 ) -> None: 

1412 self._rowbuffer.clear() 

1413 super().soft_close(result, dbapi_cursor) 

1414 

1415 def hard_close( 

1416 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1417 ) -> None: 

1418 self._rowbuffer.clear() 

1419 super().hard_close(result, dbapi_cursor) 

1420 

1421 def fetchone( 

1422 self, 

1423 result: CursorResult[Any], 

1424 dbapi_cursor: DBAPICursor, 

1425 hard_close: bool = False, 

1426 ) -> Any: 

1427 if not self._rowbuffer: 

1428 self._buffer_rows(result, dbapi_cursor) 

1429 if not self._rowbuffer: 

1430 try: 

1431 result._soft_close(hard=hard_close) 

1432 except BaseException as e: 

1433 self.handle_exception(result, dbapi_cursor, e) 

1434 return None 

1435 return self._rowbuffer.popleft() 

1436 

1437 def fetchmany( 

1438 self, 

1439 result: CursorResult[Any], 

1440 dbapi_cursor: DBAPICursor, 

1441 size: Optional[int] = None, 

1442 ) -> Any: 

1443 if size is None: 

1444 return self.fetchall(result, dbapi_cursor) 

1445 

1446 rb = self._rowbuffer 

1447 lb = len(rb) 

1448 close = False 

1449 if size > lb: 

1450 try: 

1451 new = dbapi_cursor.fetchmany(size - lb) 

1452 except BaseException as e: 

1453 self.handle_exception(result, dbapi_cursor, e) 

1454 else: 

1455 if not new: 

1456 # defer closing since it may clear the row buffer 

1457 close = True 

1458 else: 

1459 rb.extend(new) 

1460 

1461 res = [rb.popleft() for _ in range(min(size, len(rb)))] 

1462 if close: 

1463 result._soft_close() 

1464 return res 

1465 

1466 def fetchall( 

1467 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1468 ) -> Any: 

1469 try: 

1470 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) 

1471 self._rowbuffer.clear() 

1472 result._soft_close() 

1473 return ret 

1474 except BaseException as e: 

1475 self.handle_exception(result, dbapi_cursor, e) 

1476 

1477 

1478class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): 

1479 """A cursor strategy that buffers rows fully upon creation. 

1480 

1481 Used for operations where a result is to be delivered 

1482 after the database conversation can not be continued, 

1483 such as MSSQL INSERT...OUTPUT after an autocommit. 

1484 

1485 """ 

1486 

1487 __slots__ = ("_rowbuffer", "alternate_cursor_description") 

1488 

1489 def __init__( 

1490 self, 

1491 dbapi_cursor: Optional[DBAPICursor], 

1492 alternate_description: Optional[_DBAPICursorDescription] = None, 

1493 initial_buffer: Optional[Iterable[Any]] = None, 

1494 ): 

1495 self.alternate_cursor_description = alternate_description 

1496 if initial_buffer is not None: 

1497 self._rowbuffer = collections.deque(initial_buffer) 

1498 else: 

1499 assert dbapi_cursor is not None 

1500 self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) 

1501 

1502 def yield_per( 

1503 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1504 ) -> Any: 

1505 pass 

1506 

1507 def soft_close( 

1508 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1509 ) -> None: 

1510 self._rowbuffer.clear() 

1511 super().soft_close(result, dbapi_cursor) 

1512 

1513 def hard_close( 

1514 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1515 ) -> None: 

1516 self._rowbuffer.clear() 

1517 super().hard_close(result, dbapi_cursor) 

1518 

1519 def fetchone( 

1520 self, 

1521 result: CursorResult[Any], 

1522 dbapi_cursor: DBAPICursor, 

1523 hard_close: bool = False, 

1524 ) -> Any: 

1525 if self._rowbuffer: 

1526 return self._rowbuffer.popleft() 

1527 else: 

1528 result._soft_close(hard=hard_close) 

1529 return None 

1530 

1531 def fetchmany( 

1532 self, 

1533 result: CursorResult[Any], 

1534 dbapi_cursor: DBAPICursor, 

1535 size: Optional[int] = None, 

1536 ) -> Any: 

1537 if size is None: 

1538 return self.fetchall(result, dbapi_cursor) 

1539 

1540 rb = self._rowbuffer 

1541 rows = [rb.popleft() for _ in range(min(size, len(rb)))] 

1542 if not rows: 

1543 result._soft_close() 

1544 return rows 

1545 

1546 def fetchall( 

1547 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1548 ) -> Any: 

1549 ret = self._rowbuffer 

1550 self._rowbuffer = collections.deque() 

1551 result._soft_close() 

1552 return ret 

1553 

1554 

1555class _NoResultMetaData(ResultMetaData): 

1556 __slots__ = () 

1557 

1558 returns_rows = False 

1559 

1560 def _we_dont_return_rows( 

1561 self, err: Optional[BaseException] = None 

1562 ) -> NoReturn: 

1563 raise exc.ResourceClosedError( 

1564 "This result object does not return rows. " 

1565 "It has been closed automatically." 

1566 ) from err 

1567 

1568 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn: 

1569 self._we_dont_return_rows() 

1570 

1571 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1572 self._we_dont_return_rows() 

1573 

1574 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1575 self._we_dont_return_rows() 

1576 

1577 @property 

1578 def _keymap(self) -> NoReturn: # type: ignore[override] 

1579 self._we_dont_return_rows() 

1580 

1581 @property 

1582 def _key_to_index(self) -> NoReturn: # type: ignore[override] 

1583 self._we_dont_return_rows() 

1584 

1585 @property 

1586 def _processors(self) -> NoReturn: # type: ignore[override] 

1587 self._we_dont_return_rows() 

1588 

1589 @property 

1590 def keys(self) -> NoReturn: 

1591 self._we_dont_return_rows() 

1592 

1593 

1594_NO_RESULT_METADATA = _NoResultMetaData() 

1595 

1596 

1597def null_dml_result() -> IteratorResult[Any]: 

1598 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([])) 

1599 it._soft_close() 

1600 return it 

1601 

1602 

1603class CursorResult(Result[Unpack[_Ts]]): 

1604 """A Result that is representing state from a DBAPI cursor. 

1605 

1606 .. versionchanged:: 1.4 The :class:`.CursorResult`` 

1607 class replaces the previous :class:`.ResultProxy` interface. 

1608 This classes are based on the :class:`.Result` calling API 

1609 which provides an updated usage model and calling facade for 

1610 SQLAlchemy Core and SQLAlchemy ORM. 

1611 

1612 Returns database rows via the :class:`.Row` class, which provides 

1613 additional API features and behaviors on top of the raw data returned by 

1614 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` 

1615 method, other kinds of objects may also be returned. 

1616 

1617 .. seealso:: 

1618 

1619 :ref:`tutorial_selecting_data` - introductory material for accessing 

1620 :class:`_engine.CursorResult` and :class:`.Row` objects. 

1621 

1622 """ 

1623 

1624 __slots__ = ( 

1625 "context", 

1626 "dialect", 

1627 "cursor", 

1628 "cursor_strategy", 

1629 "_echo", 

1630 "connection", 

1631 ) 

1632 

1633 _metadata: Union[CursorResultMetaData, _NoResultMetaData] 

1634 _no_result_metadata = _NO_RESULT_METADATA 

1635 _soft_closed: bool = False 

1636 closed: bool = False 

1637 _is_cursor = True 

1638 

1639 context: DefaultExecutionContext 

1640 dialect: Dialect 

1641 cursor_strategy: ResultFetchStrategy 

1642 connection: Connection 

1643 

1644 def __init__( 

1645 self, 

1646 context: DefaultExecutionContext, 

1647 cursor_strategy: ResultFetchStrategy, 

1648 cursor_description: Optional[_DBAPICursorDescription], 

1649 ): 

1650 self.context = context 

1651 self.dialect = context.dialect 

1652 self.cursor = context.cursor 

1653 self.cursor_strategy = cursor_strategy 

1654 self.connection = context.root_connection 

1655 self._echo = echo = ( 

1656 self.connection._echo and context.engine._should_log_debug() 

1657 ) 

1658 

1659 if cursor_description is not None: 

1660 # inline of Result._row_getter(), set up an initial row 

1661 # getter assuming no transformations will be called as this 

1662 # is the most common case 

1663 

1664 metadata = self._init_metadata(context, cursor_description) 

1665 

1666 _make_row: Any 

1667 proc = metadata._effective_processors 

1668 tf = metadata._tuplefilter 

1669 _make_row = functools.partial( 

1670 Row, 

1671 metadata, 

1672 proc if tf is None or proc is None else tf(proc), 

1673 metadata._key_to_index, 

1674 ) 

1675 if tf is not None: 

1676 _fixed_tf = tf # needed to make mypy happy... 

1677 

1678 def _sliced_row(raw_data: Any) -> Any: 

1679 return _make_row(_fixed_tf(raw_data)) 

1680 

1681 sliced_row = _sliced_row 

1682 else: 

1683 sliced_row = _make_row 

1684 

1685 if echo: 

1686 log = self.context.connection._log_debug 

1687 

1688 def _log_row(row: Any) -> Any: 

1689 log("Row %r", sql_util._repr_row(row)) 

1690 return row 

1691 

1692 self._row_logging_fn = _log_row 

1693 

1694 def _make_row_2(row: Any) -> Any: 

1695 return _log_row(sliced_row(row)) 

1696 

1697 make_row = _make_row_2 

1698 else: 

1699 make_row = sliced_row # type: ignore[assignment] 

1700 self._set_memoized_attribute("_row_getter", make_row) 

1701 

1702 else: 

1703 assert context._num_sentinel_cols == 0 

1704 self._metadata = self._no_result_metadata 

1705 

1706 def _init_metadata( 

1707 self, 

1708 context: DefaultExecutionContext, 

1709 cursor_description: _DBAPICursorDescription, 

1710 ) -> CursorResultMetaData: 

1711 driver_column_names = context.execution_options.get( 

1712 "driver_column_names", False 

1713 ) 

1714 if context.compiled: 

1715 compiled = context.compiled 

1716 

1717 metadata: CursorResultMetaData 

1718 

1719 if driver_column_names: 

1720 # TODO: test this case 

1721 metadata = CursorResultMetaData( 

1722 self, 

1723 cursor_description, 

1724 driver_column_names=True, 

1725 num_sentinel_cols=context._num_sentinel_cols, 

1726 ) 

1727 assert not metadata._safe_for_cache 

1728 elif compiled._cached_metadata: 

1729 metadata = compiled._cached_metadata 

1730 else: 

1731 metadata = CursorResultMetaData( 

1732 self, 

1733 cursor_description, 

1734 # the number of sentinel columns is stored on the context 

1735 # but it's a characteristic of the compiled object 

1736 # so it's ok to apply it to a cacheable metadata. 

1737 num_sentinel_cols=context._num_sentinel_cols, 

1738 ) 

1739 if metadata._safe_for_cache: 

1740 compiled._cached_metadata = metadata 

1741 

1742 # result rewrite/ adapt step. this is to suit the case 

1743 # when we are invoked against a cached Compiled object, we want 

1744 # to rewrite the ResultMetaData to reflect the Column objects 

1745 # that are in our current SQL statement object, not the one 

1746 # that is associated with the cached Compiled object. 

1747 # the Compiled object may also tell us to not 

1748 # actually do this step; this is to support the ORM where 

1749 # it is to produce a new Result object in any case, and will 

1750 # be using the cached Column objects against this database result 

1751 # so we don't want to rewrite them. 

1752 # 

1753 # Basically this step suits the use case where the end user 

1754 # is using Core SQL expressions and is accessing columns in the 

1755 # result row using row._mapping[table.c.column]. 

1756 if ( 

1757 not context.execution_options.get( 

1758 "_result_disable_adapt_to_context", False 

1759 ) 

1760 and compiled._result_columns 

1761 and context.cache_hit is context.dialect.CACHE_HIT 

1762 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501 

1763 ): 

1764 metadata = metadata._adapt_to_context(context) 

1765 

1766 self._metadata = metadata 

1767 

1768 else: 

1769 self._metadata = metadata = CursorResultMetaData( 

1770 self, 

1771 cursor_description, 

1772 driver_column_names=driver_column_names, 

1773 ) 

1774 if self._echo: 

1775 context.connection._log_debug( 

1776 "Col %r", tuple(x[0] for x in cursor_description) 

1777 ) 

1778 return metadata 

1779 

1780 def _soft_close(self, hard: bool = False) -> None: 

1781 """Soft close this :class:`_engine.CursorResult`. 

1782 

1783 This releases all DBAPI cursor resources, but leaves the 

1784 CursorResult "open" from a semantic perspective, meaning the 

1785 fetchXXX() methods will continue to return empty results. 

1786 

1787 This method is called automatically when: 

1788 

1789 * all result rows are exhausted using the fetchXXX() methods. 

1790 * cursor.description is None. 

1791 

1792 This method is **not public**, but is documented in order to clarify 

1793 the "autoclose" process used. 

1794 

1795 .. seealso:: 

1796 

1797 :meth:`_engine.CursorResult.close` 

1798 

1799 

1800 """ 

1801 

1802 if (not hard and self._soft_closed) or (hard and self.closed): 

1803 return 

1804 

1805 if hard: 

1806 self.closed = True 

1807 self.cursor_strategy.hard_close(self, self.cursor) 

1808 else: 

1809 self.cursor_strategy.soft_close(self, self.cursor) 

1810 

1811 if not self._soft_closed: 

1812 cursor = self.cursor 

1813 self.cursor = None # type: ignore 

1814 self.connection._safe_close_cursor(cursor) 

1815 self._soft_closed = True 

1816 

1817 @property 

1818 def inserted_primary_key_rows(self) -> List[Optional[Any]]: 

1819 """Return the value of 

1820 :attr:`_engine.CursorResult.inserted_primary_key` 

1821 as a row contained within a list; some dialects may support a 

1822 multiple row form as well. 

1823 

1824 .. note:: As indicated below, in current SQLAlchemy versions this 

1825 accessor is only useful beyond what's already supplied by 

1826 :attr:`_engine.CursorResult.inserted_primary_key` when using the 

1827 :ref:`postgresql_psycopg2` dialect. Future versions hope to 

1828 generalize this feature to more dialects. 

1829 

1830 This accessor is added to support dialects that offer the feature 

1831 that is currently implemented by the :ref:`psycopg2_executemany_mode` 

1832 feature, currently **only the psycopg2 dialect**, which provides 

1833 for many rows to be INSERTed at once while still retaining the 

1834 behavior of being able to return server-generated primary key values. 

1835 

1836 * **When using the psycopg2 dialect, or other dialects that may support 

1837 "fast executemany" style inserts in upcoming releases** : When 

1838 invoking an INSERT statement while passing a list of rows as the 

1839 second argument to :meth:`_engine.Connection.execute`, this accessor 

1840 will then provide a list of rows, where each row contains the primary 

1841 key value for each row that was INSERTed. 

1842 

1843 * **When using all other dialects / backends that don't yet support 

1844 this feature**: This accessor is only useful for **single row INSERT 

1845 statements**, and returns the same information as that of the 

1846 :attr:`_engine.CursorResult.inserted_primary_key` within a 

1847 single-element list. When an INSERT statement is executed in 

1848 conjunction with a list of rows to be INSERTed, the list will contain 

1849 one row per row inserted in the statement, however it will contain 

1850 ``None`` for any server-generated values. 

1851 

1852 Future releases of SQLAlchemy will further generalize the 

1853 "fast execution helper" feature of psycopg2 to suit other dialects, 

1854 thus allowing this accessor to be of more general use. 

1855 

1856 .. versionadded:: 1.4 

1857 

1858 .. seealso:: 

1859 

1860 :attr:`_engine.CursorResult.inserted_primary_key` 

1861 

1862 """ 

1863 if not self.context.compiled: 

1864 raise exc.InvalidRequestError( 

1865 "Statement is not a compiled expression construct." 

1866 ) 

1867 elif not self.context.isinsert: 

1868 raise exc.InvalidRequestError( 

1869 "Statement is not an insert() expression construct." 

1870 ) 

1871 elif self.context._is_explicit_returning: 

1872 raise exc.InvalidRequestError( 

1873 "Can't call inserted_primary_key " 

1874 "when returning() " 

1875 "is used." 

1876 ) 

1877 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501 

1878 

1879 @property 

1880 def inserted_primary_key(self) -> Optional[Any]: 

1881 """Return the primary key for the row just inserted. 

1882 

1883 The return value is a :class:`_result.Row` object representing 

1884 a named tuple of primary key values in the order in which the 

1885 primary key columns are configured in the source 

1886 :class:`_schema.Table`. 

1887 

1888 .. versionchanged:: 1.4.8 - the 

1889 :attr:`_engine.CursorResult.inserted_primary_key` 

1890 value is now a named tuple via the :class:`_result.Row` class, 

1891 rather than a plain tuple. 

1892 

1893 This accessor only applies to single row :func:`_expression.insert` 

1894 constructs which did not explicitly specify 

1895 :meth:`_expression.Insert.returning`. Support for multirow inserts, 

1896 while not yet available for most backends, would be accessed using 

1897 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor. 

1898 

1899 Note that primary key columns which specify a server_default clause, or 

1900 otherwise do not qualify as "autoincrement" columns (see the notes at 

1901 :class:`_schema.Column`), and were generated using the database-side 

1902 default, will appear in this list as ``None`` unless the backend 

1903 supports "returning" and the insert statement executed with the 

1904 "implicit returning" enabled. 

1905 

1906 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1907 statement is not a compiled expression construct 

1908 or is not an insert() construct. 

1909 

1910 """ 

1911 

1912 if self.context.executemany: 

1913 raise exc.InvalidRequestError( 

1914 "This statement was an executemany call; if primary key " 

1915 "returning is supported, please " 

1916 "use .inserted_primary_key_rows." 

1917 ) 

1918 

1919 ikp = self.inserted_primary_key_rows 

1920 if ikp: 

1921 return ikp[0] 

1922 else: 

1923 return None 

1924 

1925 def last_updated_params( 

1926 self, 

1927 ) -> Union[ 

1928 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1929 ]: 

1930 """Return the collection of updated parameters from this 

1931 execution. 

1932 

1933 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1934 statement is not a compiled expression construct 

1935 or is not an update() construct. 

1936 

1937 """ 

1938 if not self.context.compiled: 

1939 raise exc.InvalidRequestError( 

1940 "Statement is not a compiled expression construct." 

1941 ) 

1942 elif not self.context.isupdate: 

1943 raise exc.InvalidRequestError( 

1944 "Statement is not an update() expression construct." 

1945 ) 

1946 elif self.context.executemany: 

1947 return self.context.compiled_parameters 

1948 else: 

1949 return self.context.compiled_parameters[0] 

1950 

1951 def last_inserted_params( 

1952 self, 

1953 ) -> Union[ 

1954 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1955 ]: 

1956 """Return the collection of inserted parameters from this 

1957 execution. 

1958 

1959 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1960 statement is not a compiled expression construct 

1961 or is not an insert() construct. 

1962 

1963 """ 

1964 if not self.context.compiled: 

1965 raise exc.InvalidRequestError( 

1966 "Statement is not a compiled expression construct." 

1967 ) 

1968 elif not self.context.isinsert: 

1969 raise exc.InvalidRequestError( 

1970 "Statement is not an insert() expression construct." 

1971 ) 

1972 elif self.context.executemany: 

1973 return self.context.compiled_parameters 

1974 else: 

1975 return self.context.compiled_parameters[0] 

1976 

1977 @property 

1978 def returned_defaults_rows( 

1979 self, 

1980 ) -> Optional[Sequence[Row[Unpack[TupleAny]]]]: 

1981 """Return a list of rows each containing the values of default 

1982 columns that were fetched using 

1983 the :meth:`.ValuesBase.return_defaults` feature. 

1984 

1985 The return value is a list of :class:`.Row` objects. 

1986 

1987 .. versionadded:: 1.4 

1988 

1989 """ 

1990 return self.context.returned_default_rows 

1991 

1992 def splice_horizontally(self, other: CursorResult[Any]) -> Self: 

1993 """Return a new :class:`.CursorResult` that "horizontally splices" 

1994 together the rows of this :class:`.CursorResult` with that of another 

1995 :class:`.CursorResult`. 

1996 

1997 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

1998 not intended for general use. 

1999 

2000 "horizontally splices" means that for each row in the first and second 

2001 result sets, a new row that concatenates the two rows together is 

2002 produced, which then becomes the new row. The incoming 

2003 :class:`.CursorResult` must have the identical number of rows. It is 

2004 typically expected that the two result sets come from the same sort 

2005 order as well, as the result rows are spliced together based on their 

2006 position in the result. 

2007 

2008 The expected use case here is so that multiple INSERT..RETURNING 

2009 statements (which definitely need to be sorted) against different 

2010 tables can produce a single result that looks like a JOIN of those two 

2011 tables. 

2012 

2013 E.g.:: 

2014 

2015 r1 = connection.execute( 

2016 users.insert().returning( 

2017 users.c.user_name, users.c.user_id, sort_by_parameter_order=True 

2018 ), 

2019 user_values, 

2020 ) 

2021 

2022 r2 = connection.execute( 

2023 addresses.insert().returning( 

2024 addresses.c.address_id, 

2025 addresses.c.address, 

2026 addresses.c.user_id, 

2027 sort_by_parameter_order=True, 

2028 ), 

2029 address_values, 

2030 ) 

2031 

2032 rows = r1.splice_horizontally(r2).all() 

2033 assert rows == [ 

2034 ("john", 1, 1, "foo@bar.com", 1), 

2035 ("jack", 2, 2, "bar@bat.com", 2), 

2036 ] 

2037 

2038 .. versionadded:: 2.0 

2039 

2040 .. seealso:: 

2041 

2042 :meth:`.CursorResult.splice_vertically` 

2043 

2044 

2045 """ # noqa: E501 

2046 

2047 clone = self._generate() 

2048 assert clone is self # just to note 

2049 assert isinstance(other._metadata, CursorResultMetaData) 

2050 assert isinstance(self._metadata, CursorResultMetaData) 

2051 self_tf = self._metadata._tuplefilter 

2052 other_tf = other._metadata._tuplefilter 

2053 clone._metadata = self._metadata._splice_horizontally(other._metadata) 

2054 

2055 total_rows = [ 

2056 tuple(r1 if self_tf is None else self_tf(r1)) 

2057 + tuple(r2 if other_tf is None else other_tf(r2)) 

2058 for r1, r2 in zip( 

2059 list(self._raw_row_iterator()), 

2060 list(other._raw_row_iterator()), 

2061 ) 

2062 ] 

2063 

2064 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2065 None, 

2066 initial_buffer=total_rows, 

2067 ) 

2068 clone._reset_memoizations() 

2069 return clone 

2070 

2071 def splice_vertically(self, other: CursorResult[Any]) -> Self: 

2072 """Return a new :class:`.CursorResult` that "vertically splices", 

2073 i.e. "extends", the rows of this :class:`.CursorResult` with that of 

2074 another :class:`.CursorResult`. 

2075 

2076 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

2077 not intended for general use. 

2078 

2079 "vertically splices" means the rows of the given result are appended to 

2080 the rows of this cursor result. The incoming :class:`.CursorResult` 

2081 must have rows that represent the identical list of columns in the 

2082 identical order as they are in this :class:`.CursorResult`. 

2083 

2084 .. versionadded:: 2.0 

2085 

2086 .. seealso:: 

2087 

2088 :meth:`.CursorResult.splice_horizontally` 

2089 

2090 """ 

2091 clone = self._generate() 

2092 total_rows = list(self._raw_row_iterator()) + list( 

2093 other._raw_row_iterator() 

2094 ) 

2095 

2096 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2097 None, 

2098 initial_buffer=total_rows, 

2099 ) 

2100 clone._reset_memoizations() 

2101 return clone 

2102 

2103 def _rewind(self, rows: Any) -> Self: 

2104 """rewind this result back to the given rowset. 

2105 

2106 this is used internally for the case where an :class:`.Insert` 

2107 construct combines the use of 

2108 :meth:`.Insert.return_defaults` along with the 

2109 "supplemental columns" feature. 

2110 

2111 NOTE: this method has not effect then an unique filter is applied 

2112 to the result, meaning that no row will be returned. 

2113 

2114 """ 

2115 

2116 if self._echo: 

2117 self.context.connection._log_debug( 

2118 "CursorResult rewound %d row(s)", len(rows) 

2119 ) 

2120 

2121 # the rows given are expected to be Row objects, so we 

2122 # have to clear out processors which have already run on these 

2123 # rows 

2124 self._metadata = cast( 

2125 CursorResultMetaData, self._metadata 

2126 )._remove_processors_and_tuple_filter() 

2127 

2128 self.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2129 None, 

2130 # TODO: if these are Row objects, can we save on not having to 

2131 # re-make new Row objects out of them a second time? is that 

2132 # what's actually happening right now? maybe look into this 

2133 initial_buffer=rows, 

2134 ) 

2135 self._reset_memoizations() 

2136 return self 

2137 

2138 @property 

2139 def returned_defaults(self) -> Optional[Row[Unpack[TupleAny]]]: 

2140 """Return the values of default columns that were fetched using 

2141 the :meth:`.ValuesBase.return_defaults` feature. 

2142 

2143 The value is an instance of :class:`.Row`, or ``None`` 

2144 if :meth:`.ValuesBase.return_defaults` was not used or if the 

2145 backend does not support RETURNING. 

2146 

2147 .. seealso:: 

2148 

2149 :meth:`.ValuesBase.return_defaults` 

2150 

2151 """ 

2152 

2153 if self.context.executemany: 

2154 raise exc.InvalidRequestError( 

2155 "This statement was an executemany call; if return defaults " 

2156 "is supported, please use .returned_defaults_rows." 

2157 ) 

2158 

2159 rows = self.context.returned_default_rows 

2160 if rows: 

2161 return rows[0] 

2162 else: 

2163 return None 

2164 

2165 def lastrow_has_defaults(self) -> bool: 

2166 """Return ``lastrow_has_defaults()`` from the underlying 

2167 :class:`.ExecutionContext`. 

2168 

2169 See :class:`.ExecutionContext` for details. 

2170 

2171 """ 

2172 

2173 return self.context.lastrow_has_defaults() 

2174 

2175 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2176 """Return ``postfetch_cols()`` from the underlying 

2177 :class:`.ExecutionContext`. 

2178 

2179 See :class:`.ExecutionContext` for details. 

2180 

2181 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2182 statement is not a compiled expression construct 

2183 or is not an insert() or update() construct. 

2184 

2185 """ 

2186 

2187 if not self.context.compiled: 

2188 raise exc.InvalidRequestError( 

2189 "Statement is not a compiled expression construct." 

2190 ) 

2191 elif not self.context.isinsert and not self.context.isupdate: 

2192 raise exc.InvalidRequestError( 

2193 "Statement is not an insert() or update() " 

2194 "expression construct." 

2195 ) 

2196 return self.context.postfetch_cols 

2197 

2198 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2199 """Return ``prefetch_cols()`` from the underlying 

2200 :class:`.ExecutionContext`. 

2201 

2202 See :class:`.ExecutionContext` for details. 

2203 

2204 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2205 statement is not a compiled expression construct 

2206 or is not an insert() or update() construct. 

2207 

2208 """ 

2209 

2210 if not self.context.compiled: 

2211 raise exc.InvalidRequestError( 

2212 "Statement is not a compiled expression construct." 

2213 ) 

2214 elif not self.context.isinsert and not self.context.isupdate: 

2215 raise exc.InvalidRequestError( 

2216 "Statement is not an insert() or update() " 

2217 "expression construct." 

2218 ) 

2219 return self.context.prefetch_cols 

2220 

2221 def supports_sane_rowcount(self) -> bool: 

2222 """Return ``supports_sane_rowcount`` from the dialect. 

2223 

2224 See :attr:`_engine.CursorResult.rowcount` for background. 

2225 

2226 """ 

2227 

2228 return self.dialect.supports_sane_rowcount 

2229 

2230 def supports_sane_multi_rowcount(self) -> bool: 

2231 """Return ``supports_sane_multi_rowcount`` from the dialect. 

2232 

2233 See :attr:`_engine.CursorResult.rowcount` for background. 

2234 

2235 """ 

2236 

2237 return self.dialect.supports_sane_multi_rowcount 

2238 

2239 @util.memoized_property 

2240 def rowcount(self) -> int: 

2241 """Return the 'rowcount' for this result. 

2242 

2243 The primary purpose of 'rowcount' is to report the number of rows 

2244 matched by the WHERE criterion of an UPDATE or DELETE statement 

2245 executed once (i.e. for a single parameter set), which may then be 

2246 compared to the number of rows expected to be updated or deleted as a 

2247 means of asserting data integrity. 

2248 

2249 This attribute is transferred from the ``cursor.rowcount`` attribute 

2250 of the DBAPI before the cursor is closed, to support DBAPIs that 

2251 don't make this value available after cursor close. Some DBAPIs may 

2252 offer meaningful values for other kinds of statements, such as INSERT 

2253 and SELECT statements as well. In order to retrieve ``cursor.rowcount`` 

2254 for these statements, set the 

2255 :paramref:`.Connection.execution_options.preserve_rowcount` 

2256 execution option to True, which will cause the ``cursor.rowcount`` 

2257 value to be unconditionally memoized before any results are returned 

2258 or the cursor is closed, regardless of statement type. 

2259 

2260 For cases where the DBAPI does not support rowcount for a particular 

2261 kind of statement and/or execution, the returned value will be ``-1``, 

2262 which is delivered directly from the DBAPI and is part of :pep:`249`. 

2263 All DBAPIs should support rowcount for single-parameter-set 

2264 UPDATE and DELETE statements, however. 

2265 

2266 .. note:: 

2267 

2268 Notes regarding :attr:`_engine.CursorResult.rowcount`: 

2269 

2270 

2271 * This attribute returns the number of rows *matched*, 

2272 which is not necessarily the same as the number of rows 

2273 that were actually *modified*. For example, an UPDATE statement 

2274 may have no net change on a given row if the SET values 

2275 given are the same as those present in the row already. 

2276 Such a row would be matched but not modified. 

2277 On backends that feature both styles, such as MySQL, 

2278 rowcount is configured to return the match 

2279 count in all cases. 

2280 

2281 * :attr:`_engine.CursorResult.rowcount` in the default case is 

2282 *only* useful in conjunction with an UPDATE or DELETE statement, 

2283 and only with a single set of parameters. For other kinds of 

2284 statements, SQLAlchemy will not attempt to pre-memoize the value 

2285 unless the 

2286 :paramref:`.Connection.execution_options.preserve_rowcount` 

2287 execution option is used. Note that contrary to :pep:`249`, many 

2288 DBAPIs do not support rowcount values for statements that are not 

2289 UPDATE or DELETE, particularly when rows are being returned which 

2290 are not fully pre-buffered. DBAPIs that dont support rowcount 

2291 for a particular kind of statement should return the value ``-1`` 

2292 for such statements. 

2293 

2294 * :attr:`_engine.CursorResult.rowcount` may not be meaningful 

2295 when executing a single statement with multiple parameter sets 

2296 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount" 

2297 values across multiple parameter sets and will return ``-1`` 

2298 when accessed. 

2299 

2300 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support 

2301 a correct population of :attr:`_engine.CursorResult.rowcount` 

2302 when the :paramref:`.Connection.execution_options.preserve_rowcount` 

2303 execution option is set to True. 

2304 

2305 * Statements that use RETURNING may not support rowcount, returning 

2306 a ``-1`` value instead. 

2307 

2308 .. seealso:: 

2309 

2310 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial` 

2311 

2312 :paramref:`.Connection.execution_options.preserve_rowcount` 

2313 

2314 """ # noqa: E501 

2315 try: 

2316 return self.context.rowcount 

2317 except BaseException as e: 

2318 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2319 raise # not called 

2320 

2321 @property 

2322 def lastrowid(self) -> int: 

2323 """Return the 'lastrowid' accessor on the DBAPI cursor. 

2324 

2325 This is a DBAPI specific method and is only functional 

2326 for those backends which support it, for statements 

2327 where it is appropriate. It's behavior is not 

2328 consistent across backends. 

2329 

2330 Usage of this method is normally unnecessary when 

2331 using insert() expression constructs; the 

2332 :attr:`~CursorResult.inserted_primary_key` attribute provides a 

2333 tuple of primary key values for a newly inserted row, 

2334 regardless of database backend. 

2335 

2336 """ 

2337 try: 

2338 return self.context.get_lastrowid() 

2339 except BaseException as e: 

2340 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2341 

2342 @property 

2343 def returns_rows(self) -> bool: 

2344 """True if this :class:`_engine.CursorResult` returns zero or more 

2345 rows. 

2346 

2347 I.e. if it is legal to call the methods 

2348 :meth:`_engine.CursorResult.fetchone`, 

2349 :meth:`_engine.CursorResult.fetchmany` 

2350 :meth:`_engine.CursorResult.fetchall`. 

2351 

2352 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should 

2353 always be synonymous with whether or not the DBAPI cursor had a 

2354 ``.description`` attribute, indicating the presence of result columns, 

2355 noting that a cursor that returns zero rows still has a 

2356 ``.description`` if a row-returning statement was emitted. 

2357 

2358 This attribute should be True for all results that are against 

2359 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE 

2360 that use RETURNING. For INSERT/UPDATE/DELETE statements that were 

2361 not using RETURNING, the value will usually be False, however 

2362 there are some dialect-specific exceptions to this, such as when 

2363 using the MSSQL / pyodbc dialect a SELECT is emitted inline in 

2364 order to retrieve an inserted primary key value. 

2365 

2366 

2367 """ 

2368 return self._metadata.returns_rows 

2369 

2370 @property 

2371 def is_insert(self) -> bool: 

2372 """True if this :class:`_engine.CursorResult` is the result 

2373 of a executing an expression language compiled 

2374 :func:`_expression.insert` construct. 

2375 

2376 When True, this implies that the 

2377 :attr:`inserted_primary_key` attribute is accessible, 

2378 assuming the statement did not include 

2379 a user defined "returning" construct. 

2380 

2381 """ 

2382 return self.context.isinsert 

2383 

2384 def _fetchiter_impl(self) -> Iterator[Any]: 

2385 fetchone = self.cursor_strategy.fetchone 

2386 

2387 while True: 

2388 row = fetchone(self, self.cursor) 

2389 if row is None: 

2390 break 

2391 yield row 

2392 

2393 def _fetchone_impl(self, hard_close: bool = False) -> Any: 

2394 return self.cursor_strategy.fetchone(self, self.cursor, hard_close) 

2395 

2396 def _fetchall_impl(self) -> Any: 

2397 return self.cursor_strategy.fetchall(self, self.cursor) 

2398 

2399 def _fetchmany_impl(self, size: Optional[int] = None) -> Any: 

2400 return self.cursor_strategy.fetchmany(self, self.cursor, size) 

2401 

2402 def _raw_row_iterator(self) -> Any: 

2403 return self._fetchiter_impl() 

2404 

2405 def merge( 

2406 self, *others: Result[Unpack[TupleAny]] 

2407 ) -> MergedResult[Unpack[TupleAny]]: 

2408 merged_result = super().merge(*others) 

2409 if self.context._has_rowcount: 

2410 merged_result.rowcount = sum( 

2411 cast("CursorResult[Any]", result).rowcount 

2412 for result in (self,) + others 

2413 ) 

2414 return merged_result 

2415 

2416 def close(self) -> None: 

2417 """Close this :class:`_engine.CursorResult`. 

2418 

2419 This closes out the underlying DBAPI cursor corresponding to the 

2420 statement execution, if one is still present. Note that the DBAPI 

2421 cursor is automatically released when the :class:`_engine.CursorResult` 

2422 exhausts all available rows. :meth:`_engine.CursorResult.close` is 

2423 generally an optional method except in the case when discarding a 

2424 :class:`_engine.CursorResult` that still has additional rows pending 

2425 for fetch. 

2426 

2427 After this method is called, it is no longer valid to call upon 

2428 the fetch methods, which will raise a :class:`.ResourceClosedError` 

2429 on subsequent use. 

2430 

2431 .. seealso:: 

2432 

2433 :ref:`connections_toplevel` 

2434 

2435 """ 

2436 self._soft_close(hard=True) 

2437 

2438 @_generative 

2439 def yield_per(self, num: int) -> Self: 

2440 self._yield_per = num 

2441 self.cursor_strategy.yield_per(self, self.cursor, num) 

2442 return self 

2443 

2444 

2445ResultProxy = CursorResult