Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

750 statements  

1# engine/cursor.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Define cursor-specific result set constructs including 

9:class:`.CursorResult`.""" 

10 

11 

12from __future__ import annotations 

13 

14import collections 

15import operator 

16import typing 

17from typing import Any 

18from typing import cast 

19from typing import ClassVar 

20from typing import Deque 

21from typing import Dict 

22from typing import Final 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Literal 

27from typing import Mapping 

28from typing import NoReturn 

29from typing import Optional 

30from typing import Sequence 

31from typing import Tuple 

32from typing import TYPE_CHECKING 

33from typing import Union 

34 

35from .result import IteratorResult 

36from .result import MergedResult 

37from .result import Result 

38from .result import ResultMetaData 

39from .result import SimpleResultMetaData 

40from .result import tuplegetter 

41from .row import Row 

42from .. import exc 

43from .. import util 

44from ..sql import elements 

45from ..sql import sqltypes 

46from ..sql import util as sql_util 

47from ..sql.base import _generative 

48from ..sql.compiler import ResultColumnsEntry 

49from ..sql.compiler import RM_NAME 

50from ..sql.compiler import RM_OBJECTS 

51from ..sql.compiler import RM_RENDERED_NAME 

52from ..sql.compiler import RM_TYPE 

53from ..sql.type_api import TypeEngine 

54from ..util.typing import Self 

55from ..util.typing import TupleAny 

56from ..util.typing import TypeVarTuple 

57from ..util.typing import Unpack 

58 

59 

60if typing.TYPE_CHECKING: 

61 from .base import Connection 

62 from .default import DefaultExecutionContext 

63 from .interfaces import _DBAPICursorDescription 

64 from .interfaces import _MutableCoreSingleExecuteParams 

65 from .interfaces import CoreExecuteOptionsParameter 

66 from .interfaces import DBAPICursor 

67 from .interfaces import DBAPIType 

68 from .interfaces import Dialect 

69 from .interfaces import ExecutionContext 

70 from .result import _KeyIndexType 

71 from .result import _KeyMapRecType 

72 from .result import _KeyMapType 

73 from .result import _KeyType 

74 from .result import _ProcessorsType 

75 from .result import _TupleGetterType 

76 from ..sql.schema import Column 

77 from ..sql.type_api import _ResultProcessorType 

78 

79 

80_Ts = TypeVarTuple("_Ts") 

81 

82 

83# metadata entry tuple indexes. 

84# using raw tuple is faster than namedtuple. 

85# these match up to the positions in 

86# _CursorKeyMapRecType 

87MD_INDEX: Final[Literal[0]] = 0 

88"""integer index in cursor.description 

89 

90""" 

91 

92MD_RESULT_MAP_INDEX: Final[Literal[1]] = 1 

93"""integer index in compiled._result_columns""" 

94 

95MD_OBJECTS: Final[Literal[2]] = 2 

96"""other string keys and ColumnElement obj that can match. 

97 

98This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects 

99 

100""" 

101 

102MD_LOOKUP_KEY: Final[Literal[3]] = 3 

103"""string key we usually expect for key-based lookup 

104 

105this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name 

106""" 

107 

108 

109MD_RENDERED_NAME: Final[Literal[4]] = 4 

110"""name that is usually in cursor.description 

111 

112this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname 

113""" 

114 

115 

116MD_PROCESSOR: Final[Literal[5]] = 5 

117"""callable to process a result value into a row""" 

118 

119MD_UNTRANSLATED: Final[Literal[6]] = 6 

120"""raw name from cursor.description""" 

121 

122 

123_CursorKeyMapRecType = Tuple[ 

124 Optional[int], # MD_INDEX, None means the record is ambiguously named 

125 int, # MD_RESULT_MAP_INDEX, -1 if MD_INDEX is None 

126 TupleAny, # MD_OBJECTS 

127 str, # MD_LOOKUP_KEY 

128 str, # MD_RENDERED_NAME 

129 Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR 

130 Optional[str], # MD_UNTRANSLATED 

131] 

132 

133_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType] 

134 

135# same as _CursorKeyMapRecType except the MD_INDEX value is definitely 

136# not None 

137_NonAmbigCursorKeyMapRecType = Tuple[ 

138 int, 

139 int, 

140 List[Any], 

141 str, 

142 str, 

143 Optional["_ResultProcessorType[Any]"], 

144 str, 

145] 

146 

147_MergeColTuple = Tuple[ 

148 int, 

149 Optional[int], 

150 str, 

151 TypeEngine[Any], 

152 "DBAPIType", 

153 Optional[TupleAny], 

154 Optional[str], 

155] 

156 

157 

158class CursorResultMetaData(ResultMetaData): 

159 """Result metadata for DBAPI cursors.""" 

160 

161 __slots__ = ( 

162 "_keymap", 

163 "_processors", 

164 "_keys", 

165 "_keymap_by_result_column_idx", 

166 "_tuplefilter", 

167 "_translated_indexes", 

168 "_safe_for_cache", 

169 "_unpickled", 

170 "_key_to_index", 

171 # don't need _unique_filters support here for now. Can be added 

172 # if a need arises. 

173 ) 

174 

175 _keymap: _CursorKeyMapType 

176 _processors: _ProcessorsType 

177 _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]] 

178 _unpickled: bool 

179 _safe_for_cache: bool 

180 _translated_indexes: Optional[List[int]] 

181 

182 returns_rows: ClassVar[bool] = True 

183 

184 def _has_key(self, key: Any) -> bool: 

185 return key in self._keymap 

186 

187 def _for_freeze(self) -> ResultMetaData: 

188 return SimpleResultMetaData( 

189 self._keys, 

190 extra=[self._keymap[key][MD_OBJECTS] for key in self._keys], 

191 ) 

192 

193 def _make_new_metadata( 

194 self, 

195 *, 

196 unpickled: bool, 

197 processors: _ProcessorsType, 

198 keys: Sequence[str], 

199 keymap: _KeyMapType, 

200 tuplefilter: Optional[_TupleGetterType], 

201 translated_indexes: Optional[List[int]], 

202 safe_for_cache: bool, 

203 keymap_by_result_column_idx: Any, 

204 ) -> Self: 

205 new_obj = self.__class__.__new__(self.__class__) 

206 new_obj._unpickled = unpickled 

207 new_obj._processors = processors 

208 new_obj._keys = keys 

209 new_obj._keymap = keymap 

210 new_obj._tuplefilter = tuplefilter 

211 new_obj._translated_indexes = translated_indexes 

212 new_obj._safe_for_cache = safe_for_cache 

213 new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx 

214 new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX) 

215 return new_obj 

216 

217 def _remove_processors_and_tuple_filter(self) -> Self: 

218 if self._tuplefilter: 

219 proc = self._tuplefilter(self._processors) 

220 else: 

221 proc = self._processors 

222 return self._make_new_metadata( 

223 unpickled=self._unpickled, 

224 processors=[None] * len(proc), 

225 tuplefilter=None, 

226 translated_indexes=None, 

227 keymap={ 

228 key: value[0:5] + (None,) + value[6:] 

229 for key, value in self._keymap.items() 

230 }, 

231 keys=self._keys, 

232 safe_for_cache=self._safe_for_cache, 

233 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

234 ) 

235 

236 def _splice_horizontally(self, other: CursorResultMetaData) -> Self: 

237 keymap = dict(self._keymap) 

238 offset = len(self._keys) 

239 

240 for key, value in other._keymap.items(): 

241 # int index should be None for ambiguous key 

242 if value[MD_INDEX] is not None and key not in keymap: 

243 md_index = value[MD_INDEX] + offset 

244 md_object = value[MD_RESULT_MAP_INDEX] + offset 

245 else: 

246 md_index = None 

247 md_object = -1 

248 keymap[key] = (md_index, md_object, *value[2:]) 

249 

250 self_tf = self._tuplefilter 

251 other_tf = other._tuplefilter 

252 

253 proc: List[Any] = [] 

254 for pp, tf in [ 

255 (self._processors, self_tf), 

256 (other._processors, other_tf), 

257 ]: 

258 proc.extend(pp if tf is None else tf(pp)) 

259 

260 new_keys = [*self._keys, *other._keys] 

261 assert len(proc) == len(new_keys) 

262 

263 return self._make_new_metadata( 

264 unpickled=self._unpickled, 

265 processors=proc, 

266 tuplefilter=None, 

267 translated_indexes=None, 

268 keys=new_keys, 

269 keymap=keymap, 

270 safe_for_cache=self._safe_for_cache, 

271 keymap_by_result_column_idx={ 

272 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

273 for metadata_entry in keymap.values() 

274 }, 

275 ) 

276 

277 def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self: 

278 recs = list(self._metadata_for_keys(keys)) 

279 

280 indexes = [rec[MD_INDEX] for rec in recs] 

281 new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] 

282 

283 if self._translated_indexes: 

284 indexes = [self._translated_indexes[idx] for idx in indexes] 

285 tup = tuplegetter(*indexes) 

286 new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] 

287 

288 keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} 

289 # TODO: need unit test for: 

290 # result = connection.execute("raw sql, no columns").scalars() 

291 # without the "or ()" it's failing because MD_OBJECTS is None 

292 keymap.update( 

293 (e, new_rec) 

294 for new_rec in new_recs 

295 for e in new_rec[MD_OBJECTS] or () 

296 ) 

297 

298 return self._make_new_metadata( 

299 unpickled=self._unpickled, 

300 processors=self._processors, 

301 keys=new_keys, 

302 tuplefilter=tup, 

303 translated_indexes=indexes, 

304 keymap=keymap, # type: ignore[arg-type] 

305 safe_for_cache=self._safe_for_cache, 

306 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

307 ) 

308 

309 def _adapt_to_context(self, context: ExecutionContext) -> Self: 

310 """When using a cached Compiled construct that has a _result_map, 

311 for a new statement that used the cached Compiled, we need to ensure 

312 the keymap has the Column objects from our new statement as keys. 

313 So here we rewrite keymap with new entries for the new columns 

314 as matched to those of the cached statement. 

315 

316 """ 

317 

318 if not context.compiled or not context.compiled._result_columns: 

319 return self 

320 

321 compiled_statement = context.compiled.statement 

322 invoked_statement = context.invoked_statement 

323 

324 if TYPE_CHECKING: 

325 assert isinstance(invoked_statement, elements.ClauseElement) 

326 

327 if compiled_statement is invoked_statement: 

328 return self 

329 

330 assert invoked_statement is not None 

331 

332 # this is the most common path for Core statements when 

333 # caching is used. In ORM use, this codepath is not really used 

334 # as the _result_disable_adapt_to_context execution option is 

335 # set by the ORM. 

336 

337 # make a copy and add the columns from the invoked statement 

338 # to the result map. 

339 

340 keymap_by_position = self._keymap_by_result_column_idx 

341 

342 if keymap_by_position is None: 

343 # first retrieval from cache, this map will not be set up yet, 

344 # initialize lazily 

345 keymap_by_position = self._keymap_by_result_column_idx = { 

346 metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry 

347 for metadata_entry in self._keymap.values() 

348 } 

349 

350 return self._make_new_metadata( 

351 keymap=self._keymap 

352 | { 

353 new: keymap_by_position[idx] 

354 for idx, new in enumerate( 

355 invoked_statement._all_selected_columns 

356 ) 

357 if idx in keymap_by_position 

358 }, 

359 unpickled=self._unpickled, 

360 processors=self._processors, 

361 tuplefilter=self._tuplefilter, 

362 translated_indexes=None, 

363 keys=self._keys, 

364 safe_for_cache=self._safe_for_cache, 

365 keymap_by_result_column_idx=self._keymap_by_result_column_idx, 

366 ) 

367 

368 def __init__( 

369 self, 

370 parent: CursorResult[Unpack[TupleAny]], 

371 cursor_description: _DBAPICursorDescription, 

372 *, 

373 driver_column_names: bool = False, 

374 num_sentinel_cols: int = 0, 

375 ): 

376 context = parent.context 

377 if num_sentinel_cols > 0: 

378 # this is slightly faster than letting tuplegetter use the indexes 

379 self._tuplefilter = tuplefilter = operator.itemgetter( 

380 slice(-num_sentinel_cols) 

381 ) 

382 cursor_description = tuplefilter(cursor_description) 

383 else: 

384 self._tuplefilter = tuplefilter = None 

385 self._translated_indexes = None 

386 self._safe_for_cache = self._unpickled = False 

387 

388 if context.result_column_struct: 

389 ( 

390 result_columns, 

391 cols_are_ordered, 

392 textual_ordered, 

393 ad_hoc_textual, 

394 loose_column_name_matching, 

395 ) = context.result_column_struct 

396 if tuplefilter is not None: 

397 result_columns = tuplefilter(result_columns) 

398 num_ctx_cols = len(result_columns) 

399 else: 

400 result_columns = cols_are_ordered = ( # type: ignore 

401 num_ctx_cols 

402 ) = ad_hoc_textual = loose_column_name_matching = ( 

403 textual_ordered 

404 ) = False 

405 

406 # merge cursor.description with the column info 

407 # present in the compiled structure, if any 

408 raw = self._merge_cursor_description( 

409 context, 

410 cursor_description, 

411 result_columns, 

412 num_ctx_cols, 

413 cols_are_ordered, 

414 textual_ordered, 

415 ad_hoc_textual, 

416 loose_column_name_matching, 

417 driver_column_names, 

418 ) 

419 

420 # processors in key order which are used when building up 

421 # a row 

422 self._processors = [ 

423 metadata_entry[MD_PROCESSOR] for metadata_entry in raw 

424 ] 

425 if num_sentinel_cols > 0: 

426 # add the number of sentinel columns since these are passed 

427 # to the tuplefilters before being used 

428 self._processors.extend([None] * num_sentinel_cols) 

429 

430 # this is used when using this ResultMetaData in a Core-only cache 

431 # retrieval context. it's initialized on first cache retrieval 

432 # when the _result_disable_adapt_to_context execution option 

433 # (which the ORM generally sets) is not set. 

434 self._keymap_by_result_column_idx = None 

435 

436 # for compiled SQL constructs, copy additional lookup keys into 

437 # the key lookup map, such as Column objects, labels, 

438 # column keys and other names 

439 if num_ctx_cols: 

440 # keymap by primary string... 

441 by_key: Dict[_KeyType, _CursorKeyMapRecType] = { 

442 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

443 for metadata_entry in raw 

444 } 

445 

446 if len(by_key) != num_ctx_cols: 

447 # if by-primary-string dictionary smaller than 

448 # number of columns, assume we have dupes; (this check 

449 # is also in place if string dictionary is bigger, as 

450 # can occur when '*' was used as one of the compiled columns, 

451 # which may or may not be suggestive of dupes), rewrite 

452 # dupe records with "None" for index which results in 

453 # ambiguous column exception when accessed. 

454 # 

455 # this is considered to be the less common case as it is not 

456 # common to have dupe column keys in a SELECT statement. 

457 # 

458 # new in 1.4: get the complete set of all possible keys, 

459 # strings, objects, whatever, that are dupes across two 

460 # different records, first. 

461 index_by_key: Dict[Any, Any] = {} 

462 dupes = set() 

463 for metadata_entry in raw: 

464 for key in (metadata_entry[MD_RENDERED_NAME],) + ( 

465 metadata_entry[MD_OBJECTS] or () 

466 ): 

467 idx = metadata_entry[MD_INDEX] 

468 # if this key has been associated with more than one 

469 # positional index, it's a dupe 

470 if index_by_key.setdefault(key, idx) != idx: 

471 dupes.add(key) 

472 

473 # then put everything we have into the keymap excluding only 

474 # those keys that are dupes. 

475 self._keymap = { 

476 obj_elem: metadata_entry 

477 for metadata_entry in raw 

478 if metadata_entry[MD_OBJECTS] 

479 for obj_elem in metadata_entry[MD_OBJECTS] 

480 if obj_elem not in dupes 

481 } 

482 

483 # then for the dupe keys, put the "ambiguous column" 

484 # record into by_key. 

485 by_key.update( 

486 { 

487 key: (None, -1, (), key, key, None, None) 

488 for key in dupes 

489 } 

490 ) 

491 

492 else: 

493 # no dupes - copy secondary elements from compiled 

494 # columns into self._keymap. this is the most common 

495 # codepath for Core / ORM statement executions before the 

496 # result metadata is cached 

497 self._keymap = { 

498 obj_elem: metadata_entry 

499 for metadata_entry in raw 

500 if metadata_entry[MD_OBJECTS] 

501 for obj_elem in metadata_entry[MD_OBJECTS] 

502 } 

503 # update keymap with primary string names taking 

504 # precedence 

505 self._keymap.update(by_key) 

506 else: 

507 # no compiled objects to map, just create keymap by primary string 

508 self._keymap = { 

509 metadata_entry[MD_LOOKUP_KEY]: metadata_entry 

510 for metadata_entry in raw 

511 } 

512 

513 # update keymap with "translated" names. 

514 # the "translated" name thing has a long history: 

515 # 1. originally, it was used to fix an issue in very old SQLite 

516 # versions prior to 3.10.0. This code is still there in the 

517 # sqlite dialect. 

518 # 2. Next, the pyhive third party dialect started using this hook 

519 # for some driver related issue on their end. 

520 # 3. Most recently, the "driver_column_names" execution option has 

521 # taken advantage of this hook to get raw DBAPI col names in the 

522 # result keys without disrupting the usual merge process. 

523 

524 if driver_column_names or ( 

525 not num_ctx_cols and context._translate_colname 

526 ): 

527 self._keymap.update( 

528 { 

529 metadata_entry[MD_UNTRANSLATED]: self._keymap[ 

530 metadata_entry[MD_LOOKUP_KEY] 

531 ] 

532 for metadata_entry in raw 

533 if metadata_entry[MD_UNTRANSLATED] 

534 } 

535 ) 

536 

537 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

538 

539 def _merge_cursor_description( 

540 self, 

541 context: DefaultExecutionContext, 

542 cursor_description: _DBAPICursorDescription, 

543 result_columns: Sequence[ResultColumnsEntry], 

544 num_ctx_cols: int, 

545 cols_are_ordered: bool, 

546 textual_ordered: bool, 

547 ad_hoc_textual: bool, 

548 loose_column_name_matching: bool, 

549 driver_column_names: bool, 

550 ) -> List[_CursorKeyMapRecType]: 

551 """Merge a cursor.description with compiled result column information. 

552 

553 There are at least four separate strategies used here, selected 

554 depending on the type of SQL construct used to start with. 

555 

556 The most common case is that of the compiled SQL expression construct, 

557 which generated the column names present in the raw SQL string and 

558 which has the identical number of columns as were reported by 

559 cursor.description. In this case, we assume a 1-1 positional mapping 

560 between the entries in cursor.description and the compiled object. 

561 This is also the most performant case as we disregard extracting / 

562 decoding the column names present in cursor.description since we 

563 already have the desired name we generated in the compiled SQL 

564 construct. 

565 

566 The next common case is that of the completely raw string SQL, 

567 such as passed to connection.execute(). In this case we have no 

568 compiled construct to work with, so we extract and decode the 

569 names from cursor.description and index those as the primary 

570 result row target keys. 

571 

572 The remaining fairly common case is that of the textual SQL 

573 that includes at least partial column information; this is when 

574 we use a :class:`_expression.TextualSelect` construct. 

575 This construct may have 

576 unordered or ordered column information. In the ordered case, we 

577 merge the cursor.description and the compiled construct's information 

578 positionally, and warn if there are additional description names 

579 present, however we still decode the names in cursor.description 

580 as we don't have a guarantee that the names in the columns match 

581 on these. In the unordered case, we match names in cursor.description 

582 to that of the compiled construct based on name matching. 

583 In both of these cases, the cursor.description names and the column 

584 expression objects and names are indexed as result row target keys. 

585 

586 The final case is much less common, where we have a compiled 

587 non-textual SQL expression construct, but the number of columns 

588 in cursor.description doesn't match what's in the compiled 

589 construct. We make the guess here that there might be textual 

590 column expressions in the compiled construct that themselves include 

591 a comma in them causing them to split. We do the same name-matching 

592 as with textual non-ordered columns. 

593 

594 The name-matched system of merging is the same as that used by 

595 SQLAlchemy for all cases up through the 0.9 series. Positional 

596 matching for compiled SQL expressions was introduced in 1.0 as a 

597 major performance feature, and positional matching for textual 

598 :class:`_expression.TextualSelect` objects in 1.1. 

599 As name matching is no longer 

600 a common case, it was acceptable to factor it into smaller generator- 

601 oriented methods that are easier to understand, but incur slightly 

602 more performance overhead. 

603 

604 """ 

605 

606 if ( 

607 num_ctx_cols 

608 and cols_are_ordered 

609 and not textual_ordered 

610 and num_ctx_cols == len(cursor_description) 

611 and not driver_column_names 

612 ): 

613 self._keys = [elem[0] for elem in result_columns] 

614 # pure positional 1-1 case; doesn't need to read 

615 # the names from cursor.description 

616 

617 # most common case for Core and ORM 

618 

619 # this metadata is safe to 

620 # cache because we are guaranteed 

621 # to have the columns in the same order for new executions 

622 self._safe_for_cache = True 

623 

624 return [ 

625 ( 

626 idx, 

627 idx, 

628 rmap_entry[RM_OBJECTS], 

629 rmap_entry[RM_NAME], 

630 rmap_entry[RM_RENDERED_NAME], 

631 context.get_result_processor( 

632 rmap_entry[RM_TYPE], 

633 rmap_entry[RM_RENDERED_NAME], 

634 cursor_description[idx][1], 

635 ), 

636 None, 

637 ) 

638 for idx, rmap_entry in enumerate(result_columns) 

639 ] 

640 else: 

641 # name-based or text-positional cases, where we need 

642 # to read cursor.description names 

643 

644 if textual_ordered or ( 

645 ad_hoc_textual and len(cursor_description) == num_ctx_cols 

646 ): 

647 self._safe_for_cache = not driver_column_names 

648 # textual positional case 

649 raw_iterator = self._merge_textual_cols_by_position( 

650 context, 

651 cursor_description, 

652 result_columns, 

653 driver_column_names, 

654 ) 

655 elif num_ctx_cols: 

656 # compiled SQL with a mismatch of description cols 

657 # vs. compiled cols, or textual w/ unordered columns 

658 # the order of columns can change if the query is 

659 # against a "select *", so not safe to cache 

660 self._safe_for_cache = False 

661 raw_iterator = self._merge_cols_by_name( 

662 context, 

663 cursor_description, 

664 result_columns, 

665 loose_column_name_matching, 

666 driver_column_names, 

667 ) 

668 else: 

669 # no compiled SQL, just a raw string, order of columns 

670 # can change for "select *" 

671 self._safe_for_cache = False 

672 raw_iterator = self._merge_cols_by_none( 

673 context, cursor_description, driver_column_names 

674 ) 

675 

676 return [ 

677 ( 

678 idx, 

679 ridx, 

680 obj, 

681 cursor_colname, 

682 cursor_colname, 

683 context.get_result_processor( 

684 mapped_type, cursor_colname, coltype 

685 ), 

686 untranslated, 

687 ) # type: ignore[misc] 

688 for ( 

689 idx, 

690 ridx, 

691 cursor_colname, 

692 mapped_type, 

693 coltype, 

694 obj, 

695 untranslated, 

696 ) in raw_iterator 

697 ] 

698 

699 def _colnames_from_description( 

700 self, 

701 context: DefaultExecutionContext, 

702 cursor_description: _DBAPICursorDescription, 

703 driver_column_names: bool, 

704 ) -> Iterator[Tuple[int, str, str, Optional[str], DBAPIType]]: 

705 """Extract column names and data types from a cursor.description. 

706 

707 Applies unicode decoding, column translation, "normalization", 

708 and case sensitivity rules to the names based on the dialect. 

709 

710 """ 

711 dialect = context.dialect 

712 translate_colname = context._translate_colname 

713 normalize_name = ( 

714 dialect.normalize_name if dialect.requires_name_normalize else None 

715 ) 

716 

717 untranslated = None 

718 

719 for idx, rec in enumerate(cursor_description): 

720 colname = unnormalized = rec[0] 

721 coltype = rec[1] 

722 

723 if translate_colname: 

724 # a None here for "untranslated" means "the dialect did not 

725 # change the column name and the untranslated case can be 

726 # ignored". otherwise "untranslated" is expected to be the 

727 # original, unchanged colname (e.g. is == to "unnormalized") 

728 colname, untranslated = translate_colname(colname) 

729 

730 assert untranslated is None or untranslated == unnormalized 

731 

732 if normalize_name: 

733 colname = normalize_name(colname) 

734 

735 if driver_column_names: 

736 yield idx, colname, unnormalized, unnormalized, coltype 

737 

738 else: 

739 yield idx, colname, unnormalized, untranslated, coltype 

740 

741 def _merge_textual_cols_by_position( 

742 self, 

743 context: DefaultExecutionContext, 

744 cursor_description: _DBAPICursorDescription, 

745 result_columns: Sequence[ResultColumnsEntry], 

746 driver_column_names: bool, 

747 ) -> Iterator[_MergeColTuple]: 

748 num_ctx_cols = len(result_columns) 

749 

750 if num_ctx_cols > len(cursor_description): 

751 util.warn( 

752 "Number of columns in textual SQL (%d) is " 

753 "smaller than number of columns requested (%d)" 

754 % (num_ctx_cols, len(cursor_description)) 

755 ) 

756 seen = set() 

757 

758 self._keys = [] 

759 

760 uses_denormalize = context.dialect.requires_name_normalize 

761 for ( 

762 idx, 

763 colname, 

764 unnormalized, 

765 untranslated, 

766 coltype, 

767 ) in self._colnames_from_description( 

768 context, cursor_description, driver_column_names 

769 ): 

770 if idx < num_ctx_cols: 

771 ctx_rec = result_columns[idx] 

772 obj = ctx_rec[RM_OBJECTS] 

773 ridx = idx 

774 mapped_type = ctx_rec[RM_TYPE] 

775 if obj[0] in seen: 

776 raise exc.InvalidRequestError( 

777 "Duplicate column expression requested " 

778 "in textual SQL: %r" % obj[0] 

779 ) 

780 seen.add(obj[0]) 

781 

782 # special check for all uppercase unnormalized name; 

783 # use the unnormalized name as the key. 

784 # see #10788 

785 # if these names don't match, then we still honor the 

786 # cursor.description name as the key and not what the 

787 # Column has, see 

788 # test_resultset.py::PositionalTextTest::test_via_column 

789 if ( 

790 uses_denormalize 

791 and unnormalized == ctx_rec[RM_RENDERED_NAME] 

792 ): 

793 result_name = unnormalized 

794 else: 

795 result_name = colname 

796 else: 

797 mapped_type = sqltypes.NULLTYPE 

798 obj = None 

799 ridx = None 

800 

801 result_name = colname 

802 

803 if driver_column_names: 

804 assert untranslated is not None 

805 self._keys.append(untranslated) 

806 else: 

807 self._keys.append(result_name) 

808 

809 yield ( 

810 idx, 

811 ridx, 

812 result_name, 

813 mapped_type, 

814 coltype, 

815 obj, 

816 untranslated, 

817 ) 

818 

819 def _merge_cols_by_name( 

820 self, 

821 context: DefaultExecutionContext, 

822 cursor_description: _DBAPICursorDescription, 

823 result_columns: Sequence[ResultColumnsEntry], 

824 loose_column_name_matching: bool, 

825 driver_column_names: bool, 

826 ) -> Iterator[_MergeColTuple]: 

827 match_map = self._create_description_match_map( 

828 result_columns, loose_column_name_matching 

829 ) 

830 mapped_type: TypeEngine[Any] 

831 

832 self._keys = [] 

833 

834 for ( 

835 idx, 

836 colname, 

837 unnormalized, 

838 untranslated, 

839 coltype, 

840 ) in self._colnames_from_description( 

841 context, cursor_description, driver_column_names 

842 ): 

843 try: 

844 ctx_rec = match_map[colname] 

845 except KeyError: 

846 mapped_type = sqltypes.NULLTYPE 

847 obj = None 

848 result_columns_idx = None 

849 else: 

850 obj = ctx_rec[1] 

851 mapped_type = ctx_rec[2] 

852 result_columns_idx = ctx_rec[3] 

853 

854 if driver_column_names: 

855 assert untranslated is not None 

856 self._keys.append(untranslated) 

857 else: 

858 self._keys.append(colname) 

859 yield ( 

860 idx, 

861 result_columns_idx, 

862 colname, 

863 mapped_type, 

864 coltype, 

865 obj, 

866 untranslated, 

867 ) 

868 

869 @classmethod 

870 def _create_description_match_map( 

871 cls, 

872 result_columns: Sequence[ResultColumnsEntry], 

873 loose_column_name_matching: bool = False, 

874 ) -> Dict[Union[str, object], Tuple[str, TupleAny, TypeEngine[Any], int]]: 

875 """when matching cursor.description to a set of names that are present 

876 in a Compiled object, as is the case with TextualSelect, get all the 

877 names we expect might match those in cursor.description. 

878 """ 

879 

880 d: Dict[ 

881 Union[str, object], 

882 Tuple[str, TupleAny, TypeEngine[Any], int], 

883 ] = {} 

884 for ridx, elem in enumerate(result_columns): 

885 key = elem[RM_RENDERED_NAME] 

886 

887 if key in d: 

888 # conflicting keyname - just add the column-linked objects 

889 # to the existing record. if there is a duplicate column 

890 # name in the cursor description, this will allow all of those 

891 # objects to raise an ambiguous column error 

892 e_name, e_obj, e_type, e_ridx = d[key] 

893 d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx 

894 else: 

895 d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx) 

896 

897 if loose_column_name_matching: 

898 # when using a textual statement with an unordered set 

899 # of columns that line up, we are expecting the user 

900 # to be using label names in the SQL that match to the column 

901 # expressions. Enable more liberal matching for this case; 

902 # duplicate keys that are ambiguous will be fixed later. 

903 for r_key in elem[RM_OBJECTS]: 

904 d.setdefault( 

905 r_key, 

906 (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx), 

907 ) 

908 return d 

909 

910 def _merge_cols_by_none( 

911 self, 

912 context: DefaultExecutionContext, 

913 cursor_description: _DBAPICursorDescription, 

914 driver_column_names: bool, 

915 ) -> Iterator[_MergeColTuple]: 

916 self._keys = [] 

917 

918 for ( 

919 idx, 

920 colname, 

921 unnormalized, 

922 untranslated, 

923 coltype, 

924 ) in self._colnames_from_description( 

925 context, cursor_description, driver_column_names 

926 ): 

927 

928 if driver_column_names: 

929 assert untranslated is not None 

930 self._keys.append(untranslated) 

931 else: 

932 self._keys.append(colname) 

933 

934 yield ( 

935 idx, 

936 None, 

937 colname, 

938 sqltypes.NULLTYPE, 

939 coltype, 

940 None, 

941 untranslated, 

942 ) 

943 

944 if not TYPE_CHECKING: 

945 

946 def _key_fallback( 

947 self, key: Any, err: Optional[Exception], raiseerr: bool = True 

948 ) -> Optional[NoReturn]: 

949 if raiseerr: 

950 if self._unpickled and isinstance(key, elements.ColumnElement): 

951 raise exc.NoSuchColumnError( 

952 "Row was unpickled; lookup by ColumnElement " 

953 "is unsupported" 

954 ) from err 

955 else: 

956 raise exc.NoSuchColumnError( 

957 "Could not locate column in row for column '%s'" 

958 % util.string_or_unprintable(key) 

959 ) from err 

960 else: 

961 return None 

962 

963 def _raise_for_ambiguous_column_name( 

964 self, rec: _KeyMapRecType 

965 ) -> NoReturn: 

966 raise exc.InvalidRequestError( 

967 "Ambiguous column name '%s' in " 

968 "result set column descriptions" % rec[MD_LOOKUP_KEY] 

969 ) 

970 

971 def _index_for_key( 

972 self, key: _KeyIndexType, raiseerr: bool = True 

973 ) -> Optional[int]: 

974 # TODO: can consider pre-loading ints and negative ints 

975 # into _keymap - also no coverage here 

976 if isinstance(key, int): 

977 key = self._keys[key] 

978 

979 try: 

980 rec = self._keymap[key] 

981 except KeyError as ke: 

982 x = self._key_fallback(key, ke, raiseerr) 

983 assert x is None 

984 return None 

985 

986 index = rec[0] 

987 

988 if index is None: 

989 self._raise_for_ambiguous_column_name(rec) 

990 return index 

991 

992 def _indexes_for_keys( 

993 self, keys: Sequence[_KeyIndexType] 

994 ) -> Sequence[int]: 

995 try: 

996 return [self._keymap[key][0] for key in keys] # type: ignore[index,misc] # noqa: E501 

997 except KeyError as ke: 

998 # ensure it raises 

999 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

1000 

1001 def _metadata_for_keys( 

1002 self, keys: Sequence[_KeyIndexType] 

1003 ) -> Iterator[_NonAmbigCursorKeyMapRecType]: 

1004 for key in keys: 

1005 if isinstance(key, int): 

1006 key = self._keys[key] 

1007 

1008 try: 

1009 rec = self._keymap[key] 

1010 except KeyError as ke: 

1011 # ensure it raises 

1012 CursorResultMetaData._key_fallback(self, ke.args[0], ke) 

1013 

1014 index = rec[MD_INDEX] 

1015 

1016 if index is None: 

1017 self._raise_for_ambiguous_column_name(rec) 

1018 

1019 yield cast(_NonAmbigCursorKeyMapRecType, rec) 

1020 

1021 def __getstate__(self) -> Dict[str, Any]: 

1022 # TODO: consider serializing this as SimpleResultMetaData 

1023 return { 

1024 "_keymap": { 

1025 key: ( 

1026 rec[MD_INDEX], 

1027 rec[MD_RESULT_MAP_INDEX], 

1028 [], 

1029 key, 

1030 rec[MD_RENDERED_NAME], 

1031 None, 

1032 None, 

1033 ) 

1034 for key, rec in self._keymap.items() 

1035 if isinstance(key, (str, int)) 

1036 }, 

1037 "_keys": self._keys, 

1038 "_translated_indexes": self._translated_indexes, 

1039 } 

1040 

1041 def __setstate__(self, state: Dict[str, Any]) -> None: 

1042 self._processors = [None for _ in range(len(state["_keys"]))] 

1043 self._keymap = state["_keymap"] 

1044 self._keymap_by_result_column_idx = None 

1045 self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) 

1046 self._keys = state["_keys"] 

1047 self._unpickled = True 

1048 if state["_translated_indexes"]: 

1049 translated_indexes: List[Any] 

1050 self._translated_indexes = translated_indexes = state[ 

1051 "_translated_indexes" 

1052 ] 

1053 self._tuplefilter = tuplegetter(*translated_indexes) 

1054 else: 

1055 self._translated_indexes = self._tuplefilter = None 

1056 

1057 

1058class ResultFetchStrategy: 

1059 """Define a fetching strategy for a result object. 

1060 

1061 

1062 .. versionadded:: 1.4 

1063 

1064 """ 

1065 

1066 __slots__ = () 

1067 

1068 alternate_cursor_description: Optional[_DBAPICursorDescription] = None 

1069 

1070 def soft_close( 

1071 self, 

1072 result: CursorResult[Unpack[TupleAny]], 

1073 dbapi_cursor: Optional[DBAPICursor], 

1074 ) -> None: 

1075 raise NotImplementedError() 

1076 

1077 def hard_close( 

1078 self, 

1079 result: CursorResult[Unpack[TupleAny]], 

1080 dbapi_cursor: Optional[DBAPICursor], 

1081 ) -> None: 

1082 raise NotImplementedError() 

1083 

1084 def yield_per( 

1085 self, 

1086 result: CursorResult[Unpack[TupleAny]], 

1087 dbapi_cursor: DBAPICursor, 

1088 num: int, 

1089 ) -> None: 

1090 return 

1091 

1092 def fetchone( 

1093 self, 

1094 result: CursorResult[Unpack[TupleAny]], 

1095 dbapi_cursor: DBAPICursor, 

1096 hard_close: bool = False, 

1097 ) -> Any: 

1098 raise NotImplementedError() 

1099 

1100 def fetchmany( 

1101 self, 

1102 result: CursorResult[Unpack[TupleAny]], 

1103 dbapi_cursor: DBAPICursor, 

1104 size: Optional[int] = None, 

1105 ) -> Any: 

1106 raise NotImplementedError() 

1107 

1108 def fetchall( 

1109 self, 

1110 result: CursorResult[Unpack[TupleAny]], 

1111 dbapi_cursor: DBAPICursor, 

1112 ) -> Any: 

1113 raise NotImplementedError() 

1114 

1115 def handle_exception( 

1116 self, 

1117 result: CursorResult[Unpack[TupleAny]], 

1118 dbapi_cursor: Optional[DBAPICursor], 

1119 err: BaseException, 

1120 ) -> NoReturn: 

1121 raise err 

1122 

1123 

1124class NoCursorFetchStrategy(ResultFetchStrategy): 

1125 """Cursor strategy for a result that has no open cursor. 

1126 

1127 There are two varieties of this strategy, one for DQL and one for 

1128 DML (and also DDL), each of which represent a result that had a cursor 

1129 but no longer has one. 

1130 

1131 """ 

1132 

1133 __slots__ = () 

1134 

1135 def soft_close( 

1136 self, 

1137 result: CursorResult[Unpack[TupleAny]], 

1138 dbapi_cursor: Optional[DBAPICursor], 

1139 ) -> None: 

1140 pass 

1141 

1142 def hard_close( 

1143 self, 

1144 result: CursorResult[Unpack[TupleAny]], 

1145 dbapi_cursor: Optional[DBAPICursor], 

1146 ) -> None: 

1147 pass 

1148 

1149 def fetchone( 

1150 self, 

1151 result: CursorResult[Unpack[TupleAny]], 

1152 dbapi_cursor: DBAPICursor, 

1153 hard_close: bool = False, 

1154 ) -> Any: 

1155 return self._non_result(result, None) 

1156 

1157 def fetchmany( 

1158 self, 

1159 result: CursorResult[Unpack[TupleAny]], 

1160 dbapi_cursor: DBAPICursor, 

1161 size: Optional[int] = None, 

1162 ) -> Any: 

1163 return self._non_result(result, []) 

1164 

1165 def fetchall( 

1166 self, result: CursorResult[Unpack[TupleAny]], dbapi_cursor: DBAPICursor 

1167 ) -> Any: 

1168 return self._non_result(result, []) 

1169 

1170 def _non_result( 

1171 self, 

1172 result: CursorResult[Unpack[TupleAny]], 

1173 default: Any, 

1174 err: Optional[BaseException] = None, 

1175 ) -> Any: 

1176 raise NotImplementedError() 

1177 

1178 

1179class NoCursorDQLFetchStrategy(NoCursorFetchStrategy): 

1180 """Cursor strategy for a DQL result that has no open cursor. 

1181 

1182 This is a result set that can return rows, i.e. for a SELECT, or for an 

1183 INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state 

1184 where the cursor is closed and no rows remain available. The owning result 

1185 object may or may not be "hard closed", which determines if the fetch 

1186 methods send empty results or raise for closed result. 

1187 

1188 """ 

1189 

1190 __slots__ = () 

1191 

1192 def _non_result( 

1193 self, 

1194 result: CursorResult[Unpack[TupleAny]], 

1195 default: Any, 

1196 err: Optional[BaseException] = None, 

1197 ) -> Any: 

1198 if result.closed: 

1199 raise exc.ResourceClosedError( 

1200 "This result object is closed." 

1201 ) from err 

1202 else: 

1203 return default 

1204 

1205 

1206_NO_CURSOR_DQL = NoCursorDQLFetchStrategy() 

1207 

1208 

1209class NoCursorDMLFetchStrategy(NoCursorFetchStrategy): 

1210 """Cursor strategy for a DML result that has no open cursor. 

1211 

1212 This is a result set that does not return rows, i.e. for an INSERT, 

1213 UPDATE, DELETE that does not include RETURNING. 

1214 

1215 """ 

1216 

1217 __slots__ = () 

1218 

1219 def _non_result( 

1220 self, 

1221 result: CursorResult[Unpack[TupleAny]], 

1222 default: Any, 

1223 err: Optional[BaseException] = None, 

1224 ) -> Any: 

1225 # we only expect to have a _NoResultMetaData() here right now. 

1226 assert not result._metadata.returns_rows 

1227 result._metadata._we_dont_return_rows(err) # type: ignore[union-attr] 

1228 

1229 

1230_NO_CURSOR_DML = NoCursorDMLFetchStrategy() 

1231 

1232 

1233class CursorFetchStrategy(ResultFetchStrategy): 

1234 """Call fetch methods from a DBAPI cursor. 

1235 

1236 Alternate versions of this class may instead buffer the rows from 

1237 cursors or not use cursors at all. 

1238 

1239 """ 

1240 

1241 __slots__ = () 

1242 

1243 def soft_close( 

1244 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1245 ) -> None: 

1246 result.cursor_strategy = _NO_CURSOR_DQL 

1247 

1248 def hard_close( 

1249 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1250 ) -> None: 

1251 result.cursor_strategy = _NO_CURSOR_DQL 

1252 

1253 def handle_exception( 

1254 self, 

1255 result: CursorResult[Any], 

1256 dbapi_cursor: Optional[DBAPICursor], 

1257 err: BaseException, 

1258 ) -> NoReturn: 

1259 result.connection._handle_dbapi_exception( 

1260 err, None, None, dbapi_cursor, result.context 

1261 ) 

1262 

1263 def yield_per( 

1264 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1265 ) -> None: 

1266 result.cursor_strategy = BufferedRowCursorFetchStrategy( 

1267 dbapi_cursor, 

1268 {"max_row_buffer": num}, 

1269 initial_buffer=collections.deque(), 

1270 growth_factor=0, 

1271 ) 

1272 

1273 def fetchone( 

1274 self, 

1275 result: CursorResult[Any], 

1276 dbapi_cursor: DBAPICursor, 

1277 hard_close: bool = False, 

1278 ) -> Any: 

1279 try: 

1280 row = dbapi_cursor.fetchone() 

1281 if row is None: 

1282 result._soft_close(hard=hard_close) 

1283 return row 

1284 except BaseException as e: 

1285 self.handle_exception(result, dbapi_cursor, e) 

1286 

1287 def fetchmany( 

1288 self, 

1289 result: CursorResult[Any], 

1290 dbapi_cursor: DBAPICursor, 

1291 size: Optional[int] = None, 

1292 ) -> Any: 

1293 try: 

1294 if size is None: 

1295 l = dbapi_cursor.fetchmany() 

1296 else: 

1297 l = dbapi_cursor.fetchmany(size) 

1298 

1299 if not l: 

1300 result._soft_close() 

1301 return l 

1302 except BaseException as e: 

1303 self.handle_exception(result, dbapi_cursor, e) 

1304 

1305 def fetchall( 

1306 self, 

1307 result: CursorResult[Any], 

1308 dbapi_cursor: DBAPICursor, 

1309 ) -> Any: 

1310 try: 

1311 rows = dbapi_cursor.fetchall() 

1312 result._soft_close() 

1313 return rows 

1314 except BaseException as e: 

1315 self.handle_exception(result, dbapi_cursor, e) 

1316 

1317 

1318_DEFAULT_FETCH = CursorFetchStrategy() 

1319 

1320 

1321class BufferedRowCursorFetchStrategy(CursorFetchStrategy): 

1322 """A cursor fetch strategy with row buffering behavior. 

1323 

1324 This strategy buffers the contents of a selection of rows 

1325 before ``fetchone()`` is called. This is to allow the results of 

1326 ``cursor.description`` to be available immediately, when 

1327 interfacing with a DB-API that requires rows to be consumed before 

1328 this information is available (currently psycopg2, when used with 

1329 server-side cursors). 

1330 

1331 The pre-fetching behavior fetches only one row initially, and then 

1332 grows its buffer size by a fixed amount with each successive need 

1333 for additional rows up the ``max_row_buffer`` size, which defaults 

1334 to 1000:: 

1335 

1336 with psycopg2_engine.connect() as conn: 

1337 

1338 result = conn.execution_options( 

1339 stream_results=True, max_row_buffer=50 

1340 ).execute(text("select * from table")) 

1341 

1342 .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. 

1343 

1344 .. seealso:: 

1345 

1346 :ref:`psycopg2_execution_options` 

1347 """ 

1348 

1349 __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor") 

1350 

1351 def __init__( 

1352 self, 

1353 dbapi_cursor: DBAPICursor, 

1354 execution_options: CoreExecuteOptionsParameter, 

1355 growth_factor: int = 5, 

1356 initial_buffer: Optional[Deque[Any]] = None, 

1357 ) -> None: 

1358 self._max_row_buffer = execution_options.get("max_row_buffer", 1000) 

1359 

1360 if initial_buffer is not None: 

1361 self._rowbuffer = initial_buffer 

1362 else: 

1363 self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) 

1364 self._growth_factor = growth_factor 

1365 

1366 if growth_factor: 

1367 self._bufsize = min(self._max_row_buffer, self._growth_factor) 

1368 else: 

1369 self._bufsize = self._max_row_buffer 

1370 

1371 @classmethod 

1372 def create( 

1373 cls, result: CursorResult[Any] 

1374 ) -> BufferedRowCursorFetchStrategy: 

1375 return BufferedRowCursorFetchStrategy( 

1376 result.cursor, 

1377 result.context.execution_options, 

1378 ) 

1379 

1380 def _buffer_rows( 

1381 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1382 ) -> None: 

1383 """this is currently used only by fetchone().""" 

1384 

1385 size = self._bufsize 

1386 try: 

1387 if size < 1: 

1388 new_rows = dbapi_cursor.fetchall() 

1389 else: 

1390 new_rows = dbapi_cursor.fetchmany(size) 

1391 except BaseException as e: 

1392 self.handle_exception(result, dbapi_cursor, e) 

1393 

1394 if not new_rows: 

1395 return 

1396 self._rowbuffer = collections.deque(new_rows) 

1397 if self._growth_factor and size < self._max_row_buffer: 

1398 self._bufsize = min( 

1399 self._max_row_buffer, size * self._growth_factor 

1400 ) 

1401 

1402 def yield_per( 

1403 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1404 ) -> None: 

1405 self._growth_factor = 0 

1406 self._max_row_buffer = self._bufsize = num 

1407 

1408 def soft_close( 

1409 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1410 ) -> None: 

1411 self._rowbuffer.clear() 

1412 super().soft_close(result, dbapi_cursor) 

1413 

1414 def hard_close( 

1415 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1416 ) -> None: 

1417 self._rowbuffer.clear() 

1418 super().hard_close(result, dbapi_cursor) 

1419 

1420 def fetchone( 

1421 self, 

1422 result: CursorResult[Any], 

1423 dbapi_cursor: DBAPICursor, 

1424 hard_close: bool = False, 

1425 ) -> Any: 

1426 if not self._rowbuffer: 

1427 self._buffer_rows(result, dbapi_cursor) 

1428 if not self._rowbuffer: 

1429 try: 

1430 result._soft_close(hard=hard_close) 

1431 except BaseException as e: 

1432 self.handle_exception(result, dbapi_cursor, e) 

1433 return None 

1434 return self._rowbuffer.popleft() 

1435 

1436 def fetchmany( 

1437 self, 

1438 result: CursorResult[Any], 

1439 dbapi_cursor: DBAPICursor, 

1440 size: Optional[int] = None, 

1441 ) -> Any: 

1442 if size is None: 

1443 return self.fetchall(result, dbapi_cursor) 

1444 

1445 rb = self._rowbuffer 

1446 lb = len(rb) 

1447 close = False 

1448 if size > lb: 

1449 try: 

1450 new = dbapi_cursor.fetchmany(size - lb) 

1451 except BaseException as e: 

1452 self.handle_exception(result, dbapi_cursor, e) 

1453 else: 

1454 if not new: 

1455 # defer closing since it may clear the row buffer 

1456 close = True 

1457 else: 

1458 rb.extend(new) 

1459 

1460 res = [rb.popleft() for _ in range(min(size, len(rb)))] 

1461 if close: 

1462 result._soft_close() 

1463 return res 

1464 

1465 def fetchall( 

1466 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1467 ) -> Any: 

1468 try: 

1469 ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) 

1470 self._rowbuffer.clear() 

1471 result._soft_close() 

1472 return ret 

1473 except BaseException as e: 

1474 self.handle_exception(result, dbapi_cursor, e) 

1475 

1476 

1477class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): 

1478 """A cursor strategy that buffers rows fully upon creation. 

1479 

1480 Used for operations where a result is to be delivered 

1481 after the database conversation can not be continued, 

1482 such as MSSQL INSERT...OUTPUT after an autocommit. 

1483 

1484 """ 

1485 

1486 __slots__ = ("_rowbuffer", "alternate_cursor_description") 

1487 

1488 def __init__( 

1489 self, 

1490 dbapi_cursor: Optional[DBAPICursor], 

1491 alternate_description: Optional[_DBAPICursorDescription] = None, 

1492 initial_buffer: Optional[Iterable[Any]] = None, 

1493 ): 

1494 self.alternate_cursor_description = alternate_description 

1495 if initial_buffer is not None: 

1496 self._rowbuffer = collections.deque(initial_buffer) 

1497 else: 

1498 assert dbapi_cursor is not None 

1499 self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) 

1500 

1501 def yield_per( 

1502 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor, num: int 

1503 ) -> Any: 

1504 pass 

1505 

1506 def soft_close( 

1507 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1508 ) -> None: 

1509 self._rowbuffer.clear() 

1510 super().soft_close(result, dbapi_cursor) 

1511 

1512 def hard_close( 

1513 self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] 

1514 ) -> None: 

1515 self._rowbuffer.clear() 

1516 super().hard_close(result, dbapi_cursor) 

1517 

1518 def fetchone( 

1519 self, 

1520 result: CursorResult[Any], 

1521 dbapi_cursor: DBAPICursor, 

1522 hard_close: bool = False, 

1523 ) -> Any: 

1524 if self._rowbuffer: 

1525 return self._rowbuffer.popleft() 

1526 else: 

1527 result._soft_close(hard=hard_close) 

1528 return None 

1529 

1530 def fetchmany( 

1531 self, 

1532 result: CursorResult[Any], 

1533 dbapi_cursor: DBAPICursor, 

1534 size: Optional[int] = None, 

1535 ) -> Any: 

1536 if size is None: 

1537 return self.fetchall(result, dbapi_cursor) 

1538 

1539 rb = self._rowbuffer 

1540 rows = [rb.popleft() for _ in range(min(size, len(rb)))] 

1541 if not rows: 

1542 result._soft_close() 

1543 return rows 

1544 

1545 def fetchall( 

1546 self, result: CursorResult[Any], dbapi_cursor: DBAPICursor 

1547 ) -> Any: 

1548 ret = self._rowbuffer 

1549 self._rowbuffer = collections.deque() 

1550 result._soft_close() 

1551 return ret 

1552 

1553 

1554class _NoResultMetaData(ResultMetaData): 

1555 __slots__ = () 

1556 

1557 returns_rows = False 

1558 

1559 def _we_dont_return_rows( 

1560 self, err: Optional[BaseException] = None 

1561 ) -> NoReturn: 

1562 raise exc.ResourceClosedError( 

1563 "This result object does not return rows. " 

1564 "It has been closed automatically." 

1565 ) from err 

1566 

1567 def _index_for_key(self, keys: _KeyIndexType, raiseerr: bool) -> NoReturn: 

1568 self._we_dont_return_rows() 

1569 

1570 def _metadata_for_keys(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1571 self._we_dont_return_rows() 

1572 

1573 def _reduce(self, keys: Sequence[_KeyIndexType]) -> NoReturn: 

1574 self._we_dont_return_rows() 

1575 

1576 @property 

1577 def _keymap(self) -> NoReturn: # type: ignore[override] 

1578 self._we_dont_return_rows() 

1579 

1580 @property 

1581 def _key_to_index(self) -> NoReturn: # type: ignore[override] 

1582 self._we_dont_return_rows() 

1583 

1584 @property 

1585 def _processors(self) -> NoReturn: # type: ignore[override] 

1586 self._we_dont_return_rows() 

1587 

1588 @property 

1589 def keys(self) -> NoReturn: 

1590 self._we_dont_return_rows() 

1591 

1592 

1593_NO_RESULT_METADATA = _NoResultMetaData() 

1594 

1595 

1596def null_dml_result() -> IteratorResult[Any]: 

1597 it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([])) 

1598 it._soft_close() 

1599 return it 

1600 

1601 

1602class CursorResult(Result[Unpack[_Ts]]): 

1603 """A Result that is representing state from a DBAPI cursor. 

1604 

1605 .. versionchanged:: 1.4 The :class:`.CursorResult`` 

1606 class replaces the previous :class:`.ResultProxy` interface. 

1607 This classes are based on the :class:`.Result` calling API 

1608 which provides an updated usage model and calling facade for 

1609 SQLAlchemy Core and SQLAlchemy ORM. 

1610 

1611 Returns database rows via the :class:`.Row` class, which provides 

1612 additional API features and behaviors on top of the raw data returned by 

1613 the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` 

1614 method, other kinds of objects may also be returned. 

1615 

1616 .. seealso:: 

1617 

1618 :ref:`tutorial_selecting_data` - introductory material for accessing 

1619 :class:`_engine.CursorResult` and :class:`.Row` objects. 

1620 

1621 """ 

1622 

1623 __slots__ = ( 

1624 "context", 

1625 "dialect", 

1626 "cursor", 

1627 "cursor_strategy", 

1628 "_echo", 

1629 "connection", 

1630 ) 

1631 

1632 _metadata: Union[CursorResultMetaData, _NoResultMetaData] 

1633 _no_result_metadata = _NO_RESULT_METADATA 

1634 _soft_closed: bool = False 

1635 closed: bool = False 

1636 _is_cursor = True 

1637 

1638 context: DefaultExecutionContext 

1639 dialect: Dialect 

1640 cursor_strategy: ResultFetchStrategy 

1641 connection: Connection 

1642 

1643 def __init__( 

1644 self, 

1645 context: DefaultExecutionContext, 

1646 cursor_strategy: ResultFetchStrategy, 

1647 cursor_description: Optional[_DBAPICursorDescription], 

1648 ): 

1649 self.context = context 

1650 self.dialect = context.dialect 

1651 self.cursor = context.cursor 

1652 self.cursor_strategy = cursor_strategy 

1653 self.connection = context.root_connection 

1654 self._echo = echo = ( 

1655 self.connection._echo and context.engine._should_log_debug() 

1656 ) 

1657 

1658 if cursor_description is not None: 

1659 self._init_metadata(context, cursor_description) 

1660 

1661 if echo: 

1662 log = self.context.connection._log_debug 

1663 

1664 def _log_row(row: Any) -> Any: 

1665 log("Row %r", sql_util._repr_row(row)) 

1666 return row 

1667 

1668 self._row_logging_fn = _log_row 

1669 

1670 # call Result._row_getter to set up the row factory 

1671 self._row_getter 

1672 

1673 else: 

1674 assert context._num_sentinel_cols == 0 

1675 self._metadata = self._no_result_metadata 

1676 

1677 def _init_metadata( 

1678 self, 

1679 context: DefaultExecutionContext, 

1680 cursor_description: _DBAPICursorDescription, 

1681 ) -> CursorResultMetaData: 

1682 driver_column_names = context.execution_options.get( 

1683 "driver_column_names", False 

1684 ) 

1685 if context.compiled: 

1686 compiled = context.compiled 

1687 

1688 metadata: CursorResultMetaData 

1689 

1690 if driver_column_names: 

1691 # TODO: test this case 

1692 metadata = CursorResultMetaData( 

1693 self, 

1694 cursor_description, 

1695 driver_column_names=True, 

1696 num_sentinel_cols=context._num_sentinel_cols, 

1697 ) 

1698 assert not metadata._safe_for_cache 

1699 elif compiled._cached_metadata: 

1700 metadata = compiled._cached_metadata 

1701 else: 

1702 metadata = CursorResultMetaData( 

1703 self, 

1704 cursor_description, 

1705 # the number of sentinel columns is stored on the context 

1706 # but it's a characteristic of the compiled object 

1707 # so it's ok to apply it to a cacheable metadata. 

1708 num_sentinel_cols=context._num_sentinel_cols, 

1709 ) 

1710 if metadata._safe_for_cache: 

1711 compiled._cached_metadata = metadata 

1712 

1713 # result rewrite/ adapt step. this is to suit the case 

1714 # when we are invoked against a cached Compiled object, we want 

1715 # to rewrite the ResultMetaData to reflect the Column objects 

1716 # that are in our current SQL statement object, not the one 

1717 # that is associated with the cached Compiled object. 

1718 # the Compiled object may also tell us to not 

1719 # actually do this step; this is to support the ORM where 

1720 # it is to produce a new Result object in any case, and will 

1721 # be using the cached Column objects against this database result 

1722 # so we don't want to rewrite them. 

1723 # 

1724 # Basically this step suits the use case where the end user 

1725 # is using Core SQL expressions and is accessing columns in the 

1726 # result row using row._mapping[table.c.column]. 

1727 if ( 

1728 not context.execution_options.get( 

1729 "_result_disable_adapt_to_context", False 

1730 ) 

1731 and compiled._result_columns 

1732 and context.cache_hit is context.dialect.CACHE_HIT 

1733 and compiled.statement is not context.invoked_statement # type: ignore[comparison-overlap] # noqa: E501 

1734 ): 

1735 metadata = metadata._adapt_to_context(context) 

1736 

1737 self._metadata = metadata 

1738 

1739 else: 

1740 self._metadata = metadata = CursorResultMetaData( 

1741 self, 

1742 cursor_description, 

1743 driver_column_names=driver_column_names, 

1744 ) 

1745 if self._echo: 

1746 context.connection._log_debug( 

1747 "Col %r", tuple(x[0] for x in cursor_description) 

1748 ) 

1749 return metadata 

1750 

1751 def _soft_close(self, hard: bool = False) -> None: 

1752 """Soft close this :class:`_engine.CursorResult`. 

1753 

1754 This releases all DBAPI cursor resources, but leaves the 

1755 CursorResult "open" from a semantic perspective, meaning the 

1756 fetchXXX() methods will continue to return empty results. 

1757 

1758 This method is called automatically when: 

1759 

1760 * all result rows are exhausted using the fetchXXX() methods. 

1761 * cursor.description is None. 

1762 

1763 This method is **not public**, but is documented in order to clarify 

1764 the "autoclose" process used. 

1765 

1766 .. seealso:: 

1767 

1768 :meth:`_engine.CursorResult.close` 

1769 

1770 

1771 """ 

1772 

1773 if (not hard and self._soft_closed) or (hard and self.closed): 

1774 return 

1775 

1776 if hard: 

1777 self.closed = True 

1778 self.cursor_strategy.hard_close(self, self.cursor) 

1779 else: 

1780 self.cursor_strategy.soft_close(self, self.cursor) 

1781 

1782 if not self._soft_closed: 

1783 cursor = self.cursor 

1784 self.cursor = None # type: ignore 

1785 self.connection._safe_close_cursor(cursor) 

1786 self._soft_closed = True 

1787 

1788 @property 

1789 def inserted_primary_key_rows(self) -> List[Optional[Any]]: 

1790 """Return the value of 

1791 :attr:`_engine.CursorResult.inserted_primary_key` 

1792 as a row contained within a list; some dialects may support a 

1793 multiple row form as well. 

1794 

1795 .. note:: As indicated below, in current SQLAlchemy versions this 

1796 accessor is only useful beyond what's already supplied by 

1797 :attr:`_engine.CursorResult.inserted_primary_key` when using the 

1798 :ref:`postgresql_psycopg2` dialect. Future versions hope to 

1799 generalize this feature to more dialects. 

1800 

1801 This accessor is added to support dialects that offer the feature 

1802 that is currently implemented by the :ref:`psycopg2_executemany_mode` 

1803 feature, currently **only the psycopg2 dialect**, which provides 

1804 for many rows to be INSERTed at once while still retaining the 

1805 behavior of being able to return server-generated primary key values. 

1806 

1807 * **When using the psycopg2 dialect, or other dialects that may support 

1808 "fast executemany" style inserts in upcoming releases** : When 

1809 invoking an INSERT statement while passing a list of rows as the 

1810 second argument to :meth:`_engine.Connection.execute`, this accessor 

1811 will then provide a list of rows, where each row contains the primary 

1812 key value for each row that was INSERTed. 

1813 

1814 * **When using all other dialects / backends that don't yet support 

1815 this feature**: This accessor is only useful for **single row INSERT 

1816 statements**, and returns the same information as that of the 

1817 :attr:`_engine.CursorResult.inserted_primary_key` within a 

1818 single-element list. When an INSERT statement is executed in 

1819 conjunction with a list of rows to be INSERTed, the list will contain 

1820 one row per row inserted in the statement, however it will contain 

1821 ``None`` for any server-generated values. 

1822 

1823 Future releases of SQLAlchemy will further generalize the 

1824 "fast execution helper" feature of psycopg2 to suit other dialects, 

1825 thus allowing this accessor to be of more general use. 

1826 

1827 .. versionadded:: 1.4 

1828 

1829 .. seealso:: 

1830 

1831 :attr:`_engine.CursorResult.inserted_primary_key` 

1832 

1833 """ 

1834 if not self.context.compiled: 

1835 raise exc.InvalidRequestError( 

1836 "Statement is not a compiled expression construct." 

1837 ) 

1838 elif not self.context.isinsert: 

1839 raise exc.InvalidRequestError( 

1840 "Statement is not an insert() expression construct." 

1841 ) 

1842 elif self.context._is_explicit_returning: 

1843 raise exc.InvalidRequestError( 

1844 "Can't call inserted_primary_key " 

1845 "when returning() " 

1846 "is used." 

1847 ) 

1848 return self.context.inserted_primary_key_rows # type: ignore[no-any-return] # noqa: E501 

1849 

1850 @property 

1851 def inserted_primary_key(self) -> Optional[Any]: 

1852 """Return the primary key for the row just inserted. 

1853 

1854 The return value is a :class:`_result.Row` object representing 

1855 a named tuple of primary key values in the order in which the 

1856 primary key columns are configured in the source 

1857 :class:`_schema.Table`. 

1858 

1859 .. versionchanged:: 1.4.8 - the 

1860 :attr:`_engine.CursorResult.inserted_primary_key` 

1861 value is now a named tuple via the :class:`_result.Row` class, 

1862 rather than a plain tuple. 

1863 

1864 This accessor only applies to single row :func:`_expression.insert` 

1865 constructs which did not explicitly specify 

1866 :meth:`_expression.Insert.returning`. Support for multirow inserts, 

1867 while not yet available for most backends, would be accessed using 

1868 the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor. 

1869 

1870 Note that primary key columns which specify a server_default clause, or 

1871 otherwise do not qualify as "autoincrement" columns (see the notes at 

1872 :class:`_schema.Column`), and were generated using the database-side 

1873 default, will appear in this list as ``None`` unless the backend 

1874 supports "returning" and the insert statement executed with the 

1875 "implicit returning" enabled. 

1876 

1877 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1878 statement is not a compiled expression construct 

1879 or is not an insert() construct. 

1880 

1881 """ 

1882 

1883 if self.context.executemany: 

1884 raise exc.InvalidRequestError( 

1885 "This statement was an executemany call; if primary key " 

1886 "returning is supported, please " 

1887 "use .inserted_primary_key_rows." 

1888 ) 

1889 

1890 ikp = self.inserted_primary_key_rows 

1891 if ikp: 

1892 return ikp[0] 

1893 else: 

1894 return None 

1895 

1896 def last_updated_params( 

1897 self, 

1898 ) -> Union[ 

1899 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1900 ]: 

1901 """Return the collection of updated parameters from this 

1902 execution. 

1903 

1904 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1905 statement is not a compiled expression construct 

1906 or is not an update() construct. 

1907 

1908 """ 

1909 if not self.context.compiled: 

1910 raise exc.InvalidRequestError( 

1911 "Statement is not a compiled expression construct." 

1912 ) 

1913 elif not self.context.isupdate: 

1914 raise exc.InvalidRequestError( 

1915 "Statement is not an update() expression construct." 

1916 ) 

1917 elif self.context.executemany: 

1918 return self.context.compiled_parameters 

1919 else: 

1920 return self.context.compiled_parameters[0] 

1921 

1922 def last_inserted_params( 

1923 self, 

1924 ) -> Union[ 

1925 List[_MutableCoreSingleExecuteParams], _MutableCoreSingleExecuteParams 

1926 ]: 

1927 """Return the collection of inserted parameters from this 

1928 execution. 

1929 

1930 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

1931 statement is not a compiled expression construct 

1932 or is not an insert() construct. 

1933 

1934 """ 

1935 if not self.context.compiled: 

1936 raise exc.InvalidRequestError( 

1937 "Statement is not a compiled expression construct." 

1938 ) 

1939 elif not self.context.isinsert: 

1940 raise exc.InvalidRequestError( 

1941 "Statement is not an insert() expression construct." 

1942 ) 

1943 elif self.context.executemany: 

1944 return self.context.compiled_parameters 

1945 else: 

1946 return self.context.compiled_parameters[0] 

1947 

1948 @property 

1949 def returned_defaults_rows( 

1950 self, 

1951 ) -> Optional[Sequence[Row[Unpack[TupleAny]]]]: 

1952 """Return a list of rows each containing the values of default 

1953 columns that were fetched using 

1954 the :meth:`.ValuesBase.return_defaults` feature. 

1955 

1956 The return value is a list of :class:`.Row` objects. 

1957 

1958 .. versionadded:: 1.4 

1959 

1960 """ 

1961 return self.context.returned_default_rows 

1962 

1963 def splice_horizontally(self, other: CursorResult[Any]) -> Self: 

1964 """Return a new :class:`.CursorResult` that "horizontally splices" 

1965 together the rows of this :class:`.CursorResult` with that of another 

1966 :class:`.CursorResult`. 

1967 

1968 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

1969 not intended for general use. 

1970 

1971 "horizontally splices" means that for each row in the first and second 

1972 result sets, a new row that concatenates the two rows together is 

1973 produced, which then becomes the new row. The incoming 

1974 :class:`.CursorResult` must have the identical number of rows. It is 

1975 typically expected that the two result sets come from the same sort 

1976 order as well, as the result rows are spliced together based on their 

1977 position in the result. 

1978 

1979 The expected use case here is so that multiple INSERT..RETURNING 

1980 statements (which definitely need to be sorted) against different 

1981 tables can produce a single result that looks like a JOIN of those two 

1982 tables. 

1983 

1984 E.g.:: 

1985 

1986 r1 = connection.execute( 

1987 users.insert().returning( 

1988 users.c.user_name, users.c.user_id, sort_by_parameter_order=True 

1989 ), 

1990 user_values, 

1991 ) 

1992 

1993 r2 = connection.execute( 

1994 addresses.insert().returning( 

1995 addresses.c.address_id, 

1996 addresses.c.address, 

1997 addresses.c.user_id, 

1998 sort_by_parameter_order=True, 

1999 ), 

2000 address_values, 

2001 ) 

2002 

2003 rows = r1.splice_horizontally(r2).all() 

2004 assert rows == [ 

2005 ("john", 1, 1, "foo@bar.com", 1), 

2006 ("jack", 2, 2, "bar@bat.com", 2), 

2007 ] 

2008 

2009 .. versionadded:: 2.0 

2010 

2011 .. seealso:: 

2012 

2013 :meth:`.CursorResult.splice_vertically` 

2014 

2015 

2016 """ # noqa: E501 

2017 

2018 clone = self._generate() 

2019 assert clone is self # just to note 

2020 assert isinstance(other._metadata, CursorResultMetaData) 

2021 assert isinstance(self._metadata, CursorResultMetaData) 

2022 self_tf = self._metadata._tuplefilter 

2023 other_tf = other._metadata._tuplefilter 

2024 clone._metadata = self._metadata._splice_horizontally(other._metadata) 

2025 

2026 total_rows = [ 

2027 tuple(r1 if self_tf is None else self_tf(r1)) 

2028 + tuple(r2 if other_tf is None else other_tf(r2)) 

2029 for r1, r2 in zip( 

2030 list(self._raw_row_iterator()), 

2031 list(other._raw_row_iterator()), 

2032 ) 

2033 ] 

2034 

2035 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2036 None, 

2037 initial_buffer=total_rows, 

2038 ) 

2039 clone._reset_memoizations() 

2040 return clone 

2041 

2042 def splice_vertically(self, other: CursorResult[Any]) -> Self: 

2043 """Return a new :class:`.CursorResult` that "vertically splices", 

2044 i.e. "extends", the rows of this :class:`.CursorResult` with that of 

2045 another :class:`.CursorResult`. 

2046 

2047 .. tip:: This method is for the benefit of the SQLAlchemy ORM and is 

2048 not intended for general use. 

2049 

2050 "vertically splices" means the rows of the given result are appended to 

2051 the rows of this cursor result. The incoming :class:`.CursorResult` 

2052 must have rows that represent the identical list of columns in the 

2053 identical order as they are in this :class:`.CursorResult`. 

2054 

2055 .. versionadded:: 2.0 

2056 

2057 .. seealso:: 

2058 

2059 :meth:`.CursorResult.splice_horizontally` 

2060 

2061 """ 

2062 clone = self._generate() 

2063 total_rows = list(self._raw_row_iterator()) + list( 

2064 other._raw_row_iterator() 

2065 ) 

2066 

2067 clone.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2068 None, 

2069 initial_buffer=total_rows, 

2070 ) 

2071 clone._reset_memoizations() 

2072 return clone 

2073 

2074 def _rewind(self, rows: Any) -> Self: 

2075 """rewind this result back to the given rowset. 

2076 

2077 this is used internally for the case where an :class:`.Insert` 

2078 construct combines the use of 

2079 :meth:`.Insert.return_defaults` along with the 

2080 "supplemental columns" feature. 

2081 

2082 NOTE: this method has not effect then an unique filter is applied 

2083 to the result, meaning that no row will be returned. 

2084 

2085 """ 

2086 

2087 if self._echo: 

2088 self.context.connection._log_debug( 

2089 "CursorResult rewound %d row(s)", len(rows) 

2090 ) 

2091 

2092 # the rows given are expected to be Row objects, so we 

2093 # have to clear out processors which have already run on these 

2094 # rows 

2095 self._metadata = cast( 

2096 CursorResultMetaData, self._metadata 

2097 )._remove_processors_and_tuple_filter() 

2098 

2099 self.cursor_strategy = FullyBufferedCursorFetchStrategy( 

2100 None, 

2101 # TODO: if these are Row objects, can we save on not having to 

2102 # re-make new Row objects out of them a second time? is that 

2103 # what's actually happening right now? maybe look into this 

2104 initial_buffer=rows, 

2105 ) 

2106 self._reset_memoizations() 

2107 return self 

2108 

2109 @property 

2110 def returned_defaults(self) -> Optional[Row[Unpack[TupleAny]]]: 

2111 """Return the values of default columns that were fetched using 

2112 the :meth:`.ValuesBase.return_defaults` feature. 

2113 

2114 The value is an instance of :class:`.Row`, or ``None`` 

2115 if :meth:`.ValuesBase.return_defaults` was not used or if the 

2116 backend does not support RETURNING. 

2117 

2118 .. seealso:: 

2119 

2120 :meth:`.ValuesBase.return_defaults` 

2121 

2122 """ 

2123 

2124 if self.context.executemany: 

2125 raise exc.InvalidRequestError( 

2126 "This statement was an executemany call; if return defaults " 

2127 "is supported, please use .returned_defaults_rows." 

2128 ) 

2129 

2130 rows = self.context.returned_default_rows 

2131 if rows: 

2132 return rows[0] 

2133 else: 

2134 return None 

2135 

2136 def lastrow_has_defaults(self) -> bool: 

2137 """Return ``lastrow_has_defaults()`` from the underlying 

2138 :class:`.ExecutionContext`. 

2139 

2140 See :class:`.ExecutionContext` for details. 

2141 

2142 """ 

2143 

2144 return self.context.lastrow_has_defaults() 

2145 

2146 def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2147 """Return ``postfetch_cols()`` from the underlying 

2148 :class:`.ExecutionContext`. 

2149 

2150 See :class:`.ExecutionContext` for details. 

2151 

2152 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2153 statement is not a compiled expression construct 

2154 or is not an insert() or update() construct. 

2155 

2156 """ 

2157 

2158 if not self.context.compiled: 

2159 raise exc.InvalidRequestError( 

2160 "Statement is not a compiled expression construct." 

2161 ) 

2162 elif not self.context.isinsert and not self.context.isupdate: 

2163 raise exc.InvalidRequestError( 

2164 "Statement is not an insert() or update() " 

2165 "expression construct." 

2166 ) 

2167 return self.context.postfetch_cols 

2168 

2169 def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 

2170 """Return ``prefetch_cols()`` from the underlying 

2171 :class:`.ExecutionContext`. 

2172 

2173 See :class:`.ExecutionContext` for details. 

2174 

2175 Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed 

2176 statement is not a compiled expression construct 

2177 or is not an insert() or update() construct. 

2178 

2179 """ 

2180 

2181 if not self.context.compiled: 

2182 raise exc.InvalidRequestError( 

2183 "Statement is not a compiled expression construct." 

2184 ) 

2185 elif not self.context.isinsert and not self.context.isupdate: 

2186 raise exc.InvalidRequestError( 

2187 "Statement is not an insert() or update() " 

2188 "expression construct." 

2189 ) 

2190 return self.context.prefetch_cols 

2191 

2192 def supports_sane_rowcount(self) -> bool: 

2193 """Return ``supports_sane_rowcount`` from the dialect. 

2194 

2195 See :attr:`_engine.CursorResult.rowcount` for background. 

2196 

2197 """ 

2198 

2199 return self.dialect.supports_sane_rowcount 

2200 

2201 def supports_sane_multi_rowcount(self) -> bool: 

2202 """Return ``supports_sane_multi_rowcount`` from the dialect. 

2203 

2204 See :attr:`_engine.CursorResult.rowcount` for background. 

2205 

2206 """ 

2207 

2208 return self.dialect.supports_sane_multi_rowcount 

2209 

2210 @util.memoized_property 

2211 def rowcount(self) -> int: 

2212 """Return the 'rowcount' for this result. 

2213 

2214 The primary purpose of 'rowcount' is to report the number of rows 

2215 matched by the WHERE criterion of an UPDATE or DELETE statement 

2216 executed once (i.e. for a single parameter set), which may then be 

2217 compared to the number of rows expected to be updated or deleted as a 

2218 means of asserting data integrity. 

2219 

2220 This attribute is transferred from the ``cursor.rowcount`` attribute 

2221 of the DBAPI before the cursor is closed, to support DBAPIs that 

2222 don't make this value available after cursor close. Some DBAPIs may 

2223 offer meaningful values for other kinds of statements, such as INSERT 

2224 and SELECT statements as well. In order to retrieve ``cursor.rowcount`` 

2225 for these statements, set the 

2226 :paramref:`.Connection.execution_options.preserve_rowcount` 

2227 execution option to True, which will cause the ``cursor.rowcount`` 

2228 value to be unconditionally memoized before any results are returned 

2229 or the cursor is closed, regardless of statement type. 

2230 

2231 For cases where the DBAPI does not support rowcount for a particular 

2232 kind of statement and/or execution, the returned value will be ``-1``, 

2233 which is delivered directly from the DBAPI and is part of :pep:`249`. 

2234 All DBAPIs should support rowcount for single-parameter-set 

2235 UPDATE and DELETE statements, however. 

2236 

2237 .. note:: 

2238 

2239 Notes regarding :attr:`_engine.CursorResult.rowcount`: 

2240 

2241 

2242 * This attribute returns the number of rows *matched*, 

2243 which is not necessarily the same as the number of rows 

2244 that were actually *modified*. For example, an UPDATE statement 

2245 may have no net change on a given row if the SET values 

2246 given are the same as those present in the row already. 

2247 Such a row would be matched but not modified. 

2248 On backends that feature both styles, such as MySQL, 

2249 rowcount is configured to return the match 

2250 count in all cases. 

2251 

2252 * :attr:`_engine.CursorResult.rowcount` in the default case is 

2253 *only* useful in conjunction with an UPDATE or DELETE statement, 

2254 and only with a single set of parameters. For other kinds of 

2255 statements, SQLAlchemy will not attempt to pre-memoize the value 

2256 unless the 

2257 :paramref:`.Connection.execution_options.preserve_rowcount` 

2258 execution option is used. Note that contrary to :pep:`249`, many 

2259 DBAPIs do not support rowcount values for statements that are not 

2260 UPDATE or DELETE, particularly when rows are being returned which 

2261 are not fully pre-buffered. DBAPIs that dont support rowcount 

2262 for a particular kind of statement should return the value ``-1`` 

2263 for such statements. 

2264 

2265 * :attr:`_engine.CursorResult.rowcount` may not be meaningful 

2266 when executing a single statement with multiple parameter sets 

2267 (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount" 

2268 values across multiple parameter sets and will return ``-1`` 

2269 when accessed. 

2270 

2271 * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support 

2272 a correct population of :attr:`_engine.CursorResult.rowcount` 

2273 when the :paramref:`.Connection.execution_options.preserve_rowcount` 

2274 execution option is set to True. 

2275 

2276 * Statements that use RETURNING may not support rowcount, returning 

2277 a ``-1`` value instead. 

2278 

2279 .. seealso:: 

2280 

2281 :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial` 

2282 

2283 :paramref:`.Connection.execution_options.preserve_rowcount` 

2284 

2285 """ # noqa: E501 

2286 try: 

2287 return self.context.rowcount 

2288 except BaseException as e: 

2289 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2290 raise # not called 

2291 

2292 @property 

2293 def lastrowid(self) -> int: 

2294 """Return the 'lastrowid' accessor on the DBAPI cursor. 

2295 

2296 This is a DBAPI specific method and is only functional 

2297 for those backends which support it, for statements 

2298 where it is appropriate. It's behavior is not 

2299 consistent across backends. 

2300 

2301 Usage of this method is normally unnecessary when 

2302 using insert() expression constructs; the 

2303 :attr:`~CursorResult.inserted_primary_key` attribute provides a 

2304 tuple of primary key values for a newly inserted row, 

2305 regardless of database backend. 

2306 

2307 """ 

2308 try: 

2309 return self.context.get_lastrowid() 

2310 except BaseException as e: 

2311 self.cursor_strategy.handle_exception(self, self.cursor, e) 

2312 

2313 @property 

2314 def returns_rows(self) -> bool: 

2315 """True if this :class:`_engine.CursorResult` returns zero or more 

2316 rows. 

2317 

2318 I.e. if it is legal to call the methods 

2319 :meth:`_engine.CursorResult.fetchone`, 

2320 :meth:`_engine.CursorResult.fetchmany` 

2321 :meth:`_engine.CursorResult.fetchall`. 

2322 

2323 Overall, the value of :attr:`_engine.CursorResult.returns_rows` should 

2324 always be synonymous with whether or not the DBAPI cursor had a 

2325 ``.description`` attribute, indicating the presence of result columns, 

2326 noting that a cursor that returns zero rows still has a 

2327 ``.description`` if a row-returning statement was emitted. 

2328 

2329 This attribute should be True for all results that are against 

2330 SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE 

2331 that use RETURNING. For INSERT/UPDATE/DELETE statements that were 

2332 not using RETURNING, the value will usually be False, however 

2333 there are some dialect-specific exceptions to this, such as when 

2334 using the MSSQL / pyodbc dialect a SELECT is emitted inline in 

2335 order to retrieve an inserted primary key value. 

2336 

2337 

2338 """ 

2339 return self._metadata.returns_rows 

2340 

2341 @property 

2342 def is_insert(self) -> bool: 

2343 """True if this :class:`_engine.CursorResult` is the result 

2344 of a executing an expression language compiled 

2345 :func:`_expression.insert` construct. 

2346 

2347 When True, this implies that the 

2348 :attr:`inserted_primary_key` attribute is accessible, 

2349 assuming the statement did not include 

2350 a user defined "returning" construct. 

2351 

2352 """ 

2353 return self.context.isinsert 

2354 

2355 def _fetchiter_impl(self) -> Iterator[Any]: 

2356 fetchone = self.cursor_strategy.fetchone 

2357 

2358 while True: 

2359 row = fetchone(self, self.cursor) 

2360 if row is None: 

2361 break 

2362 yield row 

2363 

2364 def _fetchone_impl(self, hard_close: bool = False) -> Any: 

2365 return self.cursor_strategy.fetchone(self, self.cursor, hard_close) 

2366 

2367 def _fetchall_impl(self) -> Any: 

2368 return self.cursor_strategy.fetchall(self, self.cursor) 

2369 

2370 def _fetchmany_impl(self, size: Optional[int] = None) -> Any: 

2371 return self.cursor_strategy.fetchmany(self, self.cursor, size) 

2372 

2373 def _raw_row_iterator(self) -> Any: 

2374 return self._fetchiter_impl() 

2375 

2376 def merge( 

2377 self, *others: Result[Unpack[TupleAny]] 

2378 ) -> MergedResult[Unpack[TupleAny]]: 

2379 merged_result = super().merge(*others) 

2380 if self.context._has_rowcount: 

2381 merged_result.rowcount = sum( 

2382 cast("CursorResult[Any]", result).rowcount 

2383 for result in (self,) + others 

2384 ) 

2385 return merged_result 

2386 

2387 def close(self) -> None: 

2388 """Close this :class:`_engine.CursorResult`. 

2389 

2390 This closes out the underlying DBAPI cursor corresponding to the 

2391 statement execution, if one is still present. Note that the DBAPI 

2392 cursor is automatically released when the :class:`_engine.CursorResult` 

2393 exhausts all available rows. :meth:`_engine.CursorResult.close` is 

2394 generally an optional method except in the case when discarding a 

2395 :class:`_engine.CursorResult` that still has additional rows pending 

2396 for fetch. 

2397 

2398 After this method is called, it is no longer valid to call upon 

2399 the fetch methods, which will raise a :class:`.ResourceClosedError` 

2400 on subsequent use. 

2401 

2402 .. seealso:: 

2403 

2404 :ref:`connections_toplevel` 

2405 

2406 """ 

2407 self._soft_close(hard=True) 

2408 

2409 @_generative 

2410 def yield_per(self, num: int) -> Self: 

2411 self._yield_per = num 

2412 self.cursor_strategy.yield_per(self, self.cursor, num) 

2413 return self 

2414 

2415 

2416ResultProxy = CursorResult