Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

3138 statements  

1# sql/compiler.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Base SQL and DDL compiler implementations. 

10 

11Classes provided include: 

12 

13:class:`.compiler.SQLCompiler` - renders SQL 

14strings 

15 

16:class:`.compiler.DDLCompiler` - renders DDL 

17(data definition language) strings 

18 

19:class:`.compiler.GenericTypeCompiler` - renders 

20type specification strings. 

21 

22To generate user-defined SQL strings, see 

23:doc:`/ext/compiler`. 

24 

25""" 

26from __future__ import annotations 

27 

28import collections 

29import collections.abc as collections_abc 

30import contextlib 

31from enum import IntEnum 

32import functools 

33import itertools 

34import operator 

35import re 

36from time import perf_counter 

37import typing 

38from typing import Any 

39from typing import Callable 

40from typing import cast 

41from typing import ClassVar 

42from typing import Dict 

43from typing import FrozenSet 

44from typing import Iterable 

45from typing import Iterator 

46from typing import List 

47from typing import Literal 

48from typing import Mapping 

49from typing import MutableMapping 

50from typing import NamedTuple 

51from typing import NoReturn 

52from typing import Optional 

53from typing import Pattern 

54from typing import Protocol 

55from typing import Sequence 

56from typing import Set 

57from typing import Tuple 

58from typing import Type 

59from typing import TYPE_CHECKING 

60from typing import TypedDict 

61from typing import Union 

62 

63from . import base 

64from . import coercions 

65from . import crud 

66from . import elements 

67from . import functions 

68from . import operators 

69from . import roles 

70from . import schema 

71from . import selectable 

72from . import sqltypes 

73from . import util as sql_util 

74from ._typing import is_column_element 

75from ._typing import is_dml 

76from .base import _de_clone 

77from .base import _from_objects 

78from .base import _NONE_NAME 

79from .base import _SentinelDefaultCharacterization 

80from .base import NO_ARG 

81from .elements import quoted_name 

82from .sqltypes import TupleType 

83from .visitors import prefix_anon_map 

84from .. import exc 

85from .. import util 

86from ..util import FastIntFlag 

87from ..util.typing import Self 

88from ..util.typing import TupleAny 

89from ..util.typing import Unpack 

90 

91if typing.TYPE_CHECKING: 

92 from .annotation import _AnnotationDict 

93 from .base import _AmbiguousTableNameMap 

94 from .base import CompileState 

95 from .base import Executable 

96 from .cache_key import CacheKey 

97 from .ddl import ExecutableDDLElement 

98 from .dml import Delete 

99 from .dml import Insert 

100 from .dml import Update 

101 from .dml import UpdateBase 

102 from .dml import UpdateDMLState 

103 from .dml import ValuesBase 

104 from .elements import _truncated_label 

105 from .elements import BinaryExpression 

106 from .elements import BindParameter 

107 from .elements import ClauseElement 

108 from .elements import ColumnClause 

109 from .elements import ColumnElement 

110 from .elements import False_ 

111 from .elements import Label 

112 from .elements import Null 

113 from .elements import True_ 

114 from .functions import Function 

115 from .schema import Column 

116 from .schema import Constraint 

117 from .schema import ForeignKeyConstraint 

118 from .schema import Index 

119 from .schema import PrimaryKeyConstraint 

120 from .schema import Table 

121 from .schema import UniqueConstraint 

122 from .selectable import _ColumnsClauseElement 

123 from .selectable import AliasedReturnsRows 

124 from .selectable import CompoundSelectState 

125 from .selectable import CTE 

126 from .selectable import FromClause 

127 from .selectable import NamedFromClause 

128 from .selectable import ReturnsRows 

129 from .selectable import Select 

130 from .selectable import SelectState 

131 from .type_api import _BindProcessorType 

132 from .type_api import TypeDecorator 

133 from .type_api import TypeEngine 

134 from .type_api import UserDefinedType 

135 from .visitors import Visitable 

136 from ..engine.cursor import CursorResultMetaData 

137 from ..engine.interfaces import _CoreSingleExecuteParams 

138 from ..engine.interfaces import _DBAPIAnyExecuteParams 

139 from ..engine.interfaces import _DBAPIMultiExecuteParams 

140 from ..engine.interfaces import _DBAPISingleExecuteParams 

141 from ..engine.interfaces import _ExecuteOptions 

142 from ..engine.interfaces import _GenericSetInputSizesType 

143 from ..engine.interfaces import _MutableCoreSingleExecuteParams 

144 from ..engine.interfaces import Dialect 

145 from ..engine.interfaces import SchemaTranslateMapType 

146 

147 

148_FromHintsType = Dict["FromClause", str] 

149 

150RESERVED_WORDS = { 

151 "all", 

152 "analyse", 

153 "analyze", 

154 "and", 

155 "any", 

156 "array", 

157 "as", 

158 "asc", 

159 "asymmetric", 

160 "authorization", 

161 "between", 

162 "binary", 

163 "both", 

164 "case", 

165 "cast", 

166 "check", 

167 "collate", 

168 "column", 

169 "constraint", 

170 "create", 

171 "cross", 

172 "current_date", 

173 "current_role", 

174 "current_time", 

175 "current_timestamp", 

176 "current_user", 

177 "default", 

178 "deferrable", 

179 "desc", 

180 "distinct", 

181 "do", 

182 "else", 

183 "end", 

184 "except", 

185 "false", 

186 "for", 

187 "foreign", 

188 "freeze", 

189 "from", 

190 "full", 

191 "grant", 

192 "group", 

193 "having", 

194 "ilike", 

195 "in", 

196 "initially", 

197 "inner", 

198 "intersect", 

199 "into", 

200 "is", 

201 "isnull", 

202 "join", 

203 "leading", 

204 "left", 

205 "like", 

206 "limit", 

207 "localtime", 

208 "localtimestamp", 

209 "natural", 

210 "new", 

211 "not", 

212 "notnull", 

213 "null", 

214 "off", 

215 "offset", 

216 "old", 

217 "on", 

218 "only", 

219 "or", 

220 "order", 

221 "outer", 

222 "overlaps", 

223 "placing", 

224 "primary", 

225 "references", 

226 "right", 

227 "select", 

228 "session_user", 

229 "set", 

230 "similar", 

231 "some", 

232 "symmetric", 

233 "table", 

234 "then", 

235 "to", 

236 "trailing", 

237 "true", 

238 "union", 

239 "unique", 

240 "user", 

241 "using", 

242 "verbose", 

243 "when", 

244 "where", 

245} 

246 

247LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I) 

248LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I) 

249ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"]) 

250 

251FK_ON_DELETE = re.compile( 

252 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I 

253) 

254FK_ON_UPDATE = re.compile( 

255 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I 

256) 

257FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I) 

258BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE) 

259BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE) 

260 

261_pyformat_template = "%%(%(name)s)s" 

262BIND_TEMPLATES = { 

263 "pyformat": _pyformat_template, 

264 "qmark": "?", 

265 "format": "%%s", 

266 "numeric": ":[_POSITION]", 

267 "numeric_dollar": "$[_POSITION]", 

268 "named": ":%(name)s", 

269} 

270 

271 

272OPERATORS = { 

273 # binary 

274 operators.and_: " AND ", 

275 operators.or_: " OR ", 

276 operators.add: " + ", 

277 operators.mul: " * ", 

278 operators.sub: " - ", 

279 operators.mod: " % ", 

280 operators.neg: "-", 

281 operators.lt: " < ", 

282 operators.le: " <= ", 

283 operators.ne: " != ", 

284 operators.gt: " > ", 

285 operators.ge: " >= ", 

286 operators.eq: " = ", 

287 operators.is_distinct_from: " IS DISTINCT FROM ", 

288 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ", 

289 operators.concat_op: " || ", 

290 operators.match_op: " MATCH ", 

291 operators.not_match_op: " NOT MATCH ", 

292 operators.in_op: " IN ", 

293 operators.not_in_op: " NOT IN ", 

294 operators.comma_op: ", ", 

295 operators.from_: " FROM ", 

296 operators.as_: " AS ", 

297 operators.is_: " IS ", 

298 operators.is_not: " IS NOT ", 

299 operators.collate: " COLLATE ", 

300 # unary 

301 operators.exists: "EXISTS ", 

302 operators.distinct_op: "DISTINCT ", 

303 operators.inv: "NOT ", 

304 operators.any_op: "ANY ", 

305 operators.all_op: "ALL ", 

306 # modifiers 

307 operators.desc_op: " DESC", 

308 operators.asc_op: " ASC", 

309 operators.nulls_first_op: " NULLS FIRST", 

310 operators.nulls_last_op: " NULLS LAST", 

311 # bitwise 

312 operators.bitwise_xor_op: " ^ ", 

313 operators.bitwise_or_op: " | ", 

314 operators.bitwise_and_op: " & ", 

315 operators.bitwise_not_op: "~", 

316 operators.bitwise_lshift_op: " << ", 

317 operators.bitwise_rshift_op: " >> ", 

318} 

319 

320FUNCTIONS: Dict[Type[Function[Any]], str] = { 

321 functions.coalesce: "coalesce", 

322 functions.current_date: "CURRENT_DATE", 

323 functions.current_time: "CURRENT_TIME", 

324 functions.current_timestamp: "CURRENT_TIMESTAMP", 

325 functions.current_user: "CURRENT_USER", 

326 functions.localtime: "LOCALTIME", 

327 functions.localtimestamp: "LOCALTIMESTAMP", 

328 functions.random: "random", 

329 functions.sysdate: "sysdate", 

330 functions.session_user: "SESSION_USER", 

331 functions.user: "USER", 

332 functions.cube: "CUBE", 

333 functions.rollup: "ROLLUP", 

334 functions.grouping_sets: "GROUPING SETS", 

335} 

336 

337 

338EXTRACT_MAP = { 

339 "month": "month", 

340 "day": "day", 

341 "year": "year", 

342 "second": "second", 

343 "hour": "hour", 

344 "doy": "doy", 

345 "minute": "minute", 

346 "quarter": "quarter", 

347 "dow": "dow", 

348 "week": "week", 

349 "epoch": "epoch", 

350 "milliseconds": "milliseconds", 

351 "microseconds": "microseconds", 

352 "timezone_hour": "timezone_hour", 

353 "timezone_minute": "timezone_minute", 

354} 

355 

356COMPOUND_KEYWORDS = { 

357 selectable._CompoundSelectKeyword.UNION: "UNION", 

358 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL", 

359 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT", 

360 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL", 

361 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT", 

362 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL", 

363} 

364 

365 

366class ResultColumnsEntry(NamedTuple): 

367 """Tracks a column expression that is expected to be represented 

368 in the result rows for this statement. 

369 

370 This normally refers to the columns clause of a SELECT statement 

371 but may also refer to a RETURNING clause, as well as for dialect-specific 

372 emulations. 

373 

374 """ 

375 

376 keyname: str 

377 """string name that's expected in cursor.description""" 

378 

379 name: str 

380 """column name, may be labeled""" 

381 

382 objects: Tuple[Any, ...] 

383 """sequence of objects that should be able to locate this column 

384 in a RowMapping. This is typically string names and aliases 

385 as well as Column objects. 

386 

387 """ 

388 

389 type: TypeEngine[Any] 

390 """Datatype to be associated with this column. This is where 

391 the "result processing" logic directly links the compiled statement 

392 to the rows that come back from the cursor. 

393 

394 """ 

395 

396 

397class _ResultMapAppender(Protocol): 

398 def __call__( 

399 self, 

400 keyname: str, 

401 name: str, 

402 objects: Sequence[Any], 

403 type_: TypeEngine[Any], 

404 ) -> None: ... 

405 

406 

407# integer indexes into ResultColumnsEntry used by cursor.py. 

408# some profiling showed integer access faster than named tuple 

409RM_RENDERED_NAME: Literal[0] = 0 

410RM_NAME: Literal[1] = 1 

411RM_OBJECTS: Literal[2] = 2 

412RM_TYPE: Literal[3] = 3 

413 

414 

415class _BaseCompilerStackEntry(TypedDict): 

416 asfrom_froms: Set[FromClause] 

417 correlate_froms: Set[FromClause] 

418 selectable: ReturnsRows 

419 

420 

421class _CompilerStackEntry(_BaseCompilerStackEntry, total=False): 

422 compile_state: CompileState 

423 need_result_map_for_nested: bool 

424 need_result_map_for_compound: bool 

425 select_0: ReturnsRows 

426 insert_from_select: Select[Unpack[TupleAny]] 

427 

428 

429class ExpandedState(NamedTuple): 

430 """represents state to use when producing "expanded" and 

431 "post compile" bound parameters for a statement. 

432 

433 "expanded" parameters are parameters that are generated at 

434 statement execution time to suit a number of parameters passed, the most 

435 prominent example being the individual elements inside of an IN expression. 

436 

437 "post compile" parameters are parameters where the SQL literal value 

438 will be rendered into the SQL statement at execution time, rather than 

439 being passed as separate parameters to the driver. 

440 

441 To create an :class:`.ExpandedState` instance, use the 

442 :meth:`.SQLCompiler.construct_expanded_state` method on any 

443 :class:`.SQLCompiler` instance. 

444 

445 """ 

446 

447 statement: str 

448 """String SQL statement with parameters fully expanded""" 

449 

450 parameters: _CoreSingleExecuteParams 

451 """Parameter dictionary with parameters fully expanded. 

452 

453 For a statement that uses named parameters, this dictionary will map 

454 exactly to the names in the statement. For a statement that uses 

455 positional parameters, the :attr:`.ExpandedState.positional_parameters` 

456 will yield a tuple with the positional parameter set. 

457 

458 """ 

459 

460 processors: Mapping[str, _BindProcessorType[Any]] 

461 """mapping of bound value processors""" 

462 

463 positiontup: Optional[Sequence[str]] 

464 """Sequence of string names indicating the order of positional 

465 parameters""" 

466 

467 parameter_expansion: Mapping[str, List[str]] 

468 """Mapping representing the intermediary link from original parameter 

469 name to list of "expanded" parameter names, for those parameters that 

470 were expanded.""" 

471 

472 @property 

473 def positional_parameters(self) -> Tuple[Any, ...]: 

474 """Tuple of positional parameters, for statements that were compiled 

475 using a positional paramstyle. 

476 

477 """ 

478 if self.positiontup is None: 

479 raise exc.InvalidRequestError( 

480 "statement does not use a positional paramstyle" 

481 ) 

482 return tuple(self.parameters[key] for key in self.positiontup) 

483 

484 @property 

485 def additional_parameters(self) -> _CoreSingleExecuteParams: 

486 """synonym for :attr:`.ExpandedState.parameters`.""" 

487 return self.parameters 

488 

489 

490class _InsertManyValues(NamedTuple): 

491 """represents state to use for executing an "insertmanyvalues" statement. 

492 

493 The primary consumers of this object are the 

494 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and 

495 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods. 

496 

497 .. versionadded:: 2.0 

498 

499 """ 

500 

501 is_default_expr: bool 

502 """if True, the statement is of the form 

503 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch" 

504 

505 """ 

506 

507 single_values_expr: str 

508 """The rendered "values" clause of the INSERT statement. 

509 

510 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar. 

511 The insertmanyvalues logic uses this string as a search and replace 

512 target. 

513 

514 """ 

515 

516 insert_crud_params: List[crud._CrudParamElementStr] 

517 """List of Column / bind names etc. used while rewriting the statement""" 

518 

519 num_positional_params_counted: int 

520 """the number of bound parameters in a single-row statement. 

521 

522 This count may be larger or smaller than the actual number of columns 

523 targeted in the INSERT, as it accommodates for SQL expressions 

524 in the values list that may have zero or more parameters embedded 

525 within them. 

526 

527 This count is part of what's used to organize rewritten parameter lists 

528 when batching. 

529 

530 """ 

531 

532 sort_by_parameter_order: bool = False 

533 """if the deterministic_returnined_order parameter were used on the 

534 insert. 

535 

536 All of the attributes following this will only be used if this is True. 

537 

538 """ 

539 

540 includes_upsert_behaviors: bool = False 

541 """if True, we have to accommodate for upsert behaviors. 

542 

543 This will in some cases downgrade "insertmanyvalues" that requests 

544 deterministic ordering. 

545 

546 """ 

547 

548 sentinel_columns: Optional[Sequence[Column[Any]]] = None 

549 """List of sentinel columns that were located. 

550 

551 This list is only here if the INSERT asked for 

552 sort_by_parameter_order=True, 

553 and dialect-appropriate sentinel columns were located. 

554 

555 .. versionadded:: 2.0.10 

556 

557 """ 

558 

559 num_sentinel_columns: int = 0 

560 """how many sentinel columns are in the above list, if any. 

561 

562 This is the same as 

563 ``len(sentinel_columns) if sentinel_columns is not None else 0`` 

564 

565 """ 

566 

567 sentinel_param_keys: Optional[Sequence[str]] = None 

568 """parameter str keys in each param dictionary / tuple 

569 that would link to the client side "sentinel" values for that row, which 

570 we can use to match up parameter sets to result rows. 

571 

572 This is only present if sentinel_columns is present and the INSERT 

573 statement actually refers to client side values for these sentinel 

574 columns. 

575 

576 .. versionadded:: 2.0.10 

577 

578 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys 

579 only, used against the "compiled parameteters" collection before 

580 the parameters were converted by bound parameter processors 

581 

582 """ 

583 

584 implicit_sentinel: bool = False 

585 """if True, we have exactly one sentinel column and it uses a server side 

586 value, currently has to generate an incrementing integer value. 

587 

588 The dialect in question would have asserted that it supports receiving 

589 these values back and sorting on that value as a means of guaranteeing 

590 correlation with the incoming parameter list. 

591 

592 .. versionadded:: 2.0.10 

593 

594 """ 

595 

596 embed_values_counter: bool = False 

597 """Whether to embed an incrementing integer counter in each parameter 

598 set within the VALUES clause as parameters are batched over. 

599 

600 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax 

601 where a subquery is used to produce value tuples. Current support 

602 includes PostgreSQL, Microsoft SQL Server. 

603 

604 .. versionadded:: 2.0.10 

605 

606 """ 

607 

608 

609class _InsertManyValuesBatch(NamedTuple): 

610 """represents an individual batch SQL statement for insertmanyvalues. 

611 

612 This is passed through the 

613 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and 

614 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out 

615 to the :class:`.Connection` within the 

616 :meth:`.Connection._exec_insertmany_context` method. 

617 

618 .. versionadded:: 2.0.10 

619 

620 """ 

621 

622 replaced_statement: str 

623 replaced_parameters: _DBAPIAnyExecuteParams 

624 processed_setinputsizes: Optional[_GenericSetInputSizesType] 

625 batch: Sequence[_DBAPISingleExecuteParams] 

626 sentinel_values: Sequence[Tuple[Any, ...]] 

627 current_batch_size: int 

628 batchnum: int 

629 total_batches: int 

630 rows_sorted: bool 

631 is_downgraded: bool 

632 

633 

634class InsertmanyvaluesSentinelOpts(FastIntFlag): 

635 """bitflag enum indicating styles of PK defaults 

636 which can work as implicit sentinel columns 

637 

638 """ 

639 

640 NOT_SUPPORTED = 1 

641 AUTOINCREMENT = 2 

642 IDENTITY = 4 

643 SEQUENCE = 8 

644 

645 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE 

646 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT 

647 

648 USE_INSERT_FROM_SELECT = 16 

649 RENDER_SELECT_COL_CASTS = 64 

650 

651 

652class AggregateOrderByStyle(IntEnum): 

653 """Describes backend database's capabilities with ORDER BY for aggregate 

654 functions 

655 

656 .. versionadded:: 2.1 

657 

658 """ 

659 

660 NONE = 0 

661 """database has no ORDER BY for aggregate functions""" 

662 

663 INLINE = 1 

664 """ORDER BY is rendered inside the function's argument list, typically as 

665 the last element""" 

666 

667 WITHIN_GROUP = 2 

668 """the WITHIN GROUP (ORDER BY ...) phrase is used for all aggregate 

669 functions (not just the ordered set ones)""" 

670 

671 

672class CompilerState(IntEnum): 

673 COMPILING = 0 

674 """statement is present, compilation phase in progress""" 

675 

676 STRING_APPLIED = 1 

677 """statement is present, string form of the statement has been applied. 

678 

679 Additional processors by subclasses may still be pending. 

680 

681 """ 

682 

683 NO_STATEMENT = 2 

684 """compiler does not have a statement to compile, is used 

685 for method access""" 

686 

687 

688class Linting(IntEnum): 

689 """represent preferences for the 'SQL linting' feature. 

690 

691 this feature currently includes support for flagging cartesian products 

692 in SQL statements. 

693 

694 """ 

695 

696 NO_LINTING = 0 

697 "Disable all linting." 

698 

699 COLLECT_CARTESIAN_PRODUCTS = 1 

700 """Collect data on FROMs and cartesian products and gather into 

701 'self.from_linter'""" 

702 

703 WARN_LINTING = 2 

704 "Emit warnings for linters that find problems" 

705 

706 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING 

707 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS 

708 and WARN_LINTING""" 

709 

710 

711NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple( 

712 Linting 

713) 

714 

715 

716class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])): 

717 """represents current state for the "cartesian product" detection 

718 feature.""" 

719 

720 def lint(self, start=None): 

721 froms = self.froms 

722 if not froms: 

723 return None, None 

724 

725 edges = set(self.edges) 

726 the_rest = set(froms) 

727 

728 if start is not None: 

729 start_with = start 

730 the_rest.remove(start_with) 

731 else: 

732 start_with = the_rest.pop() 

733 

734 stack = collections.deque([start_with]) 

735 

736 while stack and the_rest: 

737 node = stack.popleft() 

738 the_rest.discard(node) 

739 

740 # comparison of nodes in edges here is based on hash equality, as 

741 # there are "annotated" elements that match the non-annotated ones. 

742 # to remove the need for in-python hash() calls, use native 

743 # containment routines (e.g. "node in edge", "edge.index(node)") 

744 to_remove = {edge for edge in edges if node in edge} 

745 

746 # appendleft the node in each edge that is not 

747 # the one that matched. 

748 stack.extendleft(edge[not edge.index(node)] for edge in to_remove) 

749 edges.difference_update(to_remove) 

750 

751 # FROMS left over? boom 

752 if the_rest: 

753 return the_rest, start_with 

754 else: 

755 return None, None 

756 

757 def warn(self, stmt_type="SELECT"): 

758 the_rest, start_with = self.lint() 

759 

760 # FROMS left over? boom 

761 if the_rest: 

762 froms = the_rest 

763 if froms: 

764 template = ( 

765 "{stmt_type} statement has a cartesian product between " 

766 "FROM element(s) {froms} and " 

767 'FROM element "{start}". Apply join condition(s) ' 

768 "between each element to resolve." 

769 ) 

770 froms_str = ", ".join( 

771 f'"{self.froms[from_]}"' for from_ in froms 

772 ) 

773 message = template.format( 

774 stmt_type=stmt_type, 

775 froms=froms_str, 

776 start=self.froms[start_with], 

777 ) 

778 

779 util.warn(message) 

780 

781 

782class Compiled: 

783 """Represent a compiled SQL or DDL expression. 

784 

785 The ``__str__`` method of the ``Compiled`` object should produce 

786 the actual text of the statement. ``Compiled`` objects are 

787 specific to their underlying database dialect, and also may 

788 or may not be specific to the columns referenced within a 

789 particular set of bind parameters. In no case should the 

790 ``Compiled`` object be dependent on the actual values of those 

791 bind parameters, even though it may reference those values as 

792 defaults. 

793 """ 

794 

795 statement: Optional[ClauseElement] = None 

796 "The statement to compile." 

797 string: str = "" 

798 "The string representation of the ``statement``" 

799 

800 state: CompilerState 

801 """description of the compiler's state""" 

802 

803 is_sql = False 

804 is_ddl = False 

805 

806 _cached_metadata: Optional[CursorResultMetaData] = None 

807 

808 _result_columns: Optional[List[ResultColumnsEntry]] = None 

809 

810 schema_translate_map: Optional[SchemaTranslateMapType] = None 

811 

812 execution_options: _ExecuteOptions = util.EMPTY_DICT 

813 """ 

814 Execution options propagated from the statement. In some cases, 

815 sub-elements of the statement can modify these. 

816 """ 

817 

818 preparer: IdentifierPreparer 

819 

820 _annotations: _AnnotationDict = util.EMPTY_DICT 

821 

822 compile_state: Optional[CompileState] = None 

823 """Optional :class:`.CompileState` object that maintains additional 

824 state used by the compiler. 

825 

826 Major executable objects such as :class:`_expression.Insert`, 

827 :class:`_expression.Update`, :class:`_expression.Delete`, 

828 :class:`_expression.Select` will generate this 

829 state when compiled in order to calculate additional information about the 

830 object. For the top level object that is to be executed, the state can be 

831 stored here where it can also have applicability towards result set 

832 processing. 

833 

834 .. versionadded:: 1.4 

835 

836 """ 

837 

838 dml_compile_state: Optional[CompileState] = None 

839 """Optional :class:`.CompileState` assigned at the same point that 

840 .isinsert, .isupdate, or .isdelete is assigned. 

841 

842 This will normally be the same object as .compile_state, with the 

843 exception of cases like the :class:`.ORMFromStatementCompileState` 

844 object. 

845 

846 .. versionadded:: 1.4.40 

847 

848 """ 

849 

850 cache_key: Optional[CacheKey] = None 

851 """The :class:`.CacheKey` that was generated ahead of creating this 

852 :class:`.Compiled` object. 

853 

854 This is used for routines that need access to the original 

855 :class:`.CacheKey` instance generated when the :class:`.Compiled` 

856 instance was first cached, typically in order to reconcile 

857 the original list of :class:`.BindParameter` objects with a 

858 per-statement list that's generated on each call. 

859 

860 """ 

861 

862 _gen_time: float 

863 """Generation time of this :class:`.Compiled`, used for reporting 

864 cache stats.""" 

865 

866 def __init__( 

867 self, 

868 dialect: Dialect, 

869 statement: Optional[ClauseElement], 

870 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

871 render_schema_translate: bool = False, 

872 compile_kwargs: Mapping[str, Any] = util.immutabledict(), 

873 ): 

874 """Construct a new :class:`.Compiled` object. 

875 

876 :param dialect: :class:`.Dialect` to compile against. 

877 

878 :param statement: :class:`_expression.ClauseElement` to be compiled. 

879 

880 :param schema_translate_map: dictionary of schema names to be 

881 translated when forming the resultant SQL 

882 

883 .. seealso:: 

884 

885 :ref:`schema_translating` 

886 

887 :param compile_kwargs: additional kwargs that will be 

888 passed to the initial call to :meth:`.Compiled.process`. 

889 

890 

891 """ 

892 self.dialect = dialect 

893 self.preparer = self.dialect.identifier_preparer 

894 if schema_translate_map: 

895 self.schema_translate_map = schema_translate_map 

896 self.preparer = self.preparer._with_schema_translate( 

897 schema_translate_map 

898 ) 

899 

900 if statement is not None: 

901 self.state = CompilerState.COMPILING 

902 self.statement = statement 

903 self.can_execute = statement.supports_execution 

904 self._annotations = statement._annotations 

905 if self.can_execute: 

906 if TYPE_CHECKING: 

907 assert isinstance(statement, Executable) 

908 self.execution_options = statement._execution_options 

909 self.string = self.process(self.statement, **compile_kwargs) 

910 

911 if render_schema_translate: 

912 assert schema_translate_map is not None 

913 self.string = self.preparer._render_schema_translates( 

914 self.string, schema_translate_map 

915 ) 

916 

917 self.state = CompilerState.STRING_APPLIED 

918 else: 

919 self.state = CompilerState.NO_STATEMENT 

920 

921 self._gen_time = perf_counter() 

922 

923 def __init_subclass__(cls) -> None: 

924 cls._init_compiler_cls() 

925 return super().__init_subclass__() 

926 

927 @classmethod 

928 def _init_compiler_cls(cls): 

929 pass 

930 

931 def _execute_on_connection( 

932 self, connection, distilled_params, execution_options 

933 ): 

934 if self.can_execute: 

935 return connection._execute_compiled( 

936 self, distilled_params, execution_options 

937 ) 

938 else: 

939 raise exc.ObjectNotExecutableError(self.statement) 

940 

941 def visit_unsupported_compilation(self, element, err, **kw): 

942 raise exc.UnsupportedCompilationError(self, type(element)) from err 

943 

944 @property 

945 def sql_compiler(self) -> SQLCompiler: 

946 """Return a Compiled that is capable of processing SQL expressions. 

947 

948 If this compiler is one, it would likely just return 'self'. 

949 

950 """ 

951 

952 raise NotImplementedError() 

953 

954 def process(self, obj: Visitable, **kwargs: Any) -> str: 

955 return obj._compiler_dispatch(self, **kwargs) 

956 

957 def __str__(self) -> str: 

958 """Return the string text of the generated SQL or DDL.""" 

959 

960 if self.state is CompilerState.STRING_APPLIED: 

961 return self.string 

962 else: 

963 return "" 

964 

965 def construct_params( 

966 self, 

967 params: Optional[_CoreSingleExecuteParams] = None, 

968 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

969 escape_names: bool = True, 

970 ) -> Optional[_MutableCoreSingleExecuteParams]: 

971 """Return the bind params for this compiled object. 

972 

973 :param params: a dict of string/object pairs whose values will 

974 override bind values compiled in to the 

975 statement. 

976 """ 

977 

978 raise NotImplementedError() 

979 

980 @property 

981 def params(self): 

982 """Return the bind params for this compiled object.""" 

983 return self.construct_params() 

984 

985 

986class TypeCompiler(util.EnsureKWArg): 

987 """Produces DDL specification for TypeEngine objects.""" 

988 

989 ensure_kwarg = r"visit_\w+" 

990 

991 def __init__(self, dialect: Dialect): 

992 self.dialect = dialect 

993 

994 def process(self, type_: TypeEngine[Any], **kw: Any) -> str: 

995 if ( 

996 type_._variant_mapping 

997 and self.dialect.name in type_._variant_mapping 

998 ): 

999 type_ = type_._variant_mapping[self.dialect.name] 

1000 return type_._compiler_dispatch(self, **kw) 

1001 

1002 def visit_unsupported_compilation( 

1003 self, element: Any, err: Exception, **kw: Any 

1004 ) -> NoReturn: 

1005 raise exc.UnsupportedCompilationError(self, element) from err 

1006 

1007 

1008# this was a Visitable, but to allow accurate detection of 

1009# column elements this is actually a column element 

1010class _CompileLabel( 

1011 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

1012): 

1013 """lightweight label object which acts as an expression.Label.""" 

1014 

1015 __visit_name__ = "label" 

1016 __slots__ = "element", "name", "_alt_names" 

1017 

1018 def __init__(self, col, name, alt_names=()): 

1019 self.element = col 

1020 self.name = name 

1021 self._alt_names = (col,) + alt_names 

1022 

1023 @property 

1024 def proxy_set(self): 

1025 return self.element.proxy_set 

1026 

1027 @property 

1028 def type(self): 

1029 return self.element.type 

1030 

1031 def self_group(self, **kw): 

1032 return self 

1033 

1034 

1035class aggregate_orderby_inline( 

1036 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

1037): 

1038 """produce ORDER BY inside of function argument lists""" 

1039 

1040 __visit_name__ = "aggregate_orderby_inline" 

1041 __slots__ = "element", "aggregate_order_by" 

1042 

1043 def __init__(self, element, orderby): 

1044 self.element = element 

1045 self.aggregate_order_by = orderby 

1046 

1047 def __iter__(self): 

1048 return iter(self.element) 

1049 

1050 @property 

1051 def proxy_set(self): 

1052 return self.element.proxy_set 

1053 

1054 @property 

1055 def type(self): 

1056 return self.element.type 

1057 

1058 def self_group(self, **kw): 

1059 return self 

1060 

1061 def _with_binary_element_type(self, type_): 

1062 return aggregate_orderby_inline( 

1063 self.element._with_binary_element_type(type_), 

1064 self.aggregate_order_by, 

1065 ) 

1066 

1067 

1068class ilike_case_insensitive( 

1069 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

1070): 

1071 """produce a wrapping element for a case-insensitive portion of 

1072 an ILIKE construct. 

1073 

1074 The construct usually renders the ``lower()`` function, but on 

1075 PostgreSQL will pass silently with the assumption that "ILIKE" 

1076 is being used. 

1077 

1078 .. versionadded:: 2.0 

1079 

1080 """ 

1081 

1082 __visit_name__ = "ilike_case_insensitive_operand" 

1083 __slots__ = "element", "comparator" 

1084 

1085 def __init__(self, element): 

1086 self.element = element 

1087 self.comparator = element.comparator 

1088 

1089 @property 

1090 def proxy_set(self): 

1091 return self.element.proxy_set 

1092 

1093 @property 

1094 def type(self): 

1095 return self.element.type 

1096 

1097 def self_group(self, **kw): 

1098 return self 

1099 

1100 def _with_binary_element_type(self, type_): 

1101 return ilike_case_insensitive( 

1102 self.element._with_binary_element_type(type_) 

1103 ) 

1104 

1105 

1106class SQLCompiler(Compiled): 

1107 """Default implementation of :class:`.Compiled`. 

1108 

1109 Compiles :class:`_expression.ClauseElement` objects into SQL strings. 

1110 

1111 """ 

1112 

1113 extract_map = EXTRACT_MAP 

1114 

1115 bindname_escape_characters: ClassVar[Mapping[str, str]] = ( 

1116 util.immutabledict( 

1117 { 

1118 "%": "P", 

1119 "(": "A", 

1120 ")": "Z", 

1121 ":": "C", 

1122 ".": "_", 

1123 "[": "_", 

1124 "]": "_", 

1125 " ": "_", 

1126 } 

1127 ) 

1128 ) 

1129 """A mapping (e.g. dict or similar) containing a lookup of 

1130 characters keyed to replacement characters which will be applied to all 

1131 'bind names' used in SQL statements as a form of 'escaping'; the given 

1132 characters are replaced entirely with the 'replacement' character when 

1133 rendered in the SQL statement, and a similar translation is performed 

1134 on the incoming names used in parameter dictionaries passed to methods 

1135 like :meth:`_engine.Connection.execute`. 

1136 

1137 This allows bound parameter names used in :func:`_sql.bindparam` and 

1138 other constructs to have any arbitrary characters present without any 

1139 concern for characters that aren't allowed at all on the target database. 

1140 

1141 Third party dialects can establish their own dictionary here to replace the 

1142 default mapping, which will ensure that the particular characters in the 

1143 mapping will never appear in a bound parameter name. 

1144 

1145 The dictionary is evaluated at **class creation time**, so cannot be 

1146 modified at runtime; it must be present on the class when the class 

1147 is first declared. 

1148 

1149 Note that for dialects that have additional bound parameter rules such 

1150 as additional restrictions on leading characters, the 

1151 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented. 

1152 See the cx_Oracle compiler for an example of this. 

1153 

1154 .. versionadded:: 2.0.0rc1 

1155 

1156 """ 

1157 

1158 _bind_translate_re: ClassVar[Pattern[str]] 

1159 _bind_translate_chars: ClassVar[Mapping[str, str]] 

1160 

1161 is_sql = True 

1162 

1163 compound_keywords = COMPOUND_KEYWORDS 

1164 

1165 isdelete: bool = False 

1166 isinsert: bool = False 

1167 isupdate: bool = False 

1168 """class-level defaults which can be set at the instance 

1169 level to define if this Compiled instance represents 

1170 INSERT/UPDATE/DELETE 

1171 """ 

1172 

1173 postfetch: Optional[List[Column[Any]]] 

1174 """list of columns that can be post-fetched after INSERT or UPDATE to 

1175 receive server-updated values""" 

1176 

1177 insert_prefetch: Sequence[Column[Any]] = () 

1178 """list of columns for which default values should be evaluated before 

1179 an INSERT takes place""" 

1180 

1181 update_prefetch: Sequence[Column[Any]] = () 

1182 """list of columns for which onupdate default values should be evaluated 

1183 before an UPDATE takes place""" 

1184 

1185 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None 

1186 """list of "implicit" returning columns for a toplevel INSERT or UPDATE 

1187 statement, used to receive newly generated values of columns. 

1188 

1189 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous 

1190 ``returning`` collection, which was not a generalized RETURNING 

1191 collection and instead was in fact specific to the "implicit returning" 

1192 feature. 

1193 

1194 """ 

1195 

1196 isplaintext: bool = False 

1197 

1198 binds: Dict[str, BindParameter[Any]] 

1199 """a dictionary of bind parameter keys to BindParameter instances.""" 

1200 

1201 bind_names: Dict[BindParameter[Any], str] 

1202 """a dictionary of BindParameter instances to "compiled" names 

1203 that are actually present in the generated SQL""" 

1204 

1205 stack: List[_CompilerStackEntry] 

1206 """major statements such as SELECT, INSERT, UPDATE, DELETE are 

1207 tracked in this stack using an entry format.""" 

1208 

1209 returning_precedes_values: bool = False 

1210 """set to True classwide to generate RETURNING 

1211 clauses before the VALUES or WHERE clause (i.e. MSSQL) 

1212 """ 

1213 

1214 render_table_with_column_in_update_from: bool = False 

1215 """set to True classwide to indicate the SET clause 

1216 in a multi-table UPDATE statement should qualify 

1217 columns with the table name (i.e. MySQL only) 

1218 """ 

1219 

1220 ansi_bind_rules: bool = False 

1221 """SQL 92 doesn't allow bind parameters to be used 

1222 in the columns clause of a SELECT, nor does it allow 

1223 ambiguous expressions like "? = ?". A compiler 

1224 subclass can set this flag to False if the target 

1225 driver/DB enforces this 

1226 """ 

1227 

1228 bindtemplate: str 

1229 """template to render bound parameters based on paramstyle.""" 

1230 

1231 compilation_bindtemplate: str 

1232 """template used by compiler to render parameters before positional 

1233 paramstyle application""" 

1234 

1235 _numeric_binds_identifier_char: str 

1236 """Character that's used to as the identifier of a numerical bind param. 

1237 For example if this char is set to ``$``, numerical binds will be rendered 

1238 in the form ``$1, $2, $3``. 

1239 """ 

1240 

1241 _result_columns: List[ResultColumnsEntry] 

1242 """relates label names in the final SQL to a tuple of local 

1243 column/label name, ColumnElement object (if any) and 

1244 TypeEngine. CursorResult uses this for type processing and 

1245 column targeting""" 

1246 

1247 _textual_ordered_columns: bool = False 

1248 """tell the result object that the column names as rendered are important, 

1249 but they are also "ordered" vs. what is in the compiled object here. 

1250 

1251 As of 1.4.42 this condition is only present when the statement is a 

1252 TextualSelect, e.g. text("....").columns(...), where it is required 

1253 that the columns are considered positionally and not by name. 

1254 

1255 """ 

1256 

1257 _ad_hoc_textual: bool = False 

1258 """tell the result that we encountered text() or '*' constructs in the 

1259 middle of the result columns, but we also have compiled columns, so 

1260 if the number of columns in cursor.description does not match how many 

1261 expressions we have, that means we can't rely on positional at all and 

1262 should match on name. 

1263 

1264 """ 

1265 

1266 _ordered_columns: bool = True 

1267 """ 

1268 if False, means we can't be sure the list of entries 

1269 in _result_columns is actually the rendered order. Usually 

1270 True unless using an unordered TextualSelect. 

1271 """ 

1272 

1273 _loose_column_name_matching: bool = False 

1274 """tell the result object that the SQL statement is textual, wants to match 

1275 up to Column objects, and may be using the ._tq_label in the SELECT rather 

1276 than the base name. 

1277 

1278 """ 

1279 

1280 _numeric_binds: bool = False 

1281 """ 

1282 True if paramstyle is "numeric". This paramstyle is trickier than 

1283 all the others. 

1284 

1285 """ 

1286 

1287 _render_postcompile: bool = False 

1288 """ 

1289 whether to render out POSTCOMPILE params during the compile phase. 

1290 

1291 This attribute is used only for end-user invocation of stmt.compile(); 

1292 it's never used for actual statement execution, where instead the 

1293 dialect internals access and render the internal postcompile structure 

1294 directly. 

1295 

1296 """ 

1297 

1298 _post_compile_expanded_state: Optional[ExpandedState] = None 

1299 """When render_postcompile is used, the ``ExpandedState`` used to create 

1300 the "expanded" SQL is assigned here, and then used by the ``.params`` 

1301 accessor and ``.construct_params()`` methods for their return values. 

1302 

1303 .. versionadded:: 2.0.0rc1 

1304 

1305 """ 

1306 

1307 _pre_expanded_string: Optional[str] = None 

1308 """Stores the original string SQL before 'post_compile' is applied, 

1309 for cases where 'post_compile' were used. 

1310 

1311 """ 

1312 

1313 _pre_expanded_positiontup: Optional[List[str]] = None 

1314 

1315 _insertmanyvalues: Optional[_InsertManyValues] = None 

1316 

1317 _insert_crud_params: Optional[crud._CrudParamSequence] = None 

1318 

1319 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset() 

1320 """bindparameter objects that are rendered as literal values at statement 

1321 execution time. 

1322 

1323 """ 

1324 

1325 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset() 

1326 """bindparameter objects that are rendered as bound parameter placeholders 

1327 at statement execution time. 

1328 

1329 """ 

1330 

1331 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT 

1332 """Late escaping of bound parameter names that has to be converted 

1333 to the original name when looking in the parameter dictionary. 

1334 

1335 """ 

1336 

1337 has_out_parameters = False 

1338 """if True, there are bindparam() objects that have the isoutparam 

1339 flag set.""" 

1340 

1341 postfetch_lastrowid = False 

1342 """if True, and this in insert, use cursor.lastrowid to populate 

1343 result.inserted_primary_key. """ 

1344 

1345 _cache_key_bind_match: Optional[ 

1346 Tuple[ 

1347 Dict[ 

1348 BindParameter[Any], 

1349 List[BindParameter[Any]], 

1350 ], 

1351 Dict[ 

1352 str, 

1353 BindParameter[Any], 

1354 ], 

1355 ] 

1356 ] = None 

1357 """a mapping that will relate the BindParameter object we compile 

1358 to those that are part of the extracted collection of parameters 

1359 in the cache key, if we were given a cache key. 

1360 

1361 """ 

1362 

1363 positiontup: Optional[List[str]] = None 

1364 """for a compiled construct that uses a positional paramstyle, will be 

1365 a sequence of strings, indicating the names of bound parameters in order. 

1366 

1367 This is used in order to render bound parameters in their correct order, 

1368 and is combined with the :attr:`_sql.Compiled.params` dictionary to 

1369 render parameters. 

1370 

1371 This sequence always contains the unescaped name of the parameters. 

1372 

1373 .. seealso:: 

1374 

1375 :ref:`faq_sql_expression_string` - includes a usage example for 

1376 debugging use cases. 

1377 

1378 """ 

1379 _values_bindparam: Optional[List[str]] = None 

1380 

1381 _visited_bindparam: Optional[List[str]] = None 

1382 

1383 inline: bool = False 

1384 

1385 ctes: Optional[MutableMapping[CTE, str]] 

1386 

1387 # Detect same CTE references - Dict[(level, name), cte] 

1388 # Level is required for supporting nesting 

1389 ctes_by_level_name: Dict[Tuple[int, str], CTE] 

1390 

1391 # To retrieve key/level in ctes_by_level_name - 

1392 # Dict[cte_reference, (level, cte_name, cte_opts)] 

1393 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]] 

1394 

1395 ctes_recursive: bool 

1396 

1397 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]") 

1398 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s") 

1399 _positional_pattern = re.compile( 

1400 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}" 

1401 ) 

1402 

1403 @classmethod 

1404 def _init_compiler_cls(cls): 

1405 cls._init_bind_translate() 

1406 

1407 @classmethod 

1408 def _init_bind_translate(cls): 

1409 reg = re.escape("".join(cls.bindname_escape_characters)) 

1410 cls._bind_translate_re = re.compile(f"[{reg}]") 

1411 cls._bind_translate_chars = cls.bindname_escape_characters 

1412 

1413 def __init__( 

1414 self, 

1415 dialect: Dialect, 

1416 statement: Optional[ClauseElement], 

1417 cache_key: Optional[CacheKey] = None, 

1418 column_keys: Optional[Sequence[str]] = None, 

1419 for_executemany: bool = False, 

1420 linting: Linting = NO_LINTING, 

1421 _supporting_against: Optional[SQLCompiler] = None, 

1422 **kwargs: Any, 

1423 ): 

1424 """Construct a new :class:`.SQLCompiler` object. 

1425 

1426 :param dialect: :class:`.Dialect` to be used 

1427 

1428 :param statement: :class:`_expression.ClauseElement` to be compiled 

1429 

1430 :param column_keys: a list of column names to be compiled into an 

1431 INSERT or UPDATE statement. 

1432 

1433 :param for_executemany: whether INSERT / UPDATE statements should 

1434 expect that they are to be invoked in an "executemany" style, 

1435 which may impact how the statement will be expected to return the 

1436 values of defaults and autoincrement / sequences and similar. 

1437 Depending on the backend and driver in use, support for retrieving 

1438 these values may be disabled which means SQL expressions may 

1439 be rendered inline, RETURNING may not be rendered, etc. 

1440 

1441 :param kwargs: additional keyword arguments to be consumed by the 

1442 superclass. 

1443 

1444 """ 

1445 self.column_keys = column_keys 

1446 

1447 self.cache_key = cache_key 

1448 

1449 if cache_key: 

1450 cksm = {b.key: b for b in cache_key[1]} 

1451 ckbm = {b: [b] for b in cache_key[1]} 

1452 self._cache_key_bind_match = (ckbm, cksm) 

1453 

1454 # compile INSERT/UPDATE defaults/sequences to expect executemany 

1455 # style execution, which may mean no pre-execute of defaults, 

1456 # or no RETURNING 

1457 self.for_executemany = for_executemany 

1458 

1459 self.linting = linting 

1460 

1461 # a dictionary of bind parameter keys to BindParameter 

1462 # instances. 

1463 self.binds = {} 

1464 

1465 # a dictionary of BindParameter instances to "compiled" names 

1466 # that are actually present in the generated SQL 

1467 self.bind_names = util.column_dict() 

1468 

1469 # stack which keeps track of nested SELECT statements 

1470 self.stack = [] 

1471 

1472 self._result_columns = [] 

1473 

1474 # true if the paramstyle is positional 

1475 self.positional = dialect.positional 

1476 if self.positional: 

1477 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric") 

1478 if nb: 

1479 self._numeric_binds_identifier_char = ( 

1480 "$" if dialect.paramstyle == "numeric_dollar" else ":" 

1481 ) 

1482 

1483 self.compilation_bindtemplate = _pyformat_template 

1484 else: 

1485 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle] 

1486 

1487 self.ctes = None 

1488 

1489 self.label_length = ( 

1490 dialect.label_length or dialect.max_identifier_length 

1491 ) 

1492 

1493 # a map which tracks "anonymous" identifiers that are created on 

1494 # the fly here 

1495 self.anon_map = prefix_anon_map() 

1496 

1497 # a map which tracks "truncated" names based on 

1498 # dialect.label_length or dialect.max_identifier_length 

1499 self.truncated_names: Dict[Tuple[str, str], str] = {} 

1500 self._truncated_counters: Dict[str, int] = {} 

1501 

1502 Compiled.__init__(self, dialect, statement, **kwargs) 

1503 

1504 if self.isinsert or self.isupdate or self.isdelete: 

1505 if TYPE_CHECKING: 

1506 assert isinstance(statement, UpdateBase) 

1507 

1508 if self.isinsert or self.isupdate: 

1509 if TYPE_CHECKING: 

1510 assert isinstance(statement, ValuesBase) 

1511 if statement._inline: 

1512 self.inline = True 

1513 elif self.for_executemany and ( 

1514 not self.isinsert 

1515 or ( 

1516 self.dialect.insert_executemany_returning 

1517 and statement._return_defaults 

1518 ) 

1519 ): 

1520 self.inline = True 

1521 

1522 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle] 

1523 

1524 if _supporting_against: 

1525 self.__dict__.update( 

1526 { 

1527 k: v 

1528 for k, v in _supporting_against.__dict__.items() 

1529 if k 

1530 not in { 

1531 "state", 

1532 "dialect", 

1533 "preparer", 

1534 "positional", 

1535 "_numeric_binds", 

1536 "compilation_bindtemplate", 

1537 "bindtemplate", 

1538 } 

1539 } 

1540 ) 

1541 

1542 if self.state is CompilerState.STRING_APPLIED: 

1543 if self.positional: 

1544 if self._numeric_binds: 

1545 self._process_numeric() 

1546 else: 

1547 self._process_positional() 

1548 

1549 if self._render_postcompile: 

1550 parameters = self.construct_params( 

1551 escape_names=False, 

1552 _no_postcompile=True, 

1553 ) 

1554 

1555 self._process_parameters_for_postcompile( 

1556 parameters, _populate_self=True 

1557 ) 

1558 

1559 @property 

1560 def insert_single_values_expr(self) -> Optional[str]: 

1561 """When an INSERT is compiled with a single set of parameters inside 

1562 a VALUES expression, the string is assigned here, where it can be 

1563 used for insert batching schemes to rewrite the VALUES expression. 

1564 

1565 .. versionchanged:: 2.0 This collection is no longer used by 

1566 SQLAlchemy's built-in dialects, in favor of the currently 

1567 internal ``_insertmanyvalues`` collection that is used only by 

1568 :class:`.SQLCompiler`. 

1569 

1570 """ 

1571 if self._insertmanyvalues is None: 

1572 return None 

1573 else: 

1574 return self._insertmanyvalues.single_values_expr 

1575 

1576 @util.ro_memoized_property 

1577 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]: 

1578 """The effective "returning" columns for INSERT, UPDATE or DELETE. 

1579 

1580 This is either the so-called "implicit returning" columns which are 

1581 calculated by the compiler on the fly, or those present based on what's 

1582 present in ``self.statement._returning`` (expanded into individual 

1583 columns using the ``._all_selected_columns`` attribute) i.e. those set 

1584 explicitly using the :meth:`.UpdateBase.returning` method. 

1585 

1586 .. versionadded:: 2.0 

1587 

1588 """ 

1589 if self.implicit_returning: 

1590 return self.implicit_returning 

1591 elif self.statement is not None and is_dml(self.statement): 

1592 return [ 

1593 c 

1594 for c in self.statement._all_selected_columns 

1595 if is_column_element(c) 

1596 ] 

1597 

1598 else: 

1599 return None 

1600 

1601 @property 

1602 def returning(self): 

1603 """backwards compatibility; returns the 

1604 effective_returning collection. 

1605 

1606 """ 

1607 return self.effective_returning 

1608 

1609 @property 

1610 def current_executable(self): 

1611 """Return the current 'executable' that is being compiled. 

1612 

1613 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`, 

1614 :class:`_sql.Update`, :class:`_sql.Delete`, 

1615 :class:`_sql.CompoundSelect` object that is being compiled. 

1616 Specifically it's assigned to the ``self.stack`` list of elements. 

1617 

1618 When a statement like the above is being compiled, it normally 

1619 is also assigned to the ``.statement`` attribute of the 

1620 :class:`_sql.Compiler` object. However, all SQL constructs are 

1621 ultimately nestable, and this attribute should never be consulted 

1622 by a ``visit_`` method, as it is not guaranteed to be assigned 

1623 nor guaranteed to correspond to the current statement being compiled. 

1624 

1625 """ 

1626 try: 

1627 return self.stack[-1]["selectable"] 

1628 except IndexError as ie: 

1629 raise IndexError("Compiler does not have a stack entry") from ie 

1630 

1631 @property 

1632 def prefetch(self): 

1633 return list(self.insert_prefetch) + list(self.update_prefetch) 

1634 

1635 @util.memoized_property 

1636 def _global_attributes(self) -> Dict[Any, Any]: 

1637 return {} 

1638 

1639 @util.memoized_instancemethod 

1640 def _init_cte_state(self) -> MutableMapping[CTE, str]: 

1641 """Initialize collections related to CTEs only if 

1642 a CTE is located, to save on the overhead of 

1643 these collections otherwise. 

1644 

1645 """ 

1646 # collect CTEs to tack on top of a SELECT 

1647 # To store the query to print - Dict[cte, text_query] 

1648 ctes: MutableMapping[CTE, str] = util.OrderedDict() 

1649 self.ctes = ctes 

1650 

1651 # Detect same CTE references - Dict[(level, name), cte] 

1652 # Level is required for supporting nesting 

1653 self.ctes_by_level_name = {} 

1654 

1655 # To retrieve key/level in ctes_by_level_name - 

1656 # Dict[cte_reference, (level, cte_name, cte_opts)] 

1657 self.level_name_by_cte = {} 

1658 

1659 self.ctes_recursive = False 

1660 

1661 return ctes 

1662 

1663 @contextlib.contextmanager 

1664 def _nested_result(self): 

1665 """special API to support the use case of 'nested result sets'""" 

1666 result_columns, ordered_columns = ( 

1667 self._result_columns, 

1668 self._ordered_columns, 

1669 ) 

1670 self._result_columns, self._ordered_columns = [], False 

1671 

1672 try: 

1673 if self.stack: 

1674 entry = self.stack[-1] 

1675 entry["need_result_map_for_nested"] = True 

1676 else: 

1677 entry = None 

1678 yield self._result_columns, self._ordered_columns 

1679 finally: 

1680 if entry: 

1681 entry.pop("need_result_map_for_nested") 

1682 self._result_columns, self._ordered_columns = ( 

1683 result_columns, 

1684 ordered_columns, 

1685 ) 

1686 

1687 def _process_positional(self): 

1688 assert not self.positiontup 

1689 assert self.state is CompilerState.STRING_APPLIED 

1690 assert not self._numeric_binds 

1691 

1692 if self.dialect.paramstyle == "format": 

1693 placeholder = "%s" 

1694 else: 

1695 assert self.dialect.paramstyle == "qmark" 

1696 placeholder = "?" 

1697 

1698 positions = [] 

1699 

1700 def find_position(m: re.Match[str]) -> str: 

1701 normal_bind = m.group(1) 

1702 if normal_bind: 

1703 positions.append(normal_bind) 

1704 return placeholder 

1705 else: 

1706 # this a post-compile bind 

1707 positions.append(m.group(2)) 

1708 return m.group(0) 

1709 

1710 self.string = re.sub( 

1711 self._positional_pattern, find_position, self.string 

1712 ) 

1713 

1714 if self.escaped_bind_names: 

1715 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()} 

1716 assert len(self.escaped_bind_names) == len(reverse_escape) 

1717 self.positiontup = [ 

1718 reverse_escape.get(name, name) for name in positions 

1719 ] 

1720 else: 

1721 self.positiontup = positions 

1722 

1723 if self._insertmanyvalues: 

1724 positions = [] 

1725 

1726 single_values_expr = re.sub( 

1727 self._positional_pattern, 

1728 find_position, 

1729 self._insertmanyvalues.single_values_expr, 

1730 ) 

1731 insert_crud_params = [ 

1732 ( 

1733 v[0], 

1734 v[1], 

1735 re.sub(self._positional_pattern, find_position, v[2]), 

1736 v[3], 

1737 ) 

1738 for v in self._insertmanyvalues.insert_crud_params 

1739 ] 

1740 

1741 self._insertmanyvalues = self._insertmanyvalues._replace( 

1742 single_values_expr=single_values_expr, 

1743 insert_crud_params=insert_crud_params, 

1744 ) 

1745 

1746 def _process_numeric(self): 

1747 assert self._numeric_binds 

1748 assert self.state is CompilerState.STRING_APPLIED 

1749 

1750 num = 1 

1751 param_pos: Dict[str, str] = {} 

1752 order: Iterable[str] 

1753 if self._insertmanyvalues and self._values_bindparam is not None: 

1754 # bindparams that are not in values are always placed first. 

1755 # this avoids the need of changing them when using executemany 

1756 # values () () 

1757 order = itertools.chain( 

1758 ( 

1759 name 

1760 for name in self.bind_names.values() 

1761 if name not in self._values_bindparam 

1762 ), 

1763 self.bind_names.values(), 

1764 ) 

1765 else: 

1766 order = self.bind_names.values() 

1767 

1768 for bind_name in order: 

1769 if bind_name in param_pos: 

1770 continue 

1771 bind = self.binds[bind_name] 

1772 if ( 

1773 bind in self.post_compile_params 

1774 or bind in self.literal_execute_params 

1775 ): 

1776 # set to None to just mark the in positiontup, it will not 

1777 # be replaced below. 

1778 param_pos[bind_name] = None # type: ignore 

1779 else: 

1780 ph = f"{self._numeric_binds_identifier_char}{num}" 

1781 num += 1 

1782 param_pos[bind_name] = ph 

1783 

1784 self.next_numeric_pos = num 

1785 

1786 self.positiontup = list(param_pos) 

1787 if self.escaped_bind_names: 

1788 len_before = len(param_pos) 

1789 param_pos = { 

1790 self.escaped_bind_names.get(name, name): pos 

1791 for name, pos in param_pos.items() 

1792 } 

1793 assert len(param_pos) == len_before 

1794 

1795 # Can't use format here since % chars are not escaped. 

1796 self.string = self._pyformat_pattern.sub( 

1797 lambda m: param_pos[m.group(1)], self.string 

1798 ) 

1799 

1800 if self._insertmanyvalues: 

1801 single_values_expr = ( 

1802 # format is ok here since single_values_expr includes only 

1803 # place-holders 

1804 self._insertmanyvalues.single_values_expr 

1805 % param_pos 

1806 ) 

1807 insert_crud_params = [ 

1808 (v[0], v[1], "%s", v[3]) 

1809 for v in self._insertmanyvalues.insert_crud_params 

1810 ] 

1811 

1812 self._insertmanyvalues = self._insertmanyvalues._replace( 

1813 # This has the numbers (:1, :2) 

1814 single_values_expr=single_values_expr, 

1815 # The single binds are instead %s so they can be formatted 

1816 insert_crud_params=insert_crud_params, 

1817 ) 

1818 

1819 @util.memoized_property 

1820 def _bind_processors( 

1821 self, 

1822 ) -> MutableMapping[ 

1823 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]] 

1824 ]: 

1825 # mypy is not able to see the two value types as the above Union, 

1826 # it just sees "object". don't know how to resolve 

1827 return { 

1828 key: value # type: ignore 

1829 for key, value in ( 

1830 ( 

1831 self.bind_names[bindparam], 

1832 ( 

1833 bindparam.type._cached_bind_processor(self.dialect) 

1834 if not bindparam.type._is_tuple_type 

1835 else tuple( 

1836 elem_type._cached_bind_processor(self.dialect) 

1837 for elem_type in cast( 

1838 TupleType, bindparam.type 

1839 ).types 

1840 ) 

1841 ), 

1842 ) 

1843 for bindparam in self.bind_names 

1844 ) 

1845 if value is not None 

1846 } 

1847 

1848 def is_subquery(self): 

1849 return len(self.stack) > 1 

1850 

1851 @property 

1852 def sql_compiler(self) -> Self: 

1853 return self 

1854 

1855 def construct_expanded_state( 

1856 self, 

1857 params: Optional[_CoreSingleExecuteParams] = None, 

1858 escape_names: bool = True, 

1859 ) -> ExpandedState: 

1860 """Return a new :class:`.ExpandedState` for a given parameter set. 

1861 

1862 For queries that use "expanding" or other late-rendered parameters, 

1863 this method will provide for both the finalized SQL string as well 

1864 as the parameters that would be used for a particular parameter set. 

1865 

1866 .. versionadded:: 2.0.0rc1 

1867 

1868 """ 

1869 parameters = self.construct_params( 

1870 params, 

1871 escape_names=escape_names, 

1872 _no_postcompile=True, 

1873 ) 

1874 return self._process_parameters_for_postcompile( 

1875 parameters, 

1876 ) 

1877 

1878 def construct_params( 

1879 self, 

1880 params: Optional[_CoreSingleExecuteParams] = None, 

1881 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

1882 escape_names: bool = True, 

1883 _group_number: Optional[int] = None, 

1884 _check: bool = True, 

1885 _no_postcompile: bool = False, 

1886 ) -> _MutableCoreSingleExecuteParams: 

1887 """return a dictionary of bind parameter keys and values""" 

1888 

1889 if self._render_postcompile and not _no_postcompile: 

1890 assert self._post_compile_expanded_state is not None 

1891 if not params: 

1892 return dict(self._post_compile_expanded_state.parameters) 

1893 else: 

1894 raise exc.InvalidRequestError( 

1895 "can't construct new parameters when render_postcompile " 

1896 "is used; the statement is hard-linked to the original " 

1897 "parameters. Use construct_expanded_state to generate a " 

1898 "new statement and parameters." 

1899 ) 

1900 

1901 has_escaped_names = escape_names and bool(self.escaped_bind_names) 

1902 

1903 if extracted_parameters: 

1904 # related the bound parameters collected in the original cache key 

1905 # to those collected in the incoming cache key. They will not have 

1906 # matching names but they will line up positionally in the same 

1907 # way. The parameters present in self.bind_names may be clones of 

1908 # these original cache key params in the case of DML but the .key 

1909 # will be guaranteed to match. 

1910 if self.cache_key is None: 

1911 raise exc.CompileError( 

1912 "This compiled object has no original cache key; " 

1913 "can't pass extracted_parameters to construct_params" 

1914 ) 

1915 else: 

1916 orig_extracted = self.cache_key[1] 

1917 

1918 ckbm_tuple = self._cache_key_bind_match 

1919 assert ckbm_tuple is not None 

1920 ckbm, _ = ckbm_tuple 

1921 resolved_extracted = { 

1922 bind: extracted 

1923 for b, extracted in zip(orig_extracted, extracted_parameters) 

1924 for bind in ckbm[b] 

1925 } 

1926 else: 

1927 resolved_extracted = None 

1928 

1929 if params: 

1930 pd = {} 

1931 for bindparam, name in self.bind_names.items(): 

1932 escaped_name = ( 

1933 self.escaped_bind_names.get(name, name) 

1934 if has_escaped_names 

1935 else name 

1936 ) 

1937 

1938 if bindparam.key in params: 

1939 pd[escaped_name] = params[bindparam.key] 

1940 elif name in params: 

1941 pd[escaped_name] = params[name] 

1942 

1943 elif _check and bindparam.required: 

1944 if _group_number: 

1945 raise exc.InvalidRequestError( 

1946 "A value is required for bind parameter %r, " 

1947 "in parameter group %d" 

1948 % (bindparam.key, _group_number), 

1949 code="cd3x", 

1950 ) 

1951 else: 

1952 raise exc.InvalidRequestError( 

1953 "A value is required for bind parameter %r" 

1954 % bindparam.key, 

1955 code="cd3x", 

1956 ) 

1957 else: 

1958 if resolved_extracted: 

1959 value_param = resolved_extracted.get( 

1960 bindparam, bindparam 

1961 ) 

1962 else: 

1963 value_param = bindparam 

1964 

1965 if bindparam.callable: 

1966 pd[escaped_name] = value_param.effective_value 

1967 else: 

1968 pd[escaped_name] = value_param.value 

1969 return pd 

1970 else: 

1971 pd = {} 

1972 for bindparam, name in self.bind_names.items(): 

1973 escaped_name = ( 

1974 self.escaped_bind_names.get(name, name) 

1975 if has_escaped_names 

1976 else name 

1977 ) 

1978 

1979 if _check and bindparam.required: 

1980 if _group_number: 

1981 raise exc.InvalidRequestError( 

1982 "A value is required for bind parameter %r, " 

1983 "in parameter group %d" 

1984 % (bindparam.key, _group_number), 

1985 code="cd3x", 

1986 ) 

1987 else: 

1988 raise exc.InvalidRequestError( 

1989 "A value is required for bind parameter %r" 

1990 % bindparam.key, 

1991 code="cd3x", 

1992 ) 

1993 

1994 if resolved_extracted: 

1995 value_param = resolved_extracted.get(bindparam, bindparam) 

1996 else: 

1997 value_param = bindparam 

1998 

1999 if bindparam.callable: 

2000 pd[escaped_name] = value_param.effective_value 

2001 else: 

2002 pd[escaped_name] = value_param.value 

2003 

2004 return pd 

2005 

2006 @util.memoized_instancemethod 

2007 def _get_set_input_sizes_lookup(self): 

2008 dialect = self.dialect 

2009 

2010 include_types = dialect.include_set_input_sizes 

2011 exclude_types = dialect.exclude_set_input_sizes 

2012 

2013 dbapi = dialect.dbapi 

2014 

2015 def lookup_type(typ): 

2016 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi) 

2017 

2018 if ( 

2019 dbtype is not None 

2020 and (exclude_types is None or dbtype not in exclude_types) 

2021 and (include_types is None or dbtype in include_types) 

2022 ): 

2023 return dbtype 

2024 else: 

2025 return None 

2026 

2027 inputsizes = {} 

2028 

2029 literal_execute_params = self.literal_execute_params 

2030 

2031 for bindparam in self.bind_names: 

2032 if bindparam in literal_execute_params: 

2033 continue 

2034 

2035 if bindparam.type._is_tuple_type: 

2036 inputsizes[bindparam] = [ 

2037 lookup_type(typ) 

2038 for typ in cast(TupleType, bindparam.type).types 

2039 ] 

2040 else: 

2041 inputsizes[bindparam] = lookup_type(bindparam.type) 

2042 

2043 return inputsizes 

2044 

2045 @property 

2046 def params(self): 

2047 """Return the bind param dictionary embedded into this 

2048 compiled object, for those values that are present. 

2049 

2050 .. seealso:: 

2051 

2052 :ref:`faq_sql_expression_string` - includes a usage example for 

2053 debugging use cases. 

2054 

2055 """ 

2056 return self.construct_params(_check=False) 

2057 

2058 def _process_parameters_for_postcompile( 

2059 self, 

2060 parameters: _MutableCoreSingleExecuteParams, 

2061 _populate_self: bool = False, 

2062 ) -> ExpandedState: 

2063 """handle special post compile parameters. 

2064 

2065 These include: 

2066 

2067 * "expanding" parameters -typically IN tuples that are rendered 

2068 on a per-parameter basis for an otherwise fixed SQL statement string. 

2069 

2070 * literal_binds compiled with the literal_execute flag. Used for 

2071 things like SQL Server "TOP N" where the driver does not accommodate 

2072 N as a bound parameter. 

2073 

2074 """ 

2075 

2076 expanded_parameters = {} 

2077 new_positiontup: Optional[List[str]] 

2078 

2079 pre_expanded_string = self._pre_expanded_string 

2080 if pre_expanded_string is None: 

2081 pre_expanded_string = self.string 

2082 

2083 if self.positional: 

2084 new_positiontup = [] 

2085 

2086 pre_expanded_positiontup = self._pre_expanded_positiontup 

2087 if pre_expanded_positiontup is None: 

2088 pre_expanded_positiontup = self.positiontup 

2089 

2090 else: 

2091 new_positiontup = pre_expanded_positiontup = None 

2092 

2093 processors = self._bind_processors 

2094 single_processors = cast( 

2095 "Mapping[str, _BindProcessorType[Any]]", processors 

2096 ) 

2097 tuple_processors = cast( 

2098 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors 

2099 ) 

2100 

2101 new_processors: Dict[str, _BindProcessorType[Any]] = {} 

2102 

2103 replacement_expressions: Dict[str, Any] = {} 

2104 to_update_sets: Dict[str, Any] = {} 

2105 

2106 # notes: 

2107 # *unescaped* parameter names in: 

2108 # self.bind_names, self.binds, self._bind_processors, self.positiontup 

2109 # 

2110 # *escaped* parameter names in: 

2111 # construct_params(), replacement_expressions 

2112 

2113 numeric_positiontup: Optional[List[str]] = None 

2114 

2115 if self.positional and pre_expanded_positiontup is not None: 

2116 names: Iterable[str] = pre_expanded_positiontup 

2117 if self._numeric_binds: 

2118 numeric_positiontup = [] 

2119 else: 

2120 names = self.bind_names.values() 

2121 

2122 ebn = self.escaped_bind_names 

2123 for name in names: 

2124 escaped_name = ebn.get(name, name) if ebn else name 

2125 parameter = self.binds[name] 

2126 

2127 if parameter in self.literal_execute_params: 

2128 if escaped_name not in replacement_expressions: 

2129 replacement_expressions[escaped_name] = ( 

2130 self.render_literal_bindparam( 

2131 parameter, 

2132 render_literal_value=parameters.pop(escaped_name), 

2133 ) 

2134 ) 

2135 continue 

2136 

2137 if parameter in self.post_compile_params: 

2138 if escaped_name in replacement_expressions: 

2139 to_update = to_update_sets[escaped_name] 

2140 values = None 

2141 else: 

2142 # we are removing the parameter from parameters 

2143 # because it is a list value, which is not expected by 

2144 # TypeEngine objects that would otherwise be asked to 

2145 # process it. the single name is being replaced with 

2146 # individual numbered parameters for each value in the 

2147 # param. 

2148 # 

2149 # note we are also inserting *escaped* parameter names 

2150 # into the given dictionary. default dialect will 

2151 # use these param names directly as they will not be 

2152 # in the escaped_bind_names dictionary. 

2153 values = parameters.pop(name) 

2154 

2155 leep_res = self._literal_execute_expanding_parameter( 

2156 escaped_name, parameter, values 

2157 ) 

2158 (to_update, replacement_expr) = leep_res 

2159 

2160 to_update_sets[escaped_name] = to_update 

2161 replacement_expressions[escaped_name] = replacement_expr 

2162 

2163 if not parameter.literal_execute: 

2164 parameters.update(to_update) 

2165 if parameter.type._is_tuple_type: 

2166 assert values is not None 

2167 new_processors.update( 

2168 ( 

2169 "%s_%s_%s" % (name, i, j), 

2170 tuple_processors[name][j - 1], 

2171 ) 

2172 for i, tuple_element in enumerate(values, 1) 

2173 for j, _ in enumerate(tuple_element, 1) 

2174 if name in tuple_processors 

2175 and tuple_processors[name][j - 1] is not None 

2176 ) 

2177 else: 

2178 new_processors.update( 

2179 (key, single_processors[name]) 

2180 for key, _ in to_update 

2181 if name in single_processors 

2182 ) 

2183 if numeric_positiontup is not None: 

2184 numeric_positiontup.extend( 

2185 name for name, _ in to_update 

2186 ) 

2187 elif new_positiontup is not None: 

2188 # to_update has escaped names, but that's ok since 

2189 # these are new names, that aren't in the 

2190 # escaped_bind_names dict. 

2191 new_positiontup.extend(name for name, _ in to_update) 

2192 expanded_parameters[name] = [ 

2193 expand_key for expand_key, _ in to_update 

2194 ] 

2195 elif new_positiontup is not None: 

2196 new_positiontup.append(name) 

2197 

2198 def process_expanding(m): 

2199 key = m.group(1) 

2200 expr = replacement_expressions[key] 

2201 

2202 # if POSTCOMPILE included a bind_expression, render that 

2203 # around each element 

2204 if m.group(2): 

2205 tok = m.group(2).split("~~") 

2206 be_left, be_right = tok[1], tok[3] 

2207 expr = ", ".join( 

2208 "%s%s%s" % (be_left, exp, be_right) 

2209 for exp in expr.split(", ") 

2210 ) 

2211 return expr 

2212 

2213 statement = re.sub( 

2214 self._post_compile_pattern, process_expanding, pre_expanded_string 

2215 ) 

2216 

2217 if numeric_positiontup is not None: 

2218 assert new_positiontup is not None 

2219 param_pos = { 

2220 key: f"{self._numeric_binds_identifier_char}{num}" 

2221 for num, key in enumerate( 

2222 numeric_positiontup, self.next_numeric_pos 

2223 ) 

2224 } 

2225 # Can't use format here since % chars are not escaped. 

2226 statement = self._pyformat_pattern.sub( 

2227 lambda m: param_pos[m.group(1)], statement 

2228 ) 

2229 new_positiontup.extend(numeric_positiontup) 

2230 

2231 expanded_state = ExpandedState( 

2232 statement, 

2233 parameters, 

2234 new_processors, 

2235 new_positiontup, 

2236 expanded_parameters, 

2237 ) 

2238 

2239 if _populate_self: 

2240 # this is for the "render_postcompile" flag, which is not 

2241 # otherwise used internally and is for end-user debugging and 

2242 # special use cases. 

2243 self._pre_expanded_string = pre_expanded_string 

2244 self._pre_expanded_positiontup = pre_expanded_positiontup 

2245 self.string = expanded_state.statement 

2246 self.positiontup = ( 

2247 list(expanded_state.positiontup or ()) 

2248 if self.positional 

2249 else None 

2250 ) 

2251 self._post_compile_expanded_state = expanded_state 

2252 

2253 return expanded_state 

2254 

2255 @util.preload_module("sqlalchemy.engine.cursor") 

2256 def _create_result_map(self): 

2257 """utility method used for unit tests only.""" 

2258 cursor = util.preloaded.engine_cursor 

2259 return cursor.CursorResultMetaData._create_description_match_map( 

2260 self._result_columns 

2261 ) 

2262 

2263 # assigned by crud.py for insert/update statements 

2264 _get_bind_name_for_col: _BindNameForColProtocol 

2265 

2266 @util.memoized_property 

2267 def _within_exec_param_key_getter(self) -> Callable[[Any], str]: 

2268 getter = self._get_bind_name_for_col 

2269 return getter 

2270 

2271 @util.memoized_property 

2272 @util.preload_module("sqlalchemy.engine.result") 

2273 def _inserted_primary_key_from_lastrowid_getter(self): 

2274 result = util.preloaded.engine_result 

2275 

2276 param_key_getter = self._within_exec_param_key_getter 

2277 

2278 assert self.compile_state is not None 

2279 statement = self.compile_state.statement 

2280 

2281 if TYPE_CHECKING: 

2282 assert isinstance(statement, Insert) 

2283 

2284 table = statement.table 

2285 

2286 getters = [ 

2287 (operator.methodcaller("get", param_key_getter(col), None), col) 

2288 for col in table.primary_key 

2289 ] 

2290 

2291 autoinc_getter = None 

2292 autoinc_col = table._autoincrement_column 

2293 if autoinc_col is not None: 

2294 # apply type post processors to the lastrowid 

2295 lastrowid_processor = autoinc_col.type._cached_result_processor( 

2296 self.dialect, None 

2297 ) 

2298 autoinc_key = param_key_getter(autoinc_col) 

2299 

2300 # if a bind value is present for the autoincrement column 

2301 # in the parameters, we need to do the logic dictated by 

2302 # #7998; honor a non-None user-passed parameter over lastrowid. 

2303 # previously in the 1.4 series we weren't fetching lastrowid 

2304 # at all if the key were present in the parameters 

2305 if autoinc_key in self.binds: 

2306 

2307 def _autoinc_getter(lastrowid, parameters): 

2308 param_value = parameters.get(autoinc_key, lastrowid) 

2309 if param_value is not None: 

2310 # they supplied non-None parameter, use that. 

2311 # SQLite at least is observed to return the wrong 

2312 # cursor.lastrowid for INSERT..ON CONFLICT so it 

2313 # can't be used in all cases 

2314 return param_value 

2315 else: 

2316 # use lastrowid 

2317 return lastrowid 

2318 

2319 # work around mypy https://github.com/python/mypy/issues/14027 

2320 autoinc_getter = _autoinc_getter 

2321 

2322 else: 

2323 lastrowid_processor = None 

2324 

2325 row_fn = result.result_tuple([col.key for col in table.primary_key]) 

2326 

2327 def get(lastrowid, parameters): 

2328 """given cursor.lastrowid value and the parameters used for INSERT, 

2329 return a "row" that represents the primary key, either by 

2330 using the "lastrowid" or by extracting values from the parameters 

2331 that were sent along with the INSERT. 

2332 

2333 """ 

2334 if lastrowid_processor is not None: 

2335 lastrowid = lastrowid_processor(lastrowid) 

2336 

2337 if lastrowid is None: 

2338 return row_fn(getter(parameters) for getter, col in getters) 

2339 else: 

2340 return row_fn( 

2341 ( 

2342 ( 

2343 autoinc_getter(lastrowid, parameters) 

2344 if autoinc_getter is not None 

2345 else lastrowid 

2346 ) 

2347 if col is autoinc_col 

2348 else getter(parameters) 

2349 ) 

2350 for getter, col in getters 

2351 ) 

2352 

2353 return get 

2354 

2355 @util.memoized_property 

2356 @util.preload_module("sqlalchemy.engine.result") 

2357 def _inserted_primary_key_from_returning_getter(self): 

2358 result = util.preloaded.engine_result 

2359 

2360 assert self.compile_state is not None 

2361 statement = self.compile_state.statement 

2362 

2363 if TYPE_CHECKING: 

2364 assert isinstance(statement, Insert) 

2365 

2366 param_key_getter = self._within_exec_param_key_getter 

2367 table = statement.table 

2368 

2369 returning = self.implicit_returning 

2370 assert returning is not None 

2371 ret = {col: idx for idx, col in enumerate(returning)} 

2372 

2373 getters = cast( 

2374 "List[Tuple[Callable[[Any], Any], bool]]", 

2375 [ 

2376 ( 

2377 (operator.itemgetter(ret[col]), True) 

2378 if col in ret 

2379 else ( 

2380 operator.methodcaller( 

2381 "get", param_key_getter(col), None 

2382 ), 

2383 False, 

2384 ) 

2385 ) 

2386 for col in table.primary_key 

2387 ], 

2388 ) 

2389 

2390 row_fn = result.result_tuple([col.key for col in table.primary_key]) 

2391 

2392 def get(row, parameters): 

2393 return row_fn( 

2394 getter(row) if use_row else getter(parameters) 

2395 for getter, use_row in getters 

2396 ) 

2397 

2398 return get 

2399 

2400 def default_from(self) -> str: 

2401 """Called when a SELECT statement has no froms, and no FROM clause is 

2402 to be appended. 

2403 

2404 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string 

2405 output. 

2406 

2407 """ 

2408 return "" 

2409 

2410 def visit_override_binds(self, override_binds, **kw): 

2411 """SQL compile the nested element of an _OverrideBinds with 

2412 bindparams swapped out. 

2413 

2414 The _OverrideBinds is not normally expected to be compiled; it 

2415 is meant to be used when an already cached statement is to be used, 

2416 the compilation was already performed, and only the bound params should 

2417 be swapped in at execution time. 

2418 

2419 However, there are test cases that exericise this object, and 

2420 additionally the ORM subquery loader is known to feed in expressions 

2421 which include this construct into new queries (discovered in #11173), 

2422 so it has to do the right thing at compile time as well. 

2423 

2424 """ 

2425 

2426 # get SQL text first 

2427 sqltext = override_binds.element._compiler_dispatch(self, **kw) 

2428 

2429 # for a test compile that is not for caching, change binds after the 

2430 # fact. note that we don't try to 

2431 # swap the bindparam as we compile, because our element may be 

2432 # elsewhere in the statement already (e.g. a subquery or perhaps a 

2433 # CTE) and was already visited / compiled. See 

2434 # test_relationship_criteria.py -> 

2435 # test_selectinload_local_criteria_subquery 

2436 for k in override_binds.translate: 

2437 if k not in self.binds: 

2438 continue 

2439 bp = self.binds[k] 

2440 

2441 # so this would work, just change the value of bp in place. 

2442 # but we dont want to mutate things outside. 

2443 # bp.value = override_binds.translate[bp.key] 

2444 # continue 

2445 

2446 # instead, need to replace bp with new_bp or otherwise accommodate 

2447 # in all internal collections 

2448 new_bp = bp._with_value( 

2449 override_binds.translate[bp.key], 

2450 maintain_key=True, 

2451 required=False, 

2452 ) 

2453 

2454 name = self.bind_names[bp] 

2455 self.binds[k] = self.binds[name] = new_bp 

2456 self.bind_names[new_bp] = name 

2457 self.bind_names.pop(bp, None) 

2458 

2459 if bp in self.post_compile_params: 

2460 self.post_compile_params |= {new_bp} 

2461 if bp in self.literal_execute_params: 

2462 self.literal_execute_params |= {new_bp} 

2463 

2464 ckbm_tuple = self._cache_key_bind_match 

2465 if ckbm_tuple: 

2466 ckbm, cksm = ckbm_tuple 

2467 for bp in bp._cloned_set: 

2468 if bp.key in cksm: 

2469 cb = cksm[bp.key] 

2470 ckbm[cb].append(new_bp) 

2471 

2472 return sqltext 

2473 

2474 def visit_grouping(self, grouping, asfrom=False, **kwargs): 

2475 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")" 

2476 

2477 def visit_select_statement_grouping(self, grouping, **kwargs): 

2478 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")" 

2479 

2480 def visit_label_reference( 

2481 self, element, within_columns_clause=False, **kwargs 

2482 ): 

2483 if self.stack and self.dialect.supports_simple_order_by_label: 

2484 try: 

2485 compile_state = cast( 

2486 "Union[SelectState, CompoundSelectState]", 

2487 self.stack[-1]["compile_state"], 

2488 ) 

2489 except KeyError as ke: 

2490 raise exc.CompileError( 

2491 "Can't resolve label reference for ORDER BY / " 

2492 "GROUP BY / DISTINCT etc." 

2493 ) from ke 

2494 

2495 ( 

2496 with_cols, 

2497 only_froms, 

2498 only_cols, 

2499 ) = compile_state._label_resolve_dict 

2500 if within_columns_clause: 

2501 resolve_dict = only_froms 

2502 else: 

2503 resolve_dict = only_cols 

2504 

2505 # this can be None in the case that a _label_reference() 

2506 # were subject to a replacement operation, in which case 

2507 # the replacement of the Label element may have changed 

2508 # to something else like a ColumnClause expression. 

2509 order_by_elem = element.element._order_by_label_element 

2510 

2511 if ( 

2512 order_by_elem is not None 

2513 and order_by_elem.name in resolve_dict 

2514 and order_by_elem.shares_lineage( 

2515 resolve_dict[order_by_elem.name] 

2516 ) 

2517 ): 

2518 kwargs["render_label_as_label"] = ( 

2519 element.element._order_by_label_element 

2520 ) 

2521 return self.process( 

2522 element.element, 

2523 within_columns_clause=within_columns_clause, 

2524 **kwargs, 

2525 ) 

2526 

2527 def visit_textual_label_reference( 

2528 self, element, within_columns_clause=False, **kwargs 

2529 ): 

2530 if not self.stack: 

2531 # compiling the element outside of the context of a SELECT 

2532 return self.process(element._text_clause) 

2533 

2534 try: 

2535 compile_state = cast( 

2536 "Union[SelectState, CompoundSelectState]", 

2537 self.stack[-1]["compile_state"], 

2538 ) 

2539 except KeyError as ke: 

2540 coercions._no_text_coercion( 

2541 element.element, 

2542 extra=( 

2543 "Can't resolve label reference for ORDER BY / " 

2544 "GROUP BY / DISTINCT etc." 

2545 ), 

2546 exc_cls=exc.CompileError, 

2547 err=ke, 

2548 ) 

2549 

2550 with_cols, only_froms, only_cols = compile_state._label_resolve_dict 

2551 try: 

2552 if within_columns_clause: 

2553 col = only_froms[element.element] 

2554 else: 

2555 col = with_cols[element.element] 

2556 except KeyError as err: 

2557 coercions._no_text_coercion( 

2558 element.element, 

2559 extra=( 

2560 "Can't resolve label reference for ORDER BY / " 

2561 "GROUP BY / DISTINCT etc." 

2562 ), 

2563 exc_cls=exc.CompileError, 

2564 err=err, 

2565 ) 

2566 else: 

2567 kwargs["render_label_as_label"] = col 

2568 return self.process( 

2569 col, within_columns_clause=within_columns_clause, **kwargs 

2570 ) 

2571 

2572 def visit_label( 

2573 self, 

2574 label, 

2575 add_to_result_map=None, 

2576 within_label_clause=False, 

2577 within_columns_clause=False, 

2578 render_label_as_label=None, 

2579 result_map_targets=(), 

2580 **kw, 

2581 ): 

2582 # only render labels within the columns clause 

2583 # or ORDER BY clause of a select. dialect-specific compilers 

2584 # can modify this behavior. 

2585 render_label_with_as = ( 

2586 within_columns_clause and not within_label_clause 

2587 ) 

2588 render_label_only = render_label_as_label is label 

2589 

2590 if render_label_only or render_label_with_as: 

2591 if isinstance(label.name, elements._truncated_label): 

2592 labelname = self._truncated_identifier("colident", label.name) 

2593 else: 

2594 labelname = label.name 

2595 

2596 if render_label_with_as: 

2597 if add_to_result_map is not None: 

2598 add_to_result_map( 

2599 labelname, 

2600 label.name, 

2601 (label, labelname) + label._alt_names + result_map_targets, 

2602 label.type, 

2603 ) 

2604 return ( 

2605 label.element._compiler_dispatch( 

2606 self, 

2607 within_columns_clause=True, 

2608 within_label_clause=True, 

2609 **kw, 

2610 ) 

2611 + OPERATORS[operators.as_] 

2612 + self.preparer.format_label(label, labelname) 

2613 ) 

2614 elif render_label_only: 

2615 return self.preparer.format_label(label, labelname) 

2616 else: 

2617 return label.element._compiler_dispatch( 

2618 self, within_columns_clause=False, **kw 

2619 ) 

2620 

2621 def _fallback_column_name(self, column): 

2622 raise exc.CompileError( 

2623 "Cannot compile Column object until its 'name' is assigned." 

2624 ) 

2625 

2626 def visit_lambda_element(self, element, **kw): 

2627 sql_element = element._resolved 

2628 return self.process(sql_element, **kw) 

2629 

2630 def visit_column( 

2631 self, 

2632 column: ColumnClause[Any], 

2633 add_to_result_map: Optional[_ResultMapAppender] = None, 

2634 include_table: bool = True, 

2635 result_map_targets: Tuple[Any, ...] = (), 

2636 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None, 

2637 **kwargs: Any, 

2638 ) -> str: 

2639 name = orig_name = column.name 

2640 if name is None: 

2641 name = self._fallback_column_name(column) 

2642 

2643 is_literal = column.is_literal 

2644 if not is_literal and isinstance(name, elements._truncated_label): 

2645 name = self._truncated_identifier("colident", name) 

2646 

2647 if add_to_result_map is not None: 

2648 targets = (column, name, column.key) + result_map_targets 

2649 if column._tq_label: 

2650 targets += (column._tq_label,) 

2651 

2652 add_to_result_map(name, orig_name, targets, column.type) 

2653 

2654 if is_literal: 

2655 # note we are not currently accommodating for 

2656 # literal_column(quoted_name('ident', True)) here 

2657 name = self.escape_literal_column(name) 

2658 else: 

2659 name = self.preparer.quote(name) 

2660 table = column.table 

2661 if table is None or not include_table or not table.named_with_column: 

2662 return name 

2663 else: 

2664 effective_schema = self.preparer.schema_for_object(table) 

2665 

2666 if effective_schema: 

2667 schema_prefix = ( 

2668 self.preparer.quote_schema(effective_schema) + "." 

2669 ) 

2670 else: 

2671 schema_prefix = "" 

2672 

2673 if TYPE_CHECKING: 

2674 assert isinstance(table, NamedFromClause) 

2675 tablename = table.name 

2676 

2677 if ( 

2678 not effective_schema 

2679 and ambiguous_table_name_map 

2680 and tablename in ambiguous_table_name_map 

2681 ): 

2682 tablename = ambiguous_table_name_map[tablename] 

2683 

2684 if isinstance(tablename, elements._truncated_label): 

2685 tablename = self._truncated_identifier("alias", tablename) 

2686 

2687 return schema_prefix + self.preparer.quote(tablename) + "." + name 

2688 

2689 def visit_collation(self, element, **kw): 

2690 return self.preparer.format_collation(element.collation) 

2691 

2692 def visit_fromclause(self, fromclause, **kwargs): 

2693 return fromclause.name 

2694 

2695 def visit_index(self, index, **kwargs): 

2696 return index.name 

2697 

2698 def visit_typeclause(self, typeclause, **kw): 

2699 kw["type_expression"] = typeclause 

2700 kw["identifier_preparer"] = self.preparer 

2701 return self.dialect.type_compiler_instance.process( 

2702 typeclause.type, **kw 

2703 ) 

2704 

2705 def post_process_text(self, text): 

2706 if self.preparer._double_percents: 

2707 text = text.replace("%", "%%") 

2708 return text 

2709 

2710 def escape_literal_column(self, text): 

2711 if self.preparer._double_percents: 

2712 text = text.replace("%", "%%") 

2713 return text 

2714 

2715 def visit_textclause(self, textclause, add_to_result_map=None, **kw): 

2716 def do_bindparam(m): 

2717 name = m.group(1) 

2718 if name in textclause._bindparams: 

2719 return self.process(textclause._bindparams[name], **kw) 

2720 else: 

2721 return self.bindparam_string(name, **kw) 

2722 

2723 if not self.stack: 

2724 self.isplaintext = True 

2725 

2726 if add_to_result_map: 

2727 # text() object is present in the columns clause of a 

2728 # select(). Add a no-name entry to the result map so that 

2729 # row[text()] produces a result 

2730 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE) 

2731 

2732 # un-escape any \:params 

2733 return BIND_PARAMS_ESC.sub( 

2734 lambda m: m.group(1), 

2735 BIND_PARAMS.sub( 

2736 do_bindparam, self.post_process_text(textclause.text) 

2737 ), 

2738 ) 

2739 

2740 def visit_textual_select( 

2741 self, taf, compound_index=None, asfrom=False, **kw 

2742 ): 

2743 toplevel = not self.stack 

2744 entry = self._default_stack_entry if toplevel else self.stack[-1] 

2745 

2746 new_entry: _CompilerStackEntry = { 

2747 "correlate_froms": set(), 

2748 "asfrom_froms": set(), 

2749 "selectable": taf, 

2750 } 

2751 self.stack.append(new_entry) 

2752 

2753 if taf._independent_ctes: 

2754 self._dispatch_independent_ctes(taf, kw) 

2755 

2756 populate_result_map = ( 

2757 toplevel 

2758 or ( 

2759 compound_index == 0 

2760 and entry.get("need_result_map_for_compound", False) 

2761 ) 

2762 or entry.get("need_result_map_for_nested", False) 

2763 ) 

2764 

2765 if populate_result_map: 

2766 self._ordered_columns = self._textual_ordered_columns = ( 

2767 taf.positional 

2768 ) 

2769 

2770 # enable looser result column matching when the SQL text links to 

2771 # Column objects by name only 

2772 self._loose_column_name_matching = not taf.positional and bool( 

2773 taf.column_args 

2774 ) 

2775 

2776 for c in taf.column_args: 

2777 self.process( 

2778 c, 

2779 within_columns_clause=True, 

2780 add_to_result_map=self._add_to_result_map, 

2781 ) 

2782 

2783 text = self.process(taf.element, **kw) 

2784 if self.ctes: 

2785 nesting_level = len(self.stack) if not toplevel else None 

2786 text = self._render_cte_clause(nesting_level=nesting_level) + text 

2787 

2788 self.stack.pop(-1) 

2789 

2790 return text 

2791 

2792 def visit_null(self, expr: Null, **kw: Any) -> str: 

2793 return "NULL" 

2794 

2795 def visit_true(self, expr: True_, **kw: Any) -> str: 

2796 if self.dialect.supports_native_boolean: 

2797 return "true" 

2798 else: 

2799 return "1" 

2800 

2801 def visit_false(self, expr: False_, **kw: Any) -> str: 

2802 if self.dialect.supports_native_boolean: 

2803 return "false" 

2804 else: 

2805 return "0" 

2806 

2807 def _generate_delimited_list(self, elements, separator, **kw): 

2808 return separator.join( 

2809 s 

2810 for s in (c._compiler_dispatch(self, **kw) for c in elements) 

2811 if s 

2812 ) 

2813 

2814 def _generate_delimited_and_list(self, clauses, **kw): 

2815 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean( 

2816 operators.and_, 

2817 elements.True_._singleton, 

2818 elements.False_._singleton, 

2819 clauses, 

2820 ) 

2821 if lcc == 1: 

2822 return clauses[0]._compiler_dispatch(self, **kw) 

2823 else: 

2824 separator = OPERATORS[operators.and_] 

2825 return separator.join( 

2826 s 

2827 for s in (c._compiler_dispatch(self, **kw) for c in clauses) 

2828 if s 

2829 ) 

2830 

2831 def visit_tuple(self, clauselist, **kw): 

2832 return "(%s)" % self.visit_clauselist(clauselist, **kw) 

2833 

2834 def visit_element_list(self, element, **kw): 

2835 return self._generate_delimited_list(element.clauses, " ", **kw) 

2836 

2837 def visit_order_by_list(self, element, **kw): 

2838 return self._generate_delimited_list(element.clauses, ", ", **kw) 

2839 

2840 def visit_clauselist(self, clauselist, **kw): 

2841 sep = clauselist.operator 

2842 if sep is None: 

2843 sep = " " 

2844 else: 

2845 sep = OPERATORS[clauselist.operator] 

2846 

2847 return self._generate_delimited_list(clauselist.clauses, sep, **kw) 

2848 

2849 def visit_expression_clauselist(self, clauselist, **kw): 

2850 operator_ = clauselist.operator 

2851 

2852 disp = self._get_operator_dispatch( 

2853 operator_, "expression_clauselist", None 

2854 ) 

2855 if disp: 

2856 return disp(clauselist, operator_, **kw) 

2857 

2858 try: 

2859 opstring = OPERATORS[operator_] 

2860 except KeyError as err: 

2861 raise exc.UnsupportedCompilationError(self, operator_) from err 

2862 else: 

2863 kw["_in_operator_expression"] = True 

2864 return self._generate_delimited_list( 

2865 clauselist.clauses, opstring, **kw 

2866 ) 

2867 

2868 def visit_case(self, clause, **kwargs): 

2869 x = "CASE " 

2870 if clause.value is not None: 

2871 x += clause.value._compiler_dispatch(self, **kwargs) + " " 

2872 for cond, result in clause.whens: 

2873 x += ( 

2874 "WHEN " 

2875 + cond._compiler_dispatch(self, **kwargs) 

2876 + " THEN " 

2877 + result._compiler_dispatch(self, **kwargs) 

2878 + " " 

2879 ) 

2880 if clause.else_ is not None: 

2881 x += ( 

2882 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " " 

2883 ) 

2884 x += "END" 

2885 return x 

2886 

2887 def visit_type_coerce(self, type_coerce, **kw): 

2888 return type_coerce.typed_expression._compiler_dispatch(self, **kw) 

2889 

2890 def visit_cast(self, cast, **kwargs): 

2891 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs) 

2892 match = re.match("(.*)( COLLATE .*)", type_clause) 

2893 return "CAST(%s AS %s)%s" % ( 

2894 cast.clause._compiler_dispatch(self, **kwargs), 

2895 match.group(1) if match else type_clause, 

2896 match.group(2) if match else "", 

2897 ) 

2898 

2899 def visit_frame_clause(self, frameclause, **kw): 

2900 

2901 if frameclause.lower_type is elements._FrameClauseType.RANGE_UNBOUNDED: 

2902 left = "UNBOUNDED PRECEDING" 

2903 elif frameclause.lower_type is elements._FrameClauseType.RANGE_CURRENT: 

2904 left = "CURRENT ROW" 

2905 else: 

2906 val = self.process(frameclause.lower_integer_bind, **kw) 

2907 if ( 

2908 frameclause.lower_type 

2909 is elements._FrameClauseType.RANGE_PRECEDING 

2910 ): 

2911 left = f"{val} PRECEDING" 

2912 else: 

2913 left = f"{val} FOLLOWING" 

2914 

2915 if frameclause.upper_type is elements._FrameClauseType.RANGE_UNBOUNDED: 

2916 right = "UNBOUNDED FOLLOWING" 

2917 elif frameclause.upper_type is elements._FrameClauseType.RANGE_CURRENT: 

2918 right = "CURRENT ROW" 

2919 else: 

2920 val = self.process(frameclause.upper_integer_bind, **kw) 

2921 if ( 

2922 frameclause.upper_type 

2923 is elements._FrameClauseType.RANGE_PRECEDING 

2924 ): 

2925 right = f"{val} PRECEDING" 

2926 else: 

2927 right = f"{val} FOLLOWING" 

2928 

2929 return f"{left} AND {right}" 

2930 

2931 def visit_over(self, over, **kwargs): 

2932 text = over.element._compiler_dispatch(self, **kwargs) 

2933 if over.range_ is not None: 

2934 range_ = f"RANGE BETWEEN {self.process(over.range_, **kwargs)}" 

2935 elif over.rows is not None: 

2936 range_ = f"ROWS BETWEEN {self.process(over.rows, **kwargs)}" 

2937 elif over.groups is not None: 

2938 range_ = f"GROUPS BETWEEN {self.process(over.groups, **kwargs)}" 

2939 else: 

2940 range_ = None 

2941 

2942 return "%s OVER (%s)" % ( 

2943 text, 

2944 " ".join( 

2945 [ 

2946 "%s BY %s" 

2947 % (word, clause._compiler_dispatch(self, **kwargs)) 

2948 for word, clause in ( 

2949 ("PARTITION", over.partition_by), 

2950 ("ORDER", over.order_by), 

2951 ) 

2952 if clause is not None and len(clause) 

2953 ] 

2954 + ([range_] if range_ else []) 

2955 ), 

2956 ) 

2957 

2958 def visit_withingroup(self, withingroup, **kwargs): 

2959 return "%s WITHIN GROUP (ORDER BY %s)" % ( 

2960 withingroup.element._compiler_dispatch(self, **kwargs), 

2961 withingroup.order_by._compiler_dispatch(self, **kwargs), 

2962 ) 

2963 

2964 def visit_funcfilter(self, funcfilter, **kwargs): 

2965 return "%s FILTER (WHERE %s)" % ( 

2966 funcfilter.func._compiler_dispatch(self, **kwargs), 

2967 funcfilter.criterion._compiler_dispatch(self, **kwargs), 

2968 ) 

2969 

2970 def visit_aggregateorderby(self, aggregateorderby, **kwargs): 

2971 if self.dialect.aggregate_order_by_style is AggregateOrderByStyle.NONE: 

2972 raise exc.CompileError( 

2973 "this dialect does not support " 

2974 "ORDER BY within an aggregate function" 

2975 ) 

2976 elif ( 

2977 self.dialect.aggregate_order_by_style 

2978 is AggregateOrderByStyle.INLINE 

2979 ): 

2980 new_fn = aggregateorderby.element._clone() 

2981 new_fn.clause_expr = elements.Grouping( 

2982 aggregate_orderby_inline( 

2983 new_fn.clause_expr.element, aggregateorderby.order_by 

2984 ) 

2985 ) 

2986 

2987 return new_fn._compiler_dispatch(self, **kwargs) 

2988 else: 

2989 return self.visit_withingroup(aggregateorderby, **kwargs) 

2990 

2991 def visit_aggregate_orderby_inline(self, element, **kw): 

2992 return "%s ORDER BY %s" % ( 

2993 self.process(element.element, **kw), 

2994 self.process(element.aggregate_order_by, **kw), 

2995 ) 

2996 

2997 def visit_aggregate_strings_func(self, fn, *, use_function_name, **kw): 

2998 # aggreagate_order_by attribute is present if visit_function 

2999 # gave us a Function with aggregate_orderby_inline() as the inner 

3000 # contents 

3001 order_by = getattr(fn.clauses, "aggregate_order_by", None) 

3002 

3003 literal_exec = dict(kw) 

3004 literal_exec["literal_execute"] = True 

3005 

3006 # break up the function into its components so we can apply 

3007 # literal_execute to the second argument (the delimeter) 

3008 cl = list(fn.clauses) 

3009 expr, delimeter = cl[0:2] 

3010 if ( 

3011 order_by is not None 

3012 and self.dialect.aggregate_order_by_style 

3013 is AggregateOrderByStyle.INLINE 

3014 ): 

3015 return ( 

3016 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, " 

3017 f"{delimeter._compiler_dispatch(self, **literal_exec)} " 

3018 f"ORDER BY {order_by._compiler_dispatch(self, **kw)})" 

3019 ) 

3020 else: 

3021 return ( 

3022 f"{use_function_name}({expr._compiler_dispatch(self, **kw)}, " 

3023 f"{delimeter._compiler_dispatch(self, **literal_exec)})" 

3024 ) 

3025 

3026 def visit_extract(self, extract, **kwargs): 

3027 field = self.extract_map.get(extract.field, extract.field) 

3028 return "EXTRACT(%s FROM %s)" % ( 

3029 field, 

3030 extract.expr._compiler_dispatch(self, **kwargs), 

3031 ) 

3032 

3033 def visit_scalar_function_column(self, element, **kw): 

3034 compiled_fn = self.visit_function(element.fn, **kw) 

3035 compiled_col = self.visit_column(element, **kw) 

3036 return "(%s).%s" % (compiled_fn, compiled_col) 

3037 

3038 def visit_function( 

3039 self, 

3040 func: Function[Any], 

3041 add_to_result_map: Optional[_ResultMapAppender] = None, 

3042 **kwargs: Any, 

3043 ) -> str: 

3044 if add_to_result_map is not None: 

3045 add_to_result_map(func.name, func.name, (func.name,), func.type) 

3046 

3047 disp = getattr(self, "visit_%s_func" % func.name.lower(), None) 

3048 

3049 text: str 

3050 

3051 if disp: 

3052 text = disp(func, **kwargs) 

3053 else: 

3054 name = FUNCTIONS.get(func._deannotate().__class__, None) 

3055 if name: 

3056 if func._has_args: 

3057 name += "%(expr)s" 

3058 else: 

3059 name = func.name 

3060 name = ( 

3061 self.preparer.quote(name) 

3062 if self.preparer._requires_quotes_illegal_chars(name) 

3063 or isinstance(name, elements.quoted_name) 

3064 else name 

3065 ) 

3066 name = name + "%(expr)s" 

3067 text = ".".join( 

3068 [ 

3069 ( 

3070 self.preparer.quote(tok) 

3071 if self.preparer._requires_quotes_illegal_chars(tok) 

3072 or isinstance(name, elements.quoted_name) 

3073 else tok 

3074 ) 

3075 for tok in func.packagenames 

3076 ] 

3077 + [name] 

3078 ) % {"expr": self.function_argspec(func, **kwargs)} 

3079 

3080 if func._with_ordinality: 

3081 text += " WITH ORDINALITY" 

3082 return text 

3083 

3084 def visit_next_value_func(self, next_value, **kw): 

3085 return self.visit_sequence(next_value.sequence) 

3086 

3087 def visit_sequence(self, sequence, **kw): 

3088 raise NotImplementedError( 

3089 "Dialect '%s' does not support sequence increments." 

3090 % self.dialect.name 

3091 ) 

3092 

3093 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str: 

3094 return func.clause_expr._compiler_dispatch(self, **kwargs) 

3095 

3096 def visit_compound_select( 

3097 self, cs, asfrom=False, compound_index=None, **kwargs 

3098 ): 

3099 toplevel = not self.stack 

3100 

3101 compile_state = cs._compile_state_factory(cs, self, **kwargs) 

3102 

3103 if toplevel and not self.compile_state: 

3104 self.compile_state = compile_state 

3105 

3106 compound_stmt = compile_state.statement 

3107 

3108 entry = self._default_stack_entry if toplevel else self.stack[-1] 

3109 need_result_map = toplevel or ( 

3110 not compound_index 

3111 and entry.get("need_result_map_for_compound", False) 

3112 ) 

3113 

3114 # indicates there is already a CompoundSelect in play 

3115 if compound_index == 0: 

3116 entry["select_0"] = cs 

3117 

3118 self.stack.append( 

3119 { 

3120 "correlate_froms": entry["correlate_froms"], 

3121 "asfrom_froms": entry["asfrom_froms"], 

3122 "selectable": cs, 

3123 "compile_state": compile_state, 

3124 "need_result_map_for_compound": need_result_map, 

3125 } 

3126 ) 

3127 

3128 if compound_stmt._independent_ctes: 

3129 self._dispatch_independent_ctes(compound_stmt, kwargs) 

3130 

3131 keyword = self.compound_keywords[cs.keyword] 

3132 

3133 text = (" " + keyword + " ").join( 

3134 ( 

3135 c._compiler_dispatch( 

3136 self, asfrom=asfrom, compound_index=i, **kwargs 

3137 ) 

3138 for i, c in enumerate(cs.selects) 

3139 ) 

3140 ) 

3141 

3142 kwargs["include_table"] = False 

3143 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs)) 

3144 text += self.order_by_clause(cs, **kwargs) 

3145 if cs._has_row_limiting_clause: 

3146 text += self._row_limit_clause(cs, **kwargs) 

3147 

3148 if self.ctes: 

3149 nesting_level = len(self.stack) if not toplevel else None 

3150 text = ( 

3151 self._render_cte_clause( 

3152 nesting_level=nesting_level, 

3153 include_following_stack=True, 

3154 ) 

3155 + text 

3156 ) 

3157 

3158 self.stack.pop(-1) 

3159 return text 

3160 

3161 def _row_limit_clause(self, cs, **kwargs): 

3162 if cs._fetch_clause is not None: 

3163 return self.fetch_clause(cs, **kwargs) 

3164 else: 

3165 return self.limit_clause(cs, **kwargs) 

3166 

3167 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2): 

3168 attrname = "visit_%s_%s%s" % ( 

3169 operator_.__name__, 

3170 qualifier1, 

3171 "_" + qualifier2 if qualifier2 else "", 

3172 ) 

3173 return getattr(self, attrname, None) 

3174 

3175 def visit_unary( 

3176 self, unary, add_to_result_map=None, result_map_targets=(), **kw 

3177 ): 

3178 if add_to_result_map is not None: 

3179 result_map_targets += (unary,) 

3180 kw["add_to_result_map"] = add_to_result_map 

3181 kw["result_map_targets"] = result_map_targets 

3182 

3183 if unary.operator: 

3184 if unary.modifier: 

3185 raise exc.CompileError( 

3186 "Unary expression does not support operator " 

3187 "and modifier simultaneously" 

3188 ) 

3189 disp = self._get_operator_dispatch( 

3190 unary.operator, "unary", "operator" 

3191 ) 

3192 if disp: 

3193 return disp(unary, unary.operator, **kw) 

3194 else: 

3195 return self._generate_generic_unary_operator( 

3196 unary, OPERATORS[unary.operator], **kw 

3197 ) 

3198 elif unary.modifier: 

3199 disp = self._get_operator_dispatch( 

3200 unary.modifier, "unary", "modifier" 

3201 ) 

3202 if disp: 

3203 return disp(unary, unary.modifier, **kw) 

3204 else: 

3205 return self._generate_generic_unary_modifier( 

3206 unary, OPERATORS[unary.modifier], **kw 

3207 ) 

3208 else: 

3209 raise exc.CompileError( 

3210 "Unary expression has no operator or modifier" 

3211 ) 

3212 

3213 def visit_truediv_binary(self, binary, operator, **kw): 

3214 if self.dialect.div_is_floordiv: 

3215 return ( 

3216 self.process(binary.left, **kw) 

3217 + " / " 

3218 # TODO: would need a fast cast again here, 

3219 # unless we want to use an implicit cast like "+ 0.0" 

3220 + self.process( 

3221 elements.Cast( 

3222 binary.right, 

3223 ( 

3224 binary.right.type 

3225 if binary.right.type._type_affinity 

3226 in (sqltypes.Numeric, sqltypes.Float) 

3227 else sqltypes.Numeric() 

3228 ), 

3229 ), 

3230 **kw, 

3231 ) 

3232 ) 

3233 else: 

3234 return ( 

3235 self.process(binary.left, **kw) 

3236 + " / " 

3237 + self.process(binary.right, **kw) 

3238 ) 

3239 

3240 def visit_floordiv_binary(self, binary, operator, **kw): 

3241 if ( 

3242 self.dialect.div_is_floordiv 

3243 and binary.right.type._type_affinity is sqltypes.Integer 

3244 ): 

3245 return ( 

3246 self.process(binary.left, **kw) 

3247 + " / " 

3248 + self.process(binary.right, **kw) 

3249 ) 

3250 else: 

3251 return "FLOOR(%s)" % ( 

3252 self.process(binary.left, **kw) 

3253 + " / " 

3254 + self.process(binary.right, **kw) 

3255 ) 

3256 

3257 def visit_is_true_unary_operator(self, element, operator, **kw): 

3258 if ( 

3259 element._is_implicitly_boolean 

3260 or self.dialect.supports_native_boolean 

3261 ): 

3262 return self.process(element.element, **kw) 

3263 else: 

3264 return "%s = 1" % self.process(element.element, **kw) 

3265 

3266 def visit_is_false_unary_operator(self, element, operator, **kw): 

3267 if ( 

3268 element._is_implicitly_boolean 

3269 or self.dialect.supports_native_boolean 

3270 ): 

3271 return "NOT %s" % self.process(element.element, **kw) 

3272 else: 

3273 return "%s = 0" % self.process(element.element, **kw) 

3274 

3275 def visit_not_match_op_binary(self, binary, operator, **kw): 

3276 return "NOT %s" % self.visit_binary( 

3277 binary, override_operator=operators.match_op 

3278 ) 

3279 

3280 def visit_not_in_op_binary(self, binary, operator, **kw): 

3281 # The brackets are required in the NOT IN operation because the empty 

3282 # case is handled using the form "(col NOT IN (null) OR 1 = 1)". 

3283 # The presence of the OR makes the brackets required. 

3284 return "(%s)" % self._generate_generic_binary( 

3285 binary, OPERATORS[operator], **kw 

3286 ) 

3287 

3288 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

3289 if expand_op is operators.not_in_op: 

3290 if len(type_) > 1: 

3291 return "(%s)) OR (1 = 1" % ( 

3292 ", ".join("NULL" for element in type_) 

3293 ) 

3294 else: 

3295 return "NULL) OR (1 = 1" 

3296 elif expand_op is operators.in_op: 

3297 if len(type_) > 1: 

3298 return "(%s)) AND (1 != 1" % ( 

3299 ", ".join("NULL" for element in type_) 

3300 ) 

3301 else: 

3302 return "NULL) AND (1 != 1" 

3303 else: 

3304 return self.visit_empty_set_expr(type_) 

3305 

3306 def visit_empty_set_expr(self, element_types, **kw): 

3307 raise NotImplementedError( 

3308 "Dialect '%s' does not support empty set expression." 

3309 % self.dialect.name 

3310 ) 

3311 

3312 def _literal_execute_expanding_parameter_literal_binds( 

3313 self, parameter, values, bind_expression_template=None 

3314 ): 

3315 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect) 

3316 

3317 if not values: 

3318 # empty IN expression. note we don't need to use 

3319 # bind_expression_template here because there are no 

3320 # expressions to render. 

3321 

3322 if typ_dialect_impl._is_tuple_type: 

3323 replacement_expression = ( 

3324 "VALUES " if self.dialect.tuple_in_values else "" 

3325 ) + self.visit_empty_set_op_expr( 

3326 parameter.type.types, parameter.expand_op 

3327 ) 

3328 

3329 else: 

3330 replacement_expression = self.visit_empty_set_op_expr( 

3331 [parameter.type], parameter.expand_op 

3332 ) 

3333 

3334 elif typ_dialect_impl._is_tuple_type or ( 

3335 typ_dialect_impl._isnull 

3336 and isinstance(values[0], collections_abc.Sequence) 

3337 and not isinstance(values[0], (str, bytes)) 

3338 ): 

3339 if typ_dialect_impl._has_bind_expression: 

3340 raise NotImplementedError( 

3341 "bind_expression() on TupleType not supported with " 

3342 "literal_binds" 

3343 ) 

3344 

3345 replacement_expression = ( 

3346 "VALUES " if self.dialect.tuple_in_values else "" 

3347 ) + ", ".join( 

3348 "(%s)" 

3349 % ( 

3350 ", ".join( 

3351 self.render_literal_value(value, param_type) 

3352 for value, param_type in zip( 

3353 tuple_element, parameter.type.types 

3354 ) 

3355 ) 

3356 ) 

3357 for i, tuple_element in enumerate(values) 

3358 ) 

3359 else: 

3360 if bind_expression_template: 

3361 post_compile_pattern = self._post_compile_pattern 

3362 m = post_compile_pattern.search(bind_expression_template) 

3363 assert m and m.group( 

3364 2 

3365 ), "unexpected format for expanding parameter" 

3366 

3367 tok = m.group(2).split("~~") 

3368 be_left, be_right = tok[1], tok[3] 

3369 replacement_expression = ", ".join( 

3370 "%s%s%s" 

3371 % ( 

3372 be_left, 

3373 self.render_literal_value(value, parameter.type), 

3374 be_right, 

3375 ) 

3376 for value in values 

3377 ) 

3378 else: 

3379 replacement_expression = ", ".join( 

3380 self.render_literal_value(value, parameter.type) 

3381 for value in values 

3382 ) 

3383 

3384 return (), replacement_expression 

3385 

3386 def _literal_execute_expanding_parameter(self, name, parameter, values): 

3387 if parameter.literal_execute: 

3388 return self._literal_execute_expanding_parameter_literal_binds( 

3389 parameter, values 

3390 ) 

3391 

3392 dialect = self.dialect 

3393 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect) 

3394 

3395 if self._numeric_binds: 

3396 bind_template = self.compilation_bindtemplate 

3397 else: 

3398 bind_template = self.bindtemplate 

3399 

3400 if ( 

3401 self.dialect._bind_typing_render_casts 

3402 and typ_dialect_impl.render_bind_cast 

3403 ): 

3404 

3405 def _render_bindtemplate(name): 

3406 return self.render_bind_cast( 

3407 parameter.type, 

3408 typ_dialect_impl, 

3409 bind_template % {"name": name}, 

3410 ) 

3411 

3412 else: 

3413 

3414 def _render_bindtemplate(name): 

3415 return bind_template % {"name": name} 

3416 

3417 if not values: 

3418 to_update = [] 

3419 if typ_dialect_impl._is_tuple_type: 

3420 replacement_expression = self.visit_empty_set_op_expr( 

3421 parameter.type.types, parameter.expand_op 

3422 ) 

3423 else: 

3424 replacement_expression = self.visit_empty_set_op_expr( 

3425 [parameter.type], parameter.expand_op 

3426 ) 

3427 

3428 elif typ_dialect_impl._is_tuple_type or ( 

3429 typ_dialect_impl._isnull 

3430 and isinstance(values[0], collections_abc.Sequence) 

3431 and not isinstance(values[0], (str, bytes)) 

3432 ): 

3433 assert not typ_dialect_impl._is_array 

3434 to_update = [ 

3435 ("%s_%s_%s" % (name, i, j), value) 

3436 for i, tuple_element in enumerate(values, 1) 

3437 for j, value in enumerate(tuple_element, 1) 

3438 ] 

3439 

3440 replacement_expression = ( 

3441 "VALUES " if dialect.tuple_in_values else "" 

3442 ) + ", ".join( 

3443 "(%s)" 

3444 % ( 

3445 ", ".join( 

3446 _render_bindtemplate( 

3447 to_update[i * len(tuple_element) + j][0] 

3448 ) 

3449 for j, value in enumerate(tuple_element) 

3450 ) 

3451 ) 

3452 for i, tuple_element in enumerate(values) 

3453 ) 

3454 else: 

3455 to_update = [ 

3456 ("%s_%s" % (name, i), value) 

3457 for i, value in enumerate(values, 1) 

3458 ] 

3459 replacement_expression = ", ".join( 

3460 _render_bindtemplate(key) for key, value in to_update 

3461 ) 

3462 

3463 return to_update, replacement_expression 

3464 

3465 def visit_binary( 

3466 self, 

3467 binary, 

3468 override_operator=None, 

3469 eager_grouping=False, 

3470 from_linter=None, 

3471 lateral_from_linter=None, 

3472 **kw, 

3473 ): 

3474 if from_linter and operators.is_comparison(binary.operator): 

3475 if lateral_from_linter is not None: 

3476 enclosing_lateral = kw["enclosing_lateral"] 

3477 lateral_from_linter.edges.update( 

3478 itertools.product( 

3479 _de_clone( 

3480 binary.left._from_objects + [enclosing_lateral] 

3481 ), 

3482 _de_clone( 

3483 binary.right._from_objects + [enclosing_lateral] 

3484 ), 

3485 ) 

3486 ) 

3487 else: 

3488 from_linter.edges.update( 

3489 itertools.product( 

3490 _de_clone(binary.left._from_objects), 

3491 _de_clone(binary.right._from_objects), 

3492 ) 

3493 ) 

3494 

3495 # don't allow "? = ?" to render 

3496 if ( 

3497 self.ansi_bind_rules 

3498 and isinstance(binary.left, elements.BindParameter) 

3499 and isinstance(binary.right, elements.BindParameter) 

3500 ): 

3501 kw["literal_execute"] = True 

3502 

3503 operator_ = override_operator or binary.operator 

3504 disp = self._get_operator_dispatch(operator_, "binary", None) 

3505 if disp: 

3506 return disp(binary, operator_, **kw) 

3507 else: 

3508 try: 

3509 opstring = OPERATORS[operator_] 

3510 except KeyError as err: 

3511 raise exc.UnsupportedCompilationError(self, operator_) from err 

3512 else: 

3513 return self._generate_generic_binary( 

3514 binary, 

3515 opstring, 

3516 from_linter=from_linter, 

3517 lateral_from_linter=lateral_from_linter, 

3518 **kw, 

3519 ) 

3520 

3521 def visit_function_as_comparison_op_binary(self, element, operator, **kw): 

3522 return self.process(element.sql_function, **kw) 

3523 

3524 def visit_mod_binary(self, binary, operator, **kw): 

3525 if self.preparer._double_percents: 

3526 return ( 

3527 self.process(binary.left, **kw) 

3528 + " %% " 

3529 + self.process(binary.right, **kw) 

3530 ) 

3531 else: 

3532 return ( 

3533 self.process(binary.left, **kw) 

3534 + " % " 

3535 + self.process(binary.right, **kw) 

3536 ) 

3537 

3538 def visit_custom_op_binary(self, element, operator, **kw): 

3539 kw["eager_grouping"] = operator.eager_grouping 

3540 return self._generate_generic_binary( 

3541 element, 

3542 " " + self.escape_literal_column(operator.opstring) + " ", 

3543 **kw, 

3544 ) 

3545 

3546 def visit_custom_op_unary_operator(self, element, operator, **kw): 

3547 return self._generate_generic_unary_operator( 

3548 element, self.escape_literal_column(operator.opstring) + " ", **kw 

3549 ) 

3550 

3551 def visit_custom_op_unary_modifier(self, element, operator, **kw): 

3552 return self._generate_generic_unary_modifier( 

3553 element, " " + self.escape_literal_column(operator.opstring), **kw 

3554 ) 

3555 

3556 def _generate_generic_binary( 

3557 self, 

3558 binary: BinaryExpression[Any], 

3559 opstring: str, 

3560 eager_grouping: bool = False, 

3561 **kw: Any, 

3562 ) -> str: 

3563 _in_operator_expression = kw.get("_in_operator_expression", False) 

3564 

3565 kw["_in_operator_expression"] = True 

3566 kw["_binary_op"] = binary.operator 

3567 text = ( 

3568 binary.left._compiler_dispatch( 

3569 self, eager_grouping=eager_grouping, **kw 

3570 ) 

3571 + opstring 

3572 + binary.right._compiler_dispatch( 

3573 self, eager_grouping=eager_grouping, **kw 

3574 ) 

3575 ) 

3576 

3577 if _in_operator_expression and eager_grouping: 

3578 text = "(%s)" % text 

3579 return text 

3580 

3581 def _generate_generic_unary_operator(self, unary, opstring, **kw): 

3582 return opstring + unary.element._compiler_dispatch(self, **kw) 

3583 

3584 def _generate_generic_unary_modifier(self, unary, opstring, **kw): 

3585 return unary.element._compiler_dispatch(self, **kw) + opstring 

3586 

3587 @util.memoized_property 

3588 def _like_percent_literal(self): 

3589 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE) 

3590 

3591 def visit_ilike_case_insensitive_operand(self, element, **kw): 

3592 return f"lower({element.element._compiler_dispatch(self, **kw)})" 

3593 

3594 def visit_contains_op_binary(self, binary, operator, **kw): 

3595 binary = binary._clone() 

3596 percent = self._like_percent_literal 

3597 binary.right = percent.concat(binary.right).concat(percent) 

3598 return self.visit_like_op_binary(binary, operator, **kw) 

3599 

3600 def visit_not_contains_op_binary(self, binary, operator, **kw): 

3601 binary = binary._clone() 

3602 percent = self._like_percent_literal 

3603 binary.right = percent.concat(binary.right).concat(percent) 

3604 return self.visit_not_like_op_binary(binary, operator, **kw) 

3605 

3606 def visit_icontains_op_binary(self, binary, operator, **kw): 

3607 binary = binary._clone() 

3608 percent = self._like_percent_literal 

3609 binary.left = ilike_case_insensitive(binary.left) 

3610 binary.right = percent.concat( 

3611 ilike_case_insensitive(binary.right) 

3612 ).concat(percent) 

3613 return self.visit_ilike_op_binary(binary, operator, **kw) 

3614 

3615 def visit_not_icontains_op_binary(self, binary, operator, **kw): 

3616 binary = binary._clone() 

3617 percent = self._like_percent_literal 

3618 binary.left = ilike_case_insensitive(binary.left) 

3619 binary.right = percent.concat( 

3620 ilike_case_insensitive(binary.right) 

3621 ).concat(percent) 

3622 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3623 

3624 def visit_startswith_op_binary(self, binary, operator, **kw): 

3625 binary = binary._clone() 

3626 percent = self._like_percent_literal 

3627 binary.right = percent._rconcat(binary.right) 

3628 return self.visit_like_op_binary(binary, operator, **kw) 

3629 

3630 def visit_not_startswith_op_binary(self, binary, operator, **kw): 

3631 binary = binary._clone() 

3632 percent = self._like_percent_literal 

3633 binary.right = percent._rconcat(binary.right) 

3634 return self.visit_not_like_op_binary(binary, operator, **kw) 

3635 

3636 def visit_istartswith_op_binary(self, binary, operator, **kw): 

3637 binary = binary._clone() 

3638 percent = self._like_percent_literal 

3639 binary.left = ilike_case_insensitive(binary.left) 

3640 binary.right = percent._rconcat(ilike_case_insensitive(binary.right)) 

3641 return self.visit_ilike_op_binary(binary, operator, **kw) 

3642 

3643 def visit_not_istartswith_op_binary(self, binary, operator, **kw): 

3644 binary = binary._clone() 

3645 percent = self._like_percent_literal 

3646 binary.left = ilike_case_insensitive(binary.left) 

3647 binary.right = percent._rconcat(ilike_case_insensitive(binary.right)) 

3648 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3649 

3650 def visit_endswith_op_binary(self, binary, operator, **kw): 

3651 binary = binary._clone() 

3652 percent = self._like_percent_literal 

3653 binary.right = percent.concat(binary.right) 

3654 return self.visit_like_op_binary(binary, operator, **kw) 

3655 

3656 def visit_not_endswith_op_binary(self, binary, operator, **kw): 

3657 binary = binary._clone() 

3658 percent = self._like_percent_literal 

3659 binary.right = percent.concat(binary.right) 

3660 return self.visit_not_like_op_binary(binary, operator, **kw) 

3661 

3662 def visit_iendswith_op_binary(self, binary, operator, **kw): 

3663 binary = binary._clone() 

3664 percent = self._like_percent_literal 

3665 binary.left = ilike_case_insensitive(binary.left) 

3666 binary.right = percent.concat(ilike_case_insensitive(binary.right)) 

3667 return self.visit_ilike_op_binary(binary, operator, **kw) 

3668 

3669 def visit_not_iendswith_op_binary(self, binary, operator, **kw): 

3670 binary = binary._clone() 

3671 percent = self._like_percent_literal 

3672 binary.left = ilike_case_insensitive(binary.left) 

3673 binary.right = percent.concat(ilike_case_insensitive(binary.right)) 

3674 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3675 

3676 def visit_like_op_binary(self, binary, operator, **kw): 

3677 escape = binary.modifiers.get("escape", None) 

3678 

3679 return "%s LIKE %s" % ( 

3680 binary.left._compiler_dispatch(self, **kw), 

3681 binary.right._compiler_dispatch(self, **kw), 

3682 ) + ( 

3683 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

3684 if escape is not None 

3685 else "" 

3686 ) 

3687 

3688 def visit_not_like_op_binary(self, binary, operator, **kw): 

3689 escape = binary.modifiers.get("escape", None) 

3690 return "%s NOT LIKE %s" % ( 

3691 binary.left._compiler_dispatch(self, **kw), 

3692 binary.right._compiler_dispatch(self, **kw), 

3693 ) + ( 

3694 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

3695 if escape is not None 

3696 else "" 

3697 ) 

3698 

3699 def visit_ilike_op_binary(self, binary, operator, **kw): 

3700 if operator is operators.ilike_op: 

3701 binary = binary._clone() 

3702 binary.left = ilike_case_insensitive(binary.left) 

3703 binary.right = ilike_case_insensitive(binary.right) 

3704 # else we assume ilower() has been applied 

3705 

3706 return self.visit_like_op_binary(binary, operator, **kw) 

3707 

3708 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

3709 if operator is operators.not_ilike_op: 

3710 binary = binary._clone() 

3711 binary.left = ilike_case_insensitive(binary.left) 

3712 binary.right = ilike_case_insensitive(binary.right) 

3713 # else we assume ilower() has been applied 

3714 

3715 return self.visit_not_like_op_binary(binary, operator, **kw) 

3716 

3717 def visit_between_op_binary(self, binary, operator, **kw): 

3718 symmetric = binary.modifiers.get("symmetric", False) 

3719 return self._generate_generic_binary( 

3720 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw 

3721 ) 

3722 

3723 def visit_not_between_op_binary(self, binary, operator, **kw): 

3724 symmetric = binary.modifiers.get("symmetric", False) 

3725 return self._generate_generic_binary( 

3726 binary, 

3727 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ", 

3728 **kw, 

3729 ) 

3730 

3731 def visit_regexp_match_op_binary( 

3732 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3733 ) -> str: 

3734 raise exc.CompileError( 

3735 "%s dialect does not support regular expressions" 

3736 % self.dialect.name 

3737 ) 

3738 

3739 def visit_not_regexp_match_op_binary( 

3740 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3741 ) -> str: 

3742 raise exc.CompileError( 

3743 "%s dialect does not support regular expressions" 

3744 % self.dialect.name 

3745 ) 

3746 

3747 def visit_regexp_replace_op_binary( 

3748 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3749 ) -> str: 

3750 raise exc.CompileError( 

3751 "%s dialect does not support regular expression replacements" 

3752 % self.dialect.name 

3753 ) 

3754 

3755 def visit_dmltargetcopy(self, element, *, bindmarkers=None, **kw): 

3756 if bindmarkers is None: 

3757 raise exc.CompileError( 

3758 "DML target objects may only be used with " 

3759 "compiled INSERT or UPDATE statements" 

3760 ) 

3761 

3762 bindmarkers[element.column.key] = element 

3763 return f"__BINDMARKER_~~{element.column.key}~~" 

3764 

3765 def visit_bindparam( 

3766 self, 

3767 bindparam, 

3768 within_columns_clause=False, 

3769 literal_binds=False, 

3770 skip_bind_expression=False, 

3771 literal_execute=False, 

3772 render_postcompile=False, 

3773 **kwargs, 

3774 ): 

3775 

3776 if not skip_bind_expression: 

3777 impl = bindparam.type.dialect_impl(self.dialect) 

3778 if impl._has_bind_expression: 

3779 bind_expression = impl.bind_expression(bindparam) 

3780 wrapped = self.process( 

3781 bind_expression, 

3782 skip_bind_expression=True, 

3783 within_columns_clause=within_columns_clause, 

3784 literal_binds=literal_binds and not bindparam.expanding, 

3785 literal_execute=literal_execute, 

3786 render_postcompile=render_postcompile, 

3787 **kwargs, 

3788 ) 

3789 if bindparam.expanding: 

3790 # for postcompile w/ expanding, move the "wrapped" part 

3791 # of this into the inside 

3792 

3793 m = re.match( 

3794 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped 

3795 ) 

3796 assert m, "unexpected format for expanding parameter" 

3797 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % ( 

3798 m.group(2), 

3799 m.group(1), 

3800 m.group(3), 

3801 ) 

3802 

3803 if literal_binds: 

3804 ret = self.render_literal_bindparam( 

3805 bindparam, 

3806 within_columns_clause=True, 

3807 bind_expression_template=wrapped, 

3808 **kwargs, 

3809 ) 

3810 return f"({ret})" 

3811 

3812 return wrapped 

3813 

3814 if not literal_binds: 

3815 literal_execute = ( 

3816 literal_execute 

3817 or bindparam.literal_execute 

3818 or (within_columns_clause and self.ansi_bind_rules) 

3819 ) 

3820 post_compile = literal_execute or bindparam.expanding 

3821 else: 

3822 post_compile = False 

3823 

3824 if literal_binds: 

3825 ret = self.render_literal_bindparam( 

3826 bindparam, within_columns_clause=True, **kwargs 

3827 ) 

3828 if bindparam.expanding: 

3829 ret = f"({ret})" 

3830 return ret 

3831 

3832 name = self._truncate_bindparam(bindparam) 

3833 

3834 if name in self.binds: 

3835 existing = self.binds[name] 

3836 if existing is not bindparam: 

3837 if ( 

3838 (existing.unique or bindparam.unique) 

3839 and not existing.proxy_set.intersection( 

3840 bindparam.proxy_set 

3841 ) 

3842 and not existing._cloned_set.intersection( 

3843 bindparam._cloned_set 

3844 ) 

3845 ): 

3846 raise exc.CompileError( 

3847 "Bind parameter '%s' conflicts with " 

3848 "unique bind parameter of the same name" % name 

3849 ) 

3850 elif existing.expanding != bindparam.expanding: 

3851 raise exc.CompileError( 

3852 "Can't reuse bound parameter name '%s' in both " 

3853 "'expanding' (e.g. within an IN expression) and " 

3854 "non-expanding contexts. If this parameter is to " 

3855 "receive a list/array value, set 'expanding=True' on " 

3856 "it for expressions that aren't IN, otherwise use " 

3857 "a different parameter name." % (name,) 

3858 ) 

3859 elif existing._is_crud or bindparam._is_crud: 

3860 if existing._is_crud and bindparam._is_crud: 

3861 # TODO: this condition is not well understood. 

3862 # see tests in test/sql/test_update.py 

3863 raise exc.CompileError( 

3864 "Encountered unsupported case when compiling an " 

3865 "INSERT or UPDATE statement. If this is a " 

3866 "multi-table " 

3867 "UPDATE statement, please provide string-named " 

3868 "arguments to the " 

3869 "values() method with distinct names; support for " 

3870 "multi-table UPDATE statements that " 

3871 "target multiple tables for UPDATE is very " 

3872 "limited", 

3873 ) 

3874 else: 

3875 raise exc.CompileError( 

3876 f"bindparam() name '{bindparam.key}' is reserved " 

3877 "for automatic usage in the VALUES or SET " 

3878 "clause of this " 

3879 "insert/update statement. Please use a " 

3880 "name other than column name when using " 

3881 "bindparam() " 

3882 "with insert() or update() (for example, " 

3883 f"'b_{bindparam.key}')." 

3884 ) 

3885 

3886 self.binds[bindparam.key] = self.binds[name] = bindparam 

3887 

3888 # if we are given a cache key that we're going to match against, 

3889 # relate the bindparam here to one that is most likely present 

3890 # in the "extracted params" portion of the cache key. this is used 

3891 # to set up a positional mapping that is used to determine the 

3892 # correct parameters for a subsequent use of this compiled with 

3893 # a different set of parameter values. here, we accommodate for 

3894 # parameters that may have been cloned both before and after the cache 

3895 # key was been generated. 

3896 ckbm_tuple = self._cache_key_bind_match 

3897 

3898 if ckbm_tuple: 

3899 ckbm, cksm = ckbm_tuple 

3900 for bp in bindparam._cloned_set: 

3901 if bp.key in cksm: 

3902 cb = cksm[bp.key] 

3903 ckbm[cb].append(bindparam) 

3904 

3905 if bindparam.isoutparam: 

3906 self.has_out_parameters = True 

3907 

3908 if post_compile: 

3909 if render_postcompile: 

3910 self._render_postcompile = True 

3911 

3912 if literal_execute: 

3913 self.literal_execute_params |= {bindparam} 

3914 else: 

3915 self.post_compile_params |= {bindparam} 

3916 

3917 ret = self.bindparam_string( 

3918 name, 

3919 post_compile=post_compile, 

3920 expanding=bindparam.expanding, 

3921 bindparam_type=bindparam.type, 

3922 **kwargs, 

3923 ) 

3924 

3925 if bindparam.expanding: 

3926 ret = f"({ret})" 

3927 

3928 return ret 

3929 

3930 def render_bind_cast(self, type_, dbapi_type, sqltext): 

3931 raise NotImplementedError() 

3932 

3933 def render_literal_bindparam( 

3934 self, 

3935 bindparam, 

3936 render_literal_value=NO_ARG, 

3937 bind_expression_template=None, 

3938 **kw, 

3939 ): 

3940 if render_literal_value is not NO_ARG: 

3941 value = render_literal_value 

3942 else: 

3943 if bindparam.value is None and bindparam.callable is None: 

3944 op = kw.get("_binary_op", None) 

3945 if op and op not in (operators.is_, operators.is_not): 

3946 util.warn_limited( 

3947 "Bound parameter '%s' rendering literal NULL in a SQL " 

3948 "expression; comparisons to NULL should not use " 

3949 "operators outside of 'is' or 'is not'", 

3950 (bindparam.key,), 

3951 ) 

3952 return self.process(sqltypes.NULLTYPE, **kw) 

3953 value = bindparam.effective_value 

3954 

3955 if bindparam.expanding: 

3956 leep = self._literal_execute_expanding_parameter_literal_binds 

3957 to_update, replacement_expr = leep( 

3958 bindparam, 

3959 value, 

3960 bind_expression_template=bind_expression_template, 

3961 ) 

3962 return replacement_expr 

3963 else: 

3964 return self.render_literal_value(value, bindparam.type) 

3965 

3966 def render_literal_value( 

3967 self, value: Any, type_: sqltypes.TypeEngine[Any] 

3968 ) -> str: 

3969 """Render the value of a bind parameter as a quoted literal. 

3970 

3971 This is used for statement sections that do not accept bind parameters 

3972 on the target driver/database. 

3973 

3974 This should be implemented by subclasses using the quoting services 

3975 of the DBAPI. 

3976 

3977 """ 

3978 

3979 if value is None and not type_.should_evaluate_none: 

3980 # issue #10535 - handle NULL in the compiler without placing 

3981 # this onto each type, except for "evaluate None" types 

3982 # (e.g. JSON) 

3983 return self.process(elements.Null._instance()) 

3984 

3985 processor = type_._cached_literal_processor(self.dialect) 

3986 if processor: 

3987 try: 

3988 return processor(value) 

3989 except Exception as e: 

3990 raise exc.CompileError( 

3991 f"Could not render literal value " 

3992 f'"{sql_util._repr_single_value(value)}" ' 

3993 f"with datatype " 

3994 f"{type_}; see parent stack trace for " 

3995 "more detail." 

3996 ) from e 

3997 

3998 else: 

3999 raise exc.CompileError( 

4000 f"No literal value renderer is available for literal value " 

4001 f'"{sql_util._repr_single_value(value)}" ' 

4002 f"with datatype {type_}" 

4003 ) 

4004 

4005 def _truncate_bindparam(self, bindparam): 

4006 if bindparam in self.bind_names: 

4007 return self.bind_names[bindparam] 

4008 

4009 bind_name = bindparam.key 

4010 if isinstance(bind_name, elements._truncated_label): 

4011 bind_name = self._truncated_identifier("bindparam", bind_name) 

4012 

4013 # add to bind_names for translation 

4014 self.bind_names[bindparam] = bind_name 

4015 

4016 return bind_name 

4017 

4018 def _truncated_identifier( 

4019 self, ident_class: str, name: _truncated_label 

4020 ) -> str: 

4021 if (ident_class, name) in self.truncated_names: 

4022 return self.truncated_names[(ident_class, name)] 

4023 

4024 anonname = name.apply_map(self.anon_map) 

4025 

4026 if len(anonname) > self.label_length - 6: 

4027 counter = self._truncated_counters.get(ident_class, 1) 

4028 truncname = ( 

4029 anonname[0 : max(self.label_length - 6, 0)] 

4030 + "_" 

4031 + hex(counter)[2:] 

4032 ) 

4033 self._truncated_counters[ident_class] = counter + 1 

4034 else: 

4035 truncname = anonname 

4036 self.truncated_names[(ident_class, name)] = truncname 

4037 return truncname 

4038 

4039 def _anonymize(self, name: str) -> str: 

4040 return name % self.anon_map 

4041 

4042 def bindparam_string( 

4043 self, 

4044 name: str, 

4045 post_compile: bool = False, 

4046 expanding: bool = False, 

4047 escaped_from: Optional[str] = None, 

4048 bindparam_type: Optional[TypeEngine[Any]] = None, 

4049 accumulate_bind_names: Optional[Set[str]] = None, 

4050 visited_bindparam: Optional[List[str]] = None, 

4051 **kw: Any, 

4052 ) -> str: 

4053 # TODO: accumulate_bind_names is passed by crud.py to gather 

4054 # names on a per-value basis, visited_bindparam is passed by 

4055 # visit_insert() to collect all parameters in the statement. 

4056 # see if this gathering can be simplified somehow 

4057 if accumulate_bind_names is not None: 

4058 accumulate_bind_names.add(name) 

4059 if visited_bindparam is not None: 

4060 visited_bindparam.append(name) 

4061 

4062 if not escaped_from: 

4063 if self._bind_translate_re.search(name): 

4064 # not quite the translate use case as we want to 

4065 # also get a quick boolean if we even found 

4066 # unusual characters in the name 

4067 new_name = self._bind_translate_re.sub( 

4068 lambda m: self._bind_translate_chars[m.group(0)], 

4069 name, 

4070 ) 

4071 escaped_from = name 

4072 name = new_name 

4073 

4074 if escaped_from: 

4075 self.escaped_bind_names = self.escaped_bind_names.union( 

4076 {escaped_from: name} 

4077 ) 

4078 if post_compile: 

4079 ret = "__[POSTCOMPILE_%s]" % name 

4080 if expanding: 

4081 # for expanding, bound parameters or literal values will be 

4082 # rendered per item 

4083 return ret 

4084 

4085 # otherwise, for non-expanding "literal execute", apply 

4086 # bind casts as determined by the datatype 

4087 if bindparam_type is not None: 

4088 type_impl = bindparam_type._unwrapped_dialect_impl( 

4089 self.dialect 

4090 ) 

4091 if type_impl.render_literal_cast: 

4092 ret = self.render_bind_cast(bindparam_type, type_impl, ret) 

4093 return ret 

4094 elif self.state is CompilerState.COMPILING: 

4095 ret = self.compilation_bindtemplate % {"name": name} 

4096 else: 

4097 ret = self.bindtemplate % {"name": name} 

4098 

4099 if ( 

4100 bindparam_type is not None 

4101 and self.dialect._bind_typing_render_casts 

4102 ): 

4103 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect) 

4104 if type_impl.render_bind_cast: 

4105 ret = self.render_bind_cast(bindparam_type, type_impl, ret) 

4106 

4107 return ret 

4108 

4109 def _dispatch_independent_ctes(self, stmt, kw): 

4110 local_kw = kw.copy() 

4111 local_kw.pop("cte_opts", None) 

4112 for cte, opt in zip( 

4113 stmt._independent_ctes, stmt._independent_ctes_opts 

4114 ): 

4115 cte._compiler_dispatch(self, cte_opts=opt, **local_kw) 

4116 

4117 def visit_cte( 

4118 self, 

4119 cte: CTE, 

4120 asfrom: bool = False, 

4121 ashint: bool = False, 

4122 fromhints: Optional[_FromHintsType] = None, 

4123 visiting_cte: Optional[CTE] = None, 

4124 from_linter: Optional[FromLinter] = None, 

4125 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False), 

4126 **kwargs: Any, 

4127 ) -> Optional[str]: 

4128 self_ctes = self._init_cte_state() 

4129 assert self_ctes is self.ctes 

4130 

4131 kwargs["visiting_cte"] = cte 

4132 

4133 cte_name = cte.name 

4134 

4135 if isinstance(cte_name, elements._truncated_label): 

4136 cte_name = self._truncated_identifier("alias", cte_name) 

4137 

4138 is_new_cte = True 

4139 embedded_in_current_named_cte = False 

4140 

4141 _reference_cte = cte._get_reference_cte() 

4142 

4143 nesting = cte.nesting or cte_opts.nesting 

4144 

4145 # check for CTE already encountered 

4146 if _reference_cte in self.level_name_by_cte: 

4147 cte_level, _, existing_cte_opts = self.level_name_by_cte[ 

4148 _reference_cte 

4149 ] 

4150 assert _ == cte_name 

4151 

4152 cte_level_name = (cte_level, cte_name) 

4153 existing_cte = self.ctes_by_level_name[cte_level_name] 

4154 

4155 # check if we are receiving it here with a specific 

4156 # "nest_here" location; if so, move it to this location 

4157 

4158 if cte_opts.nesting: 

4159 if existing_cte_opts.nesting: 

4160 raise exc.CompileError( 

4161 "CTE is stated as 'nest_here' in " 

4162 "more than one location" 

4163 ) 

4164 

4165 old_level_name = (cte_level, cte_name) 

4166 cte_level = len(self.stack) if nesting else 1 

4167 cte_level_name = new_level_name = (cte_level, cte_name) 

4168 

4169 del self.ctes_by_level_name[old_level_name] 

4170 self.ctes_by_level_name[new_level_name] = existing_cte 

4171 self.level_name_by_cte[_reference_cte] = new_level_name + ( 

4172 cte_opts, 

4173 ) 

4174 

4175 else: 

4176 cte_level = len(self.stack) if nesting else 1 

4177 cte_level_name = (cte_level, cte_name) 

4178 

4179 if cte_level_name in self.ctes_by_level_name: 

4180 existing_cte = self.ctes_by_level_name[cte_level_name] 

4181 else: 

4182 existing_cte = None 

4183 

4184 if existing_cte is not None: 

4185 embedded_in_current_named_cte = visiting_cte is existing_cte 

4186 

4187 # we've generated a same-named CTE that we are enclosed in, 

4188 # or this is the same CTE. just return the name. 

4189 if cte is existing_cte._restates or cte is existing_cte: 

4190 is_new_cte = False 

4191 elif existing_cte is cte._restates: 

4192 # we've generated a same-named CTE that is 

4193 # enclosed in us - we take precedence, so 

4194 # discard the text for the "inner". 

4195 del self_ctes[existing_cte] 

4196 

4197 existing_cte_reference_cte = existing_cte._get_reference_cte() 

4198 

4199 assert existing_cte_reference_cte is _reference_cte 

4200 assert existing_cte_reference_cte is existing_cte 

4201 

4202 del self.level_name_by_cte[existing_cte_reference_cte] 

4203 else: 

4204 if ( 

4205 # if the two CTEs have the same hash, which we expect 

4206 # here means that one/both is an annotated of the other 

4207 (hash(cte) == hash(existing_cte)) 

4208 # or... 

4209 or ( 

4210 ( 

4211 # if they are clones, i.e. they came from the ORM 

4212 # or some other visit method 

4213 cte._is_clone_of is not None 

4214 or existing_cte._is_clone_of is not None 

4215 ) 

4216 # and are deep-copy identical 

4217 and cte.compare(existing_cte) 

4218 ) 

4219 ): 

4220 # then consider these two CTEs the same 

4221 is_new_cte = False 

4222 else: 

4223 # otherwise these are two CTEs that either will render 

4224 # differently, or were indicated separately by the user, 

4225 # with the same name 

4226 raise exc.CompileError( 

4227 "Multiple, unrelated CTEs found with " 

4228 "the same name: %r" % cte_name 

4229 ) 

4230 

4231 if not asfrom and not is_new_cte: 

4232 return None 

4233 

4234 if cte._cte_alias is not None: 

4235 pre_alias_cte = cte._cte_alias 

4236 cte_pre_alias_name = cte._cte_alias.name 

4237 if isinstance(cte_pre_alias_name, elements._truncated_label): 

4238 cte_pre_alias_name = self._truncated_identifier( 

4239 "alias", cte_pre_alias_name 

4240 ) 

4241 else: 

4242 pre_alias_cte = cte 

4243 cte_pre_alias_name = None 

4244 

4245 if is_new_cte: 

4246 self.ctes_by_level_name[cte_level_name] = cte 

4247 self.level_name_by_cte[_reference_cte] = cte_level_name + ( 

4248 cte_opts, 

4249 ) 

4250 

4251 if pre_alias_cte not in self.ctes: 

4252 self.visit_cte(pre_alias_cte, **kwargs) 

4253 

4254 if not cte_pre_alias_name and cte not in self_ctes: 

4255 if cte.recursive: 

4256 self.ctes_recursive = True 

4257 text = self.preparer.format_alias(cte, cte_name) 

4258 if cte.recursive or cte.element.name_cte_columns: 

4259 col_source = cte.element 

4260 

4261 # TODO: can we get at the .columns_plus_names collection 

4262 # that is already (or will be?) generated for the SELECT 

4263 # rather than calling twice? 

4264 recur_cols = [ 

4265 # TODO: proxy_name is not technically safe, 

4266 # see test_cte-> 

4267 # test_with_recursive_no_name_currently_buggy. not 

4268 # clear what should be done with such a case 

4269 fallback_label_name or proxy_name 

4270 for ( 

4271 _, 

4272 proxy_name, 

4273 fallback_label_name, 

4274 c, 

4275 repeated, 

4276 ) in (col_source._generate_columns_plus_names(True)) 

4277 if not repeated 

4278 ] 

4279 

4280 text += "(%s)" % ( 

4281 ", ".join( 

4282 self.preparer.format_label_name( 

4283 ident, anon_map=self.anon_map 

4284 ) 

4285 for ident in recur_cols 

4286 ) 

4287 ) 

4288 

4289 assert kwargs.get("subquery", False) is False 

4290 

4291 if not self.stack: 

4292 # toplevel, this is a stringify of the 

4293 # cte directly. just compile the inner 

4294 # the way alias() does. 

4295 return cte.element._compiler_dispatch( 

4296 self, asfrom=asfrom, **kwargs 

4297 ) 

4298 else: 

4299 prefixes = self._generate_prefixes( 

4300 cte, cte._prefixes, **kwargs 

4301 ) 

4302 inner = cte.element._compiler_dispatch( 

4303 self, asfrom=True, **kwargs 

4304 ) 

4305 

4306 text += " AS %s\n(%s)" % (prefixes, inner) 

4307 

4308 if cte._suffixes: 

4309 text += " " + self._generate_prefixes( 

4310 cte, cte._suffixes, **kwargs 

4311 ) 

4312 

4313 self_ctes[cte] = text 

4314 

4315 if asfrom: 

4316 if from_linter: 

4317 from_linter.froms[cte._de_clone()] = cte_name 

4318 

4319 if not is_new_cte and embedded_in_current_named_cte: 

4320 return self.preparer.format_alias(cte, cte_name) 

4321 

4322 if cte_pre_alias_name: 

4323 text = self.preparer.format_alias(cte, cte_pre_alias_name) 

4324 if self.preparer._requires_quotes(cte_name): 

4325 cte_name = self.preparer.quote(cte_name) 

4326 text += self.get_render_as_alias_suffix(cte_name) 

4327 return text # type: ignore[no-any-return] 

4328 else: 

4329 return self.preparer.format_alias(cte, cte_name) 

4330 

4331 return None 

4332 

4333 def visit_table_valued_alias(self, element, **kw): 

4334 if element.joins_implicitly: 

4335 kw["from_linter"] = None 

4336 if element._is_lateral: 

4337 return self.visit_lateral(element, **kw) 

4338 else: 

4339 return self.visit_alias(element, **kw) 

4340 

4341 def visit_table_valued_column(self, element, **kw): 

4342 return self.visit_column(element, **kw) 

4343 

4344 def visit_alias( 

4345 self, 

4346 alias, 

4347 asfrom=False, 

4348 ashint=False, 

4349 iscrud=False, 

4350 fromhints=None, 

4351 subquery=False, 

4352 lateral=False, 

4353 enclosing_alias=None, 

4354 from_linter=None, 

4355 **kwargs, 

4356 ): 

4357 if lateral: 

4358 if "enclosing_lateral" not in kwargs: 

4359 # if lateral is set and enclosing_lateral is not 

4360 # present, we assume we are being called directly 

4361 # from visit_lateral() and we need to set enclosing_lateral. 

4362 assert alias._is_lateral 

4363 kwargs["enclosing_lateral"] = alias 

4364 

4365 # for lateral objects, we track a second from_linter that is... 

4366 # lateral! to the level above us. 

4367 if ( 

4368 from_linter 

4369 and "lateral_from_linter" not in kwargs 

4370 and "enclosing_lateral" in kwargs 

4371 ): 

4372 kwargs["lateral_from_linter"] = from_linter 

4373 

4374 if enclosing_alias is not None and enclosing_alias.element is alias: 

4375 inner = alias.element._compiler_dispatch( 

4376 self, 

4377 asfrom=asfrom, 

4378 ashint=ashint, 

4379 iscrud=iscrud, 

4380 fromhints=fromhints, 

4381 lateral=lateral, 

4382 enclosing_alias=alias, 

4383 **kwargs, 

4384 ) 

4385 if subquery and (asfrom or lateral): 

4386 inner = "(%s)" % (inner,) 

4387 return inner 

4388 else: 

4389 kwargs["enclosing_alias"] = alias 

4390 

4391 if asfrom or ashint: 

4392 if isinstance(alias.name, elements._truncated_label): 

4393 alias_name = self._truncated_identifier("alias", alias.name) 

4394 else: 

4395 alias_name = alias.name 

4396 

4397 if ashint: 

4398 return self.preparer.format_alias(alias, alias_name) 

4399 elif asfrom: 

4400 if from_linter: 

4401 from_linter.froms[alias._de_clone()] = alias_name 

4402 

4403 inner = alias.element._compiler_dispatch( 

4404 self, asfrom=True, lateral=lateral, **kwargs 

4405 ) 

4406 if subquery: 

4407 inner = "(%s)" % (inner,) 

4408 

4409 ret = inner + self.get_render_as_alias_suffix( 

4410 self.preparer.format_alias(alias, alias_name) 

4411 ) 

4412 

4413 if alias._supports_derived_columns and alias._render_derived: 

4414 ret += "(%s)" % ( 

4415 ", ".join( 

4416 "%s%s" 

4417 % ( 

4418 self.preparer.quote(col.name), 

4419 ( 

4420 " %s" 

4421 % self.dialect.type_compiler_instance.process( 

4422 col.type, **kwargs 

4423 ) 

4424 if alias._render_derived_w_types 

4425 else "" 

4426 ), 

4427 ) 

4428 for col in alias.c 

4429 ) 

4430 ) 

4431 

4432 if fromhints and alias in fromhints: 

4433 ret = self.format_from_hint_text( 

4434 ret, alias, fromhints[alias], iscrud 

4435 ) 

4436 

4437 return ret 

4438 else: 

4439 # note we cancel the "subquery" flag here as well 

4440 return alias.element._compiler_dispatch( 

4441 self, lateral=lateral, **kwargs 

4442 ) 

4443 

4444 def visit_subquery(self, subquery, **kw): 

4445 kw["subquery"] = True 

4446 return self.visit_alias(subquery, **kw) 

4447 

4448 def visit_lateral(self, lateral_, **kw): 

4449 kw["lateral"] = True 

4450 return "LATERAL %s" % self.visit_alias(lateral_, **kw) 

4451 

4452 def visit_tablesample(self, tablesample, asfrom=False, **kw): 

4453 text = "%s TABLESAMPLE %s" % ( 

4454 self.visit_alias(tablesample, asfrom=True, **kw), 

4455 tablesample._get_method()._compiler_dispatch(self, **kw), 

4456 ) 

4457 

4458 if tablesample.seed is not None: 

4459 text += " REPEATABLE (%s)" % ( 

4460 tablesample.seed._compiler_dispatch(self, **kw) 

4461 ) 

4462 

4463 return text 

4464 

4465 def _render_values(self, element, **kw): 

4466 kw.setdefault("literal_binds", element.literal_binds) 

4467 tuples = ", ".join( 

4468 self.process( 

4469 elements.Tuple( 

4470 types=element._column_types, *elem 

4471 ).self_group(), 

4472 **kw, 

4473 ) 

4474 for chunk in element._data 

4475 for elem in chunk 

4476 ) 

4477 return f"VALUES {tuples}" 

4478 

4479 def visit_values( 

4480 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw 

4481 ): 

4482 

4483 if element._independent_ctes: 

4484 self._dispatch_independent_ctes(element, kw) 

4485 

4486 v = self._render_values(element, **kw) 

4487 

4488 if element._unnamed: 

4489 name = None 

4490 elif isinstance(element.name, elements._truncated_label): 

4491 name = self._truncated_identifier("values", element.name) 

4492 else: 

4493 name = element.name 

4494 

4495 if element._is_lateral: 

4496 lateral = "LATERAL " 

4497 else: 

4498 lateral = "" 

4499 

4500 if asfrom: 

4501 if from_linter: 

4502 from_linter.froms[element._de_clone()] = ( 

4503 name if name is not None else "(unnamed VALUES element)" 

4504 ) 

4505 

4506 if visiting_cte is not None and visiting_cte.element is element: 

4507 if element._is_lateral: 

4508 raise exc.CompileError( 

4509 "Can't use a LATERAL VALUES expression inside of a CTE" 

4510 ) 

4511 elif name: 

4512 kw["include_table"] = False 

4513 v = "%s(%s)%s (%s)" % ( 

4514 lateral, 

4515 v, 

4516 self.get_render_as_alias_suffix(self.preparer.quote(name)), 

4517 ( 

4518 ", ".join( 

4519 c._compiler_dispatch(self, **kw) 

4520 for c in element.columns 

4521 ) 

4522 ), 

4523 ) 

4524 else: 

4525 v = "%s(%s)" % (lateral, v) 

4526 return v 

4527 

4528 def visit_scalar_values(self, element, **kw): 

4529 return f"({self._render_values(element, **kw)})" 

4530 

4531 def get_render_as_alias_suffix(self, alias_name_text): 

4532 return " AS " + alias_name_text 

4533 

4534 def _add_to_result_map( 

4535 self, 

4536 keyname: str, 

4537 name: str, 

4538 objects: Tuple[Any, ...], 

4539 type_: TypeEngine[Any], 

4540 ) -> None: 

4541 

4542 # note objects must be non-empty for cursor.py to handle the 

4543 # collection properly 

4544 assert objects 

4545 

4546 if keyname is None or keyname == "*": 

4547 self._ordered_columns = False 

4548 self._ad_hoc_textual = True 

4549 if type_._is_tuple_type: 

4550 raise exc.CompileError( 

4551 "Most backends don't support SELECTing " 

4552 "from a tuple() object. If this is an ORM query, " 

4553 "consider using the Bundle object." 

4554 ) 

4555 self._result_columns.append( 

4556 ResultColumnsEntry(keyname, name, objects, type_) 

4557 ) 

4558 

4559 def _label_returning_column( 

4560 self, stmt, column, populate_result_map, column_clause_args=None, **kw 

4561 ): 

4562 """Render a column with necessary labels inside of a RETURNING clause. 

4563 

4564 This method is provided for individual dialects in place of calling 

4565 the _label_select_column method directly, so that the two use cases 

4566 of RETURNING vs. SELECT can be disambiguated going forward. 

4567 

4568 .. versionadded:: 1.4.21 

4569 

4570 """ 

4571 return self._label_select_column( 

4572 None, 

4573 column, 

4574 populate_result_map, 

4575 False, 

4576 {} if column_clause_args is None else column_clause_args, 

4577 **kw, 

4578 ) 

4579 

4580 def _label_select_column( 

4581 self, 

4582 select, 

4583 column, 

4584 populate_result_map, 

4585 asfrom, 

4586 column_clause_args, 

4587 name=None, 

4588 proxy_name=None, 

4589 fallback_label_name=None, 

4590 within_columns_clause=True, 

4591 column_is_repeated=False, 

4592 need_column_expressions=False, 

4593 include_table=True, 

4594 ): 

4595 """produce labeled columns present in a select().""" 

4596 impl = column.type.dialect_impl(self.dialect) 

4597 

4598 if impl._has_column_expression and ( 

4599 need_column_expressions or populate_result_map 

4600 ): 

4601 col_expr = impl.column_expression(column) 

4602 else: 

4603 col_expr = column 

4604 

4605 if populate_result_map: 

4606 # pass an "add_to_result_map" callable into the compilation 

4607 # of embedded columns. this collects information about the 

4608 # column as it will be fetched in the result and is coordinated 

4609 # with cursor.description when the query is executed. 

4610 add_to_result_map = self._add_to_result_map 

4611 

4612 # if the SELECT statement told us this column is a repeat, 

4613 # wrap the callable with one that prevents the addition of the 

4614 # targets 

4615 if column_is_repeated: 

4616 _add_to_result_map = add_to_result_map 

4617 

4618 def add_to_result_map(keyname, name, objects, type_): 

4619 _add_to_result_map(keyname, name, (keyname,), type_) 

4620 

4621 # if we redefined col_expr for type expressions, wrap the 

4622 # callable with one that adds the original column to the targets 

4623 elif col_expr is not column: 

4624 _add_to_result_map = add_to_result_map 

4625 

4626 def add_to_result_map(keyname, name, objects, type_): 

4627 _add_to_result_map( 

4628 keyname, name, (column,) + objects, type_ 

4629 ) 

4630 

4631 else: 

4632 add_to_result_map = None 

4633 

4634 # this method is used by some of the dialects for RETURNING, 

4635 # which has different inputs. _label_returning_column was added 

4636 # as the better target for this now however for 1.4 we will keep 

4637 # _label_select_column directly compatible with this use case. 

4638 # these assertions right now set up the current expected inputs 

4639 assert within_columns_clause, ( 

4640 "_label_select_column is only relevant within " 

4641 "the columns clause of a SELECT or RETURNING" 

4642 ) 

4643 if isinstance(column, elements.Label): 

4644 if col_expr is not column: 

4645 result_expr = _CompileLabel( 

4646 col_expr, column.name, alt_names=(column.element,) 

4647 ) 

4648 else: 

4649 result_expr = col_expr 

4650 

4651 elif name: 

4652 # here, _columns_plus_names has determined there's an explicit 

4653 # label name we need to use. this is the default for 

4654 # tablenames_plus_columnnames as well as when columns are being 

4655 # deduplicated on name 

4656 

4657 assert ( 

4658 proxy_name is not None 

4659 ), "proxy_name is required if 'name' is passed" 

4660 

4661 result_expr = _CompileLabel( 

4662 col_expr, 

4663 name, 

4664 alt_names=( 

4665 proxy_name, 

4666 # this is a hack to allow legacy result column lookups 

4667 # to work as they did before; this goes away in 2.0. 

4668 # TODO: this only seems to be tested indirectly 

4669 # via test/orm/test_deprecations.py. should be a 

4670 # resultset test for this 

4671 column._tq_label, 

4672 ), 

4673 ) 

4674 else: 

4675 # determine here whether this column should be rendered in 

4676 # a labelled context or not, as we were given no required label 

4677 # name from the caller. Here we apply heuristics based on the kind 

4678 # of SQL expression involved. 

4679 

4680 if col_expr is not column: 

4681 # type-specific expression wrapping the given column, 

4682 # so we render a label 

4683 render_with_label = True 

4684 elif isinstance(column, elements.ColumnClause): 

4685 # table-bound column, we render its name as a label if we are 

4686 # inside of a subquery only 

4687 render_with_label = ( 

4688 asfrom 

4689 and not column.is_literal 

4690 and column.table is not None 

4691 ) 

4692 elif isinstance(column, elements.TextClause): 

4693 render_with_label = False 

4694 elif isinstance(column, elements.UnaryExpression): 

4695 # unary expression. notes added as of #12681 

4696 # 

4697 # By convention, the visit_unary() method 

4698 # itself does not add an entry to the result map, and relies 

4699 # upon either the inner expression creating a result map 

4700 # entry, or if not, by creating a label here that produces 

4701 # the result map entry. Where that happens is based on whether 

4702 # or not the element immediately inside the unary is a 

4703 # NamedColumn subclass or not. 

4704 # 

4705 # Now, this also impacts how the SELECT is written; if 

4706 # we decide to generate a label here, we get the usual 

4707 # "~(x+y) AS anon_1" thing in the columns clause. If we 

4708 # don't, we don't get an AS at all, we get like 

4709 # "~table.column". 

4710 # 

4711 # But here is the important thing as of modernish (like 1.4) 

4712 # versions of SQLAlchemy - **whether or not the AS <label> 

4713 # is present in the statement is not actually important**. 

4714 # We target result columns **positionally** for a fully 

4715 # compiled ``Select()`` object; before 1.4 we needed those 

4716 # labels to match in cursor.description etc etc but now it 

4717 # really doesn't matter. 

4718 # So really, we could set render_with_label True in all cases. 

4719 # Or we could just have visit_unary() populate the result map 

4720 # in all cases. 

4721 # 

4722 # What we're doing here is strictly trying to not rock the 

4723 # boat too much with when we do/don't render "AS label"; 

4724 # labels being present helps in the edge cases that we 

4725 # "fall back" to named cursor.description matching, labels 

4726 # not being present for columns keeps us from having awkward 

4727 # phrases like "SELECT DISTINCT table.x AS x". 

4728 render_with_label = ( 

4729 ( 

4730 # exception case to detect if we render "not boolean" 

4731 # as "not <col>" for native boolean or "<col> = 1" 

4732 # for non-native boolean. this is controlled by 

4733 # visit_is_<true|false>_unary_operator 

4734 column.operator 

4735 in (operators.is_false, operators.is_true) 

4736 and not self.dialect.supports_native_boolean 

4737 ) 

4738 or column._wraps_unnamed_column() 

4739 or asfrom 

4740 ) 

4741 elif ( 

4742 # general class of expressions that don't have a SQL-column 

4743 # addressible name. includes scalar selects, bind parameters, 

4744 # SQL functions, others 

4745 not isinstance(column, elements.NamedColumn) 

4746 # deeper check that indicates there's no natural "name" to 

4747 # this element, which accommodates for custom SQL constructs 

4748 # that might have a ".name" attribute (but aren't SQL 

4749 # functions) but are not implementing this more recently added 

4750 # base class. in theory the "NamedColumn" check should be 

4751 # enough, however here we seek to maintain legacy behaviors 

4752 # as well. 

4753 and column._non_anon_label is None 

4754 ): 

4755 render_with_label = True 

4756 else: 

4757 render_with_label = False 

4758 

4759 if render_with_label: 

4760 if not fallback_label_name: 

4761 # used by the RETURNING case right now. we generate it 

4762 # here as 3rd party dialects may be referring to 

4763 # _label_select_column method directly instead of the 

4764 # just-added _label_returning_column method 

4765 assert not column_is_repeated 

4766 fallback_label_name = column._anon_name_label 

4767 

4768 fallback_label_name = ( 

4769 elements._truncated_label(fallback_label_name) 

4770 if not isinstance( 

4771 fallback_label_name, elements._truncated_label 

4772 ) 

4773 else fallback_label_name 

4774 ) 

4775 

4776 result_expr = _CompileLabel( 

4777 col_expr, fallback_label_name, alt_names=(proxy_name,) 

4778 ) 

4779 else: 

4780 result_expr = col_expr 

4781 

4782 column_clause_args.update( 

4783 within_columns_clause=within_columns_clause, 

4784 add_to_result_map=add_to_result_map, 

4785 include_table=include_table, 

4786 ) 

4787 return result_expr._compiler_dispatch(self, **column_clause_args) 

4788 

4789 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

4790 hinttext = self.get_from_hint_text(table, hint) 

4791 if hinttext: 

4792 sqltext += " " + hinttext 

4793 return sqltext 

4794 

4795 def get_select_hint_text(self, byfroms): 

4796 return None 

4797 

4798 def get_from_hint_text( 

4799 self, table: FromClause, text: Optional[str] 

4800 ) -> Optional[str]: 

4801 return None 

4802 

4803 def get_crud_hint_text(self, table, text): 

4804 return None 

4805 

4806 def get_statement_hint_text(self, hint_texts): 

4807 return " ".join(hint_texts) 

4808 

4809 _default_stack_entry: _CompilerStackEntry 

4810 

4811 if not typing.TYPE_CHECKING: 

4812 _default_stack_entry = util.immutabledict( 

4813 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())] 

4814 ) 

4815 

4816 def _display_froms_for_select( 

4817 self, select_stmt, asfrom, lateral=False, **kw 

4818 ): 

4819 # utility method to help external dialects 

4820 # get the correct from list for a select. 

4821 # specifically the oracle dialect needs this feature 

4822 # right now. 

4823 toplevel = not self.stack 

4824 entry = self._default_stack_entry if toplevel else self.stack[-1] 

4825 

4826 compile_state = select_stmt._compile_state_factory(select_stmt, self) 

4827 

4828 correlate_froms = entry["correlate_froms"] 

4829 asfrom_froms = entry["asfrom_froms"] 

4830 

4831 if asfrom and not lateral: 

4832 froms = compile_state._get_display_froms( 

4833 explicit_correlate_froms=correlate_froms.difference( 

4834 asfrom_froms 

4835 ), 

4836 implicit_correlate_froms=(), 

4837 ) 

4838 else: 

4839 froms = compile_state._get_display_froms( 

4840 explicit_correlate_froms=correlate_froms, 

4841 implicit_correlate_froms=asfrom_froms, 

4842 ) 

4843 return froms 

4844 

4845 translate_select_structure: Any = None 

4846 """if not ``None``, should be a callable which accepts ``(select_stmt, 

4847 **kw)`` and returns a select object. this is used for structural changes 

4848 mostly to accommodate for LIMIT/OFFSET schemes 

4849 

4850 """ 

4851 

4852 def visit_select( 

4853 self, 

4854 select_stmt, 

4855 asfrom=False, 

4856 insert_into=False, 

4857 fromhints=None, 

4858 compound_index=None, 

4859 select_wraps_for=None, 

4860 lateral=False, 

4861 from_linter=None, 

4862 **kwargs, 

4863 ): 

4864 assert select_wraps_for is None, ( 

4865 "SQLAlchemy 1.4 requires use of " 

4866 "the translate_select_structure hook for structural " 

4867 "translations of SELECT objects" 

4868 ) 

4869 

4870 # initial setup of SELECT. the compile_state_factory may now 

4871 # be creating a totally different SELECT from the one that was 

4872 # passed in. for ORM use this will convert from an ORM-state 

4873 # SELECT to a regular "Core" SELECT. other composed operations 

4874 # such as computation of joins will be performed. 

4875 

4876 kwargs["within_columns_clause"] = False 

4877 

4878 compile_state = select_stmt._compile_state_factory( 

4879 select_stmt, self, **kwargs 

4880 ) 

4881 kwargs["ambiguous_table_name_map"] = ( 

4882 compile_state._ambiguous_table_name_map 

4883 ) 

4884 

4885 select_stmt = compile_state.statement 

4886 

4887 toplevel = not self.stack 

4888 

4889 if toplevel and not self.compile_state: 

4890 self.compile_state = compile_state 

4891 

4892 is_embedded_select = compound_index is not None or insert_into 

4893 

4894 # translate step for Oracle, SQL Server which often need to 

4895 # restructure the SELECT to allow for LIMIT/OFFSET and possibly 

4896 # other conditions 

4897 if self.translate_select_structure: 

4898 new_select_stmt = self.translate_select_structure( 

4899 select_stmt, asfrom=asfrom, **kwargs 

4900 ) 

4901 

4902 # if SELECT was restructured, maintain a link to the originals 

4903 # and assemble a new compile state 

4904 if new_select_stmt is not select_stmt: 

4905 compile_state_wraps_for = compile_state 

4906 select_wraps_for = select_stmt 

4907 select_stmt = new_select_stmt 

4908 

4909 compile_state = select_stmt._compile_state_factory( 

4910 select_stmt, self, **kwargs 

4911 ) 

4912 select_stmt = compile_state.statement 

4913 

4914 entry = self._default_stack_entry if toplevel else self.stack[-1] 

4915 

4916 populate_result_map = need_column_expressions = ( 

4917 toplevel 

4918 or entry.get("need_result_map_for_compound", False) 

4919 or entry.get("need_result_map_for_nested", False) 

4920 ) 

4921 

4922 # indicates there is a CompoundSelect in play and we are not the 

4923 # first select 

4924 if compound_index: 

4925 populate_result_map = False 

4926 

4927 # this was first proposed as part of #3372; however, it is not 

4928 # reached in current tests and could possibly be an assertion 

4929 # instead. 

4930 if not populate_result_map and "add_to_result_map" in kwargs: 

4931 del kwargs["add_to_result_map"] 

4932 

4933 froms = self._setup_select_stack( 

4934 select_stmt, compile_state, entry, asfrom, lateral, compound_index 

4935 ) 

4936 

4937 column_clause_args = kwargs.copy() 

4938 column_clause_args.update( 

4939 {"within_label_clause": False, "within_columns_clause": False} 

4940 ) 

4941 

4942 text = "SELECT " # we're off to a good start ! 

4943 

4944 if select_stmt._post_select_clause is not None: 

4945 psc = self.process(select_stmt._post_select_clause, **kwargs) 

4946 if psc is not None: 

4947 text += psc + " " 

4948 

4949 if select_stmt._hints: 

4950 hint_text, byfrom = self._setup_select_hints(select_stmt) 

4951 if hint_text: 

4952 text += hint_text + " " 

4953 else: 

4954 byfrom = None 

4955 

4956 if select_stmt._independent_ctes: 

4957 self._dispatch_independent_ctes(select_stmt, kwargs) 

4958 

4959 if select_stmt._prefixes: 

4960 text += self._generate_prefixes( 

4961 select_stmt, select_stmt._prefixes, **kwargs 

4962 ) 

4963 

4964 text += self.get_select_precolumns(select_stmt, **kwargs) 

4965 

4966 if select_stmt._pre_columns_clause is not None: 

4967 pcc = self.process(select_stmt._pre_columns_clause, **kwargs) 

4968 if pcc is not None: 

4969 text += pcc + " " 

4970 

4971 # the actual list of columns to print in the SELECT column list. 

4972 inner_columns = [ 

4973 c 

4974 for c in [ 

4975 self._label_select_column( 

4976 select_stmt, 

4977 column, 

4978 populate_result_map, 

4979 asfrom, 

4980 column_clause_args, 

4981 name=name, 

4982 proxy_name=proxy_name, 

4983 fallback_label_name=fallback_label_name, 

4984 column_is_repeated=repeated, 

4985 need_column_expressions=need_column_expressions, 

4986 ) 

4987 for ( 

4988 name, 

4989 proxy_name, 

4990 fallback_label_name, 

4991 column, 

4992 repeated, 

4993 ) in compile_state.columns_plus_names 

4994 ] 

4995 if c is not None 

4996 ] 

4997 

4998 if populate_result_map and select_wraps_for is not None: 

4999 # if this select was generated from translate_select, 

5000 # rewrite the targeted columns in the result map 

5001 

5002 translate = dict( 

5003 zip( 

5004 [ 

5005 name 

5006 for ( 

5007 key, 

5008 proxy_name, 

5009 fallback_label_name, 

5010 name, 

5011 repeated, 

5012 ) in compile_state.columns_plus_names 

5013 ], 

5014 [ 

5015 name 

5016 for ( 

5017 key, 

5018 proxy_name, 

5019 fallback_label_name, 

5020 name, 

5021 repeated, 

5022 ) in compile_state_wraps_for.columns_plus_names 

5023 ], 

5024 ) 

5025 ) 

5026 

5027 self._result_columns = [ 

5028 ResultColumnsEntry( 

5029 key, name, tuple(translate.get(o, o) for o in obj), type_ 

5030 ) 

5031 for key, name, obj, type_ in self._result_columns 

5032 ] 

5033 

5034 text = self._compose_select_body( 

5035 text, 

5036 select_stmt, 

5037 compile_state, 

5038 inner_columns, 

5039 froms, 

5040 byfrom, 

5041 toplevel, 

5042 kwargs, 

5043 ) 

5044 

5045 if select_stmt._post_body_clause is not None: 

5046 pbc = self.process(select_stmt._post_body_clause, **kwargs) 

5047 if pbc: 

5048 text += " " + pbc 

5049 

5050 if select_stmt._statement_hints: 

5051 per_dialect = [ 

5052 ht 

5053 for (dialect_name, ht) in select_stmt._statement_hints 

5054 if dialect_name in ("*", self.dialect.name) 

5055 ] 

5056 if per_dialect: 

5057 text += " " + self.get_statement_hint_text(per_dialect) 

5058 

5059 # In compound query, CTEs are shared at the compound level 

5060 if self.ctes and (not is_embedded_select or toplevel): 

5061 nesting_level = len(self.stack) if not toplevel else None 

5062 text = self._render_cte_clause(nesting_level=nesting_level) + text 

5063 

5064 if select_stmt._suffixes: 

5065 text += " " + self._generate_prefixes( 

5066 select_stmt, select_stmt._suffixes, **kwargs 

5067 ) 

5068 

5069 self.stack.pop(-1) 

5070 

5071 return text 

5072 

5073 def _setup_select_hints( 

5074 self, select: Select[Unpack[TupleAny]] 

5075 ) -> Tuple[str, _FromHintsType]: 

5076 byfrom = { 

5077 from_: hinttext 

5078 % {"name": from_._compiler_dispatch(self, ashint=True)} 

5079 for (from_, dialect), hinttext in select._hints.items() 

5080 if dialect in ("*", self.dialect.name) 

5081 } 

5082 hint_text = self.get_select_hint_text(byfrom) 

5083 return hint_text, byfrom 

5084 

5085 def _setup_select_stack( 

5086 self, select, compile_state, entry, asfrom, lateral, compound_index 

5087 ): 

5088 correlate_froms = entry["correlate_froms"] 

5089 asfrom_froms = entry["asfrom_froms"] 

5090 

5091 if compound_index == 0: 

5092 entry["select_0"] = select 

5093 elif compound_index: 

5094 select_0 = entry["select_0"] 

5095 numcols = len(select_0._all_selected_columns) 

5096 

5097 if len(compile_state.columns_plus_names) != numcols: 

5098 raise exc.CompileError( 

5099 "All selectables passed to " 

5100 "CompoundSelect must have identical numbers of " 

5101 "columns; select #%d has %d columns, select " 

5102 "#%d has %d" 

5103 % ( 

5104 1, 

5105 numcols, 

5106 compound_index + 1, 

5107 len(select._all_selected_columns), 

5108 ) 

5109 ) 

5110 

5111 if asfrom and not lateral: 

5112 froms = compile_state._get_display_froms( 

5113 explicit_correlate_froms=correlate_froms.difference( 

5114 asfrom_froms 

5115 ), 

5116 implicit_correlate_froms=(), 

5117 ) 

5118 else: 

5119 froms = compile_state._get_display_froms( 

5120 explicit_correlate_froms=correlate_froms, 

5121 implicit_correlate_froms=asfrom_froms, 

5122 ) 

5123 

5124 new_correlate_froms = set(_from_objects(*froms)) 

5125 all_correlate_froms = new_correlate_froms.union(correlate_froms) 

5126 

5127 new_entry: _CompilerStackEntry = { 

5128 "asfrom_froms": new_correlate_froms, 

5129 "correlate_froms": all_correlate_froms, 

5130 "selectable": select, 

5131 "compile_state": compile_state, 

5132 } 

5133 self.stack.append(new_entry) 

5134 

5135 return froms 

5136 

5137 def _compose_select_body( 

5138 self, 

5139 text, 

5140 select, 

5141 compile_state, 

5142 inner_columns, 

5143 froms, 

5144 byfrom, 

5145 toplevel, 

5146 kwargs, 

5147 ): 

5148 text += ", ".join(inner_columns) 

5149 

5150 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

5151 from_linter = FromLinter({}, set()) 

5152 warn_linting = self.linting & WARN_LINTING 

5153 if toplevel: 

5154 self.from_linter = from_linter 

5155 else: 

5156 from_linter = None 

5157 warn_linting = False 

5158 

5159 # adjust the whitespace for no inner columns, part of #9440, 

5160 # so that a no-col SELECT comes out as "SELECT WHERE..." or 

5161 # "SELECT FROM ...". 

5162 # while it would be better to have built the SELECT starting string 

5163 # without trailing whitespace first, then add whitespace only if inner 

5164 # cols were present, this breaks compatibility with various custom 

5165 # compilation schemes that are currently being tested. 

5166 if not inner_columns: 

5167 text = text.rstrip() 

5168 

5169 if froms: 

5170 text += " \nFROM " 

5171 

5172 if select._hints: 

5173 text += ", ".join( 

5174 [ 

5175 f._compiler_dispatch( 

5176 self, 

5177 asfrom=True, 

5178 fromhints=byfrom, 

5179 from_linter=from_linter, 

5180 **kwargs, 

5181 ) 

5182 for f in froms 

5183 ] 

5184 ) 

5185 else: 

5186 text += ", ".join( 

5187 [ 

5188 f._compiler_dispatch( 

5189 self, 

5190 asfrom=True, 

5191 from_linter=from_linter, 

5192 **kwargs, 

5193 ) 

5194 for f in froms 

5195 ] 

5196 ) 

5197 else: 

5198 text += self.default_from() 

5199 

5200 if select._where_criteria: 

5201 t = self._generate_delimited_and_list( 

5202 select._where_criteria, from_linter=from_linter, **kwargs 

5203 ) 

5204 if t: 

5205 text += " \nWHERE " + t 

5206 

5207 if warn_linting: 

5208 assert from_linter is not None 

5209 from_linter.warn() 

5210 

5211 if select._group_by_clauses: 

5212 text += self.group_by_clause(select, **kwargs) 

5213 

5214 if select._having_criteria: 

5215 t = self._generate_delimited_and_list( 

5216 select._having_criteria, **kwargs 

5217 ) 

5218 if t: 

5219 text += " \nHAVING " + t 

5220 

5221 if select._post_criteria_clause is not None: 

5222 pcc = self.process(select._post_criteria_clause, **kwargs) 

5223 if pcc is not None: 

5224 text += " \n" + pcc 

5225 

5226 if select._order_by_clauses: 

5227 text += self.order_by_clause(select, **kwargs) 

5228 

5229 if select._has_row_limiting_clause: 

5230 text += self._row_limit_clause(select, **kwargs) 

5231 

5232 if select._for_update_arg is not None: 

5233 text += self.for_update_clause(select, **kwargs) 

5234 

5235 return text 

5236 

5237 def _generate_prefixes(self, stmt, prefixes, **kw): 

5238 clause = " ".join( 

5239 prefix._compiler_dispatch(self, **kw) 

5240 for prefix, dialect_name in prefixes 

5241 if dialect_name in (None, "*") or dialect_name == self.dialect.name 

5242 ) 

5243 if clause: 

5244 clause += " " 

5245 return clause 

5246 

5247 def _render_cte_clause( 

5248 self, 

5249 nesting_level=None, 

5250 include_following_stack=False, 

5251 ): 

5252 """ 

5253 include_following_stack 

5254 Also render the nesting CTEs on the next stack. Useful for 

5255 SQL structures like UNION or INSERT that can wrap SELECT 

5256 statements containing nesting CTEs. 

5257 """ 

5258 if not self.ctes: 

5259 return "" 

5260 

5261 ctes: MutableMapping[CTE, str] 

5262 

5263 if nesting_level and nesting_level > 1: 

5264 ctes = util.OrderedDict() 

5265 for cte in list(self.ctes.keys()): 

5266 cte_level, cte_name, cte_opts = self.level_name_by_cte[ 

5267 cte._get_reference_cte() 

5268 ] 

5269 nesting = cte.nesting or cte_opts.nesting 

5270 is_rendered_level = cte_level == nesting_level or ( 

5271 include_following_stack and cte_level == nesting_level + 1 

5272 ) 

5273 if not (nesting and is_rendered_level): 

5274 continue 

5275 

5276 ctes[cte] = self.ctes[cte] 

5277 

5278 else: 

5279 ctes = self.ctes 

5280 

5281 if not ctes: 

5282 return "" 

5283 ctes_recursive = any([cte.recursive for cte in ctes]) 

5284 

5285 cte_text = self.get_cte_preamble(ctes_recursive) + " " 

5286 cte_text += ", \n".join([txt for txt in ctes.values()]) 

5287 cte_text += "\n " 

5288 

5289 if nesting_level and nesting_level > 1: 

5290 for cte in list(ctes.keys()): 

5291 cte_level, cte_name, cte_opts = self.level_name_by_cte[ 

5292 cte._get_reference_cte() 

5293 ] 

5294 del self.ctes[cte] 

5295 del self.ctes_by_level_name[(cte_level, cte_name)] 

5296 del self.level_name_by_cte[cte._get_reference_cte()] 

5297 

5298 return cte_text 

5299 

5300 def get_cte_preamble(self, recursive): 

5301 if recursive: 

5302 return "WITH RECURSIVE" 

5303 else: 

5304 return "WITH" 

5305 

5306 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str: 

5307 """Called when building a ``SELECT`` statement, position is just 

5308 before column list. 

5309 

5310 """ 

5311 if select._distinct_on: 

5312 util.warn_deprecated( 

5313 "DISTINCT ON is currently supported only by the PostgreSQL " 

5314 "dialect. Use of DISTINCT ON for other backends is currently " 

5315 "silently ignored, however this usage is deprecated, and will " 

5316 "raise CompileError in a future release for all backends " 

5317 "that do not support this syntax.", 

5318 version="1.4", 

5319 ) 

5320 return "DISTINCT " if select._distinct else "" 

5321 

5322 def group_by_clause(self, select, **kw): 

5323 """allow dialects to customize how GROUP BY is rendered.""" 

5324 

5325 group_by = self._generate_delimited_list( 

5326 select._group_by_clauses, OPERATORS[operators.comma_op], **kw 

5327 ) 

5328 if group_by: 

5329 return " GROUP BY " + group_by 

5330 else: 

5331 return "" 

5332 

5333 def order_by_clause(self, select, **kw): 

5334 """allow dialects to customize how ORDER BY is rendered.""" 

5335 

5336 order_by = self._generate_delimited_list( 

5337 select._order_by_clauses, OPERATORS[operators.comma_op], **kw 

5338 ) 

5339 

5340 if order_by: 

5341 return " ORDER BY " + order_by 

5342 else: 

5343 return "" 

5344 

5345 def for_update_clause(self, select, **kw): 

5346 return " FOR UPDATE" 

5347 

5348 def returning_clause( 

5349 self, 

5350 stmt: UpdateBase, 

5351 returning_cols: Sequence[_ColumnsClauseElement], 

5352 *, 

5353 populate_result_map: bool, 

5354 **kw: Any, 

5355 ) -> str: 

5356 columns = [ 

5357 self._label_returning_column( 

5358 stmt, 

5359 column, 

5360 populate_result_map, 

5361 fallback_label_name=fallback_label_name, 

5362 column_is_repeated=repeated, 

5363 name=name, 

5364 proxy_name=proxy_name, 

5365 **kw, 

5366 ) 

5367 for ( 

5368 name, 

5369 proxy_name, 

5370 fallback_label_name, 

5371 column, 

5372 repeated, 

5373 ) in stmt._generate_columns_plus_names( 

5374 True, cols=base._select_iterables(returning_cols) 

5375 ) 

5376 ] 

5377 

5378 return "RETURNING " + ", ".join(columns) 

5379 

5380 def limit_clause(self, select, **kw): 

5381 text = "" 

5382 if select._limit_clause is not None: 

5383 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

5384 if select._offset_clause is not None: 

5385 if select._limit_clause is None: 

5386 text += "\n LIMIT -1" 

5387 text += " OFFSET " + self.process(select._offset_clause, **kw) 

5388 return text 

5389 

5390 def fetch_clause( 

5391 self, 

5392 select, 

5393 fetch_clause=None, 

5394 require_offset=False, 

5395 use_literal_execute_for_simple_int=False, 

5396 **kw, 

5397 ): 

5398 if fetch_clause is None: 

5399 fetch_clause = select._fetch_clause 

5400 fetch_clause_options = select._fetch_clause_options 

5401 else: 

5402 fetch_clause_options = {"percent": False, "with_ties": False} 

5403 

5404 text = "" 

5405 

5406 if select._offset_clause is not None: 

5407 offset_clause = select._offset_clause 

5408 if ( 

5409 use_literal_execute_for_simple_int 

5410 and select._simple_int_clause(offset_clause) 

5411 ): 

5412 offset_clause = offset_clause.render_literal_execute() 

5413 offset_str = self.process(offset_clause, **kw) 

5414 text += "\n OFFSET %s ROWS" % offset_str 

5415 elif require_offset: 

5416 text += "\n OFFSET 0 ROWS" 

5417 

5418 if fetch_clause is not None: 

5419 if ( 

5420 use_literal_execute_for_simple_int 

5421 and select._simple_int_clause(fetch_clause) 

5422 ): 

5423 fetch_clause = fetch_clause.render_literal_execute() 

5424 text += "\n FETCH FIRST %s%s ROWS %s" % ( 

5425 self.process(fetch_clause, **kw), 

5426 " PERCENT" if fetch_clause_options["percent"] else "", 

5427 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY", 

5428 ) 

5429 return text 

5430 

5431 def visit_table( 

5432 self, 

5433 table, 

5434 asfrom=False, 

5435 iscrud=False, 

5436 ashint=False, 

5437 fromhints=None, 

5438 use_schema=True, 

5439 from_linter=None, 

5440 ambiguous_table_name_map=None, 

5441 enclosing_alias=None, 

5442 **kwargs, 

5443 ): 

5444 if from_linter: 

5445 from_linter.froms[table] = table.fullname 

5446 

5447 if asfrom or ashint: 

5448 effective_schema = self.preparer.schema_for_object(table) 

5449 

5450 if use_schema and effective_schema: 

5451 ret = ( 

5452 self.preparer.quote_schema(effective_schema) 

5453 + "." 

5454 + self.preparer.quote(table.name) 

5455 ) 

5456 else: 

5457 ret = self.preparer.quote(table.name) 

5458 

5459 if ( 

5460 ( 

5461 enclosing_alias is None 

5462 or enclosing_alias.element is not table 

5463 ) 

5464 and not effective_schema 

5465 and ambiguous_table_name_map 

5466 and table.name in ambiguous_table_name_map 

5467 ): 

5468 anon_name = self._truncated_identifier( 

5469 "alias", ambiguous_table_name_map[table.name] 

5470 ) 

5471 

5472 ret = ret + self.get_render_as_alias_suffix( 

5473 self.preparer.format_alias(None, anon_name) 

5474 ) 

5475 

5476 if fromhints and table in fromhints: 

5477 ret = self.format_from_hint_text( 

5478 ret, table, fromhints[table], iscrud 

5479 ) 

5480 return ret 

5481 else: 

5482 return "" 

5483 

5484 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs): 

5485 if from_linter: 

5486 from_linter.edges.update( 

5487 itertools.product( 

5488 _de_clone(join.left._from_objects), 

5489 _de_clone(join.right._from_objects), 

5490 ) 

5491 ) 

5492 

5493 if join.full: 

5494 join_type = " FULL OUTER JOIN " 

5495 elif join.isouter: 

5496 join_type = " LEFT OUTER JOIN " 

5497 else: 

5498 join_type = " JOIN " 

5499 return ( 

5500 join.left._compiler_dispatch( 

5501 self, asfrom=True, from_linter=from_linter, **kwargs 

5502 ) 

5503 + join_type 

5504 + join.right._compiler_dispatch( 

5505 self, asfrom=True, from_linter=from_linter, **kwargs 

5506 ) 

5507 + " ON " 

5508 # TODO: likely need asfrom=True here? 

5509 + join.onclause._compiler_dispatch( 

5510 self, from_linter=from_linter, **kwargs 

5511 ) 

5512 ) 

5513 

5514 def _setup_crud_hints(self, stmt, table_text): 

5515 dialect_hints = { 

5516 table: hint_text 

5517 for (table, dialect), hint_text in stmt._hints.items() 

5518 if dialect in ("*", self.dialect.name) 

5519 } 

5520 if stmt.table in dialect_hints: 

5521 table_text = self.format_from_hint_text( 

5522 table_text, stmt.table, dialect_hints[stmt.table], True 

5523 ) 

5524 return dialect_hints, table_text 

5525 

5526 # within the realm of "insertmanyvalues sentinel columns", 

5527 # these lookups match different kinds of Column() configurations 

5528 # to specific backend capabilities. they are broken into two 

5529 # lookups, one for autoincrement columns and the other for non 

5530 # autoincrement columns 

5531 _sentinel_col_non_autoinc_lookup = util.immutabledict( 

5532 { 

5533 _SentinelDefaultCharacterization.CLIENTSIDE: ( 

5534 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5535 ), 

5536 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: ( 

5537 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5538 ), 

5539 _SentinelDefaultCharacterization.NONE: ( 

5540 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5541 ), 

5542 _SentinelDefaultCharacterization.IDENTITY: ( 

5543 InsertmanyvaluesSentinelOpts.IDENTITY 

5544 ), 

5545 _SentinelDefaultCharacterization.SEQUENCE: ( 

5546 InsertmanyvaluesSentinelOpts.SEQUENCE 

5547 ), 

5548 } 

5549 ) 

5550 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union( 

5551 { 

5552 _SentinelDefaultCharacterization.NONE: ( 

5553 InsertmanyvaluesSentinelOpts.AUTOINCREMENT 

5554 ), 

5555 } 

5556 ) 

5557 

5558 def _get_sentinel_column_for_table( 

5559 self, table: Table 

5560 ) -> Optional[Sequence[Column[Any]]]: 

5561 """given a :class:`.Table`, return a usable sentinel column or 

5562 columns for this dialect if any. 

5563 

5564 Return None if no sentinel columns could be identified, or raise an 

5565 error if a column was marked as a sentinel explicitly but isn't 

5566 compatible with this dialect. 

5567 

5568 """ 

5569 

5570 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel 

5571 sentinel_characteristics = table._sentinel_column_characteristics 

5572 

5573 sent_cols = sentinel_characteristics.columns 

5574 

5575 if sent_cols is None: 

5576 return None 

5577 

5578 if sentinel_characteristics.is_autoinc: 

5579 bitmask = self._sentinel_col_autoinc_lookup.get( 

5580 sentinel_characteristics.default_characterization, 0 

5581 ) 

5582 else: 

5583 bitmask = self._sentinel_col_non_autoinc_lookup.get( 

5584 sentinel_characteristics.default_characterization, 0 

5585 ) 

5586 

5587 if sentinel_opts & bitmask: 

5588 return sent_cols 

5589 

5590 if sentinel_characteristics.is_explicit: 

5591 # a column was explicitly marked as insert_sentinel=True, 

5592 # however it is not compatible with this dialect. they should 

5593 # not indicate this column as a sentinel if they need to include 

5594 # this dialect. 

5595 

5596 # TODO: do we want non-primary key explicit sentinel cols 

5597 # that can gracefully degrade for some backends? 

5598 # insert_sentinel="degrade" perhaps. not for the initial release. 

5599 # I am hoping people are generally not dealing with this sentinel 

5600 # business at all. 

5601 

5602 # if is_explicit is True, there will be only one sentinel column. 

5603 

5604 raise exc.InvalidRequestError( 

5605 f"Column {sent_cols[0]} can't be explicitly " 

5606 "marked as a sentinel column when using the " 

5607 f"{self.dialect.name} dialect, as the " 

5608 "particular type of default generation on this column is " 

5609 "not currently compatible with this dialect's specific " 

5610 f"INSERT..RETURNING syntax which can receive the " 

5611 "server-generated value in " 

5612 "a deterministic way. To remove this error, remove " 

5613 "insert_sentinel=True from primary key autoincrement " 

5614 "columns; these columns are automatically used as " 

5615 "sentinels for supported dialects in any case." 

5616 ) 

5617 

5618 return None 

5619 

5620 def _deliver_insertmanyvalues_batches( 

5621 self, 

5622 statement: str, 

5623 parameters: _DBAPIMultiExecuteParams, 

5624 compiled_parameters: List[_MutableCoreSingleExecuteParams], 

5625 generic_setinputsizes: Optional[_GenericSetInputSizesType], 

5626 batch_size: int, 

5627 sort_by_parameter_order: bool, 

5628 schema_translate_map: Optional[SchemaTranslateMapType], 

5629 ) -> Iterator[_InsertManyValuesBatch]: 

5630 imv = self._insertmanyvalues 

5631 assert imv is not None 

5632 

5633 if not imv.sentinel_param_keys: 

5634 _sentinel_from_params = None 

5635 else: 

5636 _sentinel_from_params = operator.itemgetter( 

5637 *imv.sentinel_param_keys 

5638 ) 

5639 

5640 lenparams = len(parameters) 

5641 if imv.is_default_expr and not self.dialect.supports_default_metavalue: 

5642 # backend doesn't support 

5643 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ... 

5644 # at the moment this is basically SQL Server due to 

5645 # not being able to use DEFAULT for identity column 

5646 # just yield out that many single statements! still 

5647 # faster than a whole connection.execute() call ;) 

5648 # 

5649 # note we still are taking advantage of the fact that we know 

5650 # we are using RETURNING. The generalized approach of fetching 

5651 # cursor.lastrowid etc. still goes through the more heavyweight 

5652 # "ExecutionContext per statement" system as it isn't usable 

5653 # as a generic "RETURNING" approach 

5654 use_row_at_a_time = True 

5655 downgraded = False 

5656 elif not self.dialect.supports_multivalues_insert or ( 

5657 sort_by_parameter_order 

5658 and self._result_columns 

5659 and (imv.sentinel_columns is None or imv.includes_upsert_behaviors) 

5660 ): 

5661 # deterministic order was requested and the compiler could 

5662 # not organize sentinel columns for this dialect/statement. 

5663 # use row at a time 

5664 use_row_at_a_time = True 

5665 downgraded = True 

5666 else: 

5667 use_row_at_a_time = False 

5668 downgraded = False 

5669 

5670 if use_row_at_a_time: 

5671 for batchnum, (param, compiled_param) in enumerate( 

5672 cast( 

5673 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501 

5674 zip(parameters, compiled_parameters), 

5675 ), 

5676 1, 

5677 ): 

5678 yield _InsertManyValuesBatch( 

5679 statement, 

5680 param, 

5681 generic_setinputsizes, 

5682 [param], 

5683 ( 

5684 [_sentinel_from_params(compiled_param)] 

5685 if _sentinel_from_params 

5686 else [] 

5687 ), 

5688 1, 

5689 batchnum, 

5690 lenparams, 

5691 sort_by_parameter_order, 

5692 downgraded, 

5693 ) 

5694 return 

5695 

5696 if schema_translate_map: 

5697 rst = functools.partial( 

5698 self.preparer._render_schema_translates, 

5699 schema_translate_map=schema_translate_map, 

5700 ) 

5701 else: 

5702 rst = None 

5703 

5704 imv_single_values_expr = imv.single_values_expr 

5705 if rst: 

5706 imv_single_values_expr = rst(imv_single_values_expr) 

5707 

5708 executemany_values = f"({imv_single_values_expr})" 

5709 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__") 

5710 

5711 # Use optional insertmanyvalues_max_parameters 

5712 # to further shrink the batch size so that there are no more than 

5713 # insertmanyvalues_max_parameters params. 

5714 # Currently used by SQL Server, which limits statements to 2100 bound 

5715 # parameters (actually 2099). 

5716 max_params = self.dialect.insertmanyvalues_max_parameters 

5717 if max_params: 

5718 total_num_of_params = len(self.bind_names) 

5719 num_params_per_batch = len(imv.insert_crud_params) 

5720 num_params_outside_of_batch = ( 

5721 total_num_of_params - num_params_per_batch 

5722 ) 

5723 batch_size = min( 

5724 batch_size, 

5725 ( 

5726 (max_params - num_params_outside_of_batch) 

5727 // num_params_per_batch 

5728 ), 

5729 ) 

5730 

5731 batches = cast("List[Sequence[Any]]", list(parameters)) 

5732 compiled_batches = cast( 

5733 "List[Sequence[Any]]", list(compiled_parameters) 

5734 ) 

5735 

5736 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None 

5737 batchnum = 1 

5738 total_batches = lenparams // batch_size + ( 

5739 1 if lenparams % batch_size else 0 

5740 ) 

5741 

5742 insert_crud_params = imv.insert_crud_params 

5743 assert insert_crud_params is not None 

5744 

5745 if rst: 

5746 insert_crud_params = [ 

5747 (col, key, rst(expr), st) 

5748 for col, key, expr, st in insert_crud_params 

5749 ] 

5750 

5751 escaped_bind_names: Mapping[str, str] 

5752 expand_pos_lower_index = expand_pos_upper_index = 0 

5753 

5754 if not self.positional: 

5755 if self.escaped_bind_names: 

5756 escaped_bind_names = self.escaped_bind_names 

5757 else: 

5758 escaped_bind_names = {} 

5759 

5760 all_keys = set(parameters[0]) 

5761 

5762 def apply_placeholders(keys, formatted): 

5763 for key in keys: 

5764 key = escaped_bind_names.get(key, key) 

5765 formatted = formatted.replace( 

5766 self.bindtemplate % {"name": key}, 

5767 self.bindtemplate 

5768 % {"name": f"{key}__EXECMANY_INDEX__"}, 

5769 ) 

5770 return formatted 

5771 

5772 if imv.embed_values_counter: 

5773 imv_values_counter = ", _IMV_VALUES_COUNTER" 

5774 else: 

5775 imv_values_counter = "" 

5776 formatted_values_clause = f"""({', '.join( 

5777 apply_placeholders(bind_keys, formatted) 

5778 for _, _, formatted, bind_keys in insert_crud_params 

5779 )}{imv_values_counter})""" 

5780 

5781 keys_to_replace = all_keys.intersection( 

5782 escaped_bind_names.get(key, key) 

5783 for _, _, _, bind_keys in insert_crud_params 

5784 for key in bind_keys 

5785 ) 

5786 base_parameters = { 

5787 key: parameters[0][key] 

5788 for key in all_keys.difference(keys_to_replace) 

5789 } 

5790 executemany_values_w_comma = "" 

5791 else: 

5792 formatted_values_clause = "" 

5793 keys_to_replace = set() 

5794 base_parameters = {} 

5795 

5796 if imv.embed_values_counter: 

5797 executemany_values_w_comma = ( 

5798 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), " 

5799 ) 

5800 else: 

5801 executemany_values_w_comma = f"({imv_single_values_expr}), " 

5802 

5803 all_names_we_will_expand: Set[str] = set() 

5804 for elem in imv.insert_crud_params: 

5805 all_names_we_will_expand.update(elem[3]) 

5806 

5807 # get the start and end position in a particular list 

5808 # of parameters where we will be doing the "expanding". 

5809 # statements can have params on either side or both sides, 

5810 # given RETURNING and CTEs 

5811 if all_names_we_will_expand: 

5812 positiontup = self.positiontup 

5813 assert positiontup is not None 

5814 

5815 all_expand_positions = { 

5816 idx 

5817 for idx, name in enumerate(positiontup) 

5818 if name in all_names_we_will_expand 

5819 } 

5820 expand_pos_lower_index = min(all_expand_positions) 

5821 expand_pos_upper_index = max(all_expand_positions) + 1 

5822 assert ( 

5823 len(all_expand_positions) 

5824 == expand_pos_upper_index - expand_pos_lower_index 

5825 ) 

5826 

5827 if self._numeric_binds: 

5828 escaped = re.escape(self._numeric_binds_identifier_char) 

5829 executemany_values_w_comma = re.sub( 

5830 rf"{escaped}\d+", "%s", executemany_values_w_comma 

5831 ) 

5832 

5833 while batches: 

5834 batch = batches[0:batch_size] 

5835 compiled_batch = compiled_batches[0:batch_size] 

5836 

5837 batches[0:batch_size] = [] 

5838 compiled_batches[0:batch_size] = [] 

5839 

5840 if batches: 

5841 current_batch_size = batch_size 

5842 else: 

5843 current_batch_size = len(batch) 

5844 

5845 if generic_setinputsizes: 

5846 # if setinputsizes is present, expand this collection to 

5847 # suit the batch length as well 

5848 # currently this will be mssql+pyodbc for internal dialects 

5849 processed_setinputsizes = [ 

5850 (new_key, len_, typ) 

5851 for new_key, len_, typ in ( 

5852 (f"{key}_{index}", len_, typ) 

5853 for index in range(current_batch_size) 

5854 for key, len_, typ in generic_setinputsizes 

5855 ) 

5856 ] 

5857 

5858 replaced_parameters: Any 

5859 if self.positional: 

5860 num_ins_params = imv.num_positional_params_counted 

5861 

5862 batch_iterator: Iterable[Sequence[Any]] 

5863 extra_params_left: Sequence[Any] 

5864 extra_params_right: Sequence[Any] 

5865 

5866 if num_ins_params == len(batch[0]): 

5867 extra_params_left = extra_params_right = () 

5868 batch_iterator = batch 

5869 else: 

5870 extra_params_left = batch[0][:expand_pos_lower_index] 

5871 extra_params_right = batch[0][expand_pos_upper_index:] 

5872 batch_iterator = ( 

5873 b[expand_pos_lower_index:expand_pos_upper_index] 

5874 for b in batch 

5875 ) 

5876 

5877 if imv.embed_values_counter: 

5878 expanded_values_string = ( 

5879 "".join( 

5880 executemany_values_w_comma.replace( 

5881 "_IMV_VALUES_COUNTER", str(i) 

5882 ) 

5883 for i, _ in enumerate(batch) 

5884 ) 

5885 )[:-2] 

5886 else: 

5887 expanded_values_string = ( 

5888 (executemany_values_w_comma * current_batch_size) 

5889 )[:-2] 

5890 

5891 if self._numeric_binds and num_ins_params > 0: 

5892 # numeric will always number the parameters inside of 

5893 # VALUES (and thus order self.positiontup) to be higher 

5894 # than non-VALUES parameters, no matter where in the 

5895 # statement those non-VALUES parameters appear (this is 

5896 # ensured in _process_numeric by numbering first all 

5897 # params that are not in _values_bindparam) 

5898 # therefore all extra params are always 

5899 # on the left side and numbered lower than the VALUES 

5900 # parameters 

5901 assert not extra_params_right 

5902 

5903 start = expand_pos_lower_index + 1 

5904 end = num_ins_params * (current_batch_size) + start 

5905 

5906 # need to format here, since statement may contain 

5907 # unescaped %, while values_string contains just (%s, %s) 

5908 positions = tuple( 

5909 f"{self._numeric_binds_identifier_char}{i}" 

5910 for i in range(start, end) 

5911 ) 

5912 expanded_values_string = expanded_values_string % positions 

5913 

5914 replaced_statement = statement.replace( 

5915 "__EXECMANY_TOKEN__", expanded_values_string 

5916 ) 

5917 

5918 replaced_parameters = tuple( 

5919 itertools.chain.from_iterable(batch_iterator) 

5920 ) 

5921 

5922 replaced_parameters = ( 

5923 extra_params_left 

5924 + replaced_parameters 

5925 + extra_params_right 

5926 ) 

5927 

5928 else: 

5929 replaced_values_clauses = [] 

5930 replaced_parameters = base_parameters.copy() 

5931 

5932 for i, param in enumerate(batch): 

5933 fmv = formatted_values_clause.replace( 

5934 "EXECMANY_INDEX__", str(i) 

5935 ) 

5936 if imv.embed_values_counter: 

5937 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i)) 

5938 

5939 replaced_values_clauses.append(fmv) 

5940 replaced_parameters.update( 

5941 {f"{key}__{i}": param[key] for key in keys_to_replace} 

5942 ) 

5943 

5944 replaced_statement = statement.replace( 

5945 "__EXECMANY_TOKEN__", 

5946 ", ".join(replaced_values_clauses), 

5947 ) 

5948 

5949 yield _InsertManyValuesBatch( 

5950 replaced_statement, 

5951 replaced_parameters, 

5952 processed_setinputsizes, 

5953 batch, 

5954 ( 

5955 [_sentinel_from_params(cb) for cb in compiled_batch] 

5956 if _sentinel_from_params 

5957 else [] 

5958 ), 

5959 current_batch_size, 

5960 batchnum, 

5961 total_batches, 

5962 sort_by_parameter_order, 

5963 False, 

5964 ) 

5965 batchnum += 1 

5966 

5967 def visit_insert( 

5968 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw 

5969 ): 

5970 compile_state = insert_stmt._compile_state_factory( 

5971 insert_stmt, self, **kw 

5972 ) 

5973 insert_stmt = compile_state.statement 

5974 

5975 if visiting_cte is not None: 

5976 kw["visiting_cte"] = visiting_cte 

5977 toplevel = False 

5978 else: 

5979 toplevel = not self.stack 

5980 

5981 if toplevel: 

5982 self.isinsert = True 

5983 if not self.dml_compile_state: 

5984 self.dml_compile_state = compile_state 

5985 if not self.compile_state: 

5986 self.compile_state = compile_state 

5987 

5988 self.stack.append( 

5989 { 

5990 "correlate_froms": set(), 

5991 "asfrom_froms": set(), 

5992 "selectable": insert_stmt, 

5993 } 

5994 ) 

5995 

5996 counted_bindparam = 0 

5997 

5998 # reset any incoming "visited_bindparam" collection 

5999 visited_bindparam = None 

6000 

6001 # for positional, insertmanyvalues needs to know how many 

6002 # bound parameters are in the VALUES sequence; there's no simple 

6003 # rule because default expressions etc. can have zero or more 

6004 # params inside them. After multiple attempts to figure this out, 

6005 # this very simplistic "count after" works and is 

6006 # likely the least amount of callcounts, though looks clumsy 

6007 if self.positional and visiting_cte is None: 

6008 # if we are inside a CTE, don't count parameters 

6009 # here since they wont be for insertmanyvalues. keep 

6010 # visited_bindparam at None so no counting happens. 

6011 # see #9173 

6012 visited_bindparam = [] 

6013 

6014 crud_params_struct = crud._get_crud_params( 

6015 self, 

6016 insert_stmt, 

6017 compile_state, 

6018 toplevel, 

6019 visited_bindparam=visited_bindparam, 

6020 **kw, 

6021 ) 

6022 

6023 if self.positional and visited_bindparam is not None: 

6024 counted_bindparam = len(visited_bindparam) 

6025 if self._numeric_binds: 

6026 if self._values_bindparam is not None: 

6027 self._values_bindparam += visited_bindparam 

6028 else: 

6029 self._values_bindparam = visited_bindparam 

6030 

6031 crud_params_single = crud_params_struct.single_params 

6032 

6033 if ( 

6034 not crud_params_single 

6035 and not self.dialect.supports_default_values 

6036 and not self.dialect.supports_default_metavalue 

6037 and not self.dialect.supports_empty_insert 

6038 ): 

6039 raise exc.CompileError( 

6040 "The '%s' dialect with current database " 

6041 "version settings does not support empty " 

6042 "inserts." % self.dialect.name 

6043 ) 

6044 

6045 if compile_state._has_multi_parameters: 

6046 if not self.dialect.supports_multivalues_insert: 

6047 raise exc.CompileError( 

6048 "The '%s' dialect with current database " 

6049 "version settings does not support " 

6050 "in-place multirow inserts." % self.dialect.name 

6051 ) 

6052 elif ( 

6053 self.implicit_returning or insert_stmt._returning 

6054 ) and insert_stmt._sort_by_parameter_order: 

6055 raise exc.CompileError( 

6056 "RETURNING cannot be determinstically sorted when " 

6057 "using an INSERT which includes multi-row values()." 

6058 ) 

6059 crud_params_single = crud_params_struct.single_params 

6060 else: 

6061 crud_params_single = crud_params_struct.single_params 

6062 

6063 preparer = self.preparer 

6064 supports_default_values = self.dialect.supports_default_values 

6065 

6066 text = "INSERT " 

6067 

6068 if insert_stmt._prefixes: 

6069 text += self._generate_prefixes( 

6070 insert_stmt, insert_stmt._prefixes, **kw 

6071 ) 

6072 

6073 text += "INTO " 

6074 table_text = preparer.format_table(insert_stmt.table) 

6075 

6076 if insert_stmt._hints: 

6077 _, table_text = self._setup_crud_hints(insert_stmt, table_text) 

6078 

6079 if insert_stmt._independent_ctes: 

6080 self._dispatch_independent_ctes(insert_stmt, kw) 

6081 

6082 text += table_text 

6083 

6084 if crud_params_single or not supports_default_values: 

6085 text += " (%s)" % ", ".join( 

6086 [expr for _, expr, _, _ in crud_params_single] 

6087 ) 

6088 

6089 # look for insertmanyvalues attributes that would have been configured 

6090 # by crud.py as it scanned through the columns to be part of the 

6091 # INSERT 

6092 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues 

6093 named_sentinel_params: Optional[Sequence[str]] = None 

6094 add_sentinel_cols = None 

6095 implicit_sentinel = False 

6096 

6097 returning_cols = self.implicit_returning or insert_stmt._returning 

6098 if returning_cols: 

6099 add_sentinel_cols = crud_params_struct.use_sentinel_columns 

6100 if add_sentinel_cols is not None: 

6101 assert use_insertmanyvalues 

6102 

6103 # search for the sentinel column explicitly present 

6104 # in the INSERT columns list, and additionally check that 

6105 # this column has a bound parameter name set up that's in the 

6106 # parameter list. If both of these cases are present, it means 

6107 # we will have a client side value for the sentinel in each 

6108 # parameter set. 

6109 

6110 _params_by_col = { 

6111 col: param_names 

6112 for col, _, _, param_names in crud_params_single 

6113 } 

6114 named_sentinel_params = [] 

6115 for _add_sentinel_col in add_sentinel_cols: 

6116 if _add_sentinel_col not in _params_by_col: 

6117 named_sentinel_params = None 

6118 break 

6119 param_name = self._within_exec_param_key_getter( 

6120 _add_sentinel_col 

6121 ) 

6122 if param_name not in _params_by_col[_add_sentinel_col]: 

6123 named_sentinel_params = None 

6124 break 

6125 named_sentinel_params.append(param_name) 

6126 

6127 if named_sentinel_params is None: 

6128 # if we are not going to have a client side value for 

6129 # the sentinel in the parameter set, that means it's 

6130 # an autoincrement, an IDENTITY, or a server-side SQL 

6131 # expression like nextval('seqname'). So this is 

6132 # an "implicit" sentinel; we will look for it in 

6133 # RETURNING 

6134 # only, and then sort on it. For this case on PG, 

6135 # SQL Server we have to use a special INSERT form 

6136 # that guarantees the server side function lines up with 

6137 # the entries in the VALUES. 

6138 if ( 

6139 self.dialect.insertmanyvalues_implicit_sentinel 

6140 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

6141 ): 

6142 implicit_sentinel = True 

6143 else: 

6144 # here, we are not using a sentinel at all 

6145 # and we are likely the SQLite dialect. 

6146 # The first add_sentinel_col that we have should not 

6147 # be marked as "insert_sentinel=True". if it was, 

6148 # an error should have been raised in 

6149 # _get_sentinel_column_for_table. 

6150 assert not add_sentinel_cols[0]._insert_sentinel, ( 

6151 "sentinel selection rules should have prevented " 

6152 "us from getting here for this dialect" 

6153 ) 

6154 

6155 # always put the sentinel columns last. even if they are 

6156 # in the returning list already, they will be there twice 

6157 # then. 

6158 returning_cols = list(returning_cols) + list(add_sentinel_cols) 

6159 

6160 returning_clause = self.returning_clause( 

6161 insert_stmt, 

6162 returning_cols, 

6163 populate_result_map=toplevel, 

6164 ) 

6165 

6166 if self.returning_precedes_values: 

6167 text += " " + returning_clause 

6168 

6169 else: 

6170 returning_clause = None 

6171 

6172 if insert_stmt.select is not None: 

6173 # placed here by crud.py 

6174 select_text = self.process( 

6175 self.stack[-1]["insert_from_select"], insert_into=True, **kw 

6176 ) 

6177 

6178 if self.ctes and self.dialect.cte_follows_insert: 

6179 nesting_level = len(self.stack) if not toplevel else None 

6180 text += " %s%s" % ( 

6181 self._render_cte_clause( 

6182 nesting_level=nesting_level, 

6183 include_following_stack=True, 

6184 ), 

6185 select_text, 

6186 ) 

6187 else: 

6188 text += " %s" % select_text 

6189 elif not crud_params_single and supports_default_values: 

6190 text += " DEFAULT VALUES" 

6191 if use_insertmanyvalues: 

6192 self._insertmanyvalues = _InsertManyValues( 

6193 True, 

6194 self.dialect.default_metavalue_token, 

6195 cast( 

6196 "List[crud._CrudParamElementStr]", crud_params_single 

6197 ), 

6198 counted_bindparam, 

6199 sort_by_parameter_order=( 

6200 insert_stmt._sort_by_parameter_order 

6201 ), 

6202 includes_upsert_behaviors=( 

6203 insert_stmt._post_values_clause is not None 

6204 ), 

6205 sentinel_columns=add_sentinel_cols, 

6206 num_sentinel_columns=( 

6207 len(add_sentinel_cols) if add_sentinel_cols else 0 

6208 ), 

6209 implicit_sentinel=implicit_sentinel, 

6210 ) 

6211 elif compile_state._has_multi_parameters: 

6212 text += " VALUES %s" % ( 

6213 ", ".join( 

6214 "(%s)" 

6215 % (", ".join(value for _, _, value, _ in crud_param_set)) 

6216 for crud_param_set in crud_params_struct.all_multi_params 

6217 ), 

6218 ) 

6219 else: 

6220 insert_single_values_expr = ", ".join( 

6221 [ 

6222 value 

6223 for _, _, value, _ in cast( 

6224 "List[crud._CrudParamElementStr]", 

6225 crud_params_single, 

6226 ) 

6227 ] 

6228 ) 

6229 

6230 if use_insertmanyvalues: 

6231 if ( 

6232 implicit_sentinel 

6233 and ( 

6234 self.dialect.insertmanyvalues_implicit_sentinel 

6235 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

6236 ) 

6237 # this is checking if we have 

6238 # INSERT INTO table (id) VALUES (DEFAULT). 

6239 and not (crud_params_struct.is_default_metavalue_only) 

6240 ): 

6241 # if we have a sentinel column that is server generated, 

6242 # then for selected backends render the VALUES list as a 

6243 # subquery. This is the orderable form supported by 

6244 # PostgreSQL and SQL Server. 

6245 embed_sentinel_value = True 

6246 

6247 render_bind_casts = ( 

6248 self.dialect.insertmanyvalues_implicit_sentinel 

6249 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

6250 ) 

6251 

6252 colnames = ", ".join( 

6253 f"p{i}" for i, _ in enumerate(crud_params_single) 

6254 ) 

6255 

6256 if render_bind_casts: 

6257 # render casts for the SELECT list. For PG, we are 

6258 # already rendering bind casts in the parameter list, 

6259 # selectively for the more "tricky" types like ARRAY. 

6260 # however, even for the "easy" types, if the parameter 

6261 # is NULL for every entry, PG gives up and says 

6262 # "it must be TEXT", which fails for other easy types 

6263 # like ints. So we cast on this side too. 

6264 colnames_w_cast = ", ".join( 

6265 self.render_bind_cast( 

6266 col.type, 

6267 col.type._unwrapped_dialect_impl(self.dialect), 

6268 f"p{i}", 

6269 ) 

6270 for i, (col, *_) in enumerate(crud_params_single) 

6271 ) 

6272 else: 

6273 colnames_w_cast = colnames 

6274 

6275 text += ( 

6276 f" SELECT {colnames_w_cast} FROM " 

6277 f"(VALUES ({insert_single_values_expr})) " 

6278 f"AS imp_sen({colnames}, sen_counter) " 

6279 "ORDER BY sen_counter" 

6280 ) 

6281 else: 

6282 # otherwise, if no sentinel or backend doesn't support 

6283 # orderable subquery form, use a plain VALUES list 

6284 embed_sentinel_value = False 

6285 text += f" VALUES ({insert_single_values_expr})" 

6286 

6287 self._insertmanyvalues = _InsertManyValues( 

6288 is_default_expr=False, 

6289 single_values_expr=insert_single_values_expr, 

6290 insert_crud_params=cast( 

6291 "List[crud._CrudParamElementStr]", 

6292 crud_params_single, 

6293 ), 

6294 num_positional_params_counted=counted_bindparam, 

6295 sort_by_parameter_order=( 

6296 insert_stmt._sort_by_parameter_order 

6297 ), 

6298 includes_upsert_behaviors=( 

6299 insert_stmt._post_values_clause is not None 

6300 ), 

6301 sentinel_columns=add_sentinel_cols, 

6302 num_sentinel_columns=( 

6303 len(add_sentinel_cols) if add_sentinel_cols else 0 

6304 ), 

6305 sentinel_param_keys=named_sentinel_params, 

6306 implicit_sentinel=implicit_sentinel, 

6307 embed_values_counter=embed_sentinel_value, 

6308 ) 

6309 

6310 else: 

6311 text += f" VALUES ({insert_single_values_expr})" 

6312 

6313 if insert_stmt._post_values_clause is not None: 

6314 post_values_clause = self.process( 

6315 insert_stmt._post_values_clause, **kw 

6316 ) 

6317 if post_values_clause: 

6318 text += " " + post_values_clause 

6319 

6320 if returning_clause and not self.returning_precedes_values: 

6321 text += " " + returning_clause 

6322 

6323 if self.ctes and not self.dialect.cte_follows_insert: 

6324 nesting_level = len(self.stack) if not toplevel else None 

6325 text = ( 

6326 self._render_cte_clause( 

6327 nesting_level=nesting_level, 

6328 include_following_stack=True, 

6329 ) 

6330 + text 

6331 ) 

6332 

6333 self.stack.pop(-1) 

6334 

6335 return text 

6336 

6337 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw): 

6338 """Provide a hook to override the initial table clause 

6339 in an UPDATE statement. 

6340 

6341 MySQL overrides this. 

6342 

6343 """ 

6344 kw["asfrom"] = True 

6345 return from_table._compiler_dispatch(self, iscrud=True, **kw) 

6346 

6347 def update_from_clause( 

6348 self, update_stmt, from_table, extra_froms, from_hints, **kw 

6349 ): 

6350 """Provide a hook to override the generation of an 

6351 UPDATE..FROM clause. 

6352 MySQL and MSSQL override this. 

6353 """ 

6354 raise NotImplementedError( 

6355 "This backend does not support multiple-table " 

6356 "criteria within UPDATE" 

6357 ) 

6358 

6359 def update_post_criteria_clause( 

6360 self, update_stmt: Update, **kw: Any 

6361 ) -> Optional[str]: 

6362 """provide a hook to override generation after the WHERE criteria 

6363 in an UPDATE statement 

6364 

6365 .. versionadded:: 2.1 

6366 

6367 """ 

6368 if update_stmt._post_criteria_clause is not None: 

6369 return self.process( 

6370 update_stmt._post_criteria_clause, 

6371 **kw, 

6372 ) 

6373 else: 

6374 return None 

6375 

6376 def delete_post_criteria_clause( 

6377 self, delete_stmt: Delete, **kw: Any 

6378 ) -> Optional[str]: 

6379 """provide a hook to override generation after the WHERE criteria 

6380 in a DELETE statement 

6381 

6382 .. versionadded:: 2.1 

6383 

6384 """ 

6385 if delete_stmt._post_criteria_clause is not None: 

6386 return self.process( 

6387 delete_stmt._post_criteria_clause, 

6388 **kw, 

6389 ) 

6390 else: 

6391 return None 

6392 

6393 def visit_update( 

6394 self, 

6395 update_stmt: Update, 

6396 visiting_cte: Optional[CTE] = None, 

6397 **kw: Any, 

6398 ) -> str: 

6399 compile_state = update_stmt._compile_state_factory( 

6400 update_stmt, self, **kw 

6401 ) 

6402 if TYPE_CHECKING: 

6403 assert isinstance(compile_state, UpdateDMLState) 

6404 update_stmt = compile_state.statement # type: ignore[assignment] 

6405 

6406 if visiting_cte is not None: 

6407 kw["visiting_cte"] = visiting_cte 

6408 toplevel = False 

6409 else: 

6410 toplevel = not self.stack 

6411 

6412 if toplevel: 

6413 self.isupdate = True 

6414 if not self.dml_compile_state: 

6415 self.dml_compile_state = compile_state 

6416 if not self.compile_state: 

6417 self.compile_state = compile_state 

6418 

6419 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

6420 from_linter = FromLinter({}, set()) 

6421 warn_linting = self.linting & WARN_LINTING 

6422 if toplevel: 

6423 self.from_linter = from_linter 

6424 else: 

6425 from_linter = None 

6426 warn_linting = False 

6427 

6428 extra_froms = compile_state._extra_froms 

6429 is_multitable = bool(extra_froms) 

6430 

6431 if is_multitable: 

6432 # main table might be a JOIN 

6433 main_froms = set(_from_objects(update_stmt.table)) 

6434 render_extra_froms = [ 

6435 f for f in extra_froms if f not in main_froms 

6436 ] 

6437 correlate_froms = main_froms.union(extra_froms) 

6438 else: 

6439 render_extra_froms = [] 

6440 correlate_froms = {update_stmt.table} 

6441 

6442 self.stack.append( 

6443 { 

6444 "correlate_froms": correlate_froms, 

6445 "asfrom_froms": correlate_froms, 

6446 "selectable": update_stmt, 

6447 } 

6448 ) 

6449 

6450 text = "UPDATE " 

6451 

6452 if update_stmt._prefixes: 

6453 text += self._generate_prefixes( 

6454 update_stmt, update_stmt._prefixes, **kw 

6455 ) 

6456 

6457 table_text = self.update_tables_clause( 

6458 update_stmt, 

6459 update_stmt.table, 

6460 render_extra_froms, 

6461 from_linter=from_linter, 

6462 **kw, 

6463 ) 

6464 crud_params_struct = crud._get_crud_params( 

6465 self, update_stmt, compile_state, toplevel, **kw 

6466 ) 

6467 crud_params = crud_params_struct.single_params 

6468 

6469 if update_stmt._hints: 

6470 dialect_hints, table_text = self._setup_crud_hints( 

6471 update_stmt, table_text 

6472 ) 

6473 else: 

6474 dialect_hints = None 

6475 

6476 if update_stmt._independent_ctes: 

6477 self._dispatch_independent_ctes(update_stmt, kw) 

6478 

6479 text += table_text 

6480 

6481 text += " SET " 

6482 text += ", ".join( 

6483 expr + "=" + value 

6484 for _, expr, value, _ in cast( 

6485 "List[Tuple[Any, str, str, Any]]", crud_params 

6486 ) 

6487 ) 

6488 

6489 if self.implicit_returning or update_stmt._returning: 

6490 if self.returning_precedes_values: 

6491 text += " " + self.returning_clause( 

6492 update_stmt, 

6493 self.implicit_returning or update_stmt._returning, 

6494 populate_result_map=toplevel, 

6495 ) 

6496 

6497 if extra_froms: 

6498 extra_from_text = self.update_from_clause( 

6499 update_stmt, 

6500 update_stmt.table, 

6501 render_extra_froms, 

6502 dialect_hints, 

6503 from_linter=from_linter, 

6504 **kw, 

6505 ) 

6506 if extra_from_text: 

6507 text += " " + extra_from_text 

6508 

6509 if update_stmt._where_criteria: 

6510 t = self._generate_delimited_and_list( 

6511 update_stmt._where_criteria, from_linter=from_linter, **kw 

6512 ) 

6513 if t: 

6514 text += " WHERE " + t 

6515 

6516 ulc = self.update_post_criteria_clause( 

6517 update_stmt, from_linter=from_linter, **kw 

6518 ) 

6519 if ulc: 

6520 text += " " + ulc 

6521 

6522 if ( 

6523 self.implicit_returning or update_stmt._returning 

6524 ) and not self.returning_precedes_values: 

6525 text += " " + self.returning_clause( 

6526 update_stmt, 

6527 self.implicit_returning or update_stmt._returning, 

6528 populate_result_map=toplevel, 

6529 ) 

6530 

6531 if self.ctes: 

6532 nesting_level = len(self.stack) if not toplevel else None 

6533 text = self._render_cte_clause(nesting_level=nesting_level) + text 

6534 

6535 if warn_linting: 

6536 assert from_linter is not None 

6537 from_linter.warn(stmt_type="UPDATE") 

6538 

6539 self.stack.pop(-1) 

6540 

6541 return text # type: ignore[no-any-return] 

6542 

6543 def delete_extra_from_clause( 

6544 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

6545 ): 

6546 """Provide a hook to override the generation of an 

6547 DELETE..FROM clause. 

6548 

6549 This can be used to implement DELETE..USING for example. 

6550 

6551 MySQL and MSSQL override this. 

6552 

6553 """ 

6554 raise NotImplementedError( 

6555 "This backend does not support multiple-table " 

6556 "criteria within DELETE" 

6557 ) 

6558 

6559 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw): 

6560 return from_table._compiler_dispatch( 

6561 self, asfrom=True, iscrud=True, **kw 

6562 ) 

6563 

6564 def visit_delete(self, delete_stmt, visiting_cte=None, **kw): 

6565 compile_state = delete_stmt._compile_state_factory( 

6566 delete_stmt, self, **kw 

6567 ) 

6568 delete_stmt = compile_state.statement 

6569 

6570 if visiting_cte is not None: 

6571 kw["visiting_cte"] = visiting_cte 

6572 toplevel = False 

6573 else: 

6574 toplevel = not self.stack 

6575 

6576 if toplevel: 

6577 self.isdelete = True 

6578 if not self.dml_compile_state: 

6579 self.dml_compile_state = compile_state 

6580 if not self.compile_state: 

6581 self.compile_state = compile_state 

6582 

6583 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

6584 from_linter = FromLinter({}, set()) 

6585 warn_linting = self.linting & WARN_LINTING 

6586 if toplevel: 

6587 self.from_linter = from_linter 

6588 else: 

6589 from_linter = None 

6590 warn_linting = False 

6591 

6592 extra_froms = compile_state._extra_froms 

6593 

6594 correlate_froms = {delete_stmt.table}.union(extra_froms) 

6595 self.stack.append( 

6596 { 

6597 "correlate_froms": correlate_froms, 

6598 "asfrom_froms": correlate_froms, 

6599 "selectable": delete_stmt, 

6600 } 

6601 ) 

6602 

6603 text = "DELETE " 

6604 

6605 if delete_stmt._prefixes: 

6606 text += self._generate_prefixes( 

6607 delete_stmt, delete_stmt._prefixes, **kw 

6608 ) 

6609 

6610 text += "FROM " 

6611 

6612 try: 

6613 table_text = self.delete_table_clause( 

6614 delete_stmt, 

6615 delete_stmt.table, 

6616 extra_froms, 

6617 from_linter=from_linter, 

6618 ) 

6619 except TypeError: 

6620 # anticipate 3rd party dialects that don't include **kw 

6621 # TODO: remove in 2.1 

6622 table_text = self.delete_table_clause( 

6623 delete_stmt, delete_stmt.table, extra_froms 

6624 ) 

6625 if from_linter: 

6626 _ = self.process(delete_stmt.table, from_linter=from_linter) 

6627 

6628 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw) 

6629 

6630 if delete_stmt._hints: 

6631 dialect_hints, table_text = self._setup_crud_hints( 

6632 delete_stmt, table_text 

6633 ) 

6634 else: 

6635 dialect_hints = None 

6636 

6637 if delete_stmt._independent_ctes: 

6638 self._dispatch_independent_ctes(delete_stmt, kw) 

6639 

6640 text += table_text 

6641 

6642 if ( 

6643 self.implicit_returning or delete_stmt._returning 

6644 ) and self.returning_precedes_values: 

6645 text += " " + self.returning_clause( 

6646 delete_stmt, 

6647 self.implicit_returning or delete_stmt._returning, 

6648 populate_result_map=toplevel, 

6649 ) 

6650 

6651 if extra_froms: 

6652 extra_from_text = self.delete_extra_from_clause( 

6653 delete_stmt, 

6654 delete_stmt.table, 

6655 extra_froms, 

6656 dialect_hints, 

6657 from_linter=from_linter, 

6658 **kw, 

6659 ) 

6660 if extra_from_text: 

6661 text += " " + extra_from_text 

6662 

6663 if delete_stmt._where_criteria: 

6664 t = self._generate_delimited_and_list( 

6665 delete_stmt._where_criteria, from_linter=from_linter, **kw 

6666 ) 

6667 if t: 

6668 text += " WHERE " + t 

6669 

6670 dlc = self.delete_post_criteria_clause( 

6671 delete_stmt, from_linter=from_linter, **kw 

6672 ) 

6673 if dlc: 

6674 text += " " + dlc 

6675 

6676 if ( 

6677 self.implicit_returning or delete_stmt._returning 

6678 ) and not self.returning_precedes_values: 

6679 text += " " + self.returning_clause( 

6680 delete_stmt, 

6681 self.implicit_returning or delete_stmt._returning, 

6682 populate_result_map=toplevel, 

6683 ) 

6684 

6685 if self.ctes: 

6686 nesting_level = len(self.stack) if not toplevel else None 

6687 text = self._render_cte_clause(nesting_level=nesting_level) + text 

6688 

6689 if warn_linting: 

6690 assert from_linter is not None 

6691 from_linter.warn(stmt_type="DELETE") 

6692 

6693 self.stack.pop(-1) 

6694 

6695 return text 

6696 

6697 def visit_savepoint(self, savepoint_stmt, **kw): 

6698 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt) 

6699 

6700 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw): 

6701 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint( 

6702 savepoint_stmt 

6703 ) 

6704 

6705 def visit_release_savepoint(self, savepoint_stmt, **kw): 

6706 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint( 

6707 savepoint_stmt 

6708 ) 

6709 

6710 

6711class StrSQLCompiler(SQLCompiler): 

6712 """A :class:`.SQLCompiler` subclass which allows a small selection 

6713 of non-standard SQL features to render into a string value. 

6714 

6715 The :class:`.StrSQLCompiler` is invoked whenever a Core expression 

6716 element is directly stringified without calling upon the 

6717 :meth:`_expression.ClauseElement.compile` method. 

6718 It can render a limited set 

6719 of non-standard SQL constructs to assist in basic stringification, 

6720 however for more substantial custom or dialect-specific SQL constructs, 

6721 it will be necessary to make use of 

6722 :meth:`_expression.ClauseElement.compile` 

6723 directly. 

6724 

6725 .. seealso:: 

6726 

6727 :ref:`faq_sql_expression_string` 

6728 

6729 """ 

6730 

6731 def _fallback_column_name(self, column): 

6732 return "<name unknown>" 

6733 

6734 @util.preload_module("sqlalchemy.engine.url") 

6735 def visit_unsupported_compilation(self, element, err, **kw): 

6736 if element.stringify_dialect != "default": 

6737 url = util.preloaded.engine_url 

6738 dialect = url.URL.create(element.stringify_dialect).get_dialect()() 

6739 

6740 compiler = dialect.statement_compiler( 

6741 dialect, None, _supporting_against=self 

6742 ) 

6743 if not isinstance(compiler, StrSQLCompiler): 

6744 return compiler.process(element, **kw) 

6745 

6746 return super().visit_unsupported_compilation(element, err) 

6747 

6748 def visit_getitem_binary(self, binary, operator, **kw): 

6749 return "%s[%s]" % ( 

6750 self.process(binary.left, **kw), 

6751 self.process(binary.right, **kw), 

6752 ) 

6753 

6754 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

6755 return self.visit_getitem_binary(binary, operator, **kw) 

6756 

6757 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

6758 return self.visit_getitem_binary(binary, operator, **kw) 

6759 

6760 def visit_sequence(self, sequence, **kw): 

6761 return ( 

6762 f"<next sequence value: {self.preparer.format_sequence(sequence)}>" 

6763 ) 

6764 

6765 def returning_clause( 

6766 self, 

6767 stmt: UpdateBase, 

6768 returning_cols: Sequence[_ColumnsClauseElement], 

6769 *, 

6770 populate_result_map: bool, 

6771 **kw: Any, 

6772 ) -> str: 

6773 columns = [ 

6774 self._label_select_column(None, c, True, False, {}) 

6775 for c in base._select_iterables(returning_cols) 

6776 ] 

6777 return "RETURNING " + ", ".join(columns) 

6778 

6779 def update_from_clause( 

6780 self, update_stmt, from_table, extra_froms, from_hints, **kw 

6781 ): 

6782 kw["asfrom"] = True 

6783 return "FROM " + ", ".join( 

6784 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

6785 for t in extra_froms 

6786 ) 

6787 

6788 def delete_extra_from_clause( 

6789 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

6790 ): 

6791 kw["asfrom"] = True 

6792 return ", " + ", ".join( 

6793 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

6794 for t in extra_froms 

6795 ) 

6796 

6797 def visit_empty_set_expr(self, element_types, **kw): 

6798 return "SELECT 1 WHERE 1!=1" 

6799 

6800 def get_from_hint_text(self, table, text): 

6801 return "[%s]" % text 

6802 

6803 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

6804 return self._generate_generic_binary(binary, " <regexp> ", **kw) 

6805 

6806 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

6807 return self._generate_generic_binary(binary, " <not regexp> ", **kw) 

6808 

6809 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

6810 return "<regexp replace>(%s, %s)" % ( 

6811 binary.left._compiler_dispatch(self, **kw), 

6812 binary.right._compiler_dispatch(self, **kw), 

6813 ) 

6814 

6815 def visit_try_cast(self, cast, **kwargs): 

6816 return "TRY_CAST(%s AS %s)" % ( 

6817 cast.clause._compiler_dispatch(self, **kwargs), 

6818 cast.typeclause._compiler_dispatch(self, **kwargs), 

6819 ) 

6820 

6821 

6822class DDLCompiler(Compiled): 

6823 is_ddl = True 

6824 

6825 if TYPE_CHECKING: 

6826 

6827 def __init__( 

6828 self, 

6829 dialect: Dialect, 

6830 statement: ExecutableDDLElement, 

6831 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

6832 render_schema_translate: bool = ..., 

6833 compile_kwargs: Mapping[str, Any] = ..., 

6834 ): ... 

6835 

6836 @util.ro_memoized_property 

6837 def sql_compiler(self) -> SQLCompiler: 

6838 return self.dialect.statement_compiler( 

6839 self.dialect, None, schema_translate_map=self.schema_translate_map 

6840 ) 

6841 

6842 @util.memoized_property 

6843 def type_compiler(self): 

6844 return self.dialect.type_compiler_instance 

6845 

6846 def construct_params( 

6847 self, 

6848 params: Optional[_CoreSingleExecuteParams] = None, 

6849 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

6850 escape_names: bool = True, 

6851 ) -> Optional[_MutableCoreSingleExecuteParams]: 

6852 return None 

6853 

6854 def visit_ddl(self, ddl, **kwargs): 

6855 # table events can substitute table and schema name 

6856 context = ddl.context 

6857 if isinstance(ddl.target, schema.Table): 

6858 context = context.copy() 

6859 

6860 preparer = self.preparer 

6861 path = preparer.format_table_seq(ddl.target) 

6862 if len(path) == 1: 

6863 table, sch = path[0], "" 

6864 else: 

6865 table, sch = path[-1], path[0] 

6866 

6867 context.setdefault("table", table) 

6868 context.setdefault("schema", sch) 

6869 context.setdefault("fullname", preparer.format_table(ddl.target)) 

6870 

6871 return self.sql_compiler.post_process_text(ddl.statement % context) 

6872 

6873 def visit_create_schema(self, create, **kw): 

6874 text = "CREATE SCHEMA " 

6875 if create.if_not_exists: 

6876 text += "IF NOT EXISTS " 

6877 return text + self.preparer.format_schema(create.element) 

6878 

6879 def visit_drop_schema(self, drop, **kw): 

6880 text = "DROP SCHEMA " 

6881 if drop.if_exists: 

6882 text += "IF EXISTS " 

6883 text += self.preparer.format_schema(drop.element) 

6884 if drop.cascade: 

6885 text += " CASCADE" 

6886 return text 

6887 

6888 def visit_create_table(self, create, **kw): 

6889 table = create.element 

6890 preparer = self.preparer 

6891 

6892 text = "\nCREATE " 

6893 if table._prefixes: 

6894 text += " ".join(table._prefixes) + " " 

6895 

6896 text += "TABLE " 

6897 if create.if_not_exists: 

6898 text += "IF NOT EXISTS " 

6899 

6900 text += preparer.format_table(table) + " " 

6901 

6902 create_table_suffix = self.create_table_suffix(table) 

6903 if create_table_suffix: 

6904 text += create_table_suffix + " " 

6905 

6906 text += "(" 

6907 

6908 separator = "\n" 

6909 

6910 # if only one primary key, specify it along with the column 

6911 first_pk = False 

6912 for create_column in create.columns: 

6913 column = create_column.element 

6914 try: 

6915 processed = self.process( 

6916 create_column, first_pk=column.primary_key and not first_pk 

6917 ) 

6918 if processed is not None: 

6919 text += separator 

6920 separator = ", \n" 

6921 text += "\t" + processed 

6922 if column.primary_key: 

6923 first_pk = True 

6924 except exc.CompileError as ce: 

6925 raise exc.CompileError( 

6926 "(in table '%s', column '%s'): %s" 

6927 % (table.description, column.name, ce.args[0]) 

6928 ) from ce 

6929 

6930 const = self.create_table_constraints( 

6931 table, 

6932 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa 

6933 ) 

6934 if const: 

6935 text += separator + "\t" + const 

6936 

6937 text += "\n)%s\n\n" % self.post_create_table(table) 

6938 return text 

6939 

6940 def visit_create_column(self, create, first_pk=False, **kw): 

6941 column = create.element 

6942 

6943 if column.system: 

6944 return None 

6945 

6946 text = self.get_column_specification(column, first_pk=first_pk) 

6947 const = " ".join( 

6948 self.process(constraint) for constraint in column.constraints 

6949 ) 

6950 if const: 

6951 text += " " + const 

6952 

6953 return text 

6954 

6955 def create_table_constraints( 

6956 self, table, _include_foreign_key_constraints=None, **kw 

6957 ): 

6958 # On some DB order is significant: visit PK first, then the 

6959 # other constraints (engine.ReflectionTest.testbasic failed on FB2) 

6960 constraints = [] 

6961 if table.primary_key: 

6962 constraints.append(table.primary_key) 

6963 

6964 all_fkcs = table.foreign_key_constraints 

6965 if _include_foreign_key_constraints is not None: 

6966 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints) 

6967 else: 

6968 omit_fkcs = set() 

6969 

6970 constraints.extend( 

6971 [ 

6972 c 

6973 for c in table._sorted_constraints 

6974 if c is not table.primary_key and c not in omit_fkcs 

6975 ] 

6976 ) 

6977 

6978 return ", \n\t".join( 

6979 p 

6980 for p in ( 

6981 self.process(constraint) 

6982 for constraint in constraints 

6983 if (constraint._should_create_for_compiler(self)) 

6984 and ( 

6985 not self.dialect.supports_alter 

6986 or not getattr(constraint, "use_alter", False) 

6987 ) 

6988 ) 

6989 if p is not None 

6990 ) 

6991 

6992 def visit_drop_table(self, drop, **kw): 

6993 text = "\nDROP TABLE " 

6994 if drop.if_exists: 

6995 text += "IF EXISTS " 

6996 return text + self.preparer.format_table(drop.element) 

6997 

6998 def visit_drop_view(self, drop, **kw): 

6999 return "\nDROP VIEW " + self.preparer.format_table(drop.element) 

7000 

7001 def _verify_index_table(self, index: Index) -> None: 

7002 if index.table is None: 

7003 raise exc.CompileError( 

7004 "Index '%s' is not associated with any table." % index.name 

7005 ) 

7006 

7007 def visit_create_index( 

7008 self, create, include_schema=False, include_table_schema=True, **kw 

7009 ): 

7010 index = create.element 

7011 self._verify_index_table(index) 

7012 preparer = self.preparer 

7013 text = "CREATE " 

7014 if index.unique: 

7015 text += "UNIQUE " 

7016 if index.name is None: 

7017 raise exc.CompileError( 

7018 "CREATE INDEX requires that the index have a name" 

7019 ) 

7020 

7021 text += "INDEX " 

7022 if create.if_not_exists: 

7023 text += "IF NOT EXISTS " 

7024 

7025 text += "%s ON %s (%s)" % ( 

7026 self._prepared_index_name(index, include_schema=include_schema), 

7027 preparer.format_table( 

7028 index.table, use_schema=include_table_schema 

7029 ), 

7030 ", ".join( 

7031 self.sql_compiler.process( 

7032 expr, include_table=False, literal_binds=True 

7033 ) 

7034 for expr in index.expressions 

7035 ), 

7036 ) 

7037 return text 

7038 

7039 def visit_drop_index(self, drop, **kw): 

7040 index = drop.element 

7041 

7042 if index.name is None: 

7043 raise exc.CompileError( 

7044 "DROP INDEX requires that the index have a name" 

7045 ) 

7046 text = "\nDROP INDEX " 

7047 if drop.if_exists: 

7048 text += "IF EXISTS " 

7049 

7050 return text + self._prepared_index_name(index, include_schema=True) 

7051 

7052 def _prepared_index_name( 

7053 self, index: Index, include_schema: bool = False 

7054 ) -> str: 

7055 if index.table is not None: 

7056 effective_schema = self.preparer.schema_for_object(index.table) 

7057 else: 

7058 effective_schema = None 

7059 if include_schema and effective_schema: 

7060 schema_name = self.preparer.quote_schema(effective_schema) 

7061 else: 

7062 schema_name = None 

7063 

7064 index_name: str = self.preparer.format_index(index) 

7065 

7066 if schema_name: 

7067 index_name = schema_name + "." + index_name 

7068 return index_name 

7069 

7070 def visit_add_constraint(self, create, **kw): 

7071 return "ALTER TABLE %s ADD %s" % ( 

7072 self.preparer.format_table(create.element.table), 

7073 self.process(create.element), 

7074 ) 

7075 

7076 def visit_set_table_comment(self, create, **kw): 

7077 return "COMMENT ON TABLE %s IS %s" % ( 

7078 self.preparer.format_table(create.element), 

7079 self.sql_compiler.render_literal_value( 

7080 create.element.comment, sqltypes.String() 

7081 ), 

7082 ) 

7083 

7084 def visit_drop_table_comment(self, drop, **kw): 

7085 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table( 

7086 drop.element 

7087 ) 

7088 

7089 def visit_set_column_comment(self, create, **kw): 

7090 return "COMMENT ON COLUMN %s IS %s" % ( 

7091 self.preparer.format_column( 

7092 create.element, use_table=True, use_schema=True 

7093 ), 

7094 self.sql_compiler.render_literal_value( 

7095 create.element.comment, sqltypes.String() 

7096 ), 

7097 ) 

7098 

7099 def visit_drop_column_comment(self, drop, **kw): 

7100 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column( 

7101 drop.element, use_table=True 

7102 ) 

7103 

7104 def visit_set_constraint_comment(self, create, **kw): 

7105 raise exc.UnsupportedCompilationError(self, type(create)) 

7106 

7107 def visit_drop_constraint_comment(self, drop, **kw): 

7108 raise exc.UnsupportedCompilationError(self, type(drop)) 

7109 

7110 def get_identity_options(self, identity_options): 

7111 text = [] 

7112 if identity_options.increment is not None: 

7113 text.append("INCREMENT BY %d" % identity_options.increment) 

7114 if identity_options.start is not None: 

7115 text.append("START WITH %d" % identity_options.start) 

7116 if identity_options.minvalue is not None: 

7117 text.append("MINVALUE %d" % identity_options.minvalue) 

7118 if identity_options.maxvalue is not None: 

7119 text.append("MAXVALUE %d" % identity_options.maxvalue) 

7120 if identity_options.nominvalue is not None: 

7121 text.append("NO MINVALUE") 

7122 if identity_options.nomaxvalue is not None: 

7123 text.append("NO MAXVALUE") 

7124 if identity_options.cache is not None: 

7125 text.append("CACHE %d" % identity_options.cache) 

7126 if identity_options.cycle is not None: 

7127 text.append("CYCLE" if identity_options.cycle else "NO CYCLE") 

7128 return " ".join(text) 

7129 

7130 def visit_create_sequence(self, create, prefix=None, **kw): 

7131 text = "CREATE SEQUENCE " 

7132 if create.if_not_exists: 

7133 text += "IF NOT EXISTS " 

7134 text += self.preparer.format_sequence(create.element) 

7135 

7136 if prefix: 

7137 text += prefix 

7138 options = self.get_identity_options(create.element) 

7139 if options: 

7140 text += " " + options 

7141 return text 

7142 

7143 def visit_drop_sequence(self, drop, **kw): 

7144 text = "DROP SEQUENCE " 

7145 if drop.if_exists: 

7146 text += "IF EXISTS " 

7147 return text + self.preparer.format_sequence(drop.element) 

7148 

7149 def visit_drop_constraint(self, drop, **kw): 

7150 constraint = drop.element 

7151 if constraint.name is not None: 

7152 formatted_name = self.preparer.format_constraint(constraint) 

7153 else: 

7154 formatted_name = None 

7155 

7156 if formatted_name is None: 

7157 raise exc.CompileError( 

7158 "Can't emit DROP CONSTRAINT for constraint %r; " 

7159 "it has no name" % drop.element 

7160 ) 

7161 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % ( 

7162 self.preparer.format_table(drop.element.table), 

7163 "IF EXISTS " if drop.if_exists else "", 

7164 formatted_name, 

7165 " CASCADE" if drop.cascade else "", 

7166 ) 

7167 

7168 def get_column_specification(self, column, **kwargs): 

7169 colspec = ( 

7170 self.preparer.format_column(column) 

7171 + " " 

7172 + self.dialect.type_compiler_instance.process( 

7173 column.type, type_expression=column 

7174 ) 

7175 ) 

7176 default = self.get_column_default_string(column) 

7177 if default is not None: 

7178 colspec += " DEFAULT " + default 

7179 

7180 if column.computed is not None: 

7181 colspec += " " + self.process(column.computed) 

7182 

7183 if ( 

7184 column.identity is not None 

7185 and self.dialect.supports_identity_columns 

7186 ): 

7187 colspec += " " + self.process(column.identity) 

7188 

7189 if not column.nullable and ( 

7190 not column.identity or not self.dialect.supports_identity_columns 

7191 ): 

7192 colspec += " NOT NULL" 

7193 return colspec 

7194 

7195 def create_table_suffix(self, table): 

7196 return "" 

7197 

7198 def post_create_table(self, table): 

7199 return "" 

7200 

7201 def get_column_default_string(self, column: Column[Any]) -> Optional[str]: 

7202 if isinstance(column.server_default, schema.DefaultClause): 

7203 return self.render_default_string(column.server_default.arg) 

7204 else: 

7205 return None 

7206 

7207 def render_default_string(self, default: Union[Visitable, str]) -> str: 

7208 if isinstance(default, str): 

7209 return self.sql_compiler.render_literal_value( 

7210 default, sqltypes.STRINGTYPE 

7211 ) 

7212 else: 

7213 return self.sql_compiler.process(default, literal_binds=True) 

7214 

7215 def visit_table_or_column_check_constraint(self, constraint, **kw): 

7216 if constraint.is_column_level: 

7217 return self.visit_column_check_constraint(constraint) 

7218 else: 

7219 return self.visit_check_constraint(constraint) 

7220 

7221 def visit_check_constraint(self, constraint, **kw): 

7222 text = "" 

7223 if constraint.name is not None: 

7224 formatted_name = self.preparer.format_constraint(constraint) 

7225 if formatted_name is not None: 

7226 text += "CONSTRAINT %s " % formatted_name 

7227 text += "CHECK (%s)" % self.sql_compiler.process( 

7228 constraint.sqltext, include_table=False, literal_binds=True 

7229 ) 

7230 text += self.define_constraint_deferrability(constraint) 

7231 return text 

7232 

7233 def visit_column_check_constraint(self, constraint, **kw): 

7234 text = "" 

7235 if constraint.name is not None: 

7236 formatted_name = self.preparer.format_constraint(constraint) 

7237 if formatted_name is not None: 

7238 text += "CONSTRAINT %s " % formatted_name 

7239 text += "CHECK (%s)" % self.sql_compiler.process( 

7240 constraint.sqltext, include_table=False, literal_binds=True 

7241 ) 

7242 text += self.define_constraint_deferrability(constraint) 

7243 return text 

7244 

7245 def visit_primary_key_constraint( 

7246 self, constraint: PrimaryKeyConstraint, **kw: Any 

7247 ) -> str: 

7248 if len(constraint) == 0: 

7249 return "" 

7250 text = "" 

7251 if constraint.name is not None: 

7252 formatted_name = self.preparer.format_constraint(constraint) 

7253 if formatted_name is not None: 

7254 text += "CONSTRAINT %s " % formatted_name 

7255 text += "PRIMARY KEY " 

7256 text += "(%s)" % ", ".join( 

7257 self.preparer.quote(c.name) 

7258 for c in ( 

7259 constraint.columns_autoinc_first 

7260 if constraint._implicit_generated 

7261 else constraint.columns 

7262 ) 

7263 ) 

7264 text += self.define_constraint_deferrability(constraint) 

7265 return text 

7266 

7267 def visit_foreign_key_constraint(self, constraint, **kw): 

7268 preparer = self.preparer 

7269 text = "" 

7270 if constraint.name is not None: 

7271 formatted_name = self.preparer.format_constraint(constraint) 

7272 if formatted_name is not None: 

7273 text += "CONSTRAINT %s " % formatted_name 

7274 remote_table = list(constraint.elements)[0].column.table 

7275 text += "FOREIGN KEY(%s) REFERENCES %s (%s)" % ( 

7276 ", ".join( 

7277 preparer.quote(f.parent.name) for f in constraint.elements 

7278 ), 

7279 self.define_constraint_remote_table( 

7280 constraint, remote_table, preparer 

7281 ), 

7282 ", ".join( 

7283 preparer.quote(f.column.name) for f in constraint.elements 

7284 ), 

7285 ) 

7286 text += self.define_constraint_match(constraint) 

7287 text += self.define_constraint_cascades(constraint) 

7288 text += self.define_constraint_deferrability(constraint) 

7289 return text 

7290 

7291 def define_constraint_remote_table(self, constraint, table, preparer): 

7292 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

7293 

7294 return preparer.format_table(table) 

7295 

7296 def visit_unique_constraint( 

7297 self, constraint: UniqueConstraint, **kw: Any 

7298 ) -> str: 

7299 if len(constraint) == 0: 

7300 return "" 

7301 text = "" 

7302 if constraint.name is not None: 

7303 formatted_name = self.preparer.format_constraint(constraint) 

7304 if formatted_name is not None: 

7305 text += "CONSTRAINT %s " % formatted_name 

7306 text += "UNIQUE %s(%s)" % ( 

7307 self.define_unique_constraint_distinct(constraint, **kw), 

7308 ", ".join(self.preparer.quote(c.name) for c in constraint), 

7309 ) 

7310 text += self.define_constraint_deferrability(constraint) 

7311 return text 

7312 

7313 def define_unique_constraint_distinct( 

7314 self, constraint: UniqueConstraint, **kw: Any 

7315 ) -> str: 

7316 return "" 

7317 

7318 def define_constraint_cascades( 

7319 self, constraint: ForeignKeyConstraint 

7320 ) -> str: 

7321 text = "" 

7322 if constraint.ondelete is not None: 

7323 text += self.define_constraint_ondelete_cascade(constraint) 

7324 

7325 if constraint.onupdate is not None: 

7326 text += self.define_constraint_onupdate_cascade(constraint) 

7327 return text 

7328 

7329 def define_constraint_ondelete_cascade( 

7330 self, constraint: ForeignKeyConstraint 

7331 ) -> str: 

7332 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

7333 constraint.ondelete, FK_ON_DELETE 

7334 ) 

7335 

7336 def define_constraint_onupdate_cascade( 

7337 self, constraint: ForeignKeyConstraint 

7338 ) -> str: 

7339 return " ON UPDATE %s" % self.preparer.validate_sql_phrase( 

7340 constraint.onupdate, FK_ON_UPDATE 

7341 ) 

7342 

7343 def define_constraint_deferrability(self, constraint: Constraint) -> str: 

7344 text = "" 

7345 if constraint.deferrable is not None: 

7346 if constraint.deferrable: 

7347 text += " DEFERRABLE" 

7348 else: 

7349 text += " NOT DEFERRABLE" 

7350 if constraint.initially is not None: 

7351 text += " INITIALLY %s" % self.preparer.validate_sql_phrase( 

7352 constraint.initially, FK_INITIALLY 

7353 ) 

7354 return text 

7355 

7356 def define_constraint_match(self, constraint): 

7357 text = "" 

7358 if constraint.match is not None: 

7359 text += " MATCH %s" % constraint.match 

7360 return text 

7361 

7362 def visit_computed_column(self, generated, **kw): 

7363 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( 

7364 generated.sqltext, include_table=False, literal_binds=True 

7365 ) 

7366 if generated.persisted is True: 

7367 text += " STORED" 

7368 elif generated.persisted is False: 

7369 text += " VIRTUAL" 

7370 return text 

7371 

7372 def visit_identity_column(self, identity, **kw): 

7373 text = "GENERATED %s AS IDENTITY" % ( 

7374 "ALWAYS" if identity.always else "BY DEFAULT", 

7375 ) 

7376 options = self.get_identity_options(identity) 

7377 if options: 

7378 text += " (%s)" % options 

7379 return text 

7380 

7381 

7382class GenericTypeCompiler(TypeCompiler): 

7383 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str: 

7384 return "FLOAT" 

7385 

7386 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str: 

7387 return "DOUBLE" 

7388 

7389 def visit_DOUBLE_PRECISION( 

7390 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any 

7391 ) -> str: 

7392 return "DOUBLE PRECISION" 

7393 

7394 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str: 

7395 return "REAL" 

7396 

7397 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str: 

7398 if type_.precision is None: 

7399 return "NUMERIC" 

7400 elif type_.scale is None: 

7401 return "NUMERIC(%(precision)s)" % {"precision": type_.precision} 

7402 else: 

7403 return "NUMERIC(%(precision)s, %(scale)s)" % { 

7404 "precision": type_.precision, 

7405 "scale": type_.scale, 

7406 } 

7407 

7408 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str: 

7409 if type_.precision is None: 

7410 return "DECIMAL" 

7411 elif type_.scale is None: 

7412 return "DECIMAL(%(precision)s)" % {"precision": type_.precision} 

7413 else: 

7414 return "DECIMAL(%(precision)s, %(scale)s)" % { 

7415 "precision": type_.precision, 

7416 "scale": type_.scale, 

7417 } 

7418 

7419 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str: 

7420 return "INTEGER" 

7421 

7422 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str: 

7423 return "SMALLINT" 

7424 

7425 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str: 

7426 return "BIGINT" 

7427 

7428 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str: 

7429 return "TIMESTAMP" 

7430 

7431 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str: 

7432 return "DATETIME" 

7433 

7434 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str: 

7435 return "DATE" 

7436 

7437 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str: 

7438 return "TIME" 

7439 

7440 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str: 

7441 return "CLOB" 

7442 

7443 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str: 

7444 return "NCLOB" 

7445 

7446 def _render_string_type( 

7447 self, name: str, length: Optional[int], collation: Optional[str] 

7448 ) -> str: 

7449 text = name 

7450 if length: 

7451 text += f"({length})" 

7452 if collation: 

7453 text += f' COLLATE "{collation}"' 

7454 return text 

7455 

7456 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str: 

7457 return self._render_string_type("CHAR", type_.length, type_.collation) 

7458 

7459 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str: 

7460 return self._render_string_type("NCHAR", type_.length, type_.collation) 

7461 

7462 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str: 

7463 return self._render_string_type( 

7464 "VARCHAR", type_.length, type_.collation 

7465 ) 

7466 

7467 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str: 

7468 return self._render_string_type( 

7469 "NVARCHAR", type_.length, type_.collation 

7470 ) 

7471 

7472 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str: 

7473 return self._render_string_type("TEXT", type_.length, type_.collation) 

7474 

7475 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str: 

7476 return "UUID" 

7477 

7478 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str: 

7479 return "BLOB" 

7480 

7481 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str: 

7482 return "BINARY" + (type_.length and "(%d)" % type_.length or "") 

7483 

7484 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str: 

7485 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "") 

7486 

7487 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str: 

7488 return "BOOLEAN" 

7489 

7490 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str: 

7491 if not type_.native_uuid or not self.dialect.supports_native_uuid: 

7492 return self._render_string_type("CHAR", length=32, collation=None) 

7493 else: 

7494 return self.visit_UUID(type_, **kw) 

7495 

7496 def visit_large_binary( 

7497 self, type_: sqltypes.LargeBinary, **kw: Any 

7498 ) -> str: 

7499 return self.visit_BLOB(type_, **kw) 

7500 

7501 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str: 

7502 return self.visit_BOOLEAN(type_, **kw) 

7503 

7504 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str: 

7505 return self.visit_TIME(type_, **kw) 

7506 

7507 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str: 

7508 return self.visit_DATETIME(type_, **kw) 

7509 

7510 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str: 

7511 return self.visit_DATE(type_, **kw) 

7512 

7513 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str: 

7514 return self.visit_BIGINT(type_, **kw) 

7515 

7516 def visit_small_integer( 

7517 self, type_: sqltypes.SmallInteger, **kw: Any 

7518 ) -> str: 

7519 return self.visit_SMALLINT(type_, **kw) 

7520 

7521 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str: 

7522 return self.visit_INTEGER(type_, **kw) 

7523 

7524 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str: 

7525 return self.visit_REAL(type_, **kw) 

7526 

7527 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str: 

7528 return self.visit_FLOAT(type_, **kw) 

7529 

7530 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str: 

7531 return self.visit_DOUBLE(type_, **kw) 

7532 

7533 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str: 

7534 return self.visit_NUMERIC(type_, **kw) 

7535 

7536 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str: 

7537 return self.visit_VARCHAR(type_, **kw) 

7538 

7539 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str: 

7540 return self.visit_VARCHAR(type_, **kw) 

7541 

7542 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str: 

7543 return self.visit_TEXT(type_, **kw) 

7544 

7545 def visit_unicode_text( 

7546 self, type_: sqltypes.UnicodeText, **kw: Any 

7547 ) -> str: 

7548 return self.visit_TEXT(type_, **kw) 

7549 

7550 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str: 

7551 return self.visit_VARCHAR(type_, **kw) 

7552 

7553 def visit_null(self, type_, **kw): 

7554 raise exc.CompileError( 

7555 "Can't generate DDL for %r; " 

7556 "did you forget to specify a " 

7557 "type on this Column?" % type_ 

7558 ) 

7559 

7560 def visit_type_decorator( 

7561 self, type_: TypeDecorator[Any], **kw: Any 

7562 ) -> str: 

7563 return self.process(type_.type_engine(self.dialect), **kw) 

7564 

7565 def visit_user_defined( 

7566 self, type_: UserDefinedType[Any], **kw: Any 

7567 ) -> str: 

7568 return type_.get_col_spec(**kw) 

7569 

7570 

7571class StrSQLTypeCompiler(GenericTypeCompiler): 

7572 def process(self, type_, **kw): 

7573 try: 

7574 _compiler_dispatch = type_._compiler_dispatch 

7575 except AttributeError: 

7576 return self._visit_unknown(type_, **kw) 

7577 else: 

7578 return _compiler_dispatch(self, **kw) 

7579 

7580 def __getattr__(self, key): 

7581 if key.startswith("visit_"): 

7582 return self._visit_unknown 

7583 else: 

7584 raise AttributeError(key) 

7585 

7586 def _visit_unknown(self, type_, **kw): 

7587 if type_.__class__.__name__ == type_.__class__.__name__.upper(): 

7588 return type_.__class__.__name__ 

7589 else: 

7590 return repr(type_) 

7591 

7592 def visit_null(self, type_, **kw): 

7593 return "NULL" 

7594 

7595 def visit_user_defined(self, type_, **kw): 

7596 try: 

7597 get_col_spec = type_.get_col_spec 

7598 except AttributeError: 

7599 return repr(type_) 

7600 else: 

7601 return get_col_spec(**kw) 

7602 

7603 

7604class _SchemaForObjectCallable(Protocol): 

7605 def __call__(self, obj: Any, /) -> str: ... 

7606 

7607 

7608class _BindNameForColProtocol(Protocol): 

7609 def __call__(self, col: ColumnClause[Any]) -> str: ... 

7610 

7611 

7612class IdentifierPreparer: 

7613 """Handle quoting and case-folding of identifiers based on options.""" 

7614 

7615 reserved_words = RESERVED_WORDS 

7616 

7617 legal_characters = LEGAL_CHARACTERS 

7618 

7619 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS 

7620 

7621 initial_quote: str 

7622 

7623 final_quote: str 

7624 

7625 _strings: MutableMapping[str, str] 

7626 

7627 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema") 

7628 """Return the .schema attribute for an object. 

7629 

7630 For the default IdentifierPreparer, the schema for an object is always 

7631 the value of the ".schema" attribute. if the preparer is replaced 

7632 with one that has a non-empty schema_translate_map, the value of the 

7633 ".schema" attribute is rendered a symbol that will be converted to a 

7634 real schema name from the mapping post-compile. 

7635 

7636 """ 

7637 

7638 _includes_none_schema_translate: bool = False 

7639 

7640 def __init__( 

7641 self, 

7642 dialect: Dialect, 

7643 initial_quote: str = '"', 

7644 final_quote: Optional[str] = None, 

7645 escape_quote: str = '"', 

7646 quote_case_sensitive_collations: bool = True, 

7647 omit_schema: bool = False, 

7648 ): 

7649 """Construct a new ``IdentifierPreparer`` object. 

7650 

7651 initial_quote 

7652 Character that begins a delimited identifier. 

7653 

7654 final_quote 

7655 Character that ends a delimited identifier. Defaults to 

7656 `initial_quote`. 

7657 

7658 omit_schema 

7659 Prevent prepending schema name. Useful for databases that do 

7660 not support schemae. 

7661 """ 

7662 

7663 self.dialect = dialect 

7664 self.initial_quote = initial_quote 

7665 self.final_quote = final_quote or self.initial_quote 

7666 self.escape_quote = escape_quote 

7667 self.escape_to_quote = self.escape_quote * 2 

7668 self.omit_schema = omit_schema 

7669 self.quote_case_sensitive_collations = quote_case_sensitive_collations 

7670 self._strings = {} 

7671 self._double_percents = self.dialect.paramstyle in ( 

7672 "format", 

7673 "pyformat", 

7674 ) 

7675 

7676 def _with_schema_translate(self, schema_translate_map): 

7677 prep = self.__class__.__new__(self.__class__) 

7678 prep.__dict__.update(self.__dict__) 

7679 

7680 includes_none = None in schema_translate_map 

7681 

7682 def symbol_getter(obj): 

7683 name = obj.schema 

7684 if obj._use_schema_map and (name is not None or includes_none): 

7685 if name is not None and ("[" in name or "]" in name): 

7686 raise exc.CompileError( 

7687 "Square bracket characters ([]) not supported " 

7688 "in schema translate name '%s'" % name 

7689 ) 

7690 return quoted_name( 

7691 "__[SCHEMA_%s]" % (name or "_none"), quote=False 

7692 ) 

7693 else: 

7694 return obj.schema 

7695 

7696 prep.schema_for_object = symbol_getter 

7697 prep._includes_none_schema_translate = includes_none 

7698 return prep 

7699 

7700 def _render_schema_translates( 

7701 self, statement: str, schema_translate_map: SchemaTranslateMapType 

7702 ) -> str: 

7703 d = schema_translate_map 

7704 if None in d: 

7705 if not self._includes_none_schema_translate: 

7706 raise exc.InvalidRequestError( 

7707 "schema translate map which previously did not have " 

7708 "`None` present as a key now has `None` present; compiled " 

7709 "statement may lack adequate placeholders. Please use " 

7710 "consistent keys in successive " 

7711 "schema_translate_map dictionaries." 

7712 ) 

7713 

7714 d["_none"] = d[None] # type: ignore[index] 

7715 

7716 def replace(m): 

7717 name = m.group(2) 

7718 if name in d: 

7719 effective_schema = d[name] 

7720 else: 

7721 if name in (None, "_none"): 

7722 raise exc.InvalidRequestError( 

7723 "schema translate map which previously had `None` " 

7724 "present as a key now no longer has it present; don't " 

7725 "know how to apply schema for compiled statement. " 

7726 "Please use consistent keys in successive " 

7727 "schema_translate_map dictionaries." 

7728 ) 

7729 effective_schema = name 

7730 

7731 if not effective_schema: 

7732 effective_schema = self.dialect.default_schema_name 

7733 if not effective_schema: 

7734 # TODO: no coverage here 

7735 raise exc.CompileError( 

7736 "Dialect has no default schema name; can't " 

7737 "use None as dynamic schema target." 

7738 ) 

7739 return self.quote_schema(effective_schema) 

7740 

7741 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement) 

7742 

7743 def _escape_identifier(self, value: str) -> str: 

7744 """Escape an identifier. 

7745 

7746 Subclasses should override this to provide database-dependent 

7747 escaping behavior. 

7748 """ 

7749 

7750 value = value.replace(self.escape_quote, self.escape_to_quote) 

7751 if self._double_percents: 

7752 value = value.replace("%", "%%") 

7753 return value 

7754 

7755 def _unescape_identifier(self, value: str) -> str: 

7756 """Canonicalize an escaped identifier. 

7757 

7758 Subclasses should override this to provide database-dependent 

7759 unescaping behavior that reverses _escape_identifier. 

7760 """ 

7761 

7762 return value.replace(self.escape_to_quote, self.escape_quote) 

7763 

7764 def validate_sql_phrase(self, element, reg): 

7765 """keyword sequence filter. 

7766 

7767 a filter for elements that are intended to represent keyword sequences, 

7768 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters 

7769 should be present. 

7770 

7771 """ 

7772 

7773 if element is not None and not reg.match(element): 

7774 raise exc.CompileError( 

7775 "Unexpected SQL phrase: %r (matching against %r)" 

7776 % (element, reg.pattern) 

7777 ) 

7778 return element 

7779 

7780 def quote_identifier(self, value: str) -> str: 

7781 """Quote an identifier. 

7782 

7783 Subclasses should override this to provide database-dependent 

7784 quoting behavior. 

7785 """ 

7786 

7787 return ( 

7788 self.initial_quote 

7789 + self._escape_identifier(value) 

7790 + self.final_quote 

7791 ) 

7792 

7793 def _requires_quotes(self, value: str) -> bool: 

7794 """Return True if the given identifier requires quoting.""" 

7795 lc_value = value.lower() 

7796 return ( 

7797 lc_value in self.reserved_words 

7798 or value[0] in self.illegal_initial_characters 

7799 or not self.legal_characters.match(str(value)) 

7800 or (lc_value != value) 

7801 ) 

7802 

7803 def _requires_quotes_illegal_chars(self, value): 

7804 """Return True if the given identifier requires quoting, but 

7805 not taking case convention into account.""" 

7806 return not self.legal_characters.match(str(value)) 

7807 

7808 def quote_schema(self, schema: str) -> str: 

7809 """Conditionally quote a schema name. 

7810 

7811 

7812 The name is quoted if it is a reserved word, contains quote-necessary 

7813 characters, or is an instance of :class:`.quoted_name` which includes 

7814 ``quote`` set to ``True``. 

7815 

7816 Subclasses can override this to provide database-dependent 

7817 quoting behavior for schema names. 

7818 

7819 :param schema: string schema name 

7820 """ 

7821 return self.quote(schema) 

7822 

7823 def quote(self, ident: str) -> str: 

7824 """Conditionally quote an identifier. 

7825 

7826 The identifier is quoted if it is a reserved word, contains 

7827 quote-necessary characters, or is an instance of 

7828 :class:`.quoted_name` which includes ``quote`` set to ``True``. 

7829 

7830 Subclasses can override this to provide database-dependent 

7831 quoting behavior for identifier names. 

7832 

7833 :param ident: string identifier 

7834 """ 

7835 force = getattr(ident, "quote", None) 

7836 

7837 if force is None: 

7838 if ident in self._strings: 

7839 return self._strings[ident] 

7840 else: 

7841 if self._requires_quotes(ident): 

7842 self._strings[ident] = self.quote_identifier(ident) 

7843 else: 

7844 self._strings[ident] = ident 

7845 return self._strings[ident] 

7846 elif force: 

7847 return self.quote_identifier(ident) 

7848 else: 

7849 return ident 

7850 

7851 def format_collation(self, collation_name): 

7852 if self.quote_case_sensitive_collations: 

7853 return self.quote(collation_name) 

7854 else: 

7855 return collation_name 

7856 

7857 def format_sequence( 

7858 self, sequence: schema.Sequence, use_schema: bool = True 

7859 ) -> str: 

7860 name = self.quote(sequence.name) 

7861 

7862 effective_schema = self.schema_for_object(sequence) 

7863 

7864 if ( 

7865 not self.omit_schema 

7866 and use_schema 

7867 and effective_schema is not None 

7868 ): 

7869 name = self.quote_schema(effective_schema) + "." + name 

7870 return name 

7871 

7872 def format_label( 

7873 self, label: Label[Any], name: Optional[str] = None 

7874 ) -> str: 

7875 return self.quote(name or label.name) 

7876 

7877 def format_alias( 

7878 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None 

7879 ) -> str: 

7880 if name is None: 

7881 assert alias is not None 

7882 return self.quote(alias.name) 

7883 else: 

7884 return self.quote(name) 

7885 

7886 def format_savepoint(self, savepoint, name=None): 

7887 # Running the savepoint name through quoting is unnecessary 

7888 # for all known dialects. This is here to support potential 

7889 # third party use cases 

7890 ident = name or savepoint.ident 

7891 if self._requires_quotes(ident): 

7892 ident = self.quote_identifier(ident) 

7893 return ident 

7894 

7895 @util.preload_module("sqlalchemy.sql.naming") 

7896 def format_constraint( 

7897 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True 

7898 ) -> Optional[str]: 

7899 naming = util.preloaded.sql_naming 

7900 

7901 if constraint.name is _NONE_NAME: 

7902 name = naming._constraint_name_for_table( 

7903 constraint, constraint.table 

7904 ) 

7905 

7906 if name is None: 

7907 return None 

7908 else: 

7909 name = constraint.name 

7910 

7911 assert name is not None 

7912 if constraint.__visit_name__ == "index": 

7913 return self.truncate_and_render_index_name( 

7914 name, _alembic_quote=_alembic_quote 

7915 ) 

7916 else: 

7917 return self.truncate_and_render_constraint_name( 

7918 name, _alembic_quote=_alembic_quote 

7919 ) 

7920 

7921 def truncate_and_render_index_name( 

7922 self, name: str, _alembic_quote: bool = True 

7923 ) -> str: 

7924 # calculate these at format time so that ad-hoc changes 

7925 # to dialect.max_identifier_length etc. can be reflected 

7926 # as IdentifierPreparer is long lived 

7927 max_ = ( 

7928 self.dialect.max_index_name_length 

7929 or self.dialect.max_identifier_length 

7930 ) 

7931 return self._truncate_and_render_maxlen_name( 

7932 name, max_, _alembic_quote 

7933 ) 

7934 

7935 def truncate_and_render_constraint_name( 

7936 self, name: str, _alembic_quote: bool = True 

7937 ) -> str: 

7938 # calculate these at format time so that ad-hoc changes 

7939 # to dialect.max_identifier_length etc. can be reflected 

7940 # as IdentifierPreparer is long lived 

7941 max_ = ( 

7942 self.dialect.max_constraint_name_length 

7943 or self.dialect.max_identifier_length 

7944 ) 

7945 return self._truncate_and_render_maxlen_name( 

7946 name, max_, _alembic_quote 

7947 ) 

7948 

7949 def _truncate_and_render_maxlen_name( 

7950 self, name: str, max_: int, _alembic_quote: bool 

7951 ) -> str: 

7952 if isinstance(name, elements._truncated_label): 

7953 if len(name) > max_: 

7954 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:] 

7955 else: 

7956 self.dialect.validate_identifier(name) 

7957 

7958 if not _alembic_quote: 

7959 return name 

7960 else: 

7961 return self.quote(name) 

7962 

7963 def format_index(self, index: Index) -> str: 

7964 name = self.format_constraint(index) 

7965 assert name is not None 

7966 return name 

7967 

7968 def format_table( 

7969 self, 

7970 table: FromClause, 

7971 use_schema: bool = True, 

7972 name: Optional[str] = None, 

7973 ) -> str: 

7974 """Prepare a quoted table and schema name.""" 

7975 if name is None: 

7976 if TYPE_CHECKING: 

7977 assert isinstance(table, NamedFromClause) 

7978 name = table.name 

7979 

7980 result = self.quote(name) 

7981 

7982 effective_schema = self.schema_for_object(table) 

7983 

7984 if not self.omit_schema and use_schema and effective_schema: 

7985 result = self.quote_schema(effective_schema) + "." + result 

7986 return result 

7987 

7988 def format_schema(self, name): 

7989 """Prepare a quoted schema name.""" 

7990 

7991 return self.quote(name) 

7992 

7993 def format_label_name( 

7994 self, 

7995 name, 

7996 anon_map=None, 

7997 ): 

7998 """Prepare a quoted column name.""" 

7999 

8000 if anon_map is not None and isinstance( 

8001 name, elements._truncated_label 

8002 ): 

8003 name = name.apply_map(anon_map) 

8004 

8005 return self.quote(name) 

8006 

8007 def format_column( 

8008 self, 

8009 column: ColumnElement[Any], 

8010 use_table: bool = False, 

8011 name: Optional[str] = None, 

8012 table_name: Optional[str] = None, 

8013 use_schema: bool = False, 

8014 anon_map: Optional[Mapping[str, Any]] = None, 

8015 ) -> str: 

8016 """Prepare a quoted column name.""" 

8017 

8018 if name is None: 

8019 name = column.name 

8020 assert name is not None 

8021 

8022 if anon_map is not None and isinstance( 

8023 name, elements._truncated_label 

8024 ): 

8025 name = name.apply_map(anon_map) 

8026 

8027 if not getattr(column, "is_literal", False): 

8028 if use_table: 

8029 return ( 

8030 self.format_table( 

8031 column.table, use_schema=use_schema, name=table_name 

8032 ) 

8033 + "." 

8034 + self.quote(name) 

8035 ) 

8036 else: 

8037 return self.quote(name) 

8038 else: 

8039 # literal textual elements get stuck into ColumnClause a lot, 

8040 # which shouldn't get quoted 

8041 

8042 if use_table: 

8043 return ( 

8044 self.format_table( 

8045 column.table, use_schema=use_schema, name=table_name 

8046 ) 

8047 + "." 

8048 + name 

8049 ) 

8050 else: 

8051 return name 

8052 

8053 def format_table_seq(self, table, use_schema=True): 

8054 """Format table name and schema as a tuple.""" 

8055 

8056 # Dialects with more levels in their fully qualified references 

8057 # ('database', 'owner', etc.) could override this and return 

8058 # a longer sequence. 

8059 

8060 effective_schema = self.schema_for_object(table) 

8061 

8062 if not self.omit_schema and use_schema and effective_schema: 

8063 return ( 

8064 self.quote_schema(effective_schema), 

8065 self.format_table(table, use_schema=False), 

8066 ) 

8067 else: 

8068 return (self.format_table(table, use_schema=False),) 

8069 

8070 @util.memoized_property 

8071 def _r_identifiers(self): 

8072 initial, final, escaped_final = ( 

8073 re.escape(s) 

8074 for s in ( 

8075 self.initial_quote, 

8076 self.final_quote, 

8077 self._escape_identifier(self.final_quote), 

8078 ) 

8079 ) 

8080 r = re.compile( 

8081 r"(?:" 

8082 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s" 

8083 r"|([^\.]+))(?=\.|$))+" 

8084 % {"initial": initial, "final": final, "escaped": escaped_final} 

8085 ) 

8086 return r 

8087 

8088 def unformat_identifiers(self, identifiers: str) -> Sequence[str]: 

8089 """Unpack 'schema.table.column'-like strings into components.""" 

8090 

8091 r = self._r_identifiers 

8092 return [ 

8093 self._unescape_identifier(i) 

8094 for i in [a or b for a, b in r.findall(identifiers)] 

8095 ]