Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/compiler.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

3064 statements  

1# sql/compiler.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Base SQL and DDL compiler implementations. 

10 

11Classes provided include: 

12 

13:class:`.compiler.SQLCompiler` - renders SQL 

14strings 

15 

16:class:`.compiler.DDLCompiler` - renders DDL 

17(data definition language) strings 

18 

19:class:`.compiler.GenericTypeCompiler` - renders 

20type specification strings. 

21 

22To generate user-defined SQL strings, see 

23:doc:`/ext/compiler`. 

24 

25""" 

26 

27from __future__ import annotations 

28 

29import collections 

30import collections.abc as collections_abc 

31import contextlib 

32from enum import IntEnum 

33import functools 

34import itertools 

35import operator 

36import re 

37from time import perf_counter 

38import typing 

39from typing import Any 

40from typing import Callable 

41from typing import cast 

42from typing import ClassVar 

43from typing import Dict 

44from typing import FrozenSet 

45from typing import Iterable 

46from typing import Iterator 

47from typing import List 

48from typing import Mapping 

49from typing import MutableMapping 

50from typing import NamedTuple 

51from typing import NoReturn 

52from typing import Optional 

53from typing import Pattern 

54from typing import Sequence 

55from typing import Set 

56from typing import Tuple 

57from typing import Type 

58from typing import TYPE_CHECKING 

59from typing import Union 

60 

61from . import base 

62from . import coercions 

63from . import crud 

64from . import elements 

65from . import functions 

66from . import operators 

67from . import roles 

68from . import schema 

69from . import selectable 

70from . import sqltypes 

71from . import util as sql_util 

72from ._typing import is_column_element 

73from ._typing import is_dml 

74from .base import _de_clone 

75from .base import _from_objects 

76from .base import _NONE_NAME 

77from .base import _SentinelDefaultCharacterization 

78from .base import NO_ARG 

79from .elements import quoted_name 

80from .sqltypes import TupleType 

81from .visitors import prefix_anon_map 

82from .. import exc 

83from .. import util 

84from ..util import FastIntFlag 

85from ..util.typing import Literal 

86from ..util.typing import Protocol 

87from ..util.typing import Self 

88from ..util.typing import TypedDict 

89 

90if typing.TYPE_CHECKING: 

91 from .annotation import _AnnotationDict 

92 from .base import _AmbiguousTableNameMap 

93 from .base import CompileState 

94 from .base import Executable 

95 from .cache_key import CacheKey 

96 from .ddl import ExecutableDDLElement 

97 from .dml import Insert 

98 from .dml import Update 

99 from .dml import UpdateBase 

100 from .dml import UpdateDMLState 

101 from .dml import ValuesBase 

102 from .elements import _truncated_label 

103 from .elements import BinaryExpression 

104 from .elements import BindParameter 

105 from .elements import ClauseElement 

106 from .elements import ColumnClause 

107 from .elements import ColumnElement 

108 from .elements import False_ 

109 from .elements import Label 

110 from .elements import Null 

111 from .elements import True_ 

112 from .functions import Function 

113 from .schema import CheckConstraint 

114 from .schema import Column 

115 from .schema import Constraint 

116 from .schema import ForeignKeyConstraint 

117 from .schema import IdentityOptions 

118 from .schema import Index 

119 from .schema import PrimaryKeyConstraint 

120 from .schema import Table 

121 from .schema import UniqueConstraint 

122 from .selectable import _ColumnsClauseElement 

123 from .selectable import AliasedReturnsRows 

124 from .selectable import CompoundSelectState 

125 from .selectable import CTE 

126 from .selectable import FromClause 

127 from .selectable import NamedFromClause 

128 from .selectable import ReturnsRows 

129 from .selectable import Select 

130 from .selectable import SelectState 

131 from .type_api import _BindProcessorType 

132 from .type_api import TypeDecorator 

133 from .type_api import TypeEngine 

134 from .type_api import UserDefinedType 

135 from .visitors import Visitable 

136 from ..engine.cursor import CursorResultMetaData 

137 from ..engine.interfaces import _CoreSingleExecuteParams 

138 from ..engine.interfaces import _DBAPIAnyExecuteParams 

139 from ..engine.interfaces import _DBAPIMultiExecuteParams 

140 from ..engine.interfaces import _DBAPISingleExecuteParams 

141 from ..engine.interfaces import _ExecuteOptions 

142 from ..engine.interfaces import _GenericSetInputSizesType 

143 from ..engine.interfaces import _MutableCoreSingleExecuteParams 

144 from ..engine.interfaces import Dialect 

145 from ..engine.interfaces import SchemaTranslateMapType 

146 

147 

148_FromHintsType = Dict["FromClause", str] 

149 

150RESERVED_WORDS = { 

151 "all", 

152 "analyse", 

153 "analyze", 

154 "and", 

155 "any", 

156 "array", 

157 "as", 

158 "asc", 

159 "asymmetric", 

160 "authorization", 

161 "between", 

162 "binary", 

163 "both", 

164 "case", 

165 "cast", 

166 "check", 

167 "collate", 

168 "column", 

169 "constraint", 

170 "create", 

171 "cross", 

172 "current_date", 

173 "current_role", 

174 "current_time", 

175 "current_timestamp", 

176 "current_user", 

177 "default", 

178 "deferrable", 

179 "desc", 

180 "distinct", 

181 "do", 

182 "else", 

183 "end", 

184 "except", 

185 "false", 

186 "for", 

187 "foreign", 

188 "freeze", 

189 "from", 

190 "full", 

191 "grant", 

192 "group", 

193 "having", 

194 "ilike", 

195 "in", 

196 "initially", 

197 "inner", 

198 "intersect", 

199 "into", 

200 "is", 

201 "isnull", 

202 "join", 

203 "leading", 

204 "left", 

205 "like", 

206 "limit", 

207 "localtime", 

208 "localtimestamp", 

209 "natural", 

210 "new", 

211 "not", 

212 "notnull", 

213 "null", 

214 "off", 

215 "offset", 

216 "old", 

217 "on", 

218 "only", 

219 "or", 

220 "order", 

221 "outer", 

222 "overlaps", 

223 "placing", 

224 "primary", 

225 "references", 

226 "right", 

227 "select", 

228 "session_user", 

229 "set", 

230 "similar", 

231 "some", 

232 "symmetric", 

233 "table", 

234 "then", 

235 "to", 

236 "trailing", 

237 "true", 

238 "union", 

239 "unique", 

240 "user", 

241 "using", 

242 "verbose", 

243 "when", 

244 "where", 

245} 

246 

247LEGAL_CHARACTERS = re.compile(r"^[A-Z0-9_$]+$", re.I) 

248LEGAL_CHARACTERS_PLUS_SPACE = re.compile(r"^[A-Z0-9_ $]+$", re.I) 

249ILLEGAL_INITIAL_CHARACTERS = {str(x) for x in range(0, 10)}.union(["$"]) 

250 

251FK_ON_DELETE = re.compile( 

252 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I 

253) 

254FK_ON_UPDATE = re.compile( 

255 r"^(?:RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT)$", re.I 

256) 

257FK_INITIALLY = re.compile(r"^(?:DEFERRED|IMMEDIATE)$", re.I) 

258BIND_PARAMS = re.compile(r"(?<![:\w\$\x5c]):([\w\$]+)(?![:\w\$])", re.UNICODE) 

259BIND_PARAMS_ESC = re.compile(r"\x5c(:[\w\$]*)(?![:\w\$])", re.UNICODE) 

260 

261_pyformat_template = "%%(%(name)s)s" 

262BIND_TEMPLATES = { 

263 "pyformat": _pyformat_template, 

264 "qmark": "?", 

265 "format": "%%s", 

266 "numeric": ":[_POSITION]", 

267 "numeric_dollar": "$[_POSITION]", 

268 "named": ":%(name)s", 

269} 

270 

271 

272OPERATORS = { 

273 # binary 

274 operators.and_: " AND ", 

275 operators.or_: " OR ", 

276 operators.add: " + ", 

277 operators.mul: " * ", 

278 operators.sub: " - ", 

279 operators.mod: " % ", 

280 operators.neg: "-", 

281 operators.lt: " < ", 

282 operators.le: " <= ", 

283 operators.ne: " != ", 

284 operators.gt: " > ", 

285 operators.ge: " >= ", 

286 operators.eq: " = ", 

287 operators.is_distinct_from: " IS DISTINCT FROM ", 

288 operators.is_not_distinct_from: " IS NOT DISTINCT FROM ", 

289 operators.concat_op: " || ", 

290 operators.match_op: " MATCH ", 

291 operators.not_match_op: " NOT MATCH ", 

292 operators.in_op: " IN ", 

293 operators.not_in_op: " NOT IN ", 

294 operators.comma_op: ", ", 

295 operators.from_: " FROM ", 

296 operators.as_: " AS ", 

297 operators.is_: " IS ", 

298 operators.is_not: " IS NOT ", 

299 operators.collate: " COLLATE ", 

300 # unary 

301 operators.exists: "EXISTS ", 

302 operators.distinct_op: "DISTINCT ", 

303 operators.inv: "NOT ", 

304 operators.any_op: "ANY ", 

305 operators.all_op: "ALL ", 

306 # modifiers 

307 operators.desc_op: " DESC", 

308 operators.asc_op: " ASC", 

309 operators.nulls_first_op: " NULLS FIRST", 

310 operators.nulls_last_op: " NULLS LAST", 

311 # bitwise 

312 operators.bitwise_xor_op: " ^ ", 

313 operators.bitwise_or_op: " | ", 

314 operators.bitwise_and_op: " & ", 

315 operators.bitwise_not_op: "~", 

316 operators.bitwise_lshift_op: " << ", 

317 operators.bitwise_rshift_op: " >> ", 

318} 

319 

320FUNCTIONS: Dict[Type[Function[Any]], str] = { 

321 functions.coalesce: "coalesce", 

322 functions.current_date: "CURRENT_DATE", 

323 functions.current_time: "CURRENT_TIME", 

324 functions.current_timestamp: "CURRENT_TIMESTAMP", 

325 functions.current_user: "CURRENT_USER", 

326 functions.localtime: "LOCALTIME", 

327 functions.localtimestamp: "LOCALTIMESTAMP", 

328 functions.random: "random", 

329 functions.sysdate: "sysdate", 

330 functions.session_user: "SESSION_USER", 

331 functions.user: "USER", 

332 functions.cube: "CUBE", 

333 functions.rollup: "ROLLUP", 

334 functions.grouping_sets: "GROUPING SETS", 

335} 

336 

337 

338EXTRACT_MAP = { 

339 "month": "month", 

340 "day": "day", 

341 "year": "year", 

342 "second": "second", 

343 "hour": "hour", 

344 "doy": "doy", 

345 "minute": "minute", 

346 "quarter": "quarter", 

347 "dow": "dow", 

348 "week": "week", 

349 "epoch": "epoch", 

350 "milliseconds": "milliseconds", 

351 "microseconds": "microseconds", 

352 "timezone_hour": "timezone_hour", 

353 "timezone_minute": "timezone_minute", 

354} 

355 

356COMPOUND_KEYWORDS = { 

357 selectable._CompoundSelectKeyword.UNION: "UNION", 

358 selectable._CompoundSelectKeyword.UNION_ALL: "UNION ALL", 

359 selectable._CompoundSelectKeyword.EXCEPT: "EXCEPT", 

360 selectable._CompoundSelectKeyword.EXCEPT_ALL: "EXCEPT ALL", 

361 selectable._CompoundSelectKeyword.INTERSECT: "INTERSECT", 

362 selectable._CompoundSelectKeyword.INTERSECT_ALL: "INTERSECT ALL", 

363} 

364 

365 

366class ResultColumnsEntry(NamedTuple): 

367 """Tracks a column expression that is expected to be represented 

368 in the result rows for this statement. 

369 

370 This normally refers to the columns clause of a SELECT statement 

371 but may also refer to a RETURNING clause, as well as for dialect-specific 

372 emulations. 

373 

374 """ 

375 

376 keyname: str 

377 """string name that's expected in cursor.description""" 

378 

379 name: str 

380 """column name, may be labeled""" 

381 

382 objects: Tuple[Any, ...] 

383 """sequence of objects that should be able to locate this column 

384 in a RowMapping. This is typically string names and aliases 

385 as well as Column objects. 

386 

387 """ 

388 

389 type: TypeEngine[Any] 

390 """Datatype to be associated with this column. This is where 

391 the "result processing" logic directly links the compiled statement 

392 to the rows that come back from the cursor. 

393 

394 """ 

395 

396 

397class _ResultMapAppender(Protocol): 

398 def __call__( 

399 self, 

400 keyname: str, 

401 name: str, 

402 objects: Sequence[Any], 

403 type_: TypeEngine[Any], 

404 ) -> None: ... 

405 

406 

407# integer indexes into ResultColumnsEntry used by cursor.py. 

408# some profiling showed integer access faster than named tuple 

409RM_RENDERED_NAME: Literal[0] = 0 

410RM_NAME: Literal[1] = 1 

411RM_OBJECTS: Literal[2] = 2 

412RM_TYPE: Literal[3] = 3 

413 

414 

415class _BaseCompilerStackEntry(TypedDict): 

416 asfrom_froms: Set[FromClause] 

417 correlate_froms: Set[FromClause] 

418 selectable: ReturnsRows 

419 

420 

421class _CompilerStackEntry(_BaseCompilerStackEntry, total=False): 

422 compile_state: CompileState 

423 need_result_map_for_nested: bool 

424 need_result_map_for_compound: bool 

425 select_0: ReturnsRows 

426 insert_from_select: Select[Any] 

427 

428 

429class ExpandedState(NamedTuple): 

430 """represents state to use when producing "expanded" and 

431 "post compile" bound parameters for a statement. 

432 

433 "expanded" parameters are parameters that are generated at 

434 statement execution time to suit a number of parameters passed, the most 

435 prominent example being the individual elements inside of an IN expression. 

436 

437 "post compile" parameters are parameters where the SQL literal value 

438 will be rendered into the SQL statement at execution time, rather than 

439 being passed as separate parameters to the driver. 

440 

441 To create an :class:`.ExpandedState` instance, use the 

442 :meth:`.SQLCompiler.construct_expanded_state` method on any 

443 :class:`.SQLCompiler` instance. 

444 

445 """ 

446 

447 statement: str 

448 """String SQL statement with parameters fully expanded""" 

449 

450 parameters: _CoreSingleExecuteParams 

451 """Parameter dictionary with parameters fully expanded. 

452 

453 For a statement that uses named parameters, this dictionary will map 

454 exactly to the names in the statement. For a statement that uses 

455 positional parameters, the :attr:`.ExpandedState.positional_parameters` 

456 will yield a tuple with the positional parameter set. 

457 

458 """ 

459 

460 processors: Mapping[str, _BindProcessorType[Any]] 

461 """mapping of bound value processors""" 

462 

463 positiontup: Optional[Sequence[str]] 

464 """Sequence of string names indicating the order of positional 

465 parameters""" 

466 

467 parameter_expansion: Mapping[str, List[str]] 

468 """Mapping representing the intermediary link from original parameter 

469 name to list of "expanded" parameter names, for those parameters that 

470 were expanded.""" 

471 

472 @property 

473 def positional_parameters(self) -> Tuple[Any, ...]: 

474 """Tuple of positional parameters, for statements that were compiled 

475 using a positional paramstyle. 

476 

477 """ 

478 if self.positiontup is None: 

479 raise exc.InvalidRequestError( 

480 "statement does not use a positional paramstyle" 

481 ) 

482 return tuple(self.parameters[key] for key in self.positiontup) 

483 

484 @property 

485 def additional_parameters(self) -> _CoreSingleExecuteParams: 

486 """synonym for :attr:`.ExpandedState.parameters`.""" 

487 return self.parameters 

488 

489 

490class _InsertManyValues(NamedTuple): 

491 """represents state to use for executing an "insertmanyvalues" statement. 

492 

493 The primary consumers of this object are the 

494 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and 

495 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods. 

496 

497 .. versionadded:: 2.0 

498 

499 """ 

500 

501 is_default_expr: bool 

502 """if True, the statement is of the form 

503 ``INSERT INTO TABLE DEFAULT VALUES``, and can't be rewritten as a "batch" 

504 

505 """ 

506 

507 single_values_expr: str 

508 """The rendered "values" clause of the INSERT statement. 

509 

510 This is typically the parenthesized section e.g. "(?, ?, ?)" or similar. 

511 The insertmanyvalues logic uses this string as a search and replace 

512 target. 

513 

514 """ 

515 

516 insert_crud_params: List[crud._CrudParamElementStr] 

517 """List of Column / bind names etc. used while rewriting the statement""" 

518 

519 num_positional_params_counted: int 

520 """the number of bound parameters in a single-row statement. 

521 

522 This count may be larger or smaller than the actual number of columns 

523 targeted in the INSERT, as it accommodates for SQL expressions 

524 in the values list that may have zero or more parameters embedded 

525 within them. 

526 

527 This count is part of what's used to organize rewritten parameter lists 

528 when batching. 

529 

530 """ 

531 

532 sort_by_parameter_order: bool = False 

533 """if the deterministic_returnined_order parameter were used on the 

534 insert. 

535 

536 All of the attributes following this will only be used if this is True. 

537 

538 """ 

539 

540 includes_upsert_behaviors: bool = False 

541 """if True, we have to accommodate for upsert behaviors. 

542 

543 This will in some cases downgrade "insertmanyvalues" that requests 

544 deterministic ordering. 

545 

546 """ 

547 

548 sentinel_columns: Optional[Sequence[Column[Any]]] = None 

549 """List of sentinel columns that were located. 

550 

551 This list is only here if the INSERT asked for 

552 sort_by_parameter_order=True, 

553 and dialect-appropriate sentinel columns were located. 

554 

555 .. versionadded:: 2.0.10 

556 

557 """ 

558 

559 num_sentinel_columns: int = 0 

560 """how many sentinel columns are in the above list, if any. 

561 

562 This is the same as 

563 ``len(sentinel_columns) if sentinel_columns is not None else 0`` 

564 

565 """ 

566 

567 sentinel_param_keys: Optional[Sequence[str]] = None 

568 """parameter str keys in each param dictionary / tuple 

569 that would link to the client side "sentinel" values for that row, which 

570 we can use to match up parameter sets to result rows. 

571 

572 This is only present if sentinel_columns is present and the INSERT 

573 statement actually refers to client side values for these sentinel 

574 columns. 

575 

576 .. versionadded:: 2.0.10 

577 

578 .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys 

579 only, used against the "compiled parameteters" collection before 

580 the parameters were converted by bound parameter processors 

581 

582 """ 

583 

584 implicit_sentinel: bool = False 

585 """if True, we have exactly one sentinel column and it uses a server side 

586 value, currently has to generate an incrementing integer value. 

587 

588 The dialect in question would have asserted that it supports receiving 

589 these values back and sorting on that value as a means of guaranteeing 

590 correlation with the incoming parameter list. 

591 

592 .. versionadded:: 2.0.10 

593 

594 """ 

595 

596 has_upsert_bound_parameters: bool = False 

597 """if True, the upsert SET clause contains bound parameters that will 

598 receive their values from the parameters dict (i.e., parametrized 

599 bindparams where value is None and callable is None). 

600 

601 This means we can't batch multiple rows in a single statement, since 

602 each row would need different values in the SET clause but there's only 

603 one SET clause per statement. See issue #13130. 

604 

605 .. versionadded:: 2.0.37 

606 

607 """ 

608 

609 embed_values_counter: bool = False 

610 """Whether to embed an incrementing integer counter in each parameter 

611 set within the VALUES clause as parameters are batched over. 

612 

613 This is only used for a specific INSERT..SELECT..VALUES..RETURNING syntax 

614 where a subquery is used to produce value tuples. Current support 

615 includes PostgreSQL, Microsoft SQL Server. 

616 

617 .. versionadded:: 2.0.10 

618 

619 """ 

620 

621 

622class _InsertManyValuesBatch(NamedTuple): 

623 """represents an individual batch SQL statement for insertmanyvalues. 

624 

625 This is passed through the 

626 :meth:`.SQLCompiler._deliver_insertmanyvalues_batches` and 

627 :meth:`.DefaultDialect._deliver_insertmanyvalues_batches` methods out 

628 to the :class:`.Connection` within the 

629 :meth:`.Connection._exec_insertmany_context` method. 

630 

631 .. versionadded:: 2.0.10 

632 

633 """ 

634 

635 replaced_statement: str 

636 replaced_parameters: _DBAPIAnyExecuteParams 

637 processed_setinputsizes: Optional[_GenericSetInputSizesType] 

638 batch: Sequence[_DBAPISingleExecuteParams] 

639 sentinel_values: Sequence[Tuple[Any, ...]] 

640 current_batch_size: int 

641 batchnum: int 

642 total_batches: int 

643 rows_sorted: bool 

644 is_downgraded: bool 

645 

646 

647class InsertmanyvaluesSentinelOpts(FastIntFlag): 

648 """bitflag enum indicating styles of PK defaults 

649 which can work as implicit sentinel columns 

650 

651 """ 

652 

653 NOT_SUPPORTED = 1 

654 AUTOINCREMENT = 2 

655 IDENTITY = 4 

656 SEQUENCE = 8 

657 

658 ANY_AUTOINCREMENT = AUTOINCREMENT | IDENTITY | SEQUENCE 

659 _SUPPORTED_OR_NOT = NOT_SUPPORTED | ANY_AUTOINCREMENT 

660 

661 USE_INSERT_FROM_SELECT = 16 

662 RENDER_SELECT_COL_CASTS = 64 

663 

664 

665class CompilerState(IntEnum): 

666 COMPILING = 0 

667 """statement is present, compilation phase in progress""" 

668 

669 STRING_APPLIED = 1 

670 """statement is present, string form of the statement has been applied. 

671 

672 Additional processors by subclasses may still be pending. 

673 

674 """ 

675 

676 NO_STATEMENT = 2 

677 """compiler does not have a statement to compile, is used 

678 for method access""" 

679 

680 

681class Linting(IntEnum): 

682 """represent preferences for the 'SQL linting' feature. 

683 

684 this feature currently includes support for flagging cartesian products 

685 in SQL statements. 

686 

687 """ 

688 

689 NO_LINTING = 0 

690 "Disable all linting." 

691 

692 COLLECT_CARTESIAN_PRODUCTS = 1 

693 """Collect data on FROMs and cartesian products and gather into 

694 'self.from_linter'""" 

695 

696 WARN_LINTING = 2 

697 "Emit warnings for linters that find problems" 

698 

699 FROM_LINTING = COLLECT_CARTESIAN_PRODUCTS | WARN_LINTING 

700 """Warn for cartesian products; combines COLLECT_CARTESIAN_PRODUCTS 

701 and WARN_LINTING""" 

702 

703 

704NO_LINTING, COLLECT_CARTESIAN_PRODUCTS, WARN_LINTING, FROM_LINTING = tuple( 

705 Linting 

706) 

707 

708 

709class FromLinter(collections.namedtuple("FromLinter", ["froms", "edges"])): 

710 """represents current state for the "cartesian product" detection 

711 feature.""" 

712 

713 def lint(self, start=None): 

714 froms = self.froms 

715 if not froms: 

716 return None, None 

717 

718 edges = set(self.edges) 

719 the_rest = set(froms) 

720 

721 if start is not None: 

722 start_with = start 

723 the_rest.remove(start_with) 

724 else: 

725 start_with = the_rest.pop() 

726 

727 stack = collections.deque([start_with]) 

728 

729 while stack and the_rest: 

730 node = stack.popleft() 

731 the_rest.discard(node) 

732 

733 # comparison of nodes in edges here is based on hash equality, as 

734 # there are "annotated" elements that match the non-annotated ones. 

735 # to remove the need for in-python hash() calls, use native 

736 # containment routines (e.g. "node in edge", "edge.index(node)") 

737 to_remove = {edge for edge in edges if node in edge} 

738 

739 # appendleft the node in each edge that is not 

740 # the one that matched. 

741 stack.extendleft(edge[not edge.index(node)] for edge in to_remove) 

742 edges.difference_update(to_remove) 

743 

744 # FROMS left over? boom 

745 if the_rest: 

746 return the_rest, start_with 

747 else: 

748 return None, None 

749 

750 def warn(self, stmt_type="SELECT"): 

751 the_rest, start_with = self.lint() 

752 

753 # FROMS left over? boom 

754 if the_rest: 

755 froms = the_rest 

756 if froms: 

757 template = ( 

758 "{stmt_type} statement has a cartesian product between " 

759 "FROM element(s) {froms} and " 

760 'FROM element "{start}". Apply join condition(s) ' 

761 "between each element to resolve." 

762 ) 

763 froms_str = ", ".join( 

764 f'"{self.froms[from_]}"' for from_ in froms 

765 ) 

766 message = template.format( 

767 stmt_type=stmt_type, 

768 froms=froms_str, 

769 start=self.froms[start_with], 

770 ) 

771 

772 util.warn(message) 

773 

774 

775class Compiled: 

776 """Represent a compiled SQL or DDL expression. 

777 

778 The ``__str__`` method of the ``Compiled`` object should produce 

779 the actual text of the statement. ``Compiled`` objects are 

780 specific to their underlying database dialect, and also may 

781 or may not be specific to the columns referenced within a 

782 particular set of bind parameters. In no case should the 

783 ``Compiled`` object be dependent on the actual values of those 

784 bind parameters, even though it may reference those values as 

785 defaults. 

786 """ 

787 

788 statement: Optional[ClauseElement] = None 

789 "The statement to compile." 

790 string: str = "" 

791 "The string representation of the ``statement``" 

792 

793 state: CompilerState 

794 """description of the compiler's state""" 

795 

796 is_sql = False 

797 is_ddl = False 

798 

799 _cached_metadata: Optional[CursorResultMetaData] = None 

800 

801 _result_columns: Optional[List[ResultColumnsEntry]] = None 

802 

803 schema_translate_map: Optional[SchemaTranslateMapType] = None 

804 

805 execution_options: _ExecuteOptions = util.EMPTY_DICT 

806 """ 

807 Execution options propagated from the statement. In some cases, 

808 sub-elements of the statement can modify these. 

809 """ 

810 

811 preparer: IdentifierPreparer 

812 

813 _annotations: _AnnotationDict = util.EMPTY_DICT 

814 

815 compile_state: Optional[CompileState] = None 

816 """Optional :class:`.CompileState` object that maintains additional 

817 state used by the compiler. 

818 

819 Major executable objects such as :class:`_expression.Insert`, 

820 :class:`_expression.Update`, :class:`_expression.Delete`, 

821 :class:`_expression.Select` will generate this 

822 state when compiled in order to calculate additional information about the 

823 object. For the top level object that is to be executed, the state can be 

824 stored here where it can also have applicability towards result set 

825 processing. 

826 

827 .. versionadded:: 1.4 

828 

829 """ 

830 

831 dml_compile_state: Optional[CompileState] = None 

832 """Optional :class:`.CompileState` assigned at the same point that 

833 .isinsert, .isupdate, or .isdelete is assigned. 

834 

835 This will normally be the same object as .compile_state, with the 

836 exception of cases like the :class:`.ORMFromStatementCompileState` 

837 object. 

838 

839 .. versionadded:: 1.4.40 

840 

841 """ 

842 

843 cache_key: Optional[CacheKey] = None 

844 """The :class:`.CacheKey` that was generated ahead of creating this 

845 :class:`.Compiled` object. 

846 

847 This is used for routines that need access to the original 

848 :class:`.CacheKey` instance generated when the :class:`.Compiled` 

849 instance was first cached, typically in order to reconcile 

850 the original list of :class:`.BindParameter` objects with a 

851 per-statement list that's generated on each call. 

852 

853 """ 

854 

855 _gen_time: float 

856 """Generation time of this :class:`.Compiled`, used for reporting 

857 cache stats.""" 

858 

859 def __init__( 

860 self, 

861 dialect: Dialect, 

862 statement: Optional[ClauseElement], 

863 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

864 render_schema_translate: bool = False, 

865 compile_kwargs: Mapping[str, Any] = util.immutabledict(), 

866 ): 

867 """Construct a new :class:`.Compiled` object. 

868 

869 :param dialect: :class:`.Dialect` to compile against. 

870 

871 :param statement: :class:`_expression.ClauseElement` to be compiled. 

872 

873 :param schema_translate_map: dictionary of schema names to be 

874 translated when forming the resultant SQL 

875 

876 .. seealso:: 

877 

878 :ref:`schema_translating` 

879 

880 :param compile_kwargs: additional kwargs that will be 

881 passed to the initial call to :meth:`.Compiled.process`. 

882 

883 

884 """ 

885 self.dialect = dialect 

886 self.preparer = self.dialect.identifier_preparer 

887 if schema_translate_map: 

888 self.schema_translate_map = schema_translate_map 

889 self.preparer = self.preparer._with_schema_translate( 

890 schema_translate_map 

891 ) 

892 

893 if statement is not None: 

894 self.state = CompilerState.COMPILING 

895 self.statement = statement 

896 self.can_execute = statement.supports_execution 

897 self._annotations = statement._annotations 

898 if self.can_execute: 

899 if TYPE_CHECKING: 

900 assert isinstance(statement, Executable) 

901 self.execution_options = statement._execution_options 

902 self.string = self.process(self.statement, **compile_kwargs) 

903 

904 if render_schema_translate: 

905 assert schema_translate_map is not None 

906 self.string = self.preparer._render_schema_translates( 

907 self.string, schema_translate_map 

908 ) 

909 

910 self.state = CompilerState.STRING_APPLIED 

911 else: 

912 self.state = CompilerState.NO_STATEMENT 

913 

914 self._gen_time = perf_counter() 

915 

916 def __init_subclass__(cls) -> None: 

917 cls._init_compiler_cls() 

918 return super().__init_subclass__() 

919 

920 @classmethod 

921 def _init_compiler_cls(cls): 

922 pass 

923 

924 def _execute_on_connection( 

925 self, connection, distilled_params, execution_options 

926 ): 

927 if self.can_execute: 

928 return connection._execute_compiled( 

929 self, distilled_params, execution_options 

930 ) 

931 else: 

932 raise exc.ObjectNotExecutableError(self.statement) 

933 

934 def visit_unsupported_compilation(self, element, err, **kw): 

935 raise exc.UnsupportedCompilationError(self, type(element)) from err 

936 

937 @property 

938 def sql_compiler(self) -> SQLCompiler: 

939 """Return a Compiled that is capable of processing SQL expressions. 

940 

941 If this compiler is one, it would likely just return 'self'. 

942 

943 """ 

944 

945 raise NotImplementedError() 

946 

947 def process(self, obj: Visitable, **kwargs: Any) -> str: 

948 return obj._compiler_dispatch(self, **kwargs) 

949 

950 def __str__(self) -> str: 

951 """Return the string text of the generated SQL or DDL.""" 

952 

953 if self.state is CompilerState.STRING_APPLIED: 

954 return self.string 

955 else: 

956 return "" 

957 

958 def construct_params( 

959 self, 

960 params: Optional[_CoreSingleExecuteParams] = None, 

961 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

962 escape_names: bool = True, 

963 ) -> Optional[_MutableCoreSingleExecuteParams]: 

964 """Return the bind params for this compiled object. 

965 

966 :param params: a dict of string/object pairs whose values will 

967 override bind values compiled in to the 

968 statement. 

969 """ 

970 

971 raise NotImplementedError() 

972 

973 @property 

974 def params(self): 

975 """Return the bind params for this compiled object.""" 

976 return self.construct_params() 

977 

978 

979class TypeCompiler(util.EnsureKWArg): 

980 """Produces DDL specification for TypeEngine objects.""" 

981 

982 ensure_kwarg = r"visit_\w+" 

983 

984 def __init__(self, dialect: Dialect): 

985 self.dialect = dialect 

986 

987 def process(self, type_: TypeEngine[Any], **kw: Any) -> str: 

988 if ( 

989 type_._variant_mapping 

990 and self.dialect.name in type_._variant_mapping 

991 ): 

992 type_ = type_._variant_mapping[self.dialect.name] 

993 return type_._compiler_dispatch(self, **kw) 

994 

995 def visit_unsupported_compilation( 

996 self, element: Any, err: Exception, **kw: Any 

997 ) -> NoReturn: 

998 raise exc.UnsupportedCompilationError(self, element) from err 

999 

1000 

1001# this was a Visitable, but to allow accurate detection of 

1002# column elements this is actually a column element 

1003class _CompileLabel( 

1004 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

1005): 

1006 """lightweight label object which acts as an expression.Label.""" 

1007 

1008 __visit_name__ = "label" 

1009 __slots__ = "element", "name", "_alt_names" 

1010 

1011 def __init__(self, col, name, alt_names=()): 

1012 self.element = col 

1013 self.name = name 

1014 self._alt_names = (col,) + alt_names 

1015 

1016 @property 

1017 def proxy_set(self): 

1018 return self.element.proxy_set 

1019 

1020 @property 

1021 def type(self): 

1022 return self.element.type 

1023 

1024 def self_group(self, **kw): 

1025 return self 

1026 

1027 

1028class ilike_case_insensitive( 

1029 roles.BinaryElementRole[Any], elements.CompilerColumnElement 

1030): 

1031 """produce a wrapping element for a case-insensitive portion of 

1032 an ILIKE construct. 

1033 

1034 The construct usually renders the ``lower()`` function, but on 

1035 PostgreSQL will pass silently with the assumption that "ILIKE" 

1036 is being used. 

1037 

1038 .. versionadded:: 2.0 

1039 

1040 """ 

1041 

1042 __visit_name__ = "ilike_case_insensitive_operand" 

1043 __slots__ = "element", "comparator" 

1044 

1045 def __init__(self, element): 

1046 self.element = element 

1047 self.comparator = element.comparator 

1048 

1049 @property 

1050 def proxy_set(self): 

1051 return self.element.proxy_set 

1052 

1053 @property 

1054 def type(self): 

1055 return self.element.type 

1056 

1057 def self_group(self, **kw): 

1058 return self 

1059 

1060 def _with_binary_element_type(self, type_): 

1061 return ilike_case_insensitive( 

1062 self.element._with_binary_element_type(type_) 

1063 ) 

1064 

1065 

1066class SQLCompiler(Compiled): 

1067 """Default implementation of :class:`.Compiled`. 

1068 

1069 Compiles :class:`_expression.ClauseElement` objects into SQL strings. 

1070 

1071 """ 

1072 

1073 extract_map = EXTRACT_MAP 

1074 

1075 bindname_escape_characters: ClassVar[Mapping[str, str]] = ( 

1076 util.immutabledict( 

1077 { 

1078 "%": "P", 

1079 "(": "A", 

1080 ")": "Z", 

1081 ":": "C", 

1082 ".": "_", 

1083 "[": "_", 

1084 "]": "_", 

1085 " ": "_", 

1086 } 

1087 ) 

1088 ) 

1089 """A mapping (e.g. dict or similar) containing a lookup of 

1090 characters keyed to replacement characters which will be applied to all 

1091 'bind names' used in SQL statements as a form of 'escaping'; the given 

1092 characters are replaced entirely with the 'replacement' character when 

1093 rendered in the SQL statement, and a similar translation is performed 

1094 on the incoming names used in parameter dictionaries passed to methods 

1095 like :meth:`_engine.Connection.execute`. 

1096 

1097 This allows bound parameter names used in :func:`_sql.bindparam` and 

1098 other constructs to have any arbitrary characters present without any 

1099 concern for characters that aren't allowed at all on the target database. 

1100 

1101 Third party dialects can establish their own dictionary here to replace the 

1102 default mapping, which will ensure that the particular characters in the 

1103 mapping will never appear in a bound parameter name. 

1104 

1105 The dictionary is evaluated at **class creation time**, so cannot be 

1106 modified at runtime; it must be present on the class when the class 

1107 is first declared. 

1108 

1109 Note that for dialects that have additional bound parameter rules such 

1110 as additional restrictions on leading characters, the 

1111 :meth:`_sql.SQLCompiler.bindparam_string` method may need to be augmented. 

1112 See the cx_Oracle compiler for an example of this. 

1113 

1114 .. versionadded:: 2.0.0rc1 

1115 

1116 """ 

1117 

1118 _bind_translate_re: ClassVar[Pattern[str]] 

1119 _bind_translate_chars: ClassVar[Mapping[str, str]] 

1120 

1121 is_sql = True 

1122 

1123 compound_keywords = COMPOUND_KEYWORDS 

1124 

1125 isdelete: bool = False 

1126 isinsert: bool = False 

1127 isupdate: bool = False 

1128 """class-level defaults which can be set at the instance 

1129 level to define if this Compiled instance represents 

1130 INSERT/UPDATE/DELETE 

1131 """ 

1132 

1133 postfetch: Optional[List[Column[Any]]] 

1134 """list of columns that can be post-fetched after INSERT or UPDATE to 

1135 receive server-updated values""" 

1136 

1137 insert_prefetch: Sequence[Column[Any]] = () 

1138 """list of columns for which default values should be evaluated before 

1139 an INSERT takes place""" 

1140 

1141 update_prefetch: Sequence[Column[Any]] = () 

1142 """list of columns for which onupdate default values should be evaluated 

1143 before an UPDATE takes place""" 

1144 

1145 implicit_returning: Optional[Sequence[ColumnElement[Any]]] = None 

1146 """list of "implicit" returning columns for a toplevel INSERT or UPDATE 

1147 statement, used to receive newly generated values of columns. 

1148 

1149 .. versionadded:: 2.0 ``implicit_returning`` replaces the previous 

1150 ``returning`` collection, which was not a generalized RETURNING 

1151 collection and instead was in fact specific to the "implicit returning" 

1152 feature. 

1153 

1154 """ 

1155 

1156 isplaintext: bool = False 

1157 

1158 binds: Dict[str, BindParameter[Any]] 

1159 """a dictionary of bind parameter keys to BindParameter instances.""" 

1160 

1161 bind_names: Dict[BindParameter[Any], str] 

1162 """a dictionary of BindParameter instances to "compiled" names 

1163 that are actually present in the generated SQL""" 

1164 

1165 stack: List[_CompilerStackEntry] 

1166 """major statements such as SELECT, INSERT, UPDATE, DELETE are 

1167 tracked in this stack using an entry format.""" 

1168 

1169 returning_precedes_values: bool = False 

1170 """set to True classwide to generate RETURNING 

1171 clauses before the VALUES or WHERE clause (i.e. MSSQL) 

1172 """ 

1173 

1174 render_table_with_column_in_update_from: bool = False 

1175 """set to True classwide to indicate the SET clause 

1176 in a multi-table UPDATE statement should qualify 

1177 columns with the table name (i.e. MySQL only) 

1178 """ 

1179 

1180 ansi_bind_rules: bool = False 

1181 """SQL 92 doesn't allow bind parameters to be used 

1182 in the columns clause of a SELECT, nor does it allow 

1183 ambiguous expressions like "? = ?". A compiler 

1184 subclass can set this flag to False if the target 

1185 driver/DB enforces this 

1186 """ 

1187 

1188 bindtemplate: str 

1189 """template to render bound parameters based on paramstyle.""" 

1190 

1191 compilation_bindtemplate: str 

1192 """template used by compiler to render parameters before positional 

1193 paramstyle application""" 

1194 

1195 _numeric_binds_identifier_char: str 

1196 """Character that's used to as the identifier of a numerical bind param. 

1197 For example if this char is set to ``$``, numerical binds will be rendered 

1198 in the form ``$1, $2, $3``. 

1199 """ 

1200 

1201 _result_columns: List[ResultColumnsEntry] 

1202 """relates label names in the final SQL to a tuple of local 

1203 column/label name, ColumnElement object (if any) and 

1204 TypeEngine. CursorResult uses this for type processing and 

1205 column targeting""" 

1206 

1207 _textual_ordered_columns: bool = False 

1208 """tell the result object that the column names as rendered are important, 

1209 but they are also "ordered" vs. what is in the compiled object here. 

1210 

1211 As of 1.4.42 this condition is only present when the statement is a 

1212 TextualSelect, e.g. text("....").columns(...), where it is required 

1213 that the columns are considered positionally and not by name. 

1214 

1215 """ 

1216 

1217 _ad_hoc_textual: bool = False 

1218 """tell the result that we encountered text() or '*' constructs in the 

1219 middle of the result columns, but we also have compiled columns, so 

1220 if the number of columns in cursor.description does not match how many 

1221 expressions we have, that means we can't rely on positional at all and 

1222 should match on name. 

1223 

1224 """ 

1225 

1226 _ordered_columns: bool = True 

1227 """ 

1228 if False, means we can't be sure the list of entries 

1229 in _result_columns is actually the rendered order. Usually 

1230 True unless using an unordered TextualSelect. 

1231 """ 

1232 

1233 _loose_column_name_matching: bool = False 

1234 """tell the result object that the SQL statement is textual, wants to match 

1235 up to Column objects, and may be using the ._tq_label in the SELECT rather 

1236 than the base name. 

1237 

1238 """ 

1239 

1240 _numeric_binds: bool = False 

1241 """ 

1242 True if paramstyle is "numeric". This paramstyle is trickier than 

1243 all the others. 

1244 

1245 """ 

1246 

1247 _render_postcompile: bool = False 

1248 """ 

1249 whether to render out POSTCOMPILE params during the compile phase. 

1250 

1251 This attribute is used only for end-user invocation of stmt.compile(); 

1252 it's never used for actual statement execution, where instead the 

1253 dialect internals access and render the internal postcompile structure 

1254 directly. 

1255 

1256 """ 

1257 

1258 _post_compile_expanded_state: Optional[ExpandedState] = None 

1259 """When render_postcompile is used, the ``ExpandedState`` used to create 

1260 the "expanded" SQL is assigned here, and then used by the ``.params`` 

1261 accessor and ``.construct_params()`` methods for their return values. 

1262 

1263 .. versionadded:: 2.0.0rc1 

1264 

1265 """ 

1266 

1267 _pre_expanded_string: Optional[str] = None 

1268 """Stores the original string SQL before 'post_compile' is applied, 

1269 for cases where 'post_compile' were used. 

1270 

1271 """ 

1272 

1273 _pre_expanded_positiontup: Optional[List[str]] = None 

1274 

1275 _insertmanyvalues: Optional[_InsertManyValues] = None 

1276 

1277 _insert_crud_params: Optional[crud._CrudParamSequence] = None 

1278 

1279 literal_execute_params: FrozenSet[BindParameter[Any]] = frozenset() 

1280 """bindparameter objects that are rendered as literal values at statement 

1281 execution time. 

1282 

1283 """ 

1284 

1285 post_compile_params: FrozenSet[BindParameter[Any]] = frozenset() 

1286 """bindparameter objects that are rendered as bound parameter placeholders 

1287 at statement execution time. 

1288 

1289 """ 

1290 

1291 escaped_bind_names: util.immutabledict[str, str] = util.EMPTY_DICT 

1292 """Late escaping of bound parameter names that has to be converted 

1293 to the original name when looking in the parameter dictionary. 

1294 

1295 """ 

1296 

1297 has_out_parameters = False 

1298 """if True, there are bindparam() objects that have the isoutparam 

1299 flag set.""" 

1300 

1301 postfetch_lastrowid = False 

1302 """if True, and this in insert, use cursor.lastrowid to populate 

1303 result.inserted_primary_key. """ 

1304 

1305 _cache_key_bind_match: Optional[ 

1306 Tuple[ 

1307 Dict[ 

1308 BindParameter[Any], 

1309 List[BindParameter[Any]], 

1310 ], 

1311 Dict[ 

1312 str, 

1313 BindParameter[Any], 

1314 ], 

1315 ] 

1316 ] = None 

1317 """a mapping that will relate the BindParameter object we compile 

1318 to those that are part of the extracted collection of parameters 

1319 in the cache key, if we were given a cache key. 

1320 

1321 """ 

1322 

1323 positiontup: Optional[List[str]] = None 

1324 """for a compiled construct that uses a positional paramstyle, will be 

1325 a sequence of strings, indicating the names of bound parameters in order. 

1326 

1327 This is used in order to render bound parameters in their correct order, 

1328 and is combined with the :attr:`_sql.Compiled.params` dictionary to 

1329 render parameters. 

1330 

1331 This sequence always contains the unescaped name of the parameters. 

1332 

1333 .. seealso:: 

1334 

1335 :ref:`faq_sql_expression_string` - includes a usage example for 

1336 debugging use cases. 

1337 

1338 """ 

1339 _values_bindparam: Optional[List[str]] = None 

1340 

1341 _visited_bindparam: Optional[List[str]] = None 

1342 

1343 inline: bool = False 

1344 

1345 ctes: Optional[MutableMapping[CTE, str]] 

1346 

1347 # Detect same CTE references - Dict[(level, name), cte] 

1348 # Level is required for supporting nesting 

1349 ctes_by_level_name: Dict[Tuple[int, str], CTE] 

1350 

1351 # To retrieve key/level in ctes_by_level_name - 

1352 # Dict[cte_reference, (level, cte_name, cte_opts)] 

1353 level_name_by_cte: Dict[CTE, Tuple[int, str, selectable._CTEOpts]] 

1354 

1355 ctes_recursive: bool 

1356 

1357 _post_compile_pattern = re.compile(r"__\[POSTCOMPILE_(\S+?)(~~.+?~~)?\]") 

1358 _pyformat_pattern = re.compile(r"%\(([^)]+?)\)s") 

1359 _positional_pattern = re.compile( 

1360 f"{_pyformat_pattern.pattern}|{_post_compile_pattern.pattern}" 

1361 ) 

1362 

1363 @classmethod 

1364 def _init_compiler_cls(cls): 

1365 cls._init_bind_translate() 

1366 

1367 @classmethod 

1368 def _init_bind_translate(cls): 

1369 reg = re.escape("".join(cls.bindname_escape_characters)) 

1370 cls._bind_translate_re = re.compile(f"[{reg}]") 

1371 cls._bind_translate_chars = cls.bindname_escape_characters 

1372 

1373 def __init__( 

1374 self, 

1375 dialect: Dialect, 

1376 statement: Optional[ClauseElement], 

1377 cache_key: Optional[CacheKey] = None, 

1378 column_keys: Optional[Sequence[str]] = None, 

1379 for_executemany: bool = False, 

1380 linting: Linting = NO_LINTING, 

1381 _supporting_against: Optional[SQLCompiler] = None, 

1382 **kwargs: Any, 

1383 ): 

1384 """Construct a new :class:`.SQLCompiler` object. 

1385 

1386 :param dialect: :class:`.Dialect` to be used 

1387 

1388 :param statement: :class:`_expression.ClauseElement` to be compiled 

1389 

1390 :param column_keys: a list of column names to be compiled into an 

1391 INSERT or UPDATE statement. 

1392 

1393 :param for_executemany: whether INSERT / UPDATE statements should 

1394 expect that they are to be invoked in an "executemany" style, 

1395 which may impact how the statement will be expected to return the 

1396 values of defaults and autoincrement / sequences and similar. 

1397 Depending on the backend and driver in use, support for retrieving 

1398 these values may be disabled which means SQL expressions may 

1399 be rendered inline, RETURNING may not be rendered, etc. 

1400 

1401 :param kwargs: additional keyword arguments to be consumed by the 

1402 superclass. 

1403 

1404 """ 

1405 self.column_keys = column_keys 

1406 

1407 self.cache_key = cache_key 

1408 

1409 if cache_key: 

1410 cksm = {b.key: b for b in cache_key[1]} 

1411 ckbm = {b: [b] for b in cache_key[1]} 

1412 self._cache_key_bind_match = (ckbm, cksm) 

1413 

1414 # compile INSERT/UPDATE defaults/sequences to expect executemany 

1415 # style execution, which may mean no pre-execute of defaults, 

1416 # or no RETURNING 

1417 self.for_executemany = for_executemany 

1418 

1419 self.linting = linting 

1420 

1421 # a dictionary of bind parameter keys to BindParameter 

1422 # instances. 

1423 self.binds = {} 

1424 

1425 # a dictionary of BindParameter instances to "compiled" names 

1426 # that are actually present in the generated SQL 

1427 self.bind_names = util.column_dict() 

1428 

1429 # stack which keeps track of nested SELECT statements 

1430 self.stack = [] 

1431 

1432 self._result_columns = [] 

1433 

1434 # true if the paramstyle is positional 

1435 self.positional = dialect.positional 

1436 if self.positional: 

1437 self._numeric_binds = nb = dialect.paramstyle.startswith("numeric") 

1438 if nb: 

1439 self._numeric_binds_identifier_char = ( 

1440 "$" if dialect.paramstyle == "numeric_dollar" else ":" 

1441 ) 

1442 

1443 self.compilation_bindtemplate = _pyformat_template 

1444 else: 

1445 self.compilation_bindtemplate = BIND_TEMPLATES[dialect.paramstyle] 

1446 

1447 self.ctes = None 

1448 

1449 self.label_length = ( 

1450 dialect.label_length or dialect.max_identifier_length 

1451 ) 

1452 

1453 # a map which tracks "anonymous" identifiers that are created on 

1454 # the fly here 

1455 self.anon_map = prefix_anon_map() 

1456 

1457 # a map which tracks "truncated" names based on 

1458 # dialect.label_length or dialect.max_identifier_length 

1459 self.truncated_names: Dict[Tuple[str, str], str] = {} 

1460 self._truncated_counters: Dict[str, int] = {} 

1461 

1462 Compiled.__init__(self, dialect, statement, **kwargs) 

1463 

1464 if self.isinsert or self.isupdate or self.isdelete: 

1465 if TYPE_CHECKING: 

1466 assert isinstance(statement, UpdateBase) 

1467 

1468 if self.isinsert or self.isupdate: 

1469 if TYPE_CHECKING: 

1470 assert isinstance(statement, ValuesBase) 

1471 if statement._inline: 

1472 self.inline = True 

1473 elif self.for_executemany and ( 

1474 not self.isinsert 

1475 or ( 

1476 self.dialect.insert_executemany_returning 

1477 and statement._return_defaults 

1478 ) 

1479 ): 

1480 self.inline = True 

1481 

1482 self.bindtemplate = BIND_TEMPLATES[dialect.paramstyle] 

1483 

1484 if _supporting_against: 

1485 self.__dict__.update( 

1486 { 

1487 k: v 

1488 for k, v in _supporting_against.__dict__.items() 

1489 if k 

1490 not in { 

1491 "state", 

1492 "dialect", 

1493 "preparer", 

1494 "positional", 

1495 "_numeric_binds", 

1496 "compilation_bindtemplate", 

1497 "bindtemplate", 

1498 } 

1499 } 

1500 ) 

1501 

1502 if self.state is CompilerState.STRING_APPLIED: 

1503 if self.positional: 

1504 if self._numeric_binds: 

1505 self._process_numeric() 

1506 else: 

1507 self._process_positional() 

1508 

1509 if self._render_postcompile: 

1510 parameters = self.construct_params( 

1511 escape_names=False, 

1512 _no_postcompile=True, 

1513 ) 

1514 

1515 self._process_parameters_for_postcompile( 

1516 parameters, _populate_self=True 

1517 ) 

1518 

1519 @property 

1520 def insert_single_values_expr(self) -> Optional[str]: 

1521 """When an INSERT is compiled with a single set of parameters inside 

1522 a VALUES expression, the string is assigned here, where it can be 

1523 used for insert batching schemes to rewrite the VALUES expression. 

1524 

1525 .. versionadded:: 1.3.8 

1526 

1527 .. versionchanged:: 2.0 This collection is no longer used by 

1528 SQLAlchemy's built-in dialects, in favor of the currently 

1529 internal ``_insertmanyvalues`` collection that is used only by 

1530 :class:`.SQLCompiler`. 

1531 

1532 """ 

1533 if self._insertmanyvalues is None: 

1534 return None 

1535 else: 

1536 return self._insertmanyvalues.single_values_expr 

1537 

1538 @util.ro_memoized_property 

1539 def effective_returning(self) -> Optional[Sequence[ColumnElement[Any]]]: 

1540 """The effective "returning" columns for INSERT, UPDATE or DELETE. 

1541 

1542 This is either the so-called "implicit returning" columns which are 

1543 calculated by the compiler on the fly, or those present based on what's 

1544 present in ``self.statement._returning`` (expanded into individual 

1545 columns using the ``._all_selected_columns`` attribute) i.e. those set 

1546 explicitly using the :meth:`.UpdateBase.returning` method. 

1547 

1548 .. versionadded:: 2.0 

1549 

1550 """ 

1551 if self.implicit_returning: 

1552 return self.implicit_returning 

1553 elif self.statement is not None and is_dml(self.statement): 

1554 return [ 

1555 c 

1556 for c in self.statement._all_selected_columns 

1557 if is_column_element(c) 

1558 ] 

1559 

1560 else: 

1561 return None 

1562 

1563 @property 

1564 def returning(self): 

1565 """backwards compatibility; returns the 

1566 effective_returning collection. 

1567 

1568 """ 

1569 return self.effective_returning 

1570 

1571 @property 

1572 def current_executable(self): 

1573 """Return the current 'executable' that is being compiled. 

1574 

1575 This is currently the :class:`_sql.Select`, :class:`_sql.Insert`, 

1576 :class:`_sql.Update`, :class:`_sql.Delete`, 

1577 :class:`_sql.CompoundSelect` object that is being compiled. 

1578 Specifically it's assigned to the ``self.stack`` list of elements. 

1579 

1580 When a statement like the above is being compiled, it normally 

1581 is also assigned to the ``.statement`` attribute of the 

1582 :class:`_sql.Compiler` object. However, all SQL constructs are 

1583 ultimately nestable, and this attribute should never be consulted 

1584 by a ``visit_`` method, as it is not guaranteed to be assigned 

1585 nor guaranteed to correspond to the current statement being compiled. 

1586 

1587 .. versionadded:: 1.3.21 

1588 

1589 For compatibility with previous versions, use the following 

1590 recipe:: 

1591 

1592 statement = getattr(self, "current_executable", False) 

1593 if statement is False: 

1594 statement = self.stack[-1]["selectable"] 

1595 

1596 For versions 1.4 and above, ensure only .current_executable 

1597 is used; the format of "self.stack" may change. 

1598 

1599 

1600 """ 

1601 try: 

1602 return self.stack[-1]["selectable"] 

1603 except IndexError as ie: 

1604 raise IndexError("Compiler does not have a stack entry") from ie 

1605 

1606 @property 

1607 def prefetch(self): 

1608 return list(self.insert_prefetch) + list(self.update_prefetch) 

1609 

1610 @util.memoized_property 

1611 def _global_attributes(self) -> Dict[Any, Any]: 

1612 return {} 

1613 

1614 @util.memoized_instancemethod 

1615 def _init_cte_state(self) -> MutableMapping[CTE, str]: 

1616 """Initialize collections related to CTEs only if 

1617 a CTE is located, to save on the overhead of 

1618 these collections otherwise. 

1619 

1620 """ 

1621 # collect CTEs to tack on top of a SELECT 

1622 # To store the query to print - Dict[cte, text_query] 

1623 ctes: MutableMapping[CTE, str] = util.OrderedDict() 

1624 self.ctes = ctes 

1625 

1626 # Detect same CTE references - Dict[(level, name), cte] 

1627 # Level is required for supporting nesting 

1628 self.ctes_by_level_name = {} 

1629 

1630 # To retrieve key/level in ctes_by_level_name - 

1631 # Dict[cte_reference, (level, cte_name, cte_opts)] 

1632 self.level_name_by_cte = {} 

1633 

1634 self.ctes_recursive = False 

1635 

1636 return ctes 

1637 

1638 @contextlib.contextmanager 

1639 def _nested_result(self): 

1640 """special API to support the use case of 'nested result sets'""" 

1641 result_columns, ordered_columns = ( 

1642 self._result_columns, 

1643 self._ordered_columns, 

1644 ) 

1645 self._result_columns, self._ordered_columns = [], False 

1646 

1647 try: 

1648 if self.stack: 

1649 entry = self.stack[-1] 

1650 entry["need_result_map_for_nested"] = True 

1651 else: 

1652 entry = None 

1653 yield self._result_columns, self._ordered_columns 

1654 finally: 

1655 if entry: 

1656 entry.pop("need_result_map_for_nested") 

1657 self._result_columns, self._ordered_columns = ( 

1658 result_columns, 

1659 ordered_columns, 

1660 ) 

1661 

1662 def _process_positional(self): 

1663 assert not self.positiontup 

1664 assert self.state is CompilerState.STRING_APPLIED 

1665 assert not self._numeric_binds 

1666 

1667 if self.dialect.paramstyle == "format": 

1668 placeholder = "%s" 

1669 else: 

1670 assert self.dialect.paramstyle == "qmark" 

1671 placeholder = "?" 

1672 

1673 positions = [] 

1674 

1675 def find_position(m: re.Match[str]) -> str: 

1676 normal_bind = m.group(1) 

1677 if normal_bind: 

1678 positions.append(normal_bind) 

1679 return placeholder 

1680 else: 

1681 # this a post-compile bind 

1682 positions.append(m.group(2)) 

1683 return m.group(0) 

1684 

1685 self.string = re.sub( 

1686 self._positional_pattern, find_position, self.string 

1687 ) 

1688 

1689 if self.escaped_bind_names: 

1690 reverse_escape = {v: k for k, v in self.escaped_bind_names.items()} 

1691 assert len(self.escaped_bind_names) == len(reverse_escape) 

1692 self.positiontup = [ 

1693 reverse_escape.get(name, name) for name in positions 

1694 ] 

1695 else: 

1696 self.positiontup = positions 

1697 

1698 if self._insertmanyvalues: 

1699 positions = [] 

1700 

1701 single_values_expr = re.sub( 

1702 self._positional_pattern, 

1703 find_position, 

1704 self._insertmanyvalues.single_values_expr, 

1705 ) 

1706 insert_crud_params = [ 

1707 ( 

1708 v[0], 

1709 v[1], 

1710 re.sub(self._positional_pattern, find_position, v[2]), 

1711 v[3], 

1712 ) 

1713 for v in self._insertmanyvalues.insert_crud_params 

1714 ] 

1715 

1716 self._insertmanyvalues = self._insertmanyvalues._replace( 

1717 single_values_expr=single_values_expr, 

1718 insert_crud_params=insert_crud_params, 

1719 ) 

1720 

1721 def _process_numeric(self): 

1722 assert self._numeric_binds 

1723 assert self.state is CompilerState.STRING_APPLIED 

1724 

1725 num = 1 

1726 param_pos: Dict[str, str] = {} 

1727 order: Iterable[str] 

1728 if self._insertmanyvalues and self._values_bindparam is not None: 

1729 # bindparams that are not in values are always placed first. 

1730 # this avoids the need of changing them when using executemany 

1731 # values () () 

1732 order = itertools.chain( 

1733 ( 

1734 name 

1735 for name in self.bind_names.values() 

1736 if name not in self._values_bindparam 

1737 ), 

1738 self.bind_names.values(), 

1739 ) 

1740 else: 

1741 order = self.bind_names.values() 

1742 

1743 for bind_name in order: 

1744 if bind_name in param_pos: 

1745 continue 

1746 bind = self.binds[bind_name] 

1747 if ( 

1748 bind in self.post_compile_params 

1749 or bind in self.literal_execute_params 

1750 ): 

1751 # set to None to just mark the in positiontup, it will not 

1752 # be replaced below. 

1753 param_pos[bind_name] = None # type: ignore 

1754 else: 

1755 ph = f"{self._numeric_binds_identifier_char}{num}" 

1756 num += 1 

1757 param_pos[bind_name] = ph 

1758 

1759 self.next_numeric_pos = num 

1760 

1761 self.positiontup = list(param_pos) 

1762 if self.escaped_bind_names: 

1763 len_before = len(param_pos) 

1764 param_pos = { 

1765 self.escaped_bind_names.get(name, name): pos 

1766 for name, pos in param_pos.items() 

1767 } 

1768 assert len(param_pos) == len_before 

1769 

1770 # Can't use format here since % chars are not escaped. 

1771 self.string = self._pyformat_pattern.sub( 

1772 lambda m: param_pos[m.group(1)], self.string 

1773 ) 

1774 

1775 if self._insertmanyvalues: 

1776 single_values_expr = ( 

1777 # format is ok here since single_values_expr includes only 

1778 # place-holders 

1779 self._insertmanyvalues.single_values_expr 

1780 % param_pos 

1781 ) 

1782 insert_crud_params = [ 

1783 (v[0], v[1], "%s", v[3]) 

1784 for v in self._insertmanyvalues.insert_crud_params 

1785 ] 

1786 

1787 self._insertmanyvalues = self._insertmanyvalues._replace( 

1788 # This has the numbers (:1, :2) 

1789 single_values_expr=single_values_expr, 

1790 # The single binds are instead %s so they can be formatted 

1791 insert_crud_params=insert_crud_params, 

1792 ) 

1793 

1794 @util.memoized_property 

1795 def _bind_processors( 

1796 self, 

1797 ) -> MutableMapping[ 

1798 str, Union[_BindProcessorType[Any], Sequence[_BindProcessorType[Any]]] 

1799 ]: 

1800 # mypy is not able to see the two value types as the above Union, 

1801 # it just sees "object". don't know how to resolve 

1802 return { 

1803 key: value # type: ignore 

1804 for key, value in ( 

1805 ( 

1806 self.bind_names[bindparam], 

1807 ( 

1808 bindparam.type._cached_bind_processor(self.dialect) 

1809 if not bindparam.type._is_tuple_type 

1810 else tuple( 

1811 elem_type._cached_bind_processor(self.dialect) 

1812 for elem_type in cast( 

1813 TupleType, bindparam.type 

1814 ).types 

1815 ) 

1816 ), 

1817 ) 

1818 for bindparam in self.bind_names 

1819 ) 

1820 if value is not None 

1821 } 

1822 

1823 def is_subquery(self): 

1824 return len(self.stack) > 1 

1825 

1826 @property 

1827 def sql_compiler(self) -> Self: 

1828 return self 

1829 

1830 def construct_expanded_state( 

1831 self, 

1832 params: Optional[_CoreSingleExecuteParams] = None, 

1833 escape_names: bool = True, 

1834 ) -> ExpandedState: 

1835 """Return a new :class:`.ExpandedState` for a given parameter set. 

1836 

1837 For queries that use "expanding" or other late-rendered parameters, 

1838 this method will provide for both the finalized SQL string as well 

1839 as the parameters that would be used for a particular parameter set. 

1840 

1841 .. versionadded:: 2.0.0rc1 

1842 

1843 """ 

1844 parameters = self.construct_params( 

1845 params, 

1846 escape_names=escape_names, 

1847 _no_postcompile=True, 

1848 ) 

1849 return self._process_parameters_for_postcompile( 

1850 parameters, 

1851 ) 

1852 

1853 def construct_params( 

1854 self, 

1855 params: Optional[_CoreSingleExecuteParams] = None, 

1856 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

1857 escape_names: bool = True, 

1858 _group_number: Optional[int] = None, 

1859 _check: bool = True, 

1860 _no_postcompile: bool = False, 

1861 ) -> _MutableCoreSingleExecuteParams: 

1862 """return a dictionary of bind parameter keys and values""" 

1863 

1864 if self._render_postcompile and not _no_postcompile: 

1865 assert self._post_compile_expanded_state is not None 

1866 if not params: 

1867 return dict(self._post_compile_expanded_state.parameters) 

1868 else: 

1869 raise exc.InvalidRequestError( 

1870 "can't construct new parameters when render_postcompile " 

1871 "is used; the statement is hard-linked to the original " 

1872 "parameters. Use construct_expanded_state to generate a " 

1873 "new statement and parameters." 

1874 ) 

1875 

1876 has_escaped_names = escape_names and bool(self.escaped_bind_names) 

1877 

1878 if extracted_parameters: 

1879 # related the bound parameters collected in the original cache key 

1880 # to those collected in the incoming cache key. They will not have 

1881 # matching names but they will line up positionally in the same 

1882 # way. The parameters present in self.bind_names may be clones of 

1883 # these original cache key params in the case of DML but the .key 

1884 # will be guaranteed to match. 

1885 if self.cache_key is None: 

1886 raise exc.CompileError( 

1887 "This compiled object has no original cache key; " 

1888 "can't pass extracted_parameters to construct_params" 

1889 ) 

1890 else: 

1891 orig_extracted = self.cache_key[1] 

1892 

1893 ckbm_tuple = self._cache_key_bind_match 

1894 assert ckbm_tuple is not None 

1895 ckbm, _ = ckbm_tuple 

1896 resolved_extracted = { 

1897 bind: extracted 

1898 for b, extracted in zip(orig_extracted, extracted_parameters) 

1899 for bind in ckbm[b] 

1900 } 

1901 else: 

1902 resolved_extracted = None 

1903 

1904 if params: 

1905 pd = {} 

1906 for bindparam, name in self.bind_names.items(): 

1907 escaped_name = ( 

1908 self.escaped_bind_names.get(name, name) 

1909 if has_escaped_names 

1910 else name 

1911 ) 

1912 

1913 if bindparam.key in params: 

1914 pd[escaped_name] = params[bindparam.key] 

1915 elif name in params: 

1916 pd[escaped_name] = params[name] 

1917 

1918 elif _check and bindparam.required: 

1919 if _group_number: 

1920 raise exc.InvalidRequestError( 

1921 "A value is required for bind parameter %r, " 

1922 "in parameter group %d" 

1923 % (bindparam.key, _group_number), 

1924 code="cd3x", 

1925 ) 

1926 else: 

1927 raise exc.InvalidRequestError( 

1928 "A value is required for bind parameter %r" 

1929 % bindparam.key, 

1930 code="cd3x", 

1931 ) 

1932 else: 

1933 if resolved_extracted: 

1934 value_param = resolved_extracted.get( 

1935 bindparam, bindparam 

1936 ) 

1937 else: 

1938 value_param = bindparam 

1939 

1940 if bindparam.callable: 

1941 pd[escaped_name] = value_param.effective_value 

1942 else: 

1943 pd[escaped_name] = value_param.value 

1944 return pd 

1945 else: 

1946 pd = {} 

1947 for bindparam, name in self.bind_names.items(): 

1948 escaped_name = ( 

1949 self.escaped_bind_names.get(name, name) 

1950 if has_escaped_names 

1951 else name 

1952 ) 

1953 

1954 if _check and bindparam.required: 

1955 if _group_number: 

1956 raise exc.InvalidRequestError( 

1957 "A value is required for bind parameter %r, " 

1958 "in parameter group %d" 

1959 % (bindparam.key, _group_number), 

1960 code="cd3x", 

1961 ) 

1962 else: 

1963 raise exc.InvalidRequestError( 

1964 "A value is required for bind parameter %r" 

1965 % bindparam.key, 

1966 code="cd3x", 

1967 ) 

1968 

1969 if resolved_extracted: 

1970 value_param = resolved_extracted.get(bindparam, bindparam) 

1971 else: 

1972 value_param = bindparam 

1973 

1974 if bindparam.callable: 

1975 pd[escaped_name] = value_param.effective_value 

1976 else: 

1977 pd[escaped_name] = value_param.value 

1978 

1979 return pd 

1980 

1981 @util.memoized_instancemethod 

1982 def _get_set_input_sizes_lookup(self): 

1983 dialect = self.dialect 

1984 

1985 include_types = dialect.include_set_input_sizes 

1986 exclude_types = dialect.exclude_set_input_sizes 

1987 

1988 dbapi = dialect.dbapi 

1989 

1990 def lookup_type(typ): 

1991 dbtype = typ._unwrapped_dialect_impl(dialect).get_dbapi_type(dbapi) 

1992 

1993 if ( 

1994 dbtype is not None 

1995 and (exclude_types is None or dbtype not in exclude_types) 

1996 and (include_types is None or dbtype in include_types) 

1997 ): 

1998 return dbtype 

1999 else: 

2000 return None 

2001 

2002 inputsizes = {} 

2003 

2004 literal_execute_params = self.literal_execute_params 

2005 

2006 for bindparam in self.bind_names: 

2007 if bindparam in literal_execute_params: 

2008 continue 

2009 

2010 if bindparam.type._is_tuple_type: 

2011 inputsizes[bindparam] = [ 

2012 lookup_type(typ) 

2013 for typ in cast(TupleType, bindparam.type).types 

2014 ] 

2015 else: 

2016 inputsizes[bindparam] = lookup_type(bindparam.type) 

2017 

2018 return inputsizes 

2019 

2020 @property 

2021 def params(self): 

2022 """Return the bind param dictionary embedded into this 

2023 compiled object, for those values that are present. 

2024 

2025 .. seealso:: 

2026 

2027 :ref:`faq_sql_expression_string` - includes a usage example for 

2028 debugging use cases. 

2029 

2030 """ 

2031 return self.construct_params(_check=False) 

2032 

2033 def _process_parameters_for_postcompile( 

2034 self, 

2035 parameters: _MutableCoreSingleExecuteParams, 

2036 _populate_self: bool = False, 

2037 ) -> ExpandedState: 

2038 """handle special post compile parameters. 

2039 

2040 These include: 

2041 

2042 * "expanding" parameters -typically IN tuples that are rendered 

2043 on a per-parameter basis for an otherwise fixed SQL statement string. 

2044 

2045 * literal_binds compiled with the literal_execute flag. Used for 

2046 things like SQL Server "TOP N" where the driver does not accommodate 

2047 N as a bound parameter. 

2048 

2049 """ 

2050 

2051 expanded_parameters = {} 

2052 new_positiontup: Optional[List[str]] 

2053 

2054 pre_expanded_string = self._pre_expanded_string 

2055 if pre_expanded_string is None: 

2056 pre_expanded_string = self.string 

2057 

2058 if self.positional: 

2059 new_positiontup = [] 

2060 

2061 pre_expanded_positiontup = self._pre_expanded_positiontup 

2062 if pre_expanded_positiontup is None: 

2063 pre_expanded_positiontup = self.positiontup 

2064 

2065 else: 

2066 new_positiontup = pre_expanded_positiontup = None 

2067 

2068 processors = self._bind_processors 

2069 single_processors = cast( 

2070 "Mapping[str, _BindProcessorType[Any]]", processors 

2071 ) 

2072 tuple_processors = cast( 

2073 "Mapping[str, Sequence[_BindProcessorType[Any]]]", processors 

2074 ) 

2075 

2076 new_processors: Dict[str, _BindProcessorType[Any]] = {} 

2077 

2078 replacement_expressions: Dict[str, Any] = {} 

2079 to_update_sets: Dict[str, Any] = {} 

2080 

2081 # notes: 

2082 # *unescaped* parameter names in: 

2083 # self.bind_names, self.binds, self._bind_processors, self.positiontup 

2084 # 

2085 # *escaped* parameter names in: 

2086 # construct_params(), replacement_expressions 

2087 

2088 numeric_positiontup: Optional[List[str]] = None 

2089 

2090 if self.positional and pre_expanded_positiontup is not None: 

2091 names: Iterable[str] = pre_expanded_positiontup 

2092 if self._numeric_binds: 

2093 numeric_positiontup = [] 

2094 else: 

2095 names = self.bind_names.values() 

2096 

2097 ebn = self.escaped_bind_names 

2098 for name in names: 

2099 escaped_name = ebn.get(name, name) if ebn else name 

2100 parameter = self.binds[name] 

2101 

2102 if parameter in self.literal_execute_params: 

2103 if escaped_name not in replacement_expressions: 

2104 replacement_expressions[escaped_name] = ( 

2105 self.render_literal_bindparam( 

2106 parameter, 

2107 render_literal_value=parameters.pop(escaped_name), 

2108 ) 

2109 ) 

2110 continue 

2111 

2112 if parameter in self.post_compile_params: 

2113 if escaped_name in replacement_expressions: 

2114 to_update = to_update_sets[escaped_name] 

2115 values = None 

2116 else: 

2117 # we are removing the parameter from parameters 

2118 # because it is a list value, which is not expected by 

2119 # TypeEngine objects that would otherwise be asked to 

2120 # process it. the single name is being replaced with 

2121 # individual numbered parameters for each value in the 

2122 # param. 

2123 # 

2124 # note we are also inserting *escaped* parameter names 

2125 # into the given dictionary. default dialect will 

2126 # use these param names directly as they will not be 

2127 # in the escaped_bind_names dictionary. 

2128 values = parameters.pop(name) 

2129 

2130 leep_res = self._literal_execute_expanding_parameter( 

2131 escaped_name, parameter, values 

2132 ) 

2133 to_update, replacement_expr = leep_res 

2134 

2135 to_update_sets[escaped_name] = to_update 

2136 replacement_expressions[escaped_name] = replacement_expr 

2137 

2138 if not parameter.literal_execute: 

2139 parameters.update(to_update) 

2140 if parameter.type._is_tuple_type: 

2141 assert values is not None 

2142 new_processors.update( 

2143 ( 

2144 "%s_%s_%s" % (name, i, j), 

2145 tuple_processors[name][j - 1], 

2146 ) 

2147 for i, tuple_element in enumerate(values, 1) 

2148 for j, _ in enumerate(tuple_element, 1) 

2149 if name in tuple_processors 

2150 and tuple_processors[name][j - 1] is not None 

2151 ) 

2152 else: 

2153 new_processors.update( 

2154 (key, single_processors[name]) 

2155 for key, _ in to_update 

2156 if name in single_processors 

2157 ) 

2158 if numeric_positiontup is not None: 

2159 numeric_positiontup.extend( 

2160 name for name, _ in to_update 

2161 ) 

2162 elif new_positiontup is not None: 

2163 # to_update has escaped names, but that's ok since 

2164 # these are new names, that aren't in the 

2165 # escaped_bind_names dict. 

2166 new_positiontup.extend(name for name, _ in to_update) 

2167 expanded_parameters[name] = [ 

2168 expand_key for expand_key, _ in to_update 

2169 ] 

2170 elif new_positiontup is not None: 

2171 new_positiontup.append(name) 

2172 

2173 def process_expanding(m): 

2174 key = m.group(1) 

2175 expr = replacement_expressions[key] 

2176 

2177 # if POSTCOMPILE included a bind_expression, render that 

2178 # around each element 

2179 if m.group(2): 

2180 tok = m.group(2).split("~~") 

2181 be_left, be_right = tok[1], tok[3] 

2182 expr = ", ".join( 

2183 "%s%s%s" % (be_left, exp, be_right) 

2184 for exp in expr.split(", ") 

2185 ) 

2186 return expr 

2187 

2188 statement = re.sub( 

2189 self._post_compile_pattern, process_expanding, pre_expanded_string 

2190 ) 

2191 

2192 if numeric_positiontup is not None: 

2193 assert new_positiontup is not None 

2194 param_pos = { 

2195 key: f"{self._numeric_binds_identifier_char}{num}" 

2196 for num, key in enumerate( 

2197 numeric_positiontup, self.next_numeric_pos 

2198 ) 

2199 } 

2200 # Can't use format here since % chars are not escaped. 

2201 statement = self._pyformat_pattern.sub( 

2202 lambda m: param_pos[m.group(1)], statement 

2203 ) 

2204 new_positiontup.extend(numeric_positiontup) 

2205 

2206 expanded_state = ExpandedState( 

2207 statement, 

2208 parameters, 

2209 new_processors, 

2210 new_positiontup, 

2211 expanded_parameters, 

2212 ) 

2213 

2214 if _populate_self: 

2215 # this is for the "render_postcompile" flag, which is not 

2216 # otherwise used internally and is for end-user debugging and 

2217 # special use cases. 

2218 self._pre_expanded_string = pre_expanded_string 

2219 self._pre_expanded_positiontup = pre_expanded_positiontup 

2220 self.string = expanded_state.statement 

2221 self.positiontup = ( 

2222 list(expanded_state.positiontup or ()) 

2223 if self.positional 

2224 else None 

2225 ) 

2226 self._post_compile_expanded_state = expanded_state 

2227 

2228 return expanded_state 

2229 

2230 @util.preload_module("sqlalchemy.engine.cursor") 

2231 def _create_result_map(self): 

2232 """utility method used for unit tests only.""" 

2233 cursor = util.preloaded.engine_cursor 

2234 return cursor.CursorResultMetaData._create_description_match_map( 

2235 self._result_columns 

2236 ) 

2237 

2238 # assigned by crud.py for insert/update statements 

2239 _get_bind_name_for_col: _BindNameForColProtocol 

2240 

2241 @util.memoized_property 

2242 def _within_exec_param_key_getter(self) -> Callable[[Any], str]: 

2243 getter = self._get_bind_name_for_col 

2244 return getter 

2245 

2246 @util.memoized_property 

2247 @util.preload_module("sqlalchemy.engine.result") 

2248 def _inserted_primary_key_from_lastrowid_getter(self): 

2249 result = util.preloaded.engine_result 

2250 

2251 param_key_getter = self._within_exec_param_key_getter 

2252 

2253 assert self.compile_state is not None 

2254 statement = self.compile_state.statement 

2255 

2256 if TYPE_CHECKING: 

2257 assert isinstance(statement, Insert) 

2258 

2259 table = statement.table 

2260 

2261 getters = [ 

2262 (operator.methodcaller("get", param_key_getter(col), None), col) 

2263 for col in table.primary_key 

2264 ] 

2265 

2266 autoinc_getter = None 

2267 autoinc_col = table._autoincrement_column 

2268 if autoinc_col is not None: 

2269 # apply type post processors to the lastrowid 

2270 lastrowid_processor = autoinc_col.type._cached_result_processor( 

2271 self.dialect, None 

2272 ) 

2273 autoinc_key = param_key_getter(autoinc_col) 

2274 

2275 # if a bind value is present for the autoincrement column 

2276 # in the parameters, we need to do the logic dictated by 

2277 # #7998; honor a non-None user-passed parameter over lastrowid. 

2278 # previously in the 1.4 series we weren't fetching lastrowid 

2279 # at all if the key were present in the parameters 

2280 if autoinc_key in self.binds: 

2281 

2282 def _autoinc_getter(lastrowid, parameters): 

2283 param_value = parameters.get(autoinc_key, lastrowid) 

2284 if param_value is not None: 

2285 # they supplied non-None parameter, use that. 

2286 # SQLite at least is observed to return the wrong 

2287 # cursor.lastrowid for INSERT..ON CONFLICT so it 

2288 # can't be used in all cases 

2289 return param_value 

2290 else: 

2291 # use lastrowid 

2292 return lastrowid 

2293 

2294 # work around mypy https://github.com/python/mypy/issues/14027 

2295 autoinc_getter = _autoinc_getter 

2296 

2297 else: 

2298 lastrowid_processor = None 

2299 

2300 row_fn = result.result_tuple([col.key for col in table.primary_key]) 

2301 

2302 def get(lastrowid, parameters): 

2303 """given cursor.lastrowid value and the parameters used for INSERT, 

2304 return a "row" that represents the primary key, either by 

2305 using the "lastrowid" or by extracting values from the parameters 

2306 that were sent along with the INSERT. 

2307 

2308 """ 

2309 if lastrowid_processor is not None: 

2310 lastrowid = lastrowid_processor(lastrowid) 

2311 

2312 if lastrowid is None: 

2313 return row_fn(getter(parameters) for getter, col in getters) 

2314 else: 

2315 return row_fn( 

2316 ( 

2317 ( 

2318 autoinc_getter(lastrowid, parameters) 

2319 if autoinc_getter is not None 

2320 else lastrowid 

2321 ) 

2322 if col is autoinc_col 

2323 else getter(parameters) 

2324 ) 

2325 for getter, col in getters 

2326 ) 

2327 

2328 return get 

2329 

2330 @util.memoized_property 

2331 @util.preload_module("sqlalchemy.engine.result") 

2332 def _inserted_primary_key_from_returning_getter(self): 

2333 result = util.preloaded.engine_result 

2334 

2335 assert self.compile_state is not None 

2336 statement = self.compile_state.statement 

2337 

2338 if TYPE_CHECKING: 

2339 assert isinstance(statement, Insert) 

2340 

2341 param_key_getter = self._within_exec_param_key_getter 

2342 table = statement.table 

2343 

2344 returning = self.implicit_returning 

2345 assert returning is not None 

2346 ret = {col: idx for idx, col in enumerate(returning)} 

2347 

2348 getters = cast( 

2349 "List[Tuple[Callable[[Any], Any], bool]]", 

2350 [ 

2351 ( 

2352 (operator.itemgetter(ret[col]), True) 

2353 if col in ret 

2354 else ( 

2355 operator.methodcaller( 

2356 "get", param_key_getter(col), None 

2357 ), 

2358 False, 

2359 ) 

2360 ) 

2361 for col in table.primary_key 

2362 ], 

2363 ) 

2364 

2365 row_fn = result.result_tuple([col.key for col in table.primary_key]) 

2366 

2367 def get(row, parameters): 

2368 return row_fn( 

2369 getter(row) if use_row else getter(parameters) 

2370 for getter, use_row in getters 

2371 ) 

2372 

2373 return get 

2374 

2375 def default_from(self) -> str: 

2376 """Called when a SELECT statement has no froms, and no FROM clause is 

2377 to be appended. 

2378 

2379 Gives Oracle Database a chance to tack on a ``FROM DUAL`` to the string 

2380 output. 

2381 

2382 """ 

2383 return "" 

2384 

2385 def visit_override_binds(self, override_binds, **kw): 

2386 """SQL compile the nested element of an _OverrideBinds with 

2387 bindparams swapped out. 

2388 

2389 The _OverrideBinds is not normally expected to be compiled; it 

2390 is meant to be used when an already cached statement is to be used, 

2391 the compilation was already performed, and only the bound params should 

2392 be swapped in at execution time. 

2393 

2394 However, there are test cases that exericise this object, and 

2395 additionally the ORM subquery loader is known to feed in expressions 

2396 which include this construct into new queries (discovered in #11173), 

2397 so it has to do the right thing at compile time as well. 

2398 

2399 """ 

2400 

2401 # get SQL text first 

2402 sqltext = override_binds.element._compiler_dispatch(self, **kw) 

2403 

2404 # for a test compile that is not for caching, change binds after the 

2405 # fact. note that we don't try to 

2406 # swap the bindparam as we compile, because our element may be 

2407 # elsewhere in the statement already (e.g. a subquery or perhaps a 

2408 # CTE) and was already visited / compiled. See 

2409 # test_relationship_criteria.py -> 

2410 # test_selectinload_local_criteria_subquery 

2411 for k in override_binds.translate: 

2412 if k not in self.binds: 

2413 continue 

2414 bp = self.binds[k] 

2415 

2416 # so this would work, just change the value of bp in place. 

2417 # but we dont want to mutate things outside. 

2418 # bp.value = override_binds.translate[bp.key] 

2419 # continue 

2420 

2421 # instead, need to replace bp with new_bp or otherwise accommodate 

2422 # in all internal collections 

2423 new_bp = bp._with_value( 

2424 override_binds.translate[bp.key], 

2425 maintain_key=True, 

2426 required=False, 

2427 ) 

2428 

2429 name = self.bind_names[bp] 

2430 self.binds[k] = self.binds[name] = new_bp 

2431 self.bind_names[new_bp] = name 

2432 self.bind_names.pop(bp, None) 

2433 

2434 if bp in self.post_compile_params: 

2435 self.post_compile_params |= {new_bp} 

2436 if bp in self.literal_execute_params: 

2437 self.literal_execute_params |= {new_bp} 

2438 

2439 ckbm_tuple = self._cache_key_bind_match 

2440 if ckbm_tuple: 

2441 ckbm, cksm = ckbm_tuple 

2442 for bp in bp._cloned_set: 

2443 if bp.key in cksm: 

2444 cb = cksm[bp.key] 

2445 ckbm[cb].append(new_bp) 

2446 

2447 return sqltext 

2448 

2449 def visit_grouping(self, grouping, asfrom=False, **kwargs): 

2450 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")" 

2451 

2452 def visit_select_statement_grouping(self, grouping, **kwargs): 

2453 return "(" + grouping.element._compiler_dispatch(self, **kwargs) + ")" 

2454 

2455 def visit_label_reference( 

2456 self, element, within_columns_clause=False, **kwargs 

2457 ): 

2458 if self.stack and self.dialect.supports_simple_order_by_label: 

2459 try: 

2460 compile_state = cast( 

2461 "Union[SelectState, CompoundSelectState]", 

2462 self.stack[-1]["compile_state"], 

2463 ) 

2464 except KeyError as ke: 

2465 raise exc.CompileError( 

2466 "Can't resolve label reference for ORDER BY / " 

2467 "GROUP BY / DISTINCT etc." 

2468 ) from ke 

2469 

2470 ( 

2471 with_cols, 

2472 only_froms, 

2473 only_cols, 

2474 ) = compile_state._label_resolve_dict 

2475 if within_columns_clause: 

2476 resolve_dict = only_froms 

2477 else: 

2478 resolve_dict = only_cols 

2479 

2480 # this can be None in the case that a _label_reference() 

2481 # were subject to a replacement operation, in which case 

2482 # the replacement of the Label element may have changed 

2483 # to something else like a ColumnClause expression. 

2484 order_by_elem = element.element._order_by_label_element 

2485 

2486 if ( 

2487 order_by_elem is not None 

2488 and order_by_elem.name in resolve_dict 

2489 and order_by_elem.shares_lineage( 

2490 resolve_dict[order_by_elem.name] 

2491 ) 

2492 ): 

2493 kwargs["render_label_as_label"] = ( 

2494 element.element._order_by_label_element 

2495 ) 

2496 return self.process( 

2497 element.element, 

2498 within_columns_clause=within_columns_clause, 

2499 **kwargs, 

2500 ) 

2501 

2502 def visit_textual_label_reference( 

2503 self, element, within_columns_clause=False, **kwargs 

2504 ): 

2505 if not self.stack: 

2506 # compiling the element outside of the context of a SELECT 

2507 return self.process(element._text_clause) 

2508 

2509 try: 

2510 compile_state = cast( 

2511 "Union[SelectState, CompoundSelectState]", 

2512 self.stack[-1]["compile_state"], 

2513 ) 

2514 except KeyError as ke: 

2515 coercions._no_text_coercion( 

2516 element.element, 

2517 extra=( 

2518 "Can't resolve label reference for ORDER BY / " 

2519 "GROUP BY / DISTINCT etc." 

2520 ), 

2521 exc_cls=exc.CompileError, 

2522 err=ke, 

2523 ) 

2524 

2525 with_cols, only_froms, only_cols = compile_state._label_resolve_dict 

2526 try: 

2527 if within_columns_clause: 

2528 col = only_froms[element.element] 

2529 else: 

2530 col = with_cols[element.element] 

2531 except KeyError as err: 

2532 coercions._no_text_coercion( 

2533 element.element, 

2534 extra=( 

2535 "Can't resolve label reference for ORDER BY / " 

2536 "GROUP BY / DISTINCT etc." 

2537 ), 

2538 exc_cls=exc.CompileError, 

2539 err=err, 

2540 ) 

2541 else: 

2542 kwargs["render_label_as_label"] = col 

2543 return self.process( 

2544 col, within_columns_clause=within_columns_clause, **kwargs 

2545 ) 

2546 

2547 def visit_label( 

2548 self, 

2549 label, 

2550 add_to_result_map=None, 

2551 within_label_clause=False, 

2552 within_columns_clause=False, 

2553 render_label_as_label=None, 

2554 result_map_targets=(), 

2555 **kw, 

2556 ): 

2557 # only render labels within the columns clause 

2558 # or ORDER BY clause of a select. dialect-specific compilers 

2559 # can modify this behavior. 

2560 render_label_with_as = ( 

2561 within_columns_clause and not within_label_clause 

2562 ) 

2563 render_label_only = render_label_as_label is label 

2564 

2565 if render_label_only or render_label_with_as: 

2566 if isinstance(label.name, elements._truncated_label): 

2567 labelname = self._truncated_identifier("colident", label.name) 

2568 else: 

2569 labelname = label.name 

2570 

2571 if render_label_with_as: 

2572 if add_to_result_map is not None: 

2573 add_to_result_map( 

2574 labelname, 

2575 label.name, 

2576 (label, labelname) + label._alt_names + result_map_targets, 

2577 label.type, 

2578 ) 

2579 return ( 

2580 label.element._compiler_dispatch( 

2581 self, 

2582 within_columns_clause=True, 

2583 within_label_clause=True, 

2584 **kw, 

2585 ) 

2586 + OPERATORS[operators.as_] 

2587 + self.preparer.format_label(label, labelname) 

2588 ) 

2589 elif render_label_only: 

2590 return self.preparer.format_label(label, labelname) 

2591 else: 

2592 return label.element._compiler_dispatch( 

2593 self, within_columns_clause=False, **kw 

2594 ) 

2595 

2596 def _fallback_column_name(self, column): 

2597 raise exc.CompileError( 

2598 "Cannot compile Column object until its 'name' is assigned." 

2599 ) 

2600 

2601 def visit_lambda_element(self, element, **kw): 

2602 sql_element = element._resolved 

2603 return self.process(sql_element, **kw) 

2604 

2605 def visit_column( 

2606 self, 

2607 column: ColumnClause[Any], 

2608 add_to_result_map: Optional[_ResultMapAppender] = None, 

2609 include_table: bool = True, 

2610 result_map_targets: Tuple[Any, ...] = (), 

2611 ambiguous_table_name_map: Optional[_AmbiguousTableNameMap] = None, 

2612 **kwargs: Any, 

2613 ) -> str: 

2614 name = orig_name = column.name 

2615 if name is None: 

2616 name = self._fallback_column_name(column) 

2617 

2618 is_literal = column.is_literal 

2619 if not is_literal and isinstance(name, elements._truncated_label): 

2620 name = self._truncated_identifier("colident", name) 

2621 

2622 if add_to_result_map is not None: 

2623 targets = (column, name, column.key) + result_map_targets 

2624 if column._tq_label: 

2625 targets += (column._tq_label,) 

2626 

2627 add_to_result_map(name, orig_name, targets, column.type) 

2628 

2629 if is_literal: 

2630 # note we are not currently accommodating for 

2631 # literal_column(quoted_name('ident', True)) here 

2632 name = self.escape_literal_column(name) 

2633 else: 

2634 name = self.preparer.quote(name) 

2635 table = column.table 

2636 if table is None or not include_table or not table.named_with_column: 

2637 return name 

2638 else: 

2639 effective_schema = self.preparer.schema_for_object(table) 

2640 

2641 if effective_schema: 

2642 schema_prefix = ( 

2643 self.preparer.quote_schema(effective_schema) + "." 

2644 ) 

2645 else: 

2646 schema_prefix = "" 

2647 

2648 if TYPE_CHECKING: 

2649 assert isinstance(table, NamedFromClause) 

2650 tablename = table.name 

2651 

2652 if ( 

2653 not effective_schema 

2654 and ambiguous_table_name_map 

2655 and tablename in ambiguous_table_name_map 

2656 ): 

2657 tablename = ambiguous_table_name_map[tablename] 

2658 

2659 if isinstance(tablename, elements._truncated_label): 

2660 tablename = self._truncated_identifier("alias", tablename) 

2661 

2662 return schema_prefix + self.preparer.quote(tablename) + "." + name 

2663 

2664 def visit_collation(self, element, **kw): 

2665 return self.preparer.format_collation(element.collation) 

2666 

2667 def visit_fromclause(self, fromclause, **kwargs): 

2668 return fromclause.name 

2669 

2670 def visit_index(self, index, **kwargs): 

2671 return index.name 

2672 

2673 def visit_typeclause(self, typeclause, **kw): 

2674 kw["type_expression"] = typeclause 

2675 kw["identifier_preparer"] = self.preparer 

2676 return self.dialect.type_compiler_instance.process( 

2677 typeclause.type, **kw 

2678 ) 

2679 

2680 def post_process_text(self, text): 

2681 if self.preparer._double_percents: 

2682 text = text.replace("%", "%%") 

2683 return text 

2684 

2685 def escape_literal_column(self, text): 

2686 if self.preparer._double_percents: 

2687 text = text.replace("%", "%%") 

2688 return text 

2689 

2690 def visit_textclause(self, textclause, add_to_result_map=None, **kw): 

2691 def do_bindparam(m): 

2692 name = m.group(1) 

2693 if name in textclause._bindparams: 

2694 return self.process(textclause._bindparams[name], **kw) 

2695 else: 

2696 return self.bindparam_string(name, **kw) 

2697 

2698 if not self.stack: 

2699 self.isplaintext = True 

2700 

2701 if add_to_result_map: 

2702 # text() object is present in the columns clause of a 

2703 # select(). Add a no-name entry to the result map so that 

2704 # row[text()] produces a result 

2705 add_to_result_map(None, None, (textclause,), sqltypes.NULLTYPE) 

2706 

2707 # un-escape any \:params 

2708 return BIND_PARAMS_ESC.sub( 

2709 lambda m: m.group(1), 

2710 BIND_PARAMS.sub( 

2711 do_bindparam, self.post_process_text(textclause.text) 

2712 ), 

2713 ) 

2714 

2715 def visit_textual_select( 

2716 self, taf, compound_index=None, asfrom=False, **kw 

2717 ): 

2718 toplevel = not self.stack 

2719 entry = self._default_stack_entry if toplevel else self.stack[-1] 

2720 

2721 new_entry: _CompilerStackEntry = { 

2722 "correlate_froms": set(), 

2723 "asfrom_froms": set(), 

2724 "selectable": taf, 

2725 } 

2726 self.stack.append(new_entry) 

2727 

2728 if taf._independent_ctes: 

2729 self._dispatch_independent_ctes(taf, kw) 

2730 

2731 populate_result_map = ( 

2732 toplevel 

2733 or ( 

2734 compound_index == 0 

2735 and entry.get("need_result_map_for_compound", False) 

2736 ) 

2737 or entry.get("need_result_map_for_nested", False) 

2738 ) 

2739 

2740 if populate_result_map: 

2741 self._ordered_columns = self._textual_ordered_columns = ( 

2742 taf.positional 

2743 ) 

2744 

2745 # enable looser result column matching when the SQL text links to 

2746 # Column objects by name only 

2747 self._loose_column_name_matching = not taf.positional and bool( 

2748 taf.column_args 

2749 ) 

2750 

2751 for c in taf.column_args: 

2752 self.process( 

2753 c, 

2754 within_columns_clause=True, 

2755 add_to_result_map=self._add_to_result_map, 

2756 ) 

2757 

2758 text = self.process(taf.element, **kw) 

2759 if self.ctes: 

2760 nesting_level = len(self.stack) if not toplevel else None 

2761 text = self._render_cte_clause(nesting_level=nesting_level) + text 

2762 

2763 self.stack.pop(-1) 

2764 

2765 return text 

2766 

2767 def visit_null(self, expr: Null, **kw: Any) -> str: 

2768 return "NULL" 

2769 

2770 def visit_true(self, expr: True_, **kw: Any) -> str: 

2771 if self.dialect.supports_native_boolean: 

2772 return "true" 

2773 else: 

2774 return "1" 

2775 

2776 def visit_false(self, expr: False_, **kw: Any) -> str: 

2777 if self.dialect.supports_native_boolean: 

2778 return "false" 

2779 else: 

2780 return "0" 

2781 

2782 def _generate_delimited_list(self, elements, separator, **kw): 

2783 return separator.join( 

2784 s 

2785 for s in (c._compiler_dispatch(self, **kw) for c in elements) 

2786 if s 

2787 ) 

2788 

2789 def _generate_delimited_and_list(self, clauses, **kw): 

2790 lcc, clauses = elements.BooleanClauseList._process_clauses_for_boolean( 

2791 operators.and_, 

2792 elements.True_._singleton, 

2793 elements.False_._singleton, 

2794 clauses, 

2795 ) 

2796 if lcc == 1: 

2797 return clauses[0]._compiler_dispatch(self, **kw) 

2798 else: 

2799 separator = OPERATORS[operators.and_] 

2800 return separator.join( 

2801 s 

2802 for s in (c._compiler_dispatch(self, **kw) for c in clauses) 

2803 if s 

2804 ) 

2805 

2806 def visit_tuple(self, clauselist, **kw): 

2807 return "(%s)" % self.visit_clauselist(clauselist, **kw) 

2808 

2809 def visit_clauselist(self, clauselist, **kw): 

2810 sep = clauselist.operator 

2811 if sep is None: 

2812 sep = " " 

2813 else: 

2814 sep = OPERATORS[clauselist.operator] 

2815 

2816 return self._generate_delimited_list(clauselist.clauses, sep, **kw) 

2817 

2818 def visit_expression_clauselist(self, clauselist, **kw): 

2819 operator_ = clauselist.operator 

2820 

2821 disp = self._get_operator_dispatch( 

2822 operator_, "expression_clauselist", None 

2823 ) 

2824 if disp: 

2825 return disp(clauselist, operator_, **kw) 

2826 

2827 try: 

2828 opstring = OPERATORS[operator_] 

2829 except KeyError as err: 

2830 raise exc.UnsupportedCompilationError(self, operator_) from err 

2831 else: 

2832 kw["_in_operator_expression"] = True 

2833 return self._generate_delimited_list( 

2834 clauselist.clauses, opstring, **kw 

2835 ) 

2836 

2837 def visit_case(self, clause, **kwargs): 

2838 x = "CASE " 

2839 if clause.value is not None: 

2840 x += clause.value._compiler_dispatch(self, **kwargs) + " " 

2841 for cond, result in clause.whens: 

2842 x += ( 

2843 "WHEN " 

2844 + cond._compiler_dispatch(self, **kwargs) 

2845 + " THEN " 

2846 + result._compiler_dispatch(self, **kwargs) 

2847 + " " 

2848 ) 

2849 if clause.else_ is not None: 

2850 x += ( 

2851 "ELSE " + clause.else_._compiler_dispatch(self, **kwargs) + " " 

2852 ) 

2853 x += "END" 

2854 return x 

2855 

2856 def visit_type_coerce(self, type_coerce, **kw): 

2857 return type_coerce.typed_expression._compiler_dispatch(self, **kw) 

2858 

2859 def visit_cast(self, cast, **kwargs): 

2860 type_clause = cast.typeclause._compiler_dispatch(self, **kwargs) 

2861 match = re.match("(.*)( COLLATE .*)", type_clause) 

2862 return "CAST(%s AS %s)%s" % ( 

2863 cast.clause._compiler_dispatch(self, **kwargs), 

2864 match.group(1) if match else type_clause, 

2865 match.group(2) if match else "", 

2866 ) 

2867 

2868 def _format_frame_clause(self, range_, **kw): 

2869 return "%s AND %s" % ( 

2870 ( 

2871 "UNBOUNDED PRECEDING" 

2872 if range_[0] is elements.RANGE_UNBOUNDED 

2873 else ( 

2874 "CURRENT ROW" 

2875 if range_[0] is elements.RANGE_CURRENT 

2876 else ( 

2877 "%s PRECEDING" 

2878 % ( 

2879 self.process( 

2880 elements.literal(abs(range_[0])), **kw 

2881 ), 

2882 ) 

2883 if range_[0] < 0 

2884 else "%s FOLLOWING" 

2885 % (self.process(elements.literal(range_[0]), **kw),) 

2886 ) 

2887 ) 

2888 ), 

2889 ( 

2890 "UNBOUNDED FOLLOWING" 

2891 if range_[1] is elements.RANGE_UNBOUNDED 

2892 else ( 

2893 "CURRENT ROW" 

2894 if range_[1] is elements.RANGE_CURRENT 

2895 else ( 

2896 "%s PRECEDING" 

2897 % ( 

2898 self.process( 

2899 elements.literal(abs(range_[1])), **kw 

2900 ), 

2901 ) 

2902 if range_[1] < 0 

2903 else "%s FOLLOWING" 

2904 % (self.process(elements.literal(range_[1]), **kw),) 

2905 ) 

2906 ) 

2907 ), 

2908 ) 

2909 

2910 def visit_over(self, over, **kwargs): 

2911 text = over.element._compiler_dispatch(self, **kwargs) 

2912 if over.range_ is not None: 

2913 range_ = "RANGE BETWEEN %s" % self._format_frame_clause( 

2914 over.range_, **kwargs 

2915 ) 

2916 elif over.rows is not None: 

2917 range_ = "ROWS BETWEEN %s" % self._format_frame_clause( 

2918 over.rows, **kwargs 

2919 ) 

2920 elif over.groups is not None: 

2921 range_ = "GROUPS BETWEEN %s" % self._format_frame_clause( 

2922 over.groups, **kwargs 

2923 ) 

2924 else: 

2925 range_ = None 

2926 

2927 return "%s OVER (%s)" % ( 

2928 text, 

2929 " ".join( 

2930 [ 

2931 "%s BY %s" 

2932 % (word, clause._compiler_dispatch(self, **kwargs)) 

2933 for word, clause in ( 

2934 ("PARTITION", over.partition_by), 

2935 ("ORDER", over.order_by), 

2936 ) 

2937 if clause is not None and len(clause) 

2938 ] 

2939 + ([range_] if range_ else []) 

2940 ), 

2941 ) 

2942 

2943 def visit_withingroup(self, withingroup, **kwargs): 

2944 return "%s WITHIN GROUP (ORDER BY %s)" % ( 

2945 withingroup.element._compiler_dispatch(self, **kwargs), 

2946 withingroup.order_by._compiler_dispatch(self, **kwargs), 

2947 ) 

2948 

2949 def visit_funcfilter(self, funcfilter, **kwargs): 

2950 return "%s FILTER (WHERE %s)" % ( 

2951 funcfilter.func._compiler_dispatch(self, **kwargs), 

2952 funcfilter.criterion._compiler_dispatch(self, **kwargs), 

2953 ) 

2954 

2955 def visit_extract(self, extract, **kwargs): 

2956 field = self.extract_map.get(extract.field, extract.field) 

2957 return "EXTRACT(%s FROM %s)" % ( 

2958 field, 

2959 extract.expr._compiler_dispatch(self, **kwargs), 

2960 ) 

2961 

2962 def visit_scalar_function_column(self, element, **kw): 

2963 compiled_fn = self.visit_function(element.fn, **kw) 

2964 compiled_col = self.visit_column(element, **kw) 

2965 return "(%s).%s" % (compiled_fn, compiled_col) 

2966 

2967 def visit_function( 

2968 self, 

2969 func: Function[Any], 

2970 add_to_result_map: Optional[_ResultMapAppender] = None, 

2971 **kwargs: Any, 

2972 ) -> str: 

2973 if add_to_result_map is not None: 

2974 add_to_result_map(func.name, func.name, (func.name,), func.type) 

2975 

2976 disp = getattr(self, "visit_%s_func" % func.name.lower(), None) 

2977 

2978 text: str 

2979 

2980 if disp: 

2981 text = disp(func, **kwargs) 

2982 else: 

2983 name = FUNCTIONS.get(func._deannotate().__class__, None) 

2984 if name: 

2985 if func._has_args: 

2986 name += "%(expr)s" 

2987 else: 

2988 name = func.name 

2989 name = ( 

2990 self.preparer.quote(name) 

2991 if self.preparer._requires_quotes_illegal_chars(name) 

2992 or isinstance(name, elements.quoted_name) 

2993 else name 

2994 ) 

2995 name = name + "%(expr)s" 

2996 text = ".".join( 

2997 [ 

2998 ( 

2999 self.preparer.quote(tok) 

3000 if self.preparer._requires_quotes_illegal_chars(tok) 

3001 or isinstance(name, elements.quoted_name) 

3002 else tok 

3003 ) 

3004 for tok in func.packagenames 

3005 ] 

3006 + [name] 

3007 ) % {"expr": self.function_argspec(func, **kwargs)} 

3008 

3009 if func._with_ordinality: 

3010 text += " WITH ORDINALITY" 

3011 return text 

3012 

3013 def visit_next_value_func(self, next_value, **kw): 

3014 return self.visit_sequence(next_value.sequence) 

3015 

3016 def visit_sequence(self, sequence, **kw): 

3017 raise NotImplementedError( 

3018 "Dialect '%s' does not support sequence increments." 

3019 % self.dialect.name 

3020 ) 

3021 

3022 def function_argspec(self, func: Function[Any], **kwargs: Any) -> str: 

3023 return func.clause_expr._compiler_dispatch(self, **kwargs) 

3024 

3025 def visit_compound_select( 

3026 self, cs, asfrom=False, compound_index=None, **kwargs 

3027 ): 

3028 toplevel = not self.stack 

3029 

3030 compile_state = cs._compile_state_factory(cs, self, **kwargs) 

3031 

3032 if toplevel and not self.compile_state: 

3033 self.compile_state = compile_state 

3034 

3035 compound_stmt = compile_state.statement 

3036 

3037 entry = self._default_stack_entry if toplevel else self.stack[-1] 

3038 need_result_map = toplevel or ( 

3039 not compound_index 

3040 and entry.get("need_result_map_for_compound", False) 

3041 ) 

3042 

3043 # indicates there is already a CompoundSelect in play 

3044 if compound_index == 0: 

3045 entry["select_0"] = cs 

3046 

3047 self.stack.append( 

3048 { 

3049 "correlate_froms": entry["correlate_froms"], 

3050 "asfrom_froms": entry["asfrom_froms"], 

3051 "selectable": cs, 

3052 "compile_state": compile_state, 

3053 "need_result_map_for_compound": need_result_map, 

3054 } 

3055 ) 

3056 

3057 if compound_stmt._independent_ctes: 

3058 self._dispatch_independent_ctes(compound_stmt, kwargs) 

3059 

3060 keyword = self.compound_keywords[cs.keyword] 

3061 

3062 text = (" " + keyword + " ").join( 

3063 ( 

3064 c._compiler_dispatch( 

3065 self, asfrom=asfrom, compound_index=i, **kwargs 

3066 ) 

3067 for i, c in enumerate(cs.selects) 

3068 ) 

3069 ) 

3070 

3071 kwargs["include_table"] = False 

3072 text += self.group_by_clause(cs, **dict(asfrom=asfrom, **kwargs)) 

3073 text += self.order_by_clause(cs, **kwargs) 

3074 if cs._has_row_limiting_clause: 

3075 text += self._row_limit_clause(cs, **kwargs) 

3076 

3077 if self.ctes: 

3078 nesting_level = len(self.stack) if not toplevel else None 

3079 text = ( 

3080 self._render_cte_clause( 

3081 nesting_level=nesting_level, 

3082 include_following_stack=True, 

3083 ) 

3084 + text 

3085 ) 

3086 

3087 self.stack.pop(-1) 

3088 return text 

3089 

3090 def _row_limit_clause(self, cs, **kwargs): 

3091 if cs._fetch_clause is not None: 

3092 return self.fetch_clause(cs, **kwargs) 

3093 else: 

3094 return self.limit_clause(cs, **kwargs) 

3095 

3096 def _get_operator_dispatch(self, operator_, qualifier1, qualifier2): 

3097 attrname = "visit_%s_%s%s" % ( 

3098 operator_.__name__, 

3099 qualifier1, 

3100 "_" + qualifier2 if qualifier2 else "", 

3101 ) 

3102 return getattr(self, attrname, None) 

3103 

3104 def visit_unary( 

3105 self, unary, add_to_result_map=None, result_map_targets=(), **kw 

3106 ): 

3107 if add_to_result_map is not None: 

3108 result_map_targets += (unary,) 

3109 kw["add_to_result_map"] = add_to_result_map 

3110 kw["result_map_targets"] = result_map_targets 

3111 

3112 if unary.operator: 

3113 if unary.modifier: 

3114 raise exc.CompileError( 

3115 "Unary expression does not support operator " 

3116 "and modifier simultaneously" 

3117 ) 

3118 disp = self._get_operator_dispatch( 

3119 unary.operator, "unary", "operator" 

3120 ) 

3121 if disp: 

3122 return disp(unary, unary.operator, **kw) 

3123 else: 

3124 return self._generate_generic_unary_operator( 

3125 unary, OPERATORS[unary.operator], **kw 

3126 ) 

3127 elif unary.modifier: 

3128 disp = self._get_operator_dispatch( 

3129 unary.modifier, "unary", "modifier" 

3130 ) 

3131 if disp: 

3132 return disp(unary, unary.modifier, **kw) 

3133 else: 

3134 return self._generate_generic_unary_modifier( 

3135 unary, OPERATORS[unary.modifier], **kw 

3136 ) 

3137 else: 

3138 raise exc.CompileError( 

3139 "Unary expression has no operator or modifier" 

3140 ) 

3141 

3142 def visit_truediv_binary(self, binary, operator, **kw): 

3143 if self.dialect.div_is_floordiv: 

3144 return ( 

3145 self.process(binary.left, **kw) 

3146 + " / " 

3147 # TODO: would need a fast cast again here, 

3148 # unless we want to use an implicit cast like "+ 0.0" 

3149 + self.process( 

3150 elements.Cast( 

3151 binary.right, 

3152 ( 

3153 binary.right.type 

3154 if binary.right.type._type_affinity 

3155 is sqltypes.Numeric 

3156 else sqltypes.Numeric() 

3157 ), 

3158 ), 

3159 **kw, 

3160 ) 

3161 ) 

3162 else: 

3163 return ( 

3164 self.process(binary.left, **kw) 

3165 + " / " 

3166 + self.process(binary.right, **kw) 

3167 ) 

3168 

3169 def visit_floordiv_binary(self, binary, operator, **kw): 

3170 if ( 

3171 self.dialect.div_is_floordiv 

3172 and binary.right.type._type_affinity is sqltypes.Integer 

3173 and binary.left.type._type_affinity is sqltypes.Integer 

3174 ): 

3175 return ( 

3176 self.process(binary.left, **kw) 

3177 + " / " 

3178 + self.process(binary.right, **kw) 

3179 ) 

3180 else: 

3181 return "FLOOR(%s)" % ( 

3182 self.process(binary.left, **kw) 

3183 + " / " 

3184 + self.process(binary.right, **kw) 

3185 ) 

3186 

3187 def visit_is_true_unary_operator(self, element, operator, **kw): 

3188 if ( 

3189 element._is_implicitly_boolean 

3190 or self.dialect.supports_native_boolean 

3191 ): 

3192 return self.process(element.element, **kw) 

3193 else: 

3194 return "%s = 1" % self.process(element.element, **kw) 

3195 

3196 def visit_is_false_unary_operator(self, element, operator, **kw): 

3197 if ( 

3198 element._is_implicitly_boolean 

3199 or self.dialect.supports_native_boolean 

3200 ): 

3201 return "NOT %s" % self.process(element.element, **kw) 

3202 else: 

3203 return "%s = 0" % self.process(element.element, **kw) 

3204 

3205 def visit_not_match_op_binary(self, binary, operator, **kw): 

3206 return "NOT %s" % self.visit_binary( 

3207 binary, override_operator=operators.match_op 

3208 ) 

3209 

3210 def visit_not_in_op_binary(self, binary, operator, **kw): 

3211 # The brackets are required in the NOT IN operation because the empty 

3212 # case is handled using the form "(col NOT IN (null) OR 1 = 1)". 

3213 # The presence of the OR makes the brackets required. 

3214 return "(%s)" % self._generate_generic_binary( 

3215 binary, OPERATORS[operator], **kw 

3216 ) 

3217 

3218 def visit_empty_set_op_expr(self, type_, expand_op, **kw): 

3219 if expand_op is operators.not_in_op: 

3220 if len(type_) > 1: 

3221 return "(%s)) OR (1 = 1" % ( 

3222 ", ".join("NULL" for element in type_) 

3223 ) 

3224 else: 

3225 return "NULL) OR (1 = 1" 

3226 elif expand_op is operators.in_op: 

3227 if len(type_) > 1: 

3228 return "(%s)) AND (1 != 1" % ( 

3229 ", ".join("NULL" for element in type_) 

3230 ) 

3231 else: 

3232 return "NULL) AND (1 != 1" 

3233 else: 

3234 return self.visit_empty_set_expr(type_) 

3235 

3236 def visit_empty_set_expr(self, element_types, **kw): 

3237 raise NotImplementedError( 

3238 "Dialect '%s' does not support empty set expression." 

3239 % self.dialect.name 

3240 ) 

3241 

3242 def _literal_execute_expanding_parameter_literal_binds( 

3243 self, parameter, values, bind_expression_template=None 

3244 ): 

3245 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(self.dialect) 

3246 

3247 if not values: 

3248 # empty IN expression. note we don't need to use 

3249 # bind_expression_template here because there are no 

3250 # expressions to render. 

3251 

3252 if typ_dialect_impl._is_tuple_type: 

3253 replacement_expression = ( 

3254 "VALUES " if self.dialect.tuple_in_values else "" 

3255 ) + self.visit_empty_set_op_expr( 

3256 parameter.type.types, parameter.expand_op 

3257 ) 

3258 

3259 else: 

3260 replacement_expression = self.visit_empty_set_op_expr( 

3261 [parameter.type], parameter.expand_op 

3262 ) 

3263 

3264 elif typ_dialect_impl._is_tuple_type or ( 

3265 typ_dialect_impl._isnull 

3266 and isinstance(values[0], collections_abc.Sequence) 

3267 and not isinstance(values[0], (str, bytes)) 

3268 ): 

3269 if typ_dialect_impl._has_bind_expression: 

3270 raise NotImplementedError( 

3271 "bind_expression() on TupleType not supported with " 

3272 "literal_binds" 

3273 ) 

3274 

3275 replacement_expression = ( 

3276 "VALUES " if self.dialect.tuple_in_values else "" 

3277 ) + ", ".join( 

3278 "(%s)" 

3279 % ( 

3280 ", ".join( 

3281 self.render_literal_value(value, param_type) 

3282 for value, param_type in zip( 

3283 tuple_element, parameter.type.types 

3284 ) 

3285 ) 

3286 ) 

3287 for i, tuple_element in enumerate(values) 

3288 ) 

3289 else: 

3290 if bind_expression_template: 

3291 post_compile_pattern = self._post_compile_pattern 

3292 m = post_compile_pattern.search(bind_expression_template) 

3293 assert m and m.group( 

3294 2 

3295 ), "unexpected format for expanding parameter" 

3296 

3297 tok = m.group(2).split("~~") 

3298 be_left, be_right = tok[1], tok[3] 

3299 replacement_expression = ", ".join( 

3300 "%s%s%s" 

3301 % ( 

3302 be_left, 

3303 self.render_literal_value(value, parameter.type), 

3304 be_right, 

3305 ) 

3306 for value in values 

3307 ) 

3308 else: 

3309 replacement_expression = ", ".join( 

3310 self.render_literal_value(value, parameter.type) 

3311 for value in values 

3312 ) 

3313 

3314 return (), replacement_expression 

3315 

3316 def _literal_execute_expanding_parameter(self, name, parameter, values): 

3317 if parameter.literal_execute: 

3318 return self._literal_execute_expanding_parameter_literal_binds( 

3319 parameter, values 

3320 ) 

3321 

3322 dialect = self.dialect 

3323 typ_dialect_impl = parameter.type._unwrapped_dialect_impl(dialect) 

3324 

3325 if self._numeric_binds: 

3326 bind_template = self.compilation_bindtemplate 

3327 else: 

3328 bind_template = self.bindtemplate 

3329 

3330 if ( 

3331 self.dialect._bind_typing_render_casts 

3332 and typ_dialect_impl.render_bind_cast 

3333 ): 

3334 

3335 def _render_bindtemplate(name): 

3336 return self.render_bind_cast( 

3337 parameter.type, 

3338 typ_dialect_impl, 

3339 bind_template % {"name": name}, 

3340 ) 

3341 

3342 else: 

3343 

3344 def _render_bindtemplate(name): 

3345 return bind_template % {"name": name} 

3346 

3347 if not values: 

3348 to_update = [] 

3349 if typ_dialect_impl._is_tuple_type: 

3350 replacement_expression = self.visit_empty_set_op_expr( 

3351 parameter.type.types, parameter.expand_op 

3352 ) 

3353 else: 

3354 replacement_expression = self.visit_empty_set_op_expr( 

3355 [parameter.type], parameter.expand_op 

3356 ) 

3357 

3358 elif typ_dialect_impl._is_tuple_type or ( 

3359 typ_dialect_impl._isnull 

3360 and isinstance(values[0], collections_abc.Sequence) 

3361 and not isinstance(values[0], (str, bytes)) 

3362 ): 

3363 assert not typ_dialect_impl._is_array 

3364 to_update = [ 

3365 ("%s_%s_%s" % (name, i, j), value) 

3366 for i, tuple_element in enumerate(values, 1) 

3367 for j, value in enumerate(tuple_element, 1) 

3368 ] 

3369 

3370 replacement_expression = ( 

3371 "VALUES " if dialect.tuple_in_values else "" 

3372 ) + ", ".join( 

3373 "(%s)" 

3374 % ( 

3375 ", ".join( 

3376 _render_bindtemplate( 

3377 to_update[i * len(tuple_element) + j][0] 

3378 ) 

3379 for j, value in enumerate(tuple_element) 

3380 ) 

3381 ) 

3382 for i, tuple_element in enumerate(values) 

3383 ) 

3384 else: 

3385 to_update = [ 

3386 ("%s_%s" % (name, i), value) 

3387 for i, value in enumerate(values, 1) 

3388 ] 

3389 replacement_expression = ", ".join( 

3390 _render_bindtemplate(key) for key, value in to_update 

3391 ) 

3392 

3393 return to_update, replacement_expression 

3394 

3395 def visit_binary( 

3396 self, 

3397 binary, 

3398 override_operator=None, 

3399 eager_grouping=False, 

3400 from_linter=None, 

3401 lateral_from_linter=None, 

3402 **kw, 

3403 ): 

3404 if from_linter and operators.is_comparison(binary.operator): 

3405 if lateral_from_linter is not None: 

3406 enclosing_lateral = kw["enclosing_lateral"] 

3407 lateral_from_linter.edges.update( 

3408 itertools.product( 

3409 _de_clone( 

3410 binary.left._from_objects + [enclosing_lateral] 

3411 ), 

3412 _de_clone( 

3413 binary.right._from_objects + [enclosing_lateral] 

3414 ), 

3415 ) 

3416 ) 

3417 else: 

3418 from_linter.edges.update( 

3419 itertools.product( 

3420 _de_clone(binary.left._from_objects), 

3421 _de_clone(binary.right._from_objects), 

3422 ) 

3423 ) 

3424 

3425 # don't allow "? = ?" to render 

3426 if ( 

3427 self.ansi_bind_rules 

3428 and isinstance(binary.left, elements.BindParameter) 

3429 and isinstance(binary.right, elements.BindParameter) 

3430 ): 

3431 kw["literal_execute"] = True 

3432 

3433 operator_ = override_operator or binary.operator 

3434 disp = self._get_operator_dispatch(operator_, "binary", None) 

3435 if disp: 

3436 return disp(binary, operator_, **kw) 

3437 else: 

3438 try: 

3439 opstring = OPERATORS[operator_] 

3440 except KeyError as err: 

3441 raise exc.UnsupportedCompilationError(self, operator_) from err 

3442 else: 

3443 return self._generate_generic_binary( 

3444 binary, 

3445 opstring, 

3446 from_linter=from_linter, 

3447 lateral_from_linter=lateral_from_linter, 

3448 **kw, 

3449 ) 

3450 

3451 def visit_function_as_comparison_op_binary(self, element, operator, **kw): 

3452 return self.process(element.sql_function, **kw) 

3453 

3454 def visit_mod_binary(self, binary, operator, **kw): 

3455 if self.preparer._double_percents: 

3456 return ( 

3457 self.process(binary.left, **kw) 

3458 + " %% " 

3459 + self.process(binary.right, **kw) 

3460 ) 

3461 else: 

3462 return ( 

3463 self.process(binary.left, **kw) 

3464 + " % " 

3465 + self.process(binary.right, **kw) 

3466 ) 

3467 

3468 def visit_custom_op_binary(self, element, operator, **kw): 

3469 kw["eager_grouping"] = operator.eager_grouping 

3470 return self._generate_generic_binary( 

3471 element, 

3472 " " + self.escape_literal_column(operator.opstring) + " ", 

3473 **kw, 

3474 ) 

3475 

3476 def visit_custom_op_unary_operator(self, element, operator, **kw): 

3477 return self._generate_generic_unary_operator( 

3478 element, self.escape_literal_column(operator.opstring) + " ", **kw 

3479 ) 

3480 

3481 def visit_custom_op_unary_modifier(self, element, operator, **kw): 

3482 return self._generate_generic_unary_modifier( 

3483 element, " " + self.escape_literal_column(operator.opstring), **kw 

3484 ) 

3485 

3486 def _generate_generic_binary( 

3487 self, 

3488 binary: BinaryExpression[Any], 

3489 opstring: str, 

3490 eager_grouping: bool = False, 

3491 **kw: Any, 

3492 ) -> str: 

3493 _in_operator_expression = kw.get("_in_operator_expression", False) 

3494 

3495 kw["_in_operator_expression"] = True 

3496 kw["_binary_op"] = binary.operator 

3497 text = ( 

3498 binary.left._compiler_dispatch( 

3499 self, eager_grouping=eager_grouping, **kw 

3500 ) 

3501 + opstring 

3502 + binary.right._compiler_dispatch( 

3503 self, eager_grouping=eager_grouping, **kw 

3504 ) 

3505 ) 

3506 

3507 if _in_operator_expression and eager_grouping: 

3508 text = "(%s)" % text 

3509 return text 

3510 

3511 def _generate_generic_unary_operator(self, unary, opstring, **kw): 

3512 return opstring + unary.element._compiler_dispatch(self, **kw) 

3513 

3514 def _generate_generic_unary_modifier(self, unary, opstring, **kw): 

3515 return unary.element._compiler_dispatch(self, **kw) + opstring 

3516 

3517 @util.memoized_property 

3518 def _like_percent_literal(self): 

3519 return elements.literal_column("'%'", type_=sqltypes.STRINGTYPE) 

3520 

3521 def visit_ilike_case_insensitive_operand(self, element, **kw): 

3522 return f"lower({element.element._compiler_dispatch(self, **kw)})" 

3523 

3524 def visit_contains_op_binary(self, binary, operator, **kw): 

3525 binary = binary._clone() 

3526 percent = self._like_percent_literal 

3527 binary.right = percent.concat(binary.right).concat(percent) 

3528 return self.visit_like_op_binary(binary, operator, **kw) 

3529 

3530 def visit_not_contains_op_binary(self, binary, operator, **kw): 

3531 binary = binary._clone() 

3532 percent = self._like_percent_literal 

3533 binary.right = percent.concat(binary.right).concat(percent) 

3534 return self.visit_not_like_op_binary(binary, operator, **kw) 

3535 

3536 def visit_icontains_op_binary(self, binary, operator, **kw): 

3537 binary = binary._clone() 

3538 percent = self._like_percent_literal 

3539 binary.left = ilike_case_insensitive(binary.left) 

3540 binary.right = percent.concat( 

3541 ilike_case_insensitive(binary.right) 

3542 ).concat(percent) 

3543 return self.visit_ilike_op_binary(binary, operator, **kw) 

3544 

3545 def visit_not_icontains_op_binary(self, binary, operator, **kw): 

3546 binary = binary._clone() 

3547 percent = self._like_percent_literal 

3548 binary.left = ilike_case_insensitive(binary.left) 

3549 binary.right = percent.concat( 

3550 ilike_case_insensitive(binary.right) 

3551 ).concat(percent) 

3552 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3553 

3554 def visit_startswith_op_binary(self, binary, operator, **kw): 

3555 binary = binary._clone() 

3556 percent = self._like_percent_literal 

3557 binary.right = percent._rconcat(binary.right) 

3558 return self.visit_like_op_binary(binary, operator, **kw) 

3559 

3560 def visit_not_startswith_op_binary(self, binary, operator, **kw): 

3561 binary = binary._clone() 

3562 percent = self._like_percent_literal 

3563 binary.right = percent._rconcat(binary.right) 

3564 return self.visit_not_like_op_binary(binary, operator, **kw) 

3565 

3566 def visit_istartswith_op_binary(self, binary, operator, **kw): 

3567 binary = binary._clone() 

3568 percent = self._like_percent_literal 

3569 binary.left = ilike_case_insensitive(binary.left) 

3570 binary.right = percent._rconcat(ilike_case_insensitive(binary.right)) 

3571 return self.visit_ilike_op_binary(binary, operator, **kw) 

3572 

3573 def visit_not_istartswith_op_binary(self, binary, operator, **kw): 

3574 binary = binary._clone() 

3575 percent = self._like_percent_literal 

3576 binary.left = ilike_case_insensitive(binary.left) 

3577 binary.right = percent._rconcat(ilike_case_insensitive(binary.right)) 

3578 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3579 

3580 def visit_endswith_op_binary(self, binary, operator, **kw): 

3581 binary = binary._clone() 

3582 percent = self._like_percent_literal 

3583 binary.right = percent.concat(binary.right) 

3584 return self.visit_like_op_binary(binary, operator, **kw) 

3585 

3586 def visit_not_endswith_op_binary(self, binary, operator, **kw): 

3587 binary = binary._clone() 

3588 percent = self._like_percent_literal 

3589 binary.right = percent.concat(binary.right) 

3590 return self.visit_not_like_op_binary(binary, operator, **kw) 

3591 

3592 def visit_iendswith_op_binary(self, binary, operator, **kw): 

3593 binary = binary._clone() 

3594 percent = self._like_percent_literal 

3595 binary.left = ilike_case_insensitive(binary.left) 

3596 binary.right = percent.concat(ilike_case_insensitive(binary.right)) 

3597 return self.visit_ilike_op_binary(binary, operator, **kw) 

3598 

3599 def visit_not_iendswith_op_binary(self, binary, operator, **kw): 

3600 binary = binary._clone() 

3601 percent = self._like_percent_literal 

3602 binary.left = ilike_case_insensitive(binary.left) 

3603 binary.right = percent.concat(ilike_case_insensitive(binary.right)) 

3604 return self.visit_not_ilike_op_binary(binary, operator, **kw) 

3605 

3606 def visit_like_op_binary(self, binary, operator, **kw): 

3607 escape = binary.modifiers.get("escape", None) 

3608 

3609 return "%s LIKE %s" % ( 

3610 binary.left._compiler_dispatch(self, **kw), 

3611 binary.right._compiler_dispatch(self, **kw), 

3612 ) + ( 

3613 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

3614 if escape is not None 

3615 else "" 

3616 ) 

3617 

3618 def visit_not_like_op_binary(self, binary, operator, **kw): 

3619 escape = binary.modifiers.get("escape", None) 

3620 return "%s NOT LIKE %s" % ( 

3621 binary.left._compiler_dispatch(self, **kw), 

3622 binary.right._compiler_dispatch(self, **kw), 

3623 ) + ( 

3624 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

3625 if escape is not None 

3626 else "" 

3627 ) 

3628 

3629 def visit_ilike_op_binary(self, binary, operator, **kw): 

3630 if operator is operators.ilike_op: 

3631 binary = binary._clone() 

3632 binary.left = ilike_case_insensitive(binary.left) 

3633 binary.right = ilike_case_insensitive(binary.right) 

3634 # else we assume ilower() has been applied 

3635 

3636 return self.visit_like_op_binary(binary, operator, **kw) 

3637 

3638 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

3639 if operator is operators.not_ilike_op: 

3640 binary = binary._clone() 

3641 binary.left = ilike_case_insensitive(binary.left) 

3642 binary.right = ilike_case_insensitive(binary.right) 

3643 # else we assume ilower() has been applied 

3644 

3645 return self.visit_not_like_op_binary(binary, operator, **kw) 

3646 

3647 def visit_between_op_binary(self, binary, operator, **kw): 

3648 symmetric = binary.modifiers.get("symmetric", False) 

3649 return self._generate_generic_binary( 

3650 binary, " BETWEEN SYMMETRIC " if symmetric else " BETWEEN ", **kw 

3651 ) 

3652 

3653 def visit_not_between_op_binary(self, binary, operator, **kw): 

3654 symmetric = binary.modifiers.get("symmetric", False) 

3655 return self._generate_generic_binary( 

3656 binary, 

3657 " NOT BETWEEN SYMMETRIC " if symmetric else " NOT BETWEEN ", 

3658 **kw, 

3659 ) 

3660 

3661 def visit_regexp_match_op_binary( 

3662 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3663 ) -> str: 

3664 raise exc.CompileError( 

3665 "%s dialect does not support regular expressions" 

3666 % self.dialect.name 

3667 ) 

3668 

3669 def visit_not_regexp_match_op_binary( 

3670 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3671 ) -> str: 

3672 raise exc.CompileError( 

3673 "%s dialect does not support regular expressions" 

3674 % self.dialect.name 

3675 ) 

3676 

3677 def visit_regexp_replace_op_binary( 

3678 self, binary: BinaryExpression[Any], operator: Any, **kw: Any 

3679 ) -> str: 

3680 raise exc.CompileError( 

3681 "%s dialect does not support regular expression replacements" 

3682 % self.dialect.name 

3683 ) 

3684 

3685 def visit_bindparam( 

3686 self, 

3687 bindparam, 

3688 within_columns_clause=False, 

3689 literal_binds=False, 

3690 skip_bind_expression=False, 

3691 literal_execute=False, 

3692 render_postcompile=False, 

3693 is_upsert_set=False, 

3694 **kwargs, 

3695 ): 

3696 # Detect parametrized bindparams in upsert SET clause for issue #13130 

3697 if ( 

3698 is_upsert_set 

3699 and bindparam.value is None 

3700 and bindparam.callable is None 

3701 and self._insertmanyvalues is not None 

3702 ): 

3703 self._insertmanyvalues = self._insertmanyvalues._replace( 

3704 has_upsert_bound_parameters=True 

3705 ) 

3706 

3707 if not skip_bind_expression: 

3708 impl = bindparam.type.dialect_impl(self.dialect) 

3709 if impl._has_bind_expression: 

3710 bind_expression = impl.bind_expression(bindparam) 

3711 wrapped = self.process( 

3712 bind_expression, 

3713 skip_bind_expression=True, 

3714 within_columns_clause=within_columns_clause, 

3715 literal_binds=literal_binds and not bindparam.expanding, 

3716 literal_execute=literal_execute, 

3717 render_postcompile=render_postcompile, 

3718 **kwargs, 

3719 ) 

3720 if bindparam.expanding: 

3721 # for postcompile w/ expanding, move the "wrapped" part 

3722 # of this into the inside 

3723 

3724 m = re.match( 

3725 r"^(.*)\(__\[POSTCOMPILE_(\S+?)\]\)(.*)$", wrapped 

3726 ) 

3727 assert m, "unexpected format for expanding parameter" 

3728 wrapped = "(__[POSTCOMPILE_%s~~%s~~REPL~~%s~~])" % ( 

3729 m.group(2), 

3730 m.group(1), 

3731 m.group(3), 

3732 ) 

3733 

3734 if literal_binds: 

3735 ret = self.render_literal_bindparam( 

3736 bindparam, 

3737 within_columns_clause=True, 

3738 bind_expression_template=wrapped, 

3739 **kwargs, 

3740 ) 

3741 return "(%s)" % ret 

3742 

3743 return wrapped 

3744 

3745 if not literal_binds: 

3746 literal_execute = ( 

3747 literal_execute 

3748 or bindparam.literal_execute 

3749 or (within_columns_clause and self.ansi_bind_rules) 

3750 ) 

3751 post_compile = literal_execute or bindparam.expanding 

3752 else: 

3753 post_compile = False 

3754 

3755 if literal_binds: 

3756 ret = self.render_literal_bindparam( 

3757 bindparam, within_columns_clause=True, **kwargs 

3758 ) 

3759 if bindparam.expanding: 

3760 ret = "(%s)" % ret 

3761 return ret 

3762 

3763 name = self._truncate_bindparam(bindparam) 

3764 

3765 if name in self.binds: 

3766 existing = self.binds[name] 

3767 if existing is not bindparam: 

3768 if ( 

3769 (existing.unique or bindparam.unique) 

3770 and not existing.proxy_set.intersection( 

3771 bindparam.proxy_set 

3772 ) 

3773 and not existing._cloned_set.intersection( 

3774 bindparam._cloned_set 

3775 ) 

3776 ): 

3777 raise exc.CompileError( 

3778 "Bind parameter '%s' conflicts with " 

3779 "unique bind parameter of the same name" % name 

3780 ) 

3781 elif existing.expanding != bindparam.expanding: 

3782 raise exc.CompileError( 

3783 "Can't reuse bound parameter name '%s' in both " 

3784 "'expanding' (e.g. within an IN expression) and " 

3785 "non-expanding contexts. If this parameter is to " 

3786 "receive a list/array value, set 'expanding=True' on " 

3787 "it for expressions that aren't IN, otherwise use " 

3788 "a different parameter name." % (name,) 

3789 ) 

3790 elif existing._is_crud or bindparam._is_crud: 

3791 if existing._is_crud and bindparam._is_crud: 

3792 # TODO: this condition is not well understood. 

3793 # see tests in test/sql/test_update.py 

3794 raise exc.CompileError( 

3795 "Encountered unsupported case when compiling an " 

3796 "INSERT or UPDATE statement. If this is a " 

3797 "multi-table " 

3798 "UPDATE statement, please provide string-named " 

3799 "arguments to the " 

3800 "values() method with distinct names; support for " 

3801 "multi-table UPDATE statements that " 

3802 "target multiple tables for UPDATE is very " 

3803 "limited", 

3804 ) 

3805 else: 

3806 raise exc.CompileError( 

3807 f"bindparam() name '{bindparam.key}' is reserved " 

3808 "for automatic usage in the VALUES or SET " 

3809 "clause of this " 

3810 "insert/update statement. Please use a " 

3811 "name other than column name when using " 

3812 "bindparam() " 

3813 "with insert() or update() (for example, " 

3814 f"'b_{bindparam.key}')." 

3815 ) 

3816 

3817 self.binds[bindparam.key] = self.binds[name] = bindparam 

3818 

3819 # if we are given a cache key that we're going to match against, 

3820 # relate the bindparam here to one that is most likely present 

3821 # in the "extracted params" portion of the cache key. this is used 

3822 # to set up a positional mapping that is used to determine the 

3823 # correct parameters for a subsequent use of this compiled with 

3824 # a different set of parameter values. here, we accommodate for 

3825 # parameters that may have been cloned both before and after the cache 

3826 # key was been generated. 

3827 ckbm_tuple = self._cache_key_bind_match 

3828 

3829 if ckbm_tuple: 

3830 ckbm, cksm = ckbm_tuple 

3831 for bp in bindparam._cloned_set: 

3832 if bp.key in cksm: 

3833 cb = cksm[bp.key] 

3834 ckbm[cb].append(bindparam) 

3835 

3836 if bindparam.isoutparam: 

3837 self.has_out_parameters = True 

3838 

3839 if post_compile: 

3840 if render_postcompile: 

3841 self._render_postcompile = True 

3842 

3843 if literal_execute: 

3844 self.literal_execute_params |= {bindparam} 

3845 else: 

3846 self.post_compile_params |= {bindparam} 

3847 

3848 ret = self.bindparam_string( 

3849 name, 

3850 post_compile=post_compile, 

3851 expanding=bindparam.expanding, 

3852 bindparam_type=bindparam.type, 

3853 **kwargs, 

3854 ) 

3855 

3856 if bindparam.expanding: 

3857 ret = "(%s)" % ret 

3858 

3859 return ret 

3860 

3861 def render_bind_cast(self, type_, dbapi_type, sqltext): 

3862 raise NotImplementedError() 

3863 

3864 def render_literal_bindparam( 

3865 self, 

3866 bindparam, 

3867 render_literal_value=NO_ARG, 

3868 bind_expression_template=None, 

3869 **kw, 

3870 ): 

3871 if render_literal_value is not NO_ARG: 

3872 value = render_literal_value 

3873 else: 

3874 if bindparam.value is None and bindparam.callable is None: 

3875 op = kw.get("_binary_op", None) 

3876 if op and op not in (operators.is_, operators.is_not): 

3877 util.warn_limited( 

3878 "Bound parameter '%s' rendering literal NULL in a SQL " 

3879 "expression; comparisons to NULL should not use " 

3880 "operators outside of 'is' or 'is not'", 

3881 (bindparam.key,), 

3882 ) 

3883 return self.process(sqltypes.NULLTYPE, **kw) 

3884 value = bindparam.effective_value 

3885 

3886 if bindparam.expanding: 

3887 leep = self._literal_execute_expanding_parameter_literal_binds 

3888 to_update, replacement_expr = leep( 

3889 bindparam, 

3890 value, 

3891 bind_expression_template=bind_expression_template, 

3892 ) 

3893 return replacement_expr 

3894 else: 

3895 return self.render_literal_value(value, bindparam.type) 

3896 

3897 def render_literal_value( 

3898 self, value: Any, type_: sqltypes.TypeEngine[Any] 

3899 ) -> str: 

3900 """Render the value of a bind parameter as a quoted literal. 

3901 

3902 This is used for statement sections that do not accept bind parameters 

3903 on the target driver/database. 

3904 

3905 This should be implemented by subclasses using the quoting services 

3906 of the DBAPI. 

3907 

3908 """ 

3909 

3910 if value is None and not type_.should_evaluate_none: 

3911 # issue #10535 - handle NULL in the compiler without placing 

3912 # this onto each type, except for "evaluate None" types 

3913 # (e.g. JSON) 

3914 return self.process(elements.Null._instance()) 

3915 

3916 processor = type_._cached_literal_processor(self.dialect) 

3917 if processor: 

3918 try: 

3919 return processor(value) 

3920 except Exception as e: 

3921 raise exc.CompileError( 

3922 f"Could not render literal value " 

3923 f'"{sql_util._repr_single_value(value)}" ' 

3924 f"with datatype " 

3925 f"{type_}; see parent stack trace for " 

3926 "more detail." 

3927 ) from e 

3928 

3929 else: 

3930 raise exc.CompileError( 

3931 f"No literal value renderer is available for literal value " 

3932 f'"{sql_util._repr_single_value(value)}" ' 

3933 f"with datatype {type_}" 

3934 ) 

3935 

3936 def _truncate_bindparam(self, bindparam): 

3937 if bindparam in self.bind_names: 

3938 return self.bind_names[bindparam] 

3939 

3940 bind_name = bindparam.key 

3941 if isinstance(bind_name, elements._truncated_label): 

3942 bind_name = self._truncated_identifier("bindparam", bind_name) 

3943 

3944 # add to bind_names for translation 

3945 self.bind_names[bindparam] = bind_name 

3946 

3947 return bind_name 

3948 

3949 def _truncated_identifier( 

3950 self, ident_class: str, name: _truncated_label 

3951 ) -> str: 

3952 if (ident_class, name) in self.truncated_names: 

3953 return self.truncated_names[(ident_class, name)] 

3954 

3955 anonname = name.apply_map(self.anon_map) 

3956 

3957 if len(anonname) > self.label_length - 6: 

3958 counter = self._truncated_counters.get(ident_class, 1) 

3959 truncname = ( 

3960 anonname[0 : max(self.label_length - 6, 0)] 

3961 + "_" 

3962 + hex(counter)[2:] 

3963 ) 

3964 self._truncated_counters[ident_class] = counter + 1 

3965 else: 

3966 truncname = anonname 

3967 self.truncated_names[(ident_class, name)] = truncname 

3968 return truncname 

3969 

3970 def _anonymize(self, name: str) -> str: 

3971 return name % self.anon_map 

3972 

3973 def bindparam_string( 

3974 self, 

3975 name: str, 

3976 post_compile: bool = False, 

3977 expanding: bool = False, 

3978 escaped_from: Optional[str] = None, 

3979 bindparam_type: Optional[TypeEngine[Any]] = None, 

3980 accumulate_bind_names: Optional[Set[str]] = None, 

3981 visited_bindparam: Optional[List[str]] = None, 

3982 **kw: Any, 

3983 ) -> str: 

3984 # TODO: accumulate_bind_names is passed by crud.py to gather 

3985 # names on a per-value basis, visited_bindparam is passed by 

3986 # visit_insert() to collect all parameters in the statement. 

3987 # see if this gathering can be simplified somehow 

3988 if accumulate_bind_names is not None: 

3989 accumulate_bind_names.add(name) 

3990 if visited_bindparam is not None: 

3991 visited_bindparam.append(name) 

3992 

3993 if not escaped_from: 

3994 if self._bind_translate_re.search(name): 

3995 # not quite the translate use case as we want to 

3996 # also get a quick boolean if we even found 

3997 # unusual characters in the name 

3998 new_name = self._bind_translate_re.sub( 

3999 lambda m: self._bind_translate_chars[m.group(0)], 

4000 name, 

4001 ) 

4002 escaped_from = name 

4003 name = new_name 

4004 

4005 if escaped_from: 

4006 self.escaped_bind_names = self.escaped_bind_names.union( 

4007 {escaped_from: name} 

4008 ) 

4009 if post_compile: 

4010 ret = "__[POSTCOMPILE_%s]" % name 

4011 if expanding: 

4012 # for expanding, bound parameters or literal values will be 

4013 # rendered per item 

4014 return ret 

4015 

4016 # otherwise, for non-expanding "literal execute", apply 

4017 # bind casts as determined by the datatype 

4018 if bindparam_type is not None: 

4019 type_impl = bindparam_type._unwrapped_dialect_impl( 

4020 self.dialect 

4021 ) 

4022 if type_impl.render_literal_cast: 

4023 ret = self.render_bind_cast(bindparam_type, type_impl, ret) 

4024 return ret 

4025 elif self.state is CompilerState.COMPILING: 

4026 ret = self.compilation_bindtemplate % {"name": name} 

4027 else: 

4028 ret = self.bindtemplate % {"name": name} 

4029 

4030 if ( 

4031 bindparam_type is not None 

4032 and self.dialect._bind_typing_render_casts 

4033 ): 

4034 type_impl = bindparam_type._unwrapped_dialect_impl(self.dialect) 

4035 if type_impl.render_bind_cast: 

4036 ret = self.render_bind_cast(bindparam_type, type_impl, ret) 

4037 

4038 return ret 

4039 

4040 def _dispatch_independent_ctes(self, stmt, kw): 

4041 local_kw = kw.copy() 

4042 local_kw.pop("cte_opts", None) 

4043 for cte, opt in zip( 

4044 stmt._independent_ctes, stmt._independent_ctes_opts 

4045 ): 

4046 cte._compiler_dispatch(self, cte_opts=opt, **local_kw) 

4047 

4048 def visit_cte( 

4049 self, 

4050 cte: CTE, 

4051 asfrom: bool = False, 

4052 ashint: bool = False, 

4053 fromhints: Optional[_FromHintsType] = None, 

4054 visiting_cte: Optional[CTE] = None, 

4055 from_linter: Optional[FromLinter] = None, 

4056 cte_opts: selectable._CTEOpts = selectable._CTEOpts(False), 

4057 **kwargs: Any, 

4058 ) -> Optional[str]: 

4059 self_ctes = self._init_cte_state() 

4060 assert self_ctes is self.ctes 

4061 

4062 kwargs["visiting_cte"] = cte 

4063 

4064 cte_name = cte.name 

4065 

4066 if isinstance(cte_name, elements._truncated_label): 

4067 cte_name = self._truncated_identifier("alias", cte_name) 

4068 

4069 is_new_cte = True 

4070 embedded_in_current_named_cte = False 

4071 

4072 _reference_cte = cte._get_reference_cte() 

4073 

4074 nesting = cte.nesting or cte_opts.nesting 

4075 

4076 # check for CTE already encountered 

4077 if _reference_cte in self.level_name_by_cte: 

4078 cte_level, _, existing_cte_opts = self.level_name_by_cte[ 

4079 _reference_cte 

4080 ] 

4081 assert _ == cte_name 

4082 

4083 cte_level_name = (cte_level, cte_name) 

4084 existing_cte = self.ctes_by_level_name[cte_level_name] 

4085 

4086 # check if we are receiving it here with a specific 

4087 # "nest_here" location; if so, move it to this location 

4088 

4089 if cte_opts.nesting: 

4090 if existing_cte_opts.nesting: 

4091 raise exc.CompileError( 

4092 "CTE is stated as 'nest_here' in " 

4093 "more than one location" 

4094 ) 

4095 

4096 old_level_name = (cte_level, cte_name) 

4097 cte_level = len(self.stack) if nesting else 1 

4098 cte_level_name = new_level_name = (cte_level, cte_name) 

4099 

4100 del self.ctes_by_level_name[old_level_name] 

4101 self.ctes_by_level_name[new_level_name] = existing_cte 

4102 self.level_name_by_cte[_reference_cte] = new_level_name + ( 

4103 cte_opts, 

4104 ) 

4105 

4106 else: 

4107 cte_level = len(self.stack) if nesting else 1 

4108 cte_level_name = (cte_level, cte_name) 

4109 

4110 if cte_level_name in self.ctes_by_level_name: 

4111 existing_cte = self.ctes_by_level_name[cte_level_name] 

4112 else: 

4113 existing_cte = None 

4114 

4115 if existing_cte is not None: 

4116 embedded_in_current_named_cte = visiting_cte is existing_cte 

4117 

4118 # we've generated a same-named CTE that we are enclosed in, 

4119 # or this is the same CTE. just return the name. 

4120 if cte is existing_cte._restates or cte is existing_cte: 

4121 is_new_cte = False 

4122 elif existing_cte is cte._restates: 

4123 # we've generated a same-named CTE that is 

4124 # enclosed in us - we take precedence, so 

4125 # discard the text for the "inner". 

4126 del self_ctes[existing_cte] 

4127 

4128 existing_cte_reference_cte = existing_cte._get_reference_cte() 

4129 

4130 assert existing_cte_reference_cte is _reference_cte 

4131 assert existing_cte_reference_cte is existing_cte 

4132 

4133 del self.level_name_by_cte[existing_cte_reference_cte] 

4134 else: 

4135 if ( 

4136 # if the two CTEs have the same hash, which we expect 

4137 # here means that one/both is an annotated of the other 

4138 (hash(cte) == hash(existing_cte)) 

4139 # or... 

4140 or ( 

4141 ( 

4142 # if they are clones, i.e. they came from the ORM 

4143 # or some other visit method 

4144 cte._is_clone_of is not None 

4145 or existing_cte._is_clone_of is not None 

4146 ) 

4147 # and are deep-copy identical 

4148 and cte.compare(existing_cte) 

4149 ) 

4150 ): 

4151 # then consider these two CTEs the same 

4152 is_new_cte = False 

4153 else: 

4154 # otherwise these are two CTEs that either will render 

4155 # differently, or were indicated separately by the user, 

4156 # with the same name 

4157 raise exc.CompileError( 

4158 "Multiple, unrelated CTEs found with " 

4159 "the same name: %r" % cte_name 

4160 ) 

4161 

4162 if not asfrom and not is_new_cte: 

4163 return None 

4164 

4165 if cte._cte_alias is not None: 

4166 pre_alias_cte = cte._cte_alias 

4167 cte_pre_alias_name = cte._cte_alias.name 

4168 if isinstance(cte_pre_alias_name, elements._truncated_label): 

4169 cte_pre_alias_name = self._truncated_identifier( 

4170 "alias", cte_pre_alias_name 

4171 ) 

4172 else: 

4173 pre_alias_cte = cte 

4174 cte_pre_alias_name = None 

4175 

4176 if is_new_cte: 

4177 self.ctes_by_level_name[cte_level_name] = cte 

4178 self.level_name_by_cte[_reference_cte] = cte_level_name + ( 

4179 cte_opts, 

4180 ) 

4181 

4182 if pre_alias_cte not in self.ctes: 

4183 self.visit_cte(pre_alias_cte, **kwargs) 

4184 

4185 if not cte_pre_alias_name and cte not in self_ctes: 

4186 if cte.recursive: 

4187 self.ctes_recursive = True 

4188 text = self.preparer.format_alias(cte, cte_name) 

4189 if cte.recursive or cte.element.name_cte_columns: 

4190 col_source = cte.element 

4191 

4192 # TODO: can we get at the .columns_plus_names collection 

4193 # that is already (or will be?) generated for the SELECT 

4194 # rather than calling twice? 

4195 recur_cols = [ 

4196 # TODO: proxy_name is not technically safe, 

4197 # see test_cte-> 

4198 # test_with_recursive_no_name_currently_buggy. not 

4199 # clear what should be done with such a case 

4200 fallback_label_name or proxy_name 

4201 for ( 

4202 _, 

4203 proxy_name, 

4204 fallback_label_name, 

4205 c, 

4206 repeated, 

4207 ) in (col_source._generate_columns_plus_names(True)) 

4208 if not repeated 

4209 ] 

4210 

4211 text += "(%s)" % ( 

4212 ", ".join( 

4213 self.preparer.format_label_name( 

4214 ident, anon_map=self.anon_map 

4215 ) 

4216 for ident in recur_cols 

4217 ) 

4218 ) 

4219 

4220 assert kwargs.get("subquery", False) is False 

4221 

4222 if not self.stack: 

4223 # toplevel, this is a stringify of the 

4224 # cte directly. just compile the inner 

4225 # the way alias() does. 

4226 return cte.element._compiler_dispatch( 

4227 self, asfrom=asfrom, **kwargs 

4228 ) 

4229 else: 

4230 prefixes = self._generate_prefixes( 

4231 cte, cte._prefixes, **kwargs 

4232 ) 

4233 inner = cte.element._compiler_dispatch( 

4234 self, asfrom=True, **kwargs 

4235 ) 

4236 

4237 text += " AS %s\n(%s)" % (prefixes, inner) 

4238 

4239 if cte._suffixes: 

4240 text += " " + self._generate_prefixes( 

4241 cte, cte._suffixes, **kwargs 

4242 ) 

4243 

4244 self_ctes[cte] = text 

4245 

4246 if asfrom: 

4247 if from_linter: 

4248 from_linter.froms[cte._de_clone()] = cte_name 

4249 

4250 if not is_new_cte and embedded_in_current_named_cte: 

4251 return self.preparer.format_alias(cte, cte_name) 

4252 

4253 if cte_pre_alias_name: 

4254 text = self.preparer.format_alias(cte, cte_pre_alias_name) 

4255 if self.preparer._requires_quotes(cte_name): 

4256 cte_name = self.preparer.quote(cte_name) 

4257 text += self.get_render_as_alias_suffix(cte_name) 

4258 return text # type: ignore[no-any-return] 

4259 else: 

4260 return self.preparer.format_alias(cte, cte_name) 

4261 

4262 return None 

4263 

4264 def visit_table_valued_alias(self, element, **kw): 

4265 if element.joins_implicitly: 

4266 kw["from_linter"] = None 

4267 if element._is_lateral: 

4268 return self.visit_lateral(element, **kw) 

4269 else: 

4270 return self.visit_alias(element, **kw) 

4271 

4272 def visit_table_valued_column(self, element, **kw): 

4273 return self.visit_column(element, **kw) 

4274 

4275 def visit_alias( 

4276 self, 

4277 alias, 

4278 asfrom=False, 

4279 ashint=False, 

4280 iscrud=False, 

4281 fromhints=None, 

4282 subquery=False, 

4283 lateral=False, 

4284 enclosing_alias=None, 

4285 from_linter=None, 

4286 **kwargs, 

4287 ): 

4288 if lateral: 

4289 if "enclosing_lateral" not in kwargs: 

4290 # if lateral is set and enclosing_lateral is not 

4291 # present, we assume we are being called directly 

4292 # from visit_lateral() and we need to set enclosing_lateral. 

4293 assert alias._is_lateral 

4294 kwargs["enclosing_lateral"] = alias 

4295 

4296 # for lateral objects, we track a second from_linter that is... 

4297 # lateral! to the level above us. 

4298 if ( 

4299 from_linter 

4300 and "lateral_from_linter" not in kwargs 

4301 and "enclosing_lateral" in kwargs 

4302 ): 

4303 kwargs["lateral_from_linter"] = from_linter 

4304 

4305 if enclosing_alias is not None and enclosing_alias.element is alias: 

4306 inner = alias.element._compiler_dispatch( 

4307 self, 

4308 asfrom=asfrom, 

4309 ashint=ashint, 

4310 iscrud=iscrud, 

4311 fromhints=fromhints, 

4312 lateral=lateral, 

4313 enclosing_alias=alias, 

4314 **kwargs, 

4315 ) 

4316 if subquery and (asfrom or lateral): 

4317 inner = "(%s)" % (inner,) 

4318 return inner 

4319 else: 

4320 kwargs["enclosing_alias"] = alias 

4321 

4322 if asfrom or ashint: 

4323 if isinstance(alias.name, elements._truncated_label): 

4324 alias_name = self._truncated_identifier("alias", alias.name) 

4325 else: 

4326 alias_name = alias.name 

4327 

4328 if ashint: 

4329 return self.preparer.format_alias(alias, alias_name) 

4330 elif asfrom: 

4331 if from_linter: 

4332 from_linter.froms[alias._de_clone()] = alias_name 

4333 

4334 inner = alias.element._compiler_dispatch( 

4335 self, asfrom=True, lateral=lateral, **kwargs 

4336 ) 

4337 if subquery: 

4338 inner = "(%s)" % (inner,) 

4339 

4340 ret = inner + self.get_render_as_alias_suffix( 

4341 self.preparer.format_alias(alias, alias_name) 

4342 ) 

4343 

4344 if alias._supports_derived_columns and alias._render_derived: 

4345 ret += "(%s)" % ( 

4346 ", ".join( 

4347 "%s%s" 

4348 % ( 

4349 self.preparer.quote(col.name), 

4350 ( 

4351 " %s" 

4352 % self.dialect.type_compiler_instance.process( 

4353 col.type, **kwargs 

4354 ) 

4355 if alias._render_derived_w_types 

4356 else "" 

4357 ), 

4358 ) 

4359 for col in alias.c 

4360 ) 

4361 ) 

4362 

4363 if fromhints and alias in fromhints: 

4364 ret = self.format_from_hint_text( 

4365 ret, alias, fromhints[alias], iscrud 

4366 ) 

4367 

4368 return ret 

4369 else: 

4370 # note we cancel the "subquery" flag here as well 

4371 return alias.element._compiler_dispatch( 

4372 self, lateral=lateral, **kwargs 

4373 ) 

4374 

4375 def visit_subquery(self, subquery, **kw): 

4376 kw["subquery"] = True 

4377 return self.visit_alias(subquery, **kw) 

4378 

4379 def visit_lateral(self, lateral_, **kw): 

4380 kw["lateral"] = True 

4381 return "LATERAL %s" % self.visit_alias(lateral_, **kw) 

4382 

4383 def visit_tablesample(self, tablesample, asfrom=False, **kw): 

4384 text = "%s TABLESAMPLE %s" % ( 

4385 self.visit_alias(tablesample, asfrom=True, **kw), 

4386 tablesample._get_method()._compiler_dispatch(self, **kw), 

4387 ) 

4388 

4389 if tablesample.seed is not None: 

4390 text += " REPEATABLE (%s)" % ( 

4391 tablesample.seed._compiler_dispatch(self, **kw) 

4392 ) 

4393 

4394 return text 

4395 

4396 def _render_values(self, element, **kw): 

4397 kw.setdefault("literal_binds", element.literal_binds) 

4398 tuples = ", ".join( 

4399 self.process( 

4400 elements.Tuple( 

4401 types=element._column_types, *elem 

4402 ).self_group(), 

4403 **kw, 

4404 ) 

4405 for chunk in element._data 

4406 for elem in chunk 

4407 ) 

4408 return f"VALUES {tuples}" 

4409 

4410 def visit_values( 

4411 self, element, asfrom=False, from_linter=None, visiting_cte=None, **kw 

4412 ): 

4413 

4414 if element._independent_ctes: 

4415 self._dispatch_independent_ctes(element, kw) 

4416 

4417 v = self._render_values(element, **kw) 

4418 

4419 if element._unnamed: 

4420 name = None 

4421 elif isinstance(element.name, elements._truncated_label): 

4422 name = self._truncated_identifier("values", element.name) 

4423 else: 

4424 name = element.name 

4425 

4426 if element._is_lateral: 

4427 lateral = "LATERAL " 

4428 else: 

4429 lateral = "" 

4430 

4431 if asfrom: 

4432 if from_linter: 

4433 from_linter.froms[element._de_clone()] = ( 

4434 name if name is not None else "(unnamed VALUES element)" 

4435 ) 

4436 

4437 if visiting_cte is not None and visiting_cte.element is element: 

4438 if element._is_lateral: 

4439 raise exc.CompileError( 

4440 "Can't use a LATERAL VALUES expression inside of a CTE" 

4441 ) 

4442 elif name: 

4443 kw["include_table"] = False 

4444 v = "%s(%s)%s (%s)" % ( 

4445 lateral, 

4446 v, 

4447 self.get_render_as_alias_suffix(self.preparer.quote(name)), 

4448 ( 

4449 ", ".join( 

4450 c._compiler_dispatch(self, **kw) 

4451 for c in element.columns 

4452 ) 

4453 ), 

4454 ) 

4455 else: 

4456 v = "%s(%s)" % (lateral, v) 

4457 return v 

4458 

4459 def visit_scalar_values(self, element, **kw): 

4460 return f"({self._render_values(element, **kw)})" 

4461 

4462 def get_render_as_alias_suffix(self, alias_name_text): 

4463 return " AS " + alias_name_text 

4464 

4465 def _add_to_result_map( 

4466 self, 

4467 keyname: str, 

4468 name: str, 

4469 objects: Tuple[Any, ...], 

4470 type_: TypeEngine[Any], 

4471 ) -> None: 

4472 

4473 # note objects must be non-empty for cursor.py to handle the 

4474 # collection properly 

4475 assert objects 

4476 

4477 if keyname is None or keyname == "*": 

4478 self._ordered_columns = False 

4479 self._ad_hoc_textual = True 

4480 if type_._is_tuple_type: 

4481 raise exc.CompileError( 

4482 "Most backends don't support SELECTing " 

4483 "from a tuple() object. If this is an ORM query, " 

4484 "consider using the Bundle object." 

4485 ) 

4486 self._result_columns.append( 

4487 ResultColumnsEntry(keyname, name, objects, type_) 

4488 ) 

4489 

4490 def _label_returning_column( 

4491 self, stmt, column, populate_result_map, column_clause_args=None, **kw 

4492 ): 

4493 """Render a column with necessary labels inside of a RETURNING clause. 

4494 

4495 This method is provided for individual dialects in place of calling 

4496 the _label_select_column method directly, so that the two use cases 

4497 of RETURNING vs. SELECT can be disambiguated going forward. 

4498 

4499 .. versionadded:: 1.4.21 

4500 

4501 """ 

4502 return self._label_select_column( 

4503 None, 

4504 column, 

4505 populate_result_map, 

4506 False, 

4507 {} if column_clause_args is None else column_clause_args, 

4508 **kw, 

4509 ) 

4510 

4511 def _label_select_column( 

4512 self, 

4513 select, 

4514 column, 

4515 populate_result_map, 

4516 asfrom, 

4517 column_clause_args, 

4518 name=None, 

4519 proxy_name=None, 

4520 fallback_label_name=None, 

4521 within_columns_clause=True, 

4522 column_is_repeated=False, 

4523 need_column_expressions=False, 

4524 include_table=True, 

4525 ): 

4526 """produce labeled columns present in a select().""" 

4527 impl = column.type.dialect_impl(self.dialect) 

4528 

4529 if impl._has_column_expression and ( 

4530 need_column_expressions or populate_result_map 

4531 ): 

4532 col_expr = impl.column_expression(column) 

4533 else: 

4534 col_expr = column 

4535 

4536 if populate_result_map: 

4537 # pass an "add_to_result_map" callable into the compilation 

4538 # of embedded columns. this collects information about the 

4539 # column as it will be fetched in the result and is coordinated 

4540 # with cursor.description when the query is executed. 

4541 add_to_result_map = self._add_to_result_map 

4542 

4543 # if the SELECT statement told us this column is a repeat, 

4544 # wrap the callable with one that prevents the addition of the 

4545 # targets 

4546 if column_is_repeated: 

4547 _add_to_result_map = add_to_result_map 

4548 

4549 def add_to_result_map(keyname, name, objects, type_): 

4550 _add_to_result_map(keyname, name, (keyname,), type_) 

4551 

4552 # if we redefined col_expr for type expressions, wrap the 

4553 # callable with one that adds the original column to the targets 

4554 elif col_expr is not column: 

4555 _add_to_result_map = add_to_result_map 

4556 

4557 def add_to_result_map(keyname, name, objects, type_): 

4558 _add_to_result_map( 

4559 keyname, name, (column,) + objects, type_ 

4560 ) 

4561 

4562 else: 

4563 add_to_result_map = None 

4564 

4565 # this method is used by some of the dialects for RETURNING, 

4566 # which has different inputs. _label_returning_column was added 

4567 # as the better target for this now however for 1.4 we will keep 

4568 # _label_select_column directly compatible with this use case. 

4569 # these assertions right now set up the current expected inputs 

4570 assert within_columns_clause, ( 

4571 "_label_select_column is only relevant within " 

4572 "the columns clause of a SELECT or RETURNING" 

4573 ) 

4574 result_expr: Union[elements.Label[Any], _CompileLabel] 

4575 

4576 if isinstance(column, elements.Label): 

4577 if col_expr is not column: 

4578 result_expr = _CompileLabel( 

4579 col_expr, column.name, alt_names=(column.element,) 

4580 ) 

4581 else: 

4582 result_expr = col_expr 

4583 

4584 elif name: 

4585 # here, _columns_plus_names has determined there's an explicit 

4586 # label name we need to use. this is the default for 

4587 # tablenames_plus_columnnames as well as when columns are being 

4588 # deduplicated on name 

4589 

4590 assert ( 

4591 proxy_name is not None 

4592 ), "proxy_name is required if 'name' is passed" 

4593 

4594 result_expr = _CompileLabel( 

4595 col_expr, 

4596 name, 

4597 alt_names=( 

4598 proxy_name, 

4599 # this is a hack to allow legacy result column lookups 

4600 # to work as they did before; this goes away in 2.0. 

4601 # TODO: this only seems to be tested indirectly 

4602 # via test/orm/test_deprecations.py. should be a 

4603 # resultset test for this 

4604 column._tq_label, 

4605 ), 

4606 ) 

4607 else: 

4608 # determine here whether this column should be rendered in 

4609 # a labelled context or not, as we were given no required label 

4610 # name from the caller. Here we apply heuristics based on the kind 

4611 # of SQL expression involved. 

4612 

4613 if col_expr is not column: 

4614 # type-specific expression wrapping the given column, 

4615 # so we render a label 

4616 render_with_label = True 

4617 elif isinstance(column, elements.ColumnClause): 

4618 # table-bound column, we render its name as a label if we are 

4619 # inside of a subquery only 

4620 render_with_label = ( 

4621 asfrom 

4622 and not column.is_literal 

4623 and column.table is not None 

4624 ) 

4625 elif isinstance(column, elements.TextClause): 

4626 render_with_label = False 

4627 elif isinstance(column, elements.UnaryExpression): 

4628 # unary expression. notes added as of #12681 

4629 # 

4630 # By convention, the visit_unary() method 

4631 # itself does not add an entry to the result map, and relies 

4632 # upon either the inner expression creating a result map 

4633 # entry, or if not, by creating a label here that produces 

4634 # the result map entry. Where that happens is based on whether 

4635 # or not the element immediately inside the unary is a 

4636 # NamedColumn subclass or not. 

4637 # 

4638 # Now, this also impacts how the SELECT is written; if 

4639 # we decide to generate a label here, we get the usual 

4640 # "~(x+y) AS anon_1" thing in the columns clause. If we 

4641 # don't, we don't get an AS at all, we get like 

4642 # "~table.column". 

4643 # 

4644 # But here is the important thing as of modernish (like 1.4) 

4645 # versions of SQLAlchemy - **whether or not the AS <label> 

4646 # is present in the statement is not actually important**. 

4647 # We target result columns **positionally** for a fully 

4648 # compiled ``Select()`` object; before 1.4 we needed those 

4649 # labels to match in cursor.description etc etc but now it 

4650 # really doesn't matter. 

4651 # So really, we could set render_with_label True in all cases. 

4652 # Or we could just have visit_unary() populate the result map 

4653 # in all cases. 

4654 # 

4655 # What we're doing here is strictly trying to not rock the 

4656 # boat too much with when we do/don't render "AS label"; 

4657 # labels being present helps in the edge cases that we 

4658 # "fall back" to named cursor.description matching, labels 

4659 # not being present for columns keeps us from having awkward 

4660 # phrases like "SELECT DISTINCT table.x AS x". 

4661 render_with_label = ( 

4662 ( 

4663 # exception case to detect if we render "not boolean" 

4664 # as "not <col>" for native boolean or "<col> = 1" 

4665 # for non-native boolean. this is controlled by 

4666 # visit_is_<true|false>_unary_operator 

4667 column.operator 

4668 in (operators.is_false, operators.is_true) 

4669 and not self.dialect.supports_native_boolean 

4670 ) 

4671 or column._wraps_unnamed_column() 

4672 or asfrom 

4673 ) 

4674 elif ( 

4675 # general class of expressions that don't have a SQL-column 

4676 # addressable name. includes scalar selects, bind parameters, 

4677 # SQL functions, others 

4678 not isinstance(column, elements.NamedColumn) 

4679 # deeper check that indicates there's no natural "name" to 

4680 # this element, which accommodates for custom SQL constructs 

4681 # that might have a ".name" attribute (but aren't SQL 

4682 # functions) but are not implementing this more recently added 

4683 # base class. in theory the "NamedColumn" check should be 

4684 # enough, however here we seek to maintain legacy behaviors 

4685 # as well. 

4686 and column._non_anon_label is None 

4687 ): 

4688 render_with_label = True 

4689 else: 

4690 render_with_label = False 

4691 

4692 if render_with_label: 

4693 if not fallback_label_name: 

4694 # used by the RETURNING case right now. we generate it 

4695 # here as 3rd party dialects may be referring to 

4696 # _label_select_column method directly instead of the 

4697 # just-added _label_returning_column method 

4698 assert not column_is_repeated 

4699 fallback_label_name = column._anon_name_label 

4700 

4701 fallback_label_name = ( 

4702 elements._truncated_label(fallback_label_name) 

4703 if not isinstance( 

4704 fallback_label_name, elements._truncated_label 

4705 ) 

4706 else fallback_label_name 

4707 ) 

4708 

4709 result_expr = _CompileLabel( 

4710 col_expr, fallback_label_name, alt_names=(proxy_name,) 

4711 ) 

4712 else: 

4713 result_expr = col_expr 

4714 

4715 column_clause_args.update( 

4716 within_columns_clause=within_columns_clause, 

4717 add_to_result_map=add_to_result_map, 

4718 include_table=include_table, 

4719 ) 

4720 return result_expr._compiler_dispatch(self, **column_clause_args) 

4721 

4722 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

4723 hinttext = self.get_from_hint_text(table, hint) 

4724 if hinttext: 

4725 sqltext += " " + hinttext 

4726 return sqltext 

4727 

4728 def get_select_hint_text(self, byfroms): 

4729 return None 

4730 

4731 def get_from_hint_text( 

4732 self, table: FromClause, text: Optional[str] 

4733 ) -> Optional[str]: 

4734 return None 

4735 

4736 def get_crud_hint_text(self, table, text): 

4737 return None 

4738 

4739 def get_statement_hint_text(self, hint_texts): 

4740 return " ".join(hint_texts) 

4741 

4742 _default_stack_entry: _CompilerStackEntry 

4743 

4744 if not typing.TYPE_CHECKING: 

4745 _default_stack_entry = util.immutabledict( 

4746 [("correlate_froms", frozenset()), ("asfrom_froms", frozenset())] 

4747 ) 

4748 

4749 def _display_froms_for_select( 

4750 self, select_stmt, asfrom, lateral=False, **kw 

4751 ): 

4752 # utility method to help external dialects 

4753 # get the correct from list for a select. 

4754 # specifically the oracle dialect needs this feature 

4755 # right now. 

4756 toplevel = not self.stack 

4757 entry = self._default_stack_entry if toplevel else self.stack[-1] 

4758 

4759 compile_state = select_stmt._compile_state_factory(select_stmt, self) 

4760 

4761 correlate_froms = entry["correlate_froms"] 

4762 asfrom_froms = entry["asfrom_froms"] 

4763 

4764 if asfrom and not lateral: 

4765 froms = compile_state._get_display_froms( 

4766 explicit_correlate_froms=correlate_froms.difference( 

4767 asfrom_froms 

4768 ), 

4769 implicit_correlate_froms=(), 

4770 ) 

4771 else: 

4772 froms = compile_state._get_display_froms( 

4773 explicit_correlate_froms=correlate_froms, 

4774 implicit_correlate_froms=asfrom_froms, 

4775 ) 

4776 return froms 

4777 

4778 translate_select_structure: Any = None 

4779 """if not ``None``, should be a callable which accepts ``(select_stmt, 

4780 **kw)`` and returns a select object. this is used for structural changes 

4781 mostly to accommodate for LIMIT/OFFSET schemes 

4782 

4783 """ 

4784 

4785 def visit_select( 

4786 self, 

4787 select_stmt, 

4788 asfrom=False, 

4789 insert_into=False, 

4790 fromhints=None, 

4791 compound_index=None, 

4792 select_wraps_for=None, 

4793 lateral=False, 

4794 from_linter=None, 

4795 **kwargs, 

4796 ): 

4797 assert select_wraps_for is None, ( 

4798 "SQLAlchemy 1.4 requires use of " 

4799 "the translate_select_structure hook for structural " 

4800 "translations of SELECT objects" 

4801 ) 

4802 

4803 # initial setup of SELECT. the compile_state_factory may now 

4804 # be creating a totally different SELECT from the one that was 

4805 # passed in. for ORM use this will convert from an ORM-state 

4806 # SELECT to a regular "Core" SELECT. other composed operations 

4807 # such as computation of joins will be performed. 

4808 

4809 kwargs["within_columns_clause"] = False 

4810 

4811 compile_state = select_stmt._compile_state_factory( 

4812 select_stmt, self, **kwargs 

4813 ) 

4814 kwargs["ambiguous_table_name_map"] = ( 

4815 compile_state._ambiguous_table_name_map 

4816 ) 

4817 

4818 select_stmt = compile_state.statement 

4819 

4820 toplevel = not self.stack 

4821 

4822 if toplevel and not self.compile_state: 

4823 self.compile_state = compile_state 

4824 

4825 is_embedded_select = compound_index is not None or insert_into 

4826 

4827 # translate step for Oracle, SQL Server which often need to 

4828 # restructure the SELECT to allow for LIMIT/OFFSET and possibly 

4829 # other conditions 

4830 if self.translate_select_structure: 

4831 new_select_stmt = self.translate_select_structure( 

4832 select_stmt, asfrom=asfrom, **kwargs 

4833 ) 

4834 

4835 # if SELECT was restructured, maintain a link to the originals 

4836 # and assemble a new compile state 

4837 if new_select_stmt is not select_stmt: 

4838 compile_state_wraps_for = compile_state 

4839 select_wraps_for = select_stmt 

4840 select_stmt = new_select_stmt 

4841 

4842 compile_state = select_stmt._compile_state_factory( 

4843 select_stmt, self, **kwargs 

4844 ) 

4845 select_stmt = compile_state.statement 

4846 

4847 entry = self._default_stack_entry if toplevel else self.stack[-1] 

4848 

4849 populate_result_map = need_column_expressions = ( 

4850 toplevel 

4851 or entry.get("need_result_map_for_compound", False) 

4852 or entry.get("need_result_map_for_nested", False) 

4853 ) 

4854 

4855 # indicates there is a CompoundSelect in play and we are not the 

4856 # first select 

4857 if compound_index: 

4858 populate_result_map = False 

4859 

4860 # this was first proposed as part of #3372; however, it is not 

4861 # reached in current tests and could possibly be an assertion 

4862 # instead. 

4863 if not populate_result_map and "add_to_result_map" in kwargs: 

4864 del kwargs["add_to_result_map"] 

4865 

4866 froms = self._setup_select_stack( 

4867 select_stmt, compile_state, entry, asfrom, lateral, compound_index 

4868 ) 

4869 

4870 column_clause_args = kwargs.copy() 

4871 column_clause_args.update( 

4872 {"within_label_clause": False, "within_columns_clause": False} 

4873 ) 

4874 

4875 text = "SELECT " # we're off to a good start ! 

4876 

4877 if select_stmt._hints: 

4878 hint_text, byfrom = self._setup_select_hints(select_stmt) 

4879 if hint_text: 

4880 text += hint_text + " " 

4881 else: 

4882 byfrom = None 

4883 

4884 if select_stmt._independent_ctes: 

4885 self._dispatch_independent_ctes(select_stmt, kwargs) 

4886 

4887 if select_stmt._prefixes: 

4888 text += self._generate_prefixes( 

4889 select_stmt, select_stmt._prefixes, **kwargs 

4890 ) 

4891 

4892 text += self.get_select_precolumns(select_stmt, **kwargs) 

4893 # the actual list of columns to print in the SELECT column list. 

4894 inner_columns = [ 

4895 c 

4896 for c in [ 

4897 self._label_select_column( 

4898 select_stmt, 

4899 column, 

4900 populate_result_map, 

4901 asfrom, 

4902 column_clause_args, 

4903 name=name, 

4904 proxy_name=proxy_name, 

4905 fallback_label_name=fallback_label_name, 

4906 column_is_repeated=repeated, 

4907 need_column_expressions=need_column_expressions, 

4908 ) 

4909 for ( 

4910 name, 

4911 proxy_name, 

4912 fallback_label_name, 

4913 column, 

4914 repeated, 

4915 ) in compile_state.columns_plus_names 

4916 ] 

4917 if c is not None 

4918 ] 

4919 

4920 if populate_result_map and select_wraps_for is not None: 

4921 # if this select was generated from translate_select, 

4922 # rewrite the targeted columns in the result map 

4923 

4924 translate = dict( 

4925 zip( 

4926 [ 

4927 name 

4928 for ( 

4929 key, 

4930 proxy_name, 

4931 fallback_label_name, 

4932 name, 

4933 repeated, 

4934 ) in compile_state.columns_plus_names 

4935 ], 

4936 [ 

4937 name 

4938 for ( 

4939 key, 

4940 proxy_name, 

4941 fallback_label_name, 

4942 name, 

4943 repeated, 

4944 ) in compile_state_wraps_for.columns_plus_names 

4945 ], 

4946 ) 

4947 ) 

4948 

4949 self._result_columns = [ 

4950 ResultColumnsEntry( 

4951 key, name, tuple(translate.get(o, o) for o in obj), type_ 

4952 ) 

4953 for key, name, obj, type_ in self._result_columns 

4954 ] 

4955 

4956 text = self._compose_select_body( 

4957 text, 

4958 select_stmt, 

4959 compile_state, 

4960 inner_columns, 

4961 froms, 

4962 byfrom, 

4963 toplevel, 

4964 kwargs, 

4965 ) 

4966 

4967 if select_stmt._statement_hints: 

4968 per_dialect = [ 

4969 ht 

4970 for (dialect_name, ht) in select_stmt._statement_hints 

4971 if dialect_name in ("*", self.dialect.name) 

4972 ] 

4973 if per_dialect: 

4974 text += " " + self.get_statement_hint_text(per_dialect) 

4975 

4976 # In compound query, CTEs are shared at the compound level 

4977 if self.ctes and (not is_embedded_select or toplevel): 

4978 nesting_level = len(self.stack) if not toplevel else None 

4979 text = self._render_cte_clause(nesting_level=nesting_level) + text 

4980 

4981 if select_stmt._suffixes: 

4982 text += " " + self._generate_prefixes( 

4983 select_stmt, select_stmt._suffixes, **kwargs 

4984 ) 

4985 

4986 self.stack.pop(-1) 

4987 

4988 return text 

4989 

4990 def _setup_select_hints( 

4991 self, select: Select[Any] 

4992 ) -> Tuple[str, _FromHintsType]: 

4993 byfrom = { 

4994 from_: hinttext 

4995 % {"name": from_._compiler_dispatch(self, ashint=True)} 

4996 for (from_, dialect), hinttext in select._hints.items() 

4997 if dialect in ("*", self.dialect.name) 

4998 } 

4999 hint_text = self.get_select_hint_text(byfrom) 

5000 return hint_text, byfrom 

5001 

5002 def _setup_select_stack( 

5003 self, select, compile_state, entry, asfrom, lateral, compound_index 

5004 ): 

5005 correlate_froms = entry["correlate_froms"] 

5006 asfrom_froms = entry["asfrom_froms"] 

5007 

5008 if compound_index == 0: 

5009 entry["select_0"] = select 

5010 elif compound_index: 

5011 select_0 = entry["select_0"] 

5012 numcols = len(select_0._all_selected_columns) 

5013 

5014 if len(compile_state.columns_plus_names) != numcols: 

5015 raise exc.CompileError( 

5016 "All selectables passed to " 

5017 "CompoundSelect must have identical numbers of " 

5018 "columns; select #%d has %d columns, select " 

5019 "#%d has %d" 

5020 % ( 

5021 1, 

5022 numcols, 

5023 compound_index + 1, 

5024 len(select._all_selected_columns), 

5025 ) 

5026 ) 

5027 

5028 if asfrom and not lateral: 

5029 froms = compile_state._get_display_froms( 

5030 explicit_correlate_froms=correlate_froms.difference( 

5031 asfrom_froms 

5032 ), 

5033 implicit_correlate_froms=(), 

5034 ) 

5035 else: 

5036 froms = compile_state._get_display_froms( 

5037 explicit_correlate_froms=correlate_froms, 

5038 implicit_correlate_froms=asfrom_froms, 

5039 ) 

5040 

5041 new_correlate_froms = set(_from_objects(*froms)) 

5042 all_correlate_froms = new_correlate_froms.union(correlate_froms) 

5043 

5044 new_entry: _CompilerStackEntry = { 

5045 "asfrom_froms": new_correlate_froms, 

5046 "correlate_froms": all_correlate_froms, 

5047 "selectable": select, 

5048 "compile_state": compile_state, 

5049 } 

5050 self.stack.append(new_entry) 

5051 

5052 return froms 

5053 

5054 def _compose_select_body( 

5055 self, 

5056 text, 

5057 select, 

5058 compile_state, 

5059 inner_columns, 

5060 froms, 

5061 byfrom, 

5062 toplevel, 

5063 kwargs, 

5064 ): 

5065 text += ", ".join(inner_columns) 

5066 

5067 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

5068 from_linter = FromLinter({}, set()) 

5069 warn_linting = self.linting & WARN_LINTING 

5070 if toplevel: 

5071 self.from_linter = from_linter 

5072 else: 

5073 from_linter = None 

5074 warn_linting = False 

5075 

5076 # adjust the whitespace for no inner columns, part of #9440, 

5077 # so that a no-col SELECT comes out as "SELECT WHERE..." or 

5078 # "SELECT FROM ...". 

5079 # while it would be better to have built the SELECT starting string 

5080 # without trailing whitespace first, then add whitespace only if inner 

5081 # cols were present, this breaks compatibility with various custom 

5082 # compilation schemes that are currently being tested. 

5083 if not inner_columns: 

5084 text = text.rstrip() 

5085 

5086 if froms: 

5087 text += " \nFROM " 

5088 

5089 if select._hints: 

5090 text += ", ".join( 

5091 [ 

5092 f._compiler_dispatch( 

5093 self, 

5094 asfrom=True, 

5095 fromhints=byfrom, 

5096 from_linter=from_linter, 

5097 **kwargs, 

5098 ) 

5099 for f in froms 

5100 ] 

5101 ) 

5102 else: 

5103 text += ", ".join( 

5104 [ 

5105 f._compiler_dispatch( 

5106 self, 

5107 asfrom=True, 

5108 from_linter=from_linter, 

5109 **kwargs, 

5110 ) 

5111 for f in froms 

5112 ] 

5113 ) 

5114 else: 

5115 text += self.default_from() 

5116 

5117 if select._where_criteria: 

5118 t = self._generate_delimited_and_list( 

5119 select._where_criteria, from_linter=from_linter, **kwargs 

5120 ) 

5121 if t: 

5122 text += " \nWHERE " + t 

5123 

5124 if warn_linting: 

5125 assert from_linter is not None 

5126 from_linter.warn() 

5127 

5128 if select._group_by_clauses: 

5129 text += self.group_by_clause(select, **kwargs) 

5130 

5131 if select._having_criteria: 

5132 t = self._generate_delimited_and_list( 

5133 select._having_criteria, **kwargs 

5134 ) 

5135 if t: 

5136 text += " \nHAVING " + t 

5137 

5138 if select._order_by_clauses: 

5139 text += self.order_by_clause(select, **kwargs) 

5140 

5141 if select._has_row_limiting_clause: 

5142 text += self._row_limit_clause(select, **kwargs) 

5143 

5144 if select._for_update_arg is not None: 

5145 text += self.for_update_clause(select, **kwargs) 

5146 

5147 return text 

5148 

5149 def _generate_prefixes(self, stmt, prefixes, **kw): 

5150 clause = " ".join( 

5151 prefix._compiler_dispatch(self, **kw) 

5152 for prefix, dialect_name in prefixes 

5153 if dialect_name in (None, "*") or dialect_name == self.dialect.name 

5154 ) 

5155 if clause: 

5156 clause += " " 

5157 return clause 

5158 

5159 def _render_cte_clause( 

5160 self, 

5161 nesting_level=None, 

5162 include_following_stack=False, 

5163 ): 

5164 """ 

5165 include_following_stack 

5166 Also render the nesting CTEs on the next stack. Useful for 

5167 SQL structures like UNION or INSERT that can wrap SELECT 

5168 statements containing nesting CTEs. 

5169 """ 

5170 if not self.ctes: 

5171 return "" 

5172 

5173 ctes: MutableMapping[CTE, str] 

5174 

5175 if nesting_level and nesting_level > 1: 

5176 ctes = util.OrderedDict() 

5177 for cte in list(self.ctes.keys()): 

5178 cte_level, cte_name, cte_opts = self.level_name_by_cte[ 

5179 cte._get_reference_cte() 

5180 ] 

5181 nesting = cte.nesting or cte_opts.nesting 

5182 is_rendered_level = cte_level == nesting_level or ( 

5183 include_following_stack and cte_level == nesting_level + 1 

5184 ) 

5185 if not (nesting and is_rendered_level): 

5186 continue 

5187 

5188 ctes[cte] = self.ctes[cte] 

5189 

5190 else: 

5191 ctes = self.ctes 

5192 

5193 if not ctes: 

5194 return "" 

5195 ctes_recursive = any([cte.recursive for cte in ctes]) 

5196 

5197 cte_text = self.get_cte_preamble(ctes_recursive) + " " 

5198 cte_text += ", \n".join([txt for txt in ctes.values()]) 

5199 cte_text += "\n " 

5200 

5201 if nesting_level and nesting_level > 1: 

5202 for cte in list(ctes.keys()): 

5203 cte_level, cte_name, cte_opts = self.level_name_by_cte[ 

5204 cte._get_reference_cte() 

5205 ] 

5206 del self.ctes[cte] 

5207 del self.ctes_by_level_name[(cte_level, cte_name)] 

5208 del self.level_name_by_cte[cte._get_reference_cte()] 

5209 

5210 return cte_text 

5211 

5212 def get_cte_preamble(self, recursive): 

5213 if recursive: 

5214 return "WITH RECURSIVE" 

5215 else: 

5216 return "WITH" 

5217 

5218 def get_select_precolumns(self, select: Select[Any], **kw: Any) -> str: 

5219 """Called when building a ``SELECT`` statement, position is just 

5220 before column list. 

5221 

5222 """ 

5223 if select._distinct_on: 

5224 util.warn_deprecated( 

5225 "DISTINCT ON is currently supported only by the PostgreSQL " 

5226 "dialect. Use of DISTINCT ON for other backends is currently " 

5227 "silently ignored, however this usage is deprecated, and will " 

5228 "raise CompileError in a future release for all backends " 

5229 "that do not support this syntax.", 

5230 version="1.4", 

5231 ) 

5232 return "DISTINCT " if select._distinct else "" 

5233 

5234 def group_by_clause(self, select, **kw): 

5235 """allow dialects to customize how GROUP BY is rendered.""" 

5236 

5237 group_by = self._generate_delimited_list( 

5238 select._group_by_clauses, OPERATORS[operators.comma_op], **kw 

5239 ) 

5240 if group_by: 

5241 return " GROUP BY " + group_by 

5242 else: 

5243 return "" 

5244 

5245 def order_by_clause(self, select, **kw): 

5246 """allow dialects to customize how ORDER BY is rendered.""" 

5247 

5248 order_by = self._generate_delimited_list( 

5249 select._order_by_clauses, OPERATORS[operators.comma_op], **kw 

5250 ) 

5251 

5252 if order_by: 

5253 return " ORDER BY " + order_by 

5254 else: 

5255 return "" 

5256 

5257 def for_update_clause(self, select, **kw): 

5258 return " FOR UPDATE" 

5259 

5260 def returning_clause( 

5261 self, 

5262 stmt: UpdateBase, 

5263 returning_cols: Sequence[_ColumnsClauseElement], 

5264 *, 

5265 populate_result_map: bool, 

5266 **kw: Any, 

5267 ) -> str: 

5268 columns = [ 

5269 self._label_returning_column( 

5270 stmt, 

5271 column, 

5272 populate_result_map, 

5273 fallback_label_name=fallback_label_name, 

5274 column_is_repeated=repeated, 

5275 name=name, 

5276 proxy_name=proxy_name, 

5277 **kw, 

5278 ) 

5279 for ( 

5280 name, 

5281 proxy_name, 

5282 fallback_label_name, 

5283 column, 

5284 repeated, 

5285 ) in stmt._generate_columns_plus_names( 

5286 True, cols=base._select_iterables(returning_cols) 

5287 ) 

5288 ] 

5289 

5290 return "RETURNING " + ", ".join(columns) 

5291 

5292 def limit_clause(self, select, **kw): 

5293 text = "" 

5294 if select._limit_clause is not None: 

5295 text += "\n LIMIT " + self.process(select._limit_clause, **kw) 

5296 if select._offset_clause is not None: 

5297 if select._limit_clause is None: 

5298 text += "\n LIMIT -1" 

5299 text += " OFFSET " + self.process(select._offset_clause, **kw) 

5300 return text 

5301 

5302 def fetch_clause( 

5303 self, 

5304 select, 

5305 fetch_clause=None, 

5306 require_offset=False, 

5307 use_literal_execute_for_simple_int=False, 

5308 **kw, 

5309 ): 

5310 if fetch_clause is None: 

5311 fetch_clause = select._fetch_clause 

5312 fetch_clause_options = select._fetch_clause_options 

5313 else: 

5314 fetch_clause_options = {"percent": False, "with_ties": False} 

5315 

5316 text = "" 

5317 

5318 if select._offset_clause is not None: 

5319 offset_clause = select._offset_clause 

5320 if ( 

5321 use_literal_execute_for_simple_int 

5322 and select._simple_int_clause(offset_clause) 

5323 ): 

5324 offset_clause = offset_clause.render_literal_execute() 

5325 offset_str = self.process(offset_clause, **kw) 

5326 text += "\n OFFSET %s ROWS" % offset_str 

5327 elif require_offset: 

5328 text += "\n OFFSET 0 ROWS" 

5329 

5330 if fetch_clause is not None: 

5331 if ( 

5332 use_literal_execute_for_simple_int 

5333 and select._simple_int_clause(fetch_clause) 

5334 ): 

5335 fetch_clause = fetch_clause.render_literal_execute() 

5336 text += "\n FETCH FIRST %s%s ROWS %s" % ( 

5337 self.process(fetch_clause, **kw), 

5338 " PERCENT" if fetch_clause_options["percent"] else "", 

5339 "WITH TIES" if fetch_clause_options["with_ties"] else "ONLY", 

5340 ) 

5341 return text 

5342 

5343 def visit_table( 

5344 self, 

5345 table, 

5346 asfrom=False, 

5347 iscrud=False, 

5348 ashint=False, 

5349 fromhints=None, 

5350 use_schema=True, 

5351 from_linter=None, 

5352 ambiguous_table_name_map=None, 

5353 enclosing_alias=None, 

5354 **kwargs, 

5355 ): 

5356 if from_linter: 

5357 from_linter.froms[table] = table.fullname 

5358 

5359 if asfrom or ashint: 

5360 effective_schema = self.preparer.schema_for_object(table) 

5361 

5362 if use_schema and effective_schema: 

5363 ret = ( 

5364 self.preparer.quote_schema(effective_schema) 

5365 + "." 

5366 + self.preparer.quote(table.name) 

5367 ) 

5368 else: 

5369 ret = self.preparer.quote(table.name) 

5370 

5371 if ( 

5372 ( 

5373 enclosing_alias is None 

5374 or enclosing_alias.element is not table 

5375 ) 

5376 and not effective_schema 

5377 and ambiguous_table_name_map 

5378 and table.name in ambiguous_table_name_map 

5379 ): 

5380 anon_name = self._truncated_identifier( 

5381 "alias", ambiguous_table_name_map[table.name] 

5382 ) 

5383 

5384 ret = ret + self.get_render_as_alias_suffix( 

5385 self.preparer.format_alias(None, anon_name) 

5386 ) 

5387 

5388 if fromhints and table in fromhints: 

5389 ret = self.format_from_hint_text( 

5390 ret, table, fromhints[table], iscrud 

5391 ) 

5392 return ret 

5393 else: 

5394 return "" 

5395 

5396 def visit_join(self, join, asfrom=False, from_linter=None, **kwargs): 

5397 if from_linter: 

5398 from_linter.edges.update( 

5399 itertools.product( 

5400 _de_clone(join.left._from_objects), 

5401 _de_clone(join.right._from_objects), 

5402 ) 

5403 ) 

5404 

5405 if join.full: 

5406 join_type = " FULL OUTER JOIN " 

5407 elif join.isouter: 

5408 join_type = " LEFT OUTER JOIN " 

5409 else: 

5410 join_type = " JOIN " 

5411 return ( 

5412 join.left._compiler_dispatch( 

5413 self, asfrom=True, from_linter=from_linter, **kwargs 

5414 ) 

5415 + join_type 

5416 + join.right._compiler_dispatch( 

5417 self, asfrom=True, from_linter=from_linter, **kwargs 

5418 ) 

5419 + " ON " 

5420 # TODO: likely need asfrom=True here? 

5421 + join.onclause._compiler_dispatch( 

5422 self, from_linter=from_linter, **kwargs 

5423 ) 

5424 ) 

5425 

5426 def _setup_crud_hints(self, stmt, table_text): 

5427 dialect_hints = { 

5428 table: hint_text 

5429 for (table, dialect), hint_text in stmt._hints.items() 

5430 if dialect in ("*", self.dialect.name) 

5431 } 

5432 if stmt.table in dialect_hints: 

5433 table_text = self.format_from_hint_text( 

5434 table_text, stmt.table, dialect_hints[stmt.table], True 

5435 ) 

5436 return dialect_hints, table_text 

5437 

5438 # within the realm of "insertmanyvalues sentinel columns", 

5439 # these lookups match different kinds of Column() configurations 

5440 # to specific backend capabilities. they are broken into two 

5441 # lookups, one for autoincrement columns and the other for non 

5442 # autoincrement columns 

5443 _sentinel_col_non_autoinc_lookup = util.immutabledict( 

5444 { 

5445 _SentinelDefaultCharacterization.CLIENTSIDE: ( 

5446 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5447 ), 

5448 _SentinelDefaultCharacterization.SENTINEL_DEFAULT: ( 

5449 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5450 ), 

5451 _SentinelDefaultCharacterization.NONE: ( 

5452 InsertmanyvaluesSentinelOpts._SUPPORTED_OR_NOT 

5453 ), 

5454 _SentinelDefaultCharacterization.IDENTITY: ( 

5455 InsertmanyvaluesSentinelOpts.IDENTITY 

5456 ), 

5457 _SentinelDefaultCharacterization.SEQUENCE: ( 

5458 InsertmanyvaluesSentinelOpts.SEQUENCE 

5459 ), 

5460 } 

5461 ) 

5462 _sentinel_col_autoinc_lookup = _sentinel_col_non_autoinc_lookup.union( 

5463 { 

5464 _SentinelDefaultCharacterization.NONE: ( 

5465 InsertmanyvaluesSentinelOpts.AUTOINCREMENT 

5466 ), 

5467 } 

5468 ) 

5469 

5470 def _get_sentinel_column_for_table( 

5471 self, table: Table 

5472 ) -> Optional[Sequence[Column[Any]]]: 

5473 """given a :class:`.Table`, return a usable sentinel column or 

5474 columns for this dialect if any. 

5475 

5476 Return None if no sentinel columns could be identified, or raise an 

5477 error if a column was marked as a sentinel explicitly but isn't 

5478 compatible with this dialect. 

5479 

5480 """ 

5481 

5482 sentinel_opts = self.dialect.insertmanyvalues_implicit_sentinel 

5483 sentinel_characteristics = table._sentinel_column_characteristics 

5484 

5485 sent_cols = sentinel_characteristics.columns 

5486 

5487 if sent_cols is None: 

5488 return None 

5489 

5490 if sentinel_characteristics.is_autoinc: 

5491 bitmask = self._sentinel_col_autoinc_lookup.get( 

5492 sentinel_characteristics.default_characterization, 0 

5493 ) 

5494 else: 

5495 bitmask = self._sentinel_col_non_autoinc_lookup.get( 

5496 sentinel_characteristics.default_characterization, 0 

5497 ) 

5498 

5499 if sentinel_opts & bitmask: 

5500 return sent_cols 

5501 

5502 if sentinel_characteristics.is_explicit: 

5503 # a column was explicitly marked as insert_sentinel=True, 

5504 # however it is not compatible with this dialect. they should 

5505 # not indicate this column as a sentinel if they need to include 

5506 # this dialect. 

5507 

5508 # TODO: do we want non-primary key explicit sentinel cols 

5509 # that can gracefully degrade for some backends? 

5510 # insert_sentinel="degrade" perhaps. not for the initial release. 

5511 # I am hoping people are generally not dealing with this sentinel 

5512 # business at all. 

5513 

5514 # if is_explicit is True, there will be only one sentinel column. 

5515 

5516 raise exc.InvalidRequestError( 

5517 f"Column {sent_cols[0]} can't be explicitly " 

5518 "marked as a sentinel column when using the " 

5519 f"{self.dialect.name} dialect, as the " 

5520 "particular type of default generation on this column is " 

5521 "not currently compatible with this dialect's specific " 

5522 f"INSERT..RETURNING syntax which can receive the " 

5523 "server-generated value in " 

5524 "a deterministic way. To remove this error, remove " 

5525 "insert_sentinel=True from primary key autoincrement " 

5526 "columns; these columns are automatically used as " 

5527 "sentinels for supported dialects in any case." 

5528 ) 

5529 

5530 return None 

5531 

5532 def _deliver_insertmanyvalues_batches( 

5533 self, 

5534 statement: str, 

5535 parameters: _DBAPIMultiExecuteParams, 

5536 compiled_parameters: List[_MutableCoreSingleExecuteParams], 

5537 generic_setinputsizes: Optional[_GenericSetInputSizesType], 

5538 batch_size: int, 

5539 sort_by_parameter_order: bool, 

5540 schema_translate_map: Optional[SchemaTranslateMapType], 

5541 ) -> Iterator[_InsertManyValuesBatch]: 

5542 imv = self._insertmanyvalues 

5543 assert imv is not None 

5544 

5545 if not imv.sentinel_param_keys: 

5546 _sentinel_from_params = None 

5547 else: 

5548 _sentinel_from_params = operator.itemgetter( 

5549 *imv.sentinel_param_keys 

5550 ) 

5551 

5552 lenparams = len(parameters) 

5553 if imv.is_default_expr and not self.dialect.supports_default_metavalue: 

5554 # backend doesn't support 

5555 # INSERT INTO table (pk_col) VALUES (DEFAULT), (DEFAULT), ... 

5556 # at the moment this is basically SQL Server due to 

5557 # not being able to use DEFAULT for identity column 

5558 # just yield out that many single statements! still 

5559 # faster than a whole connection.execute() call ;) 

5560 # 

5561 # note we still are taking advantage of the fact that we know 

5562 # we are using RETURNING. The generalized approach of fetching 

5563 # cursor.lastrowid etc. still goes through the more heavyweight 

5564 # "ExecutionContext per statement" system as it isn't usable 

5565 # as a generic "RETURNING" approach 

5566 use_row_at_a_time = True 

5567 downgraded = False 

5568 elif not self.dialect.supports_multivalues_insert or ( 

5569 sort_by_parameter_order 

5570 and self._result_columns 

5571 and ( 

5572 imv.sentinel_columns is None 

5573 or ( 

5574 imv.includes_upsert_behaviors 

5575 and not imv.embed_values_counter 

5576 ) 

5577 ) 

5578 ): 

5579 # deterministic order was requested and the compiler could 

5580 # not organize sentinel columns for this dialect/statement. 

5581 # use row at a time. Note: if embed_values_counter is True, 

5582 # the counter itself provides the ordering capability we need, 

5583 # so we can use batch mode even with upsert behaviors. 

5584 use_row_at_a_time = True 

5585 downgraded = True 

5586 elif ( 

5587 imv.has_upsert_bound_parameters 

5588 and not imv.embed_values_counter 

5589 and self._result_columns 

5590 ): 

5591 # For upsert behaviors (ON CONFLICT DO UPDATE, etc.) with RETURNING 

5592 # and parametrized bindparams in the SET clause, we must use 

5593 # row-at-a-time. Batching multiple rows in a single statement 

5594 # doesn't work when the SET clause contains bound parameters that 

5595 # will receive different values per row, as there's only one SET 

5596 # clause per statement. See issue #13130. 

5597 use_row_at_a_time = True 

5598 downgraded = True 

5599 else: 

5600 use_row_at_a_time = False 

5601 downgraded = False 

5602 

5603 if use_row_at_a_time: 

5604 for batchnum, (param, compiled_param) in enumerate( 

5605 cast( 

5606 "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501 

5607 zip(parameters, compiled_parameters), 

5608 ), 

5609 1, 

5610 ): 

5611 yield _InsertManyValuesBatch( 

5612 statement, 

5613 param, 

5614 generic_setinputsizes, 

5615 [param], 

5616 ( 

5617 [_sentinel_from_params(compiled_param)] 

5618 if _sentinel_from_params 

5619 else [] 

5620 ), 

5621 1, 

5622 batchnum, 

5623 lenparams, 

5624 sort_by_parameter_order, 

5625 downgraded, 

5626 ) 

5627 return 

5628 

5629 if schema_translate_map: 

5630 rst = functools.partial( 

5631 self.preparer._render_schema_translates, 

5632 schema_translate_map=schema_translate_map, 

5633 ) 

5634 else: 

5635 rst = None 

5636 

5637 imv_single_values_expr = imv.single_values_expr 

5638 if rst: 

5639 imv_single_values_expr = rst(imv_single_values_expr) 

5640 

5641 executemany_values = f"({imv_single_values_expr})" 

5642 statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__") 

5643 

5644 # Use optional insertmanyvalues_max_parameters 

5645 # to further shrink the batch size so that there are no more than 

5646 # insertmanyvalues_max_parameters params. 

5647 # Currently used by SQL Server, which limits statements to 2100 bound 

5648 # parameters (actually 2099). 

5649 max_params = self.dialect.insertmanyvalues_max_parameters 

5650 if max_params: 

5651 total_num_of_params = len(self.bind_names) 

5652 num_params_per_batch = len(imv.insert_crud_params) 

5653 num_params_outside_of_batch = ( 

5654 total_num_of_params - num_params_per_batch 

5655 ) 

5656 batch_size = min( 

5657 batch_size, 

5658 ( 

5659 (max_params - num_params_outside_of_batch) 

5660 // num_params_per_batch 

5661 ), 

5662 ) 

5663 

5664 batches = cast("List[Sequence[Any]]", list(parameters)) 

5665 compiled_batches = cast( 

5666 "List[Sequence[Any]]", list(compiled_parameters) 

5667 ) 

5668 

5669 processed_setinputsizes: Optional[_GenericSetInputSizesType] = None 

5670 batchnum = 1 

5671 total_batches = lenparams // batch_size + ( 

5672 1 if lenparams % batch_size else 0 

5673 ) 

5674 

5675 insert_crud_params = imv.insert_crud_params 

5676 assert insert_crud_params is not None 

5677 

5678 if rst: 

5679 insert_crud_params = [ 

5680 (col, key, rst(expr), st) 

5681 for col, key, expr, st in insert_crud_params 

5682 ] 

5683 

5684 escaped_bind_names: Mapping[str, str] 

5685 expand_pos_lower_index = expand_pos_upper_index = 0 

5686 

5687 if not self.positional: 

5688 if self.escaped_bind_names: 

5689 escaped_bind_names = self.escaped_bind_names 

5690 else: 

5691 escaped_bind_names = {} 

5692 

5693 all_keys = set(parameters[0]) 

5694 

5695 def apply_placeholders(keys, formatted): 

5696 for key in keys: 

5697 key = escaped_bind_names.get(key, key) 

5698 formatted = formatted.replace( 

5699 self.bindtemplate % {"name": key}, 

5700 self.bindtemplate 

5701 % {"name": f"{key}__EXECMANY_INDEX__"}, 

5702 ) 

5703 return formatted 

5704 

5705 if imv.embed_values_counter: 

5706 imv_values_counter = ", _IMV_VALUES_COUNTER" 

5707 else: 

5708 imv_values_counter = "" 

5709 formatted_values_clause = f"""({', '.join( 

5710 apply_placeholders(bind_keys, formatted) 

5711 for _, _, formatted, bind_keys in insert_crud_params 

5712 )}{imv_values_counter})""" 

5713 

5714 keys_to_replace = all_keys.intersection( 

5715 escaped_bind_names.get(key, key) 

5716 for _, _, _, bind_keys in insert_crud_params 

5717 for key in bind_keys 

5718 ) 

5719 base_parameters = { 

5720 key: parameters[0][key] 

5721 for key in all_keys.difference(keys_to_replace) 

5722 } 

5723 

5724 executemany_values_w_comma = "" 

5725 else: 

5726 formatted_values_clause = "" 

5727 keys_to_replace = set() 

5728 base_parameters = {} 

5729 

5730 if imv.embed_values_counter: 

5731 executemany_values_w_comma = ( 

5732 f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), " 

5733 ) 

5734 else: 

5735 executemany_values_w_comma = f"({imv_single_values_expr}), " 

5736 

5737 all_names_we_will_expand: Set[str] = set() 

5738 for elem in imv.insert_crud_params: 

5739 all_names_we_will_expand.update(elem[3]) 

5740 

5741 # get the start and end position in a particular list 

5742 # of parameters where we will be doing the "expanding". 

5743 # statements can have params on either side or both sides, 

5744 # given RETURNING and CTEs 

5745 if all_names_we_will_expand: 

5746 positiontup = self.positiontup 

5747 assert positiontup is not None 

5748 

5749 all_expand_positions = { 

5750 idx 

5751 for idx, name in enumerate(positiontup) 

5752 if name in all_names_we_will_expand 

5753 } 

5754 expand_pos_lower_index = min(all_expand_positions) 

5755 expand_pos_upper_index = max(all_expand_positions) + 1 

5756 assert ( 

5757 len(all_expand_positions) 

5758 == expand_pos_upper_index - expand_pos_lower_index 

5759 ) 

5760 

5761 if self._numeric_binds: 

5762 escaped = re.escape(self._numeric_binds_identifier_char) 

5763 executemany_values_w_comma = re.sub( 

5764 rf"{escaped}\d+", "%s", executemany_values_w_comma 

5765 ) 

5766 

5767 while batches: 

5768 batch = batches[0:batch_size] 

5769 compiled_batch = compiled_batches[0:batch_size] 

5770 

5771 batches[0:batch_size] = [] 

5772 compiled_batches[0:batch_size] = [] 

5773 

5774 if batches: 

5775 current_batch_size = batch_size 

5776 else: 

5777 current_batch_size = len(batch) 

5778 

5779 if generic_setinputsizes: 

5780 # if setinputsizes is present, expand this collection to 

5781 # suit the batch length as well 

5782 # currently this will be mssql+pyodbc for internal dialects 

5783 processed_setinputsizes = [ 

5784 (new_key, len_, typ) 

5785 for new_key, len_, typ in ( 

5786 (f"{key}_{index}", len_, typ) 

5787 for index in range(current_batch_size) 

5788 for key, len_, typ in generic_setinputsizes 

5789 ) 

5790 ] 

5791 

5792 replaced_parameters: Any 

5793 if self.positional: 

5794 num_ins_params = imv.num_positional_params_counted 

5795 

5796 batch_iterator: Iterable[Sequence[Any]] 

5797 extra_params_left: Sequence[Any] 

5798 extra_params_right: Sequence[Any] 

5799 

5800 if num_ins_params == len(batch[0]): 

5801 extra_params_left = extra_params_right = () 

5802 batch_iterator = batch 

5803 else: 

5804 extra_params_left = batch[0][:expand_pos_lower_index] 

5805 extra_params_right = batch[0][expand_pos_upper_index:] 

5806 batch_iterator = ( 

5807 b[expand_pos_lower_index:expand_pos_upper_index] 

5808 for b in batch 

5809 ) 

5810 

5811 if imv.embed_values_counter: 

5812 expanded_values_string = ( 

5813 "".join( 

5814 executemany_values_w_comma.replace( 

5815 "_IMV_VALUES_COUNTER", str(i) 

5816 ) 

5817 for i, _ in enumerate(batch) 

5818 ) 

5819 )[:-2] 

5820 else: 

5821 expanded_values_string = ( 

5822 (executemany_values_w_comma * current_batch_size) 

5823 )[:-2] 

5824 

5825 if self._numeric_binds and num_ins_params > 0: 

5826 # numeric will always number the parameters inside of 

5827 # VALUES (and thus order self.positiontup) to be higher 

5828 # than non-VALUES parameters, no matter where in the 

5829 # statement those non-VALUES parameters appear (this is 

5830 # ensured in _process_numeric by numbering first all 

5831 # params that are not in _values_bindparam) 

5832 # therefore all extra params are always 

5833 # on the left side and numbered lower than the VALUES 

5834 # parameters 

5835 assert not extra_params_right 

5836 

5837 start = expand_pos_lower_index + 1 

5838 end = num_ins_params * (current_batch_size) + start 

5839 

5840 # need to format here, since statement may contain 

5841 # unescaped %, while values_string contains just (%s, %s) 

5842 positions = tuple( 

5843 f"{self._numeric_binds_identifier_char}{i}" 

5844 for i in range(start, end) 

5845 ) 

5846 expanded_values_string = expanded_values_string % positions 

5847 

5848 replaced_statement = statement.replace( 

5849 "__EXECMANY_TOKEN__", expanded_values_string 

5850 ) 

5851 

5852 replaced_parameters = tuple( 

5853 itertools.chain.from_iterable(batch_iterator) 

5854 ) 

5855 

5856 replaced_parameters = ( 

5857 extra_params_left 

5858 + replaced_parameters 

5859 + extra_params_right 

5860 ) 

5861 

5862 else: 

5863 replaced_values_clauses = [] 

5864 replaced_parameters = base_parameters.copy() 

5865 

5866 for i, param in enumerate(batch): 

5867 fmv = formatted_values_clause.replace( 

5868 "EXECMANY_INDEX__", str(i) 

5869 ) 

5870 if imv.embed_values_counter: 

5871 fmv = fmv.replace("_IMV_VALUES_COUNTER", str(i)) 

5872 

5873 replaced_values_clauses.append(fmv) 

5874 replaced_parameters.update( 

5875 {f"{key}__{i}": param[key] for key in keys_to_replace} 

5876 ) 

5877 

5878 replaced_statement = statement.replace( 

5879 "__EXECMANY_TOKEN__", 

5880 ", ".join(replaced_values_clauses), 

5881 ) 

5882 

5883 yield _InsertManyValuesBatch( 

5884 replaced_statement, 

5885 replaced_parameters, 

5886 processed_setinputsizes, 

5887 batch, 

5888 ( 

5889 [_sentinel_from_params(cb) for cb in compiled_batch] 

5890 if _sentinel_from_params 

5891 else [] 

5892 ), 

5893 current_batch_size, 

5894 batchnum, 

5895 total_batches, 

5896 sort_by_parameter_order, 

5897 False, 

5898 ) 

5899 batchnum += 1 

5900 

5901 def visit_insert( 

5902 self, insert_stmt, visited_bindparam=None, visiting_cte=None, **kw 

5903 ): 

5904 compile_state = insert_stmt._compile_state_factory( 

5905 insert_stmt, self, **kw 

5906 ) 

5907 insert_stmt = compile_state.statement 

5908 

5909 if visiting_cte is not None: 

5910 kw["visiting_cte"] = visiting_cte 

5911 toplevel = False 

5912 else: 

5913 toplevel = not self.stack 

5914 

5915 if toplevel: 

5916 self.isinsert = True 

5917 if not self.dml_compile_state: 

5918 self.dml_compile_state = compile_state 

5919 if not self.compile_state: 

5920 self.compile_state = compile_state 

5921 

5922 self.stack.append( 

5923 { 

5924 "correlate_froms": set(), 

5925 "asfrom_froms": set(), 

5926 "selectable": insert_stmt, 

5927 } 

5928 ) 

5929 

5930 counted_bindparam = 0 

5931 

5932 # reset any incoming "visited_bindparam" collection 

5933 visited_bindparam = None 

5934 

5935 # for positional, insertmanyvalues needs to know how many 

5936 # bound parameters are in the VALUES sequence; there's no simple 

5937 # rule because default expressions etc. can have zero or more 

5938 # params inside them. After multiple attempts to figure this out, 

5939 # this very simplistic "count after" works and is 

5940 # likely the least amount of callcounts, though looks clumsy 

5941 if self.positional and visiting_cte is None: 

5942 # if we are inside a CTE, don't count parameters 

5943 # here since they won't be for insertmanyvalues. keep 

5944 # visited_bindparam at None so no counting happens. 

5945 # see #9173 

5946 visited_bindparam = [] 

5947 

5948 crud_params_struct = crud._get_crud_params( 

5949 self, 

5950 insert_stmt, 

5951 compile_state, 

5952 toplevel, 

5953 visited_bindparam=visited_bindparam, 

5954 **kw, 

5955 ) 

5956 

5957 if self.positional and visited_bindparam is not None: 

5958 counted_bindparam = len(visited_bindparam) 

5959 if self._numeric_binds: 

5960 if self._values_bindparam is not None: 

5961 self._values_bindparam += visited_bindparam 

5962 else: 

5963 self._values_bindparam = visited_bindparam 

5964 

5965 crud_params_single = crud_params_struct.single_params 

5966 

5967 if ( 

5968 not crud_params_single 

5969 and not self.dialect.supports_default_values 

5970 and not self.dialect.supports_default_metavalue 

5971 and not self.dialect.supports_empty_insert 

5972 ): 

5973 raise exc.CompileError( 

5974 "The '%s' dialect with current database " 

5975 "version settings does not support empty " 

5976 "inserts." % self.dialect.name 

5977 ) 

5978 

5979 if compile_state._has_multi_parameters: 

5980 if not self.dialect.supports_multivalues_insert: 

5981 raise exc.CompileError( 

5982 "The '%s' dialect with current database " 

5983 "version settings does not support " 

5984 "in-place multirow inserts." % self.dialect.name 

5985 ) 

5986 elif ( 

5987 self.implicit_returning or insert_stmt._returning 

5988 ) and insert_stmt._sort_by_parameter_order: 

5989 raise exc.CompileError( 

5990 "RETURNING cannot be deterministically sorted when " 

5991 "using an INSERT which includes multi-row values()." 

5992 ) 

5993 crud_params_single = crud_params_struct.single_params 

5994 else: 

5995 crud_params_single = crud_params_struct.single_params 

5996 

5997 preparer = self.preparer 

5998 supports_default_values = self.dialect.supports_default_values 

5999 

6000 text = "INSERT " 

6001 

6002 if insert_stmt._prefixes: 

6003 text += self._generate_prefixes( 

6004 insert_stmt, insert_stmt._prefixes, **kw 

6005 ) 

6006 

6007 text += "INTO " 

6008 table_text = preparer.format_table(insert_stmt.table) 

6009 

6010 if insert_stmt._hints: 

6011 _, table_text = self._setup_crud_hints(insert_stmt, table_text) 

6012 

6013 if insert_stmt._independent_ctes: 

6014 self._dispatch_independent_ctes(insert_stmt, kw) 

6015 

6016 text += table_text 

6017 

6018 if crud_params_single or not supports_default_values: 

6019 text += " (%s)" % ", ".join( 

6020 [expr for _, expr, _, _ in crud_params_single] 

6021 ) 

6022 

6023 # look for insertmanyvalues attributes that would have been configured 

6024 # by crud.py as it scanned through the columns to be part of the 

6025 # INSERT 

6026 use_insertmanyvalues = crud_params_struct.use_insertmanyvalues 

6027 named_sentinel_params: Optional[Sequence[str]] = None 

6028 add_sentinel_cols = None 

6029 implicit_sentinel = False 

6030 

6031 returning_cols = self.implicit_returning or insert_stmt._returning 

6032 if returning_cols: 

6033 add_sentinel_cols = crud_params_struct.use_sentinel_columns 

6034 if add_sentinel_cols is not None: 

6035 assert use_insertmanyvalues 

6036 

6037 # search for the sentinel column explicitly present 

6038 # in the INSERT columns list, and additionally check that 

6039 # this column has a bound parameter name set up that's in the 

6040 # parameter list. If both of these cases are present, it means 

6041 # we will have a client side value for the sentinel in each 

6042 # parameter set. 

6043 

6044 _params_by_col = { 

6045 col: param_names 

6046 for col, _, _, param_names in crud_params_single 

6047 } 

6048 named_sentinel_params = [] 

6049 for _add_sentinel_col in add_sentinel_cols: 

6050 if _add_sentinel_col not in _params_by_col: 

6051 named_sentinel_params = None 

6052 break 

6053 param_name = self._within_exec_param_key_getter( 

6054 _add_sentinel_col 

6055 ) 

6056 if param_name not in _params_by_col[_add_sentinel_col]: 

6057 named_sentinel_params = None 

6058 break 

6059 named_sentinel_params.append(param_name) 

6060 

6061 if named_sentinel_params is None: 

6062 # if we are not going to have a client side value for 

6063 # the sentinel in the parameter set, that means it's 

6064 # an autoincrement, an IDENTITY, or a server-side SQL 

6065 # expression like nextval('seqname'). So this is 

6066 # an "implicit" sentinel; we will look for it in 

6067 # RETURNING 

6068 # only, and then sort on it. For this case on PG, 

6069 # SQL Server we have to use a special INSERT form 

6070 # that guarantees the server side function lines up with 

6071 # the entries in the VALUES. 

6072 if ( 

6073 self.dialect.insertmanyvalues_implicit_sentinel 

6074 & InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

6075 ): 

6076 implicit_sentinel = True 

6077 else: 

6078 # here, we are not using a sentinel at all 

6079 # and we are likely the SQLite dialect. 

6080 # The first add_sentinel_col that we have should not 

6081 # be marked as "insert_sentinel=True". if it was, 

6082 # an error should have been raised in 

6083 # _get_sentinel_column_for_table. 

6084 assert not add_sentinel_cols[0]._insert_sentinel, ( 

6085 "sentinel selection rules should have prevented " 

6086 "us from getting here for this dialect" 

6087 ) 

6088 

6089 # always put the sentinel columns last. even if they are 

6090 # in the returning list already, they will be there twice 

6091 # then. 

6092 returning_cols = list(returning_cols) + list(add_sentinel_cols) 

6093 

6094 returning_clause = self.returning_clause( 

6095 insert_stmt, 

6096 returning_cols, 

6097 populate_result_map=toplevel, 

6098 ) 

6099 

6100 if self.returning_precedes_values: 

6101 text += " " + returning_clause 

6102 

6103 else: 

6104 returning_clause = None 

6105 

6106 if insert_stmt.select is not None: 

6107 # placed here by crud.py 

6108 select_text = self.process( 

6109 self.stack[-1]["insert_from_select"], insert_into=True, **kw 

6110 ) 

6111 

6112 if self.ctes and self.dialect.cte_follows_insert: 

6113 nesting_level = len(self.stack) if not toplevel else None 

6114 text += " %s%s" % ( 

6115 self._render_cte_clause( 

6116 nesting_level=nesting_level, 

6117 include_following_stack=True, 

6118 ), 

6119 select_text, 

6120 ) 

6121 else: 

6122 text += " %s" % select_text 

6123 elif not crud_params_single and supports_default_values: 

6124 text += " DEFAULT VALUES" 

6125 if use_insertmanyvalues: 

6126 self._insertmanyvalues = _InsertManyValues( 

6127 True, 

6128 self.dialect.default_metavalue_token, 

6129 crud_params_single, 

6130 counted_bindparam, 

6131 sort_by_parameter_order=( 

6132 insert_stmt._sort_by_parameter_order 

6133 ), 

6134 includes_upsert_behaviors=( 

6135 insert_stmt._post_values_clause is not None 

6136 ), 

6137 sentinel_columns=add_sentinel_cols, 

6138 num_sentinel_columns=( 

6139 len(add_sentinel_cols) if add_sentinel_cols else 0 

6140 ), 

6141 implicit_sentinel=implicit_sentinel, 

6142 ) 

6143 elif compile_state._has_multi_parameters: 

6144 text += " VALUES %s" % ( 

6145 ", ".join( 

6146 "(%s)" 

6147 % (", ".join(value for _, _, value, _ in crud_param_set)) 

6148 for crud_param_set in crud_params_struct.all_multi_params 

6149 ), 

6150 ) 

6151 elif use_insertmanyvalues: 

6152 if ( 

6153 implicit_sentinel 

6154 and ( 

6155 self.dialect.insertmanyvalues_implicit_sentinel 

6156 & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

6157 ) 

6158 # this is checking if we have 

6159 # INSERT INTO table (id) VALUES (DEFAULT). 

6160 and not (crud_params_struct.is_default_metavalue_only) 

6161 ): 

6162 # if we have a sentinel column that is server generated, 

6163 # then for selected backends render the VALUES list as a 

6164 # subquery. This is the orderable form supported by 

6165 # PostgreSQL and in fewer cases SQL Server 

6166 embed_sentinel_value = True 

6167 

6168 render_bind_casts = ( 

6169 self.dialect.insertmanyvalues_implicit_sentinel 

6170 & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

6171 ) 

6172 

6173 add_sentinel_set = add_sentinel_cols or () 

6174 

6175 insert_single_values_expr = ", ".join( 

6176 [ 

6177 value 

6178 for col, _, value, _ in crud_params_single 

6179 if col not in add_sentinel_set 

6180 ] 

6181 ) 

6182 

6183 colnames = ", ".join( 

6184 f"p{i}" 

6185 for i, cp in enumerate(crud_params_single) 

6186 if cp[0] not in add_sentinel_set 

6187 ) 

6188 

6189 if render_bind_casts: 

6190 # render casts for the SELECT list. For PG, we are 

6191 # already rendering bind casts in the parameter list, 

6192 # selectively for the more "tricky" types like ARRAY. 

6193 # however, even for the "easy" types, if the parameter 

6194 # is NULL for every entry, PG gives up and says 

6195 # "it must be TEXT", which fails for other easy types 

6196 # like ints. So we cast on this side too. 

6197 colnames_w_cast = ", ".join( 

6198 ( 

6199 self.render_bind_cast( 

6200 col.type, 

6201 col.type._unwrapped_dialect_impl(self.dialect), 

6202 f"p{i}", 

6203 ) 

6204 if col not in add_sentinel_set 

6205 else expr 

6206 ) 

6207 for i, (col, _, expr, _) in enumerate( 

6208 crud_params_single 

6209 ) 

6210 ) 

6211 else: 

6212 colnames_w_cast = ", ".join( 

6213 (f"p{i}" if col not in add_sentinel_set else expr) 

6214 for i, (col, _, expr, _) in enumerate( 

6215 crud_params_single 

6216 ) 

6217 ) 

6218 

6219 insert_crud_params = [ 

6220 elem 

6221 for elem in crud_params_single 

6222 if elem[0] not in add_sentinel_set 

6223 ] 

6224 

6225 text += ( 

6226 f" SELECT {colnames_w_cast} FROM " 

6227 f"(VALUES ({insert_single_values_expr})) " 

6228 f"AS imp_sen({colnames}, sen_counter) " 

6229 "ORDER BY sen_counter" 

6230 ) 

6231 

6232 else: 

6233 # otherwise, if no sentinel or backend doesn't support 

6234 # orderable subquery form, use a plain VALUES list 

6235 embed_sentinel_value = False 

6236 insert_crud_params = crud_params_single 

6237 insert_single_values_expr = ", ".join( 

6238 [value for _, _, value, _ in crud_params_single] 

6239 ) 

6240 

6241 text += f" VALUES ({insert_single_values_expr})" 

6242 

6243 self._insertmanyvalues = _InsertManyValues( 

6244 is_default_expr=False, 

6245 single_values_expr=insert_single_values_expr, 

6246 insert_crud_params=insert_crud_params, 

6247 num_positional_params_counted=counted_bindparam, 

6248 sort_by_parameter_order=(insert_stmt._sort_by_parameter_order), 

6249 includes_upsert_behaviors=( 

6250 insert_stmt._post_values_clause is not None 

6251 ), 

6252 sentinel_columns=add_sentinel_cols, 

6253 num_sentinel_columns=( 

6254 len(add_sentinel_cols) if add_sentinel_cols else 0 

6255 ), 

6256 sentinel_param_keys=named_sentinel_params, 

6257 implicit_sentinel=implicit_sentinel, 

6258 embed_values_counter=embed_sentinel_value, 

6259 ) 

6260 

6261 else: 

6262 insert_single_values_expr = ", ".join( 

6263 [value for _, _, value, _ in crud_params_single] 

6264 ) 

6265 

6266 text += f" VALUES ({insert_single_values_expr})" 

6267 

6268 if insert_stmt._post_values_clause is not None: 

6269 post_values_clause = self.process( 

6270 insert_stmt._post_values_clause, **kw 

6271 ) 

6272 if post_values_clause: 

6273 text += " " + post_values_clause 

6274 

6275 if returning_clause and not self.returning_precedes_values: 

6276 text += " " + returning_clause 

6277 

6278 if self.ctes and not self.dialect.cte_follows_insert: 

6279 nesting_level = len(self.stack) if not toplevel else None 

6280 text = ( 

6281 self._render_cte_clause( 

6282 nesting_level=nesting_level, 

6283 include_following_stack=True, 

6284 ) 

6285 + text 

6286 ) 

6287 

6288 self.stack.pop(-1) 

6289 

6290 return text 

6291 

6292 def update_limit_clause(self, update_stmt): 

6293 """Provide a hook for MySQL to add LIMIT to the UPDATE""" 

6294 return None 

6295 

6296 def delete_limit_clause(self, delete_stmt): 

6297 """Provide a hook for MySQL to add LIMIT to the DELETE""" 

6298 return None 

6299 

6300 def update_tables_clause(self, update_stmt, from_table, extra_froms, **kw): 

6301 """Provide a hook to override the initial table clause 

6302 in an UPDATE statement. 

6303 

6304 MySQL overrides this. 

6305 

6306 """ 

6307 kw["asfrom"] = True 

6308 return from_table._compiler_dispatch(self, iscrud=True, **kw) 

6309 

6310 def update_from_clause( 

6311 self, update_stmt, from_table, extra_froms, from_hints, **kw 

6312 ): 

6313 """Provide a hook to override the generation of an 

6314 UPDATE..FROM clause. 

6315 

6316 MySQL and MSSQL override this. 

6317 

6318 """ 

6319 raise NotImplementedError( 

6320 "This backend does not support multiple-table " 

6321 "criteria within UPDATE" 

6322 ) 

6323 

6324 def visit_update( 

6325 self, 

6326 update_stmt: Update, 

6327 visiting_cte: Optional[CTE] = None, 

6328 **kw: Any, 

6329 ) -> str: 

6330 compile_state = update_stmt._compile_state_factory( 

6331 update_stmt, self, **kw 

6332 ) 

6333 if TYPE_CHECKING: 

6334 assert isinstance(compile_state, UpdateDMLState) 

6335 update_stmt = compile_state.statement # type: ignore[assignment] 

6336 

6337 if visiting_cte is not None: 

6338 kw["visiting_cte"] = visiting_cte 

6339 toplevel = False 

6340 else: 

6341 toplevel = not self.stack 

6342 

6343 if toplevel: 

6344 self.isupdate = True 

6345 if not self.dml_compile_state: 

6346 self.dml_compile_state = compile_state 

6347 if not self.compile_state: 

6348 self.compile_state = compile_state 

6349 

6350 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

6351 from_linter = FromLinter({}, set()) 

6352 warn_linting = self.linting & WARN_LINTING 

6353 if toplevel: 

6354 self.from_linter = from_linter 

6355 else: 

6356 from_linter = None 

6357 warn_linting = False 

6358 

6359 extra_froms = compile_state._extra_froms 

6360 is_multitable = bool(extra_froms) 

6361 

6362 if is_multitable: 

6363 # main table might be a JOIN 

6364 main_froms = set(_from_objects(update_stmt.table)) 

6365 render_extra_froms = [ 

6366 f for f in extra_froms if f not in main_froms 

6367 ] 

6368 correlate_froms = main_froms.union(extra_froms) 

6369 else: 

6370 render_extra_froms = [] 

6371 correlate_froms = {update_stmt.table} 

6372 

6373 self.stack.append( 

6374 { 

6375 "correlate_froms": correlate_froms, 

6376 "asfrom_froms": correlate_froms, 

6377 "selectable": update_stmt, 

6378 } 

6379 ) 

6380 

6381 text = "UPDATE " 

6382 

6383 if update_stmt._prefixes: 

6384 text += self._generate_prefixes( 

6385 update_stmt, update_stmt._prefixes, **kw 

6386 ) 

6387 

6388 table_text = self.update_tables_clause( 

6389 update_stmt, 

6390 update_stmt.table, 

6391 render_extra_froms, 

6392 from_linter=from_linter, 

6393 **kw, 

6394 ) 

6395 crud_params_struct = crud._get_crud_params( 

6396 self, update_stmt, compile_state, toplevel, **kw 

6397 ) 

6398 crud_params = crud_params_struct.single_params 

6399 

6400 if update_stmt._hints: 

6401 dialect_hints, table_text = self._setup_crud_hints( 

6402 update_stmt, table_text 

6403 ) 

6404 else: 

6405 dialect_hints = None 

6406 

6407 if update_stmt._independent_ctes: 

6408 self._dispatch_independent_ctes(update_stmt, kw) 

6409 

6410 text += table_text 

6411 

6412 text += " SET " 

6413 text += ", ".join( 

6414 expr + "=" + value 

6415 for _, expr, value, _ in cast( 

6416 "List[Tuple[Any, str, str, Any]]", crud_params 

6417 ) 

6418 ) 

6419 

6420 if self.implicit_returning or update_stmt._returning: 

6421 if self.returning_precedes_values: 

6422 text += " " + self.returning_clause( 

6423 update_stmt, 

6424 self.implicit_returning or update_stmt._returning, 

6425 populate_result_map=toplevel, 

6426 ) 

6427 

6428 if extra_froms: 

6429 extra_from_text = self.update_from_clause( 

6430 update_stmt, 

6431 update_stmt.table, 

6432 render_extra_froms, 

6433 dialect_hints, 

6434 from_linter=from_linter, 

6435 **kw, 

6436 ) 

6437 if extra_from_text: 

6438 text += " " + extra_from_text 

6439 

6440 if update_stmt._where_criteria: 

6441 t = self._generate_delimited_and_list( 

6442 update_stmt._where_criteria, from_linter=from_linter, **kw 

6443 ) 

6444 if t: 

6445 text += " WHERE " + t 

6446 

6447 limit_clause = self.update_limit_clause(update_stmt) 

6448 if limit_clause: 

6449 text += " " + limit_clause 

6450 

6451 if ( 

6452 self.implicit_returning or update_stmt._returning 

6453 ) and not self.returning_precedes_values: 

6454 text += " " + self.returning_clause( 

6455 update_stmt, 

6456 self.implicit_returning or update_stmt._returning, 

6457 populate_result_map=toplevel, 

6458 ) 

6459 

6460 if self.ctes: 

6461 nesting_level = len(self.stack) if not toplevel else None 

6462 text = self._render_cte_clause(nesting_level=nesting_level) + text 

6463 

6464 if warn_linting: 

6465 assert from_linter is not None 

6466 from_linter.warn(stmt_type="UPDATE") 

6467 

6468 self.stack.pop(-1) 

6469 

6470 return text # type: ignore[no-any-return] 

6471 

6472 def delete_extra_from_clause( 

6473 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

6474 ): 

6475 """Provide a hook to override the generation of an 

6476 DELETE..FROM clause. 

6477 

6478 This can be used to implement DELETE..USING for example. 

6479 

6480 MySQL and MSSQL override this. 

6481 

6482 """ 

6483 raise NotImplementedError( 

6484 "This backend does not support multiple-table " 

6485 "criteria within DELETE" 

6486 ) 

6487 

6488 def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw): 

6489 return from_table._compiler_dispatch( 

6490 self, asfrom=True, iscrud=True, **kw 

6491 ) 

6492 

6493 def visit_delete(self, delete_stmt, visiting_cte=None, **kw): 

6494 compile_state = delete_stmt._compile_state_factory( 

6495 delete_stmt, self, **kw 

6496 ) 

6497 delete_stmt = compile_state.statement 

6498 

6499 if visiting_cte is not None: 

6500 kw["visiting_cte"] = visiting_cte 

6501 toplevel = False 

6502 else: 

6503 toplevel = not self.stack 

6504 

6505 if toplevel: 

6506 self.isdelete = True 

6507 if not self.dml_compile_state: 

6508 self.dml_compile_state = compile_state 

6509 if not self.compile_state: 

6510 self.compile_state = compile_state 

6511 

6512 if self.linting & COLLECT_CARTESIAN_PRODUCTS: 

6513 from_linter = FromLinter({}, set()) 

6514 warn_linting = self.linting & WARN_LINTING 

6515 if toplevel: 

6516 self.from_linter = from_linter 

6517 else: 

6518 from_linter = None 

6519 warn_linting = False 

6520 

6521 extra_froms = compile_state._extra_froms 

6522 

6523 correlate_froms = {delete_stmt.table}.union(extra_froms) 

6524 self.stack.append( 

6525 { 

6526 "correlate_froms": correlate_froms, 

6527 "asfrom_froms": correlate_froms, 

6528 "selectable": delete_stmt, 

6529 } 

6530 ) 

6531 

6532 text = "DELETE " 

6533 

6534 if delete_stmt._prefixes: 

6535 text += self._generate_prefixes( 

6536 delete_stmt, delete_stmt._prefixes, **kw 

6537 ) 

6538 

6539 text += "FROM " 

6540 

6541 try: 

6542 table_text = self.delete_table_clause( 

6543 delete_stmt, 

6544 delete_stmt.table, 

6545 extra_froms, 

6546 from_linter=from_linter, 

6547 ) 

6548 except TypeError: 

6549 # anticipate 3rd party dialects that don't include **kw 

6550 # TODO: remove in 2.1 

6551 table_text = self.delete_table_clause( 

6552 delete_stmt, delete_stmt.table, extra_froms 

6553 ) 

6554 if from_linter: 

6555 _ = self.process(delete_stmt.table, from_linter=from_linter) 

6556 

6557 crud._get_crud_params(self, delete_stmt, compile_state, toplevel, **kw) 

6558 

6559 if delete_stmt._hints: 

6560 dialect_hints, table_text = self._setup_crud_hints( 

6561 delete_stmt, table_text 

6562 ) 

6563 else: 

6564 dialect_hints = None 

6565 

6566 if delete_stmt._independent_ctes: 

6567 self._dispatch_independent_ctes(delete_stmt, kw) 

6568 

6569 text += table_text 

6570 

6571 if ( 

6572 self.implicit_returning or delete_stmt._returning 

6573 ) and self.returning_precedes_values: 

6574 text += " " + self.returning_clause( 

6575 delete_stmt, 

6576 self.implicit_returning or delete_stmt._returning, 

6577 populate_result_map=toplevel, 

6578 ) 

6579 

6580 if extra_froms: 

6581 extra_from_text = self.delete_extra_from_clause( 

6582 delete_stmt, 

6583 delete_stmt.table, 

6584 extra_froms, 

6585 dialect_hints, 

6586 from_linter=from_linter, 

6587 **kw, 

6588 ) 

6589 if extra_from_text: 

6590 text += " " + extra_from_text 

6591 

6592 if delete_stmt._where_criteria: 

6593 t = self._generate_delimited_and_list( 

6594 delete_stmt._where_criteria, from_linter=from_linter, **kw 

6595 ) 

6596 if t: 

6597 text += " WHERE " + t 

6598 

6599 limit_clause = self.delete_limit_clause(delete_stmt) 

6600 if limit_clause: 

6601 text += " " + limit_clause 

6602 

6603 if ( 

6604 self.implicit_returning or delete_stmt._returning 

6605 ) and not self.returning_precedes_values: 

6606 text += " " + self.returning_clause( 

6607 delete_stmt, 

6608 self.implicit_returning or delete_stmt._returning, 

6609 populate_result_map=toplevel, 

6610 ) 

6611 

6612 if self.ctes: 

6613 nesting_level = len(self.stack) if not toplevel else None 

6614 text = self._render_cte_clause(nesting_level=nesting_level) + text 

6615 

6616 if warn_linting: 

6617 assert from_linter is not None 

6618 from_linter.warn(stmt_type="DELETE") 

6619 

6620 self.stack.pop(-1) 

6621 

6622 return text 

6623 

6624 def visit_savepoint(self, savepoint_stmt, **kw): 

6625 return "SAVEPOINT %s" % self.preparer.format_savepoint(savepoint_stmt) 

6626 

6627 def visit_rollback_to_savepoint(self, savepoint_stmt, **kw): 

6628 return "ROLLBACK TO SAVEPOINT %s" % self.preparer.format_savepoint( 

6629 savepoint_stmt 

6630 ) 

6631 

6632 def visit_release_savepoint(self, savepoint_stmt, **kw): 

6633 return "RELEASE SAVEPOINT %s" % self.preparer.format_savepoint( 

6634 savepoint_stmt 

6635 ) 

6636 

6637 

6638class StrSQLCompiler(SQLCompiler): 

6639 """A :class:`.SQLCompiler` subclass which allows a small selection 

6640 of non-standard SQL features to render into a string value. 

6641 

6642 The :class:`.StrSQLCompiler` is invoked whenever a Core expression 

6643 element is directly stringified without calling upon the 

6644 :meth:`_expression.ClauseElement.compile` method. 

6645 It can render a limited set 

6646 of non-standard SQL constructs to assist in basic stringification, 

6647 however for more substantial custom or dialect-specific SQL constructs, 

6648 it will be necessary to make use of 

6649 :meth:`_expression.ClauseElement.compile` 

6650 directly. 

6651 

6652 .. seealso:: 

6653 

6654 :ref:`faq_sql_expression_string` 

6655 

6656 """ 

6657 

6658 def _fallback_column_name(self, column): 

6659 return "<name unknown>" 

6660 

6661 @util.preload_module("sqlalchemy.engine.url") 

6662 def visit_unsupported_compilation(self, element, err, **kw): 

6663 if element.stringify_dialect != "default": 

6664 url = util.preloaded.engine_url 

6665 dialect = url.URL.create(element.stringify_dialect).get_dialect()() 

6666 

6667 compiler = dialect.statement_compiler( 

6668 dialect, None, _supporting_against=self 

6669 ) 

6670 if not isinstance(compiler, StrSQLCompiler): 

6671 return compiler.process(element, **kw) 

6672 

6673 return super().visit_unsupported_compilation(element, err) 

6674 

6675 def visit_getitem_binary(self, binary, operator, **kw): 

6676 return "%s[%s]" % ( 

6677 self.process(binary.left, **kw), 

6678 self.process(binary.right, **kw), 

6679 ) 

6680 

6681 def visit_json_getitem_op_binary(self, binary, operator, **kw): 

6682 return self.visit_getitem_binary(binary, operator, **kw) 

6683 

6684 def visit_json_path_getitem_op_binary(self, binary, operator, **kw): 

6685 return self.visit_getitem_binary(binary, operator, **kw) 

6686 

6687 def visit_sequence(self, sequence, **kw): 

6688 return ( 

6689 f"<next sequence value: {self.preparer.format_sequence(sequence)}>" 

6690 ) 

6691 

6692 def returning_clause( 

6693 self, 

6694 stmt: UpdateBase, 

6695 returning_cols: Sequence[_ColumnsClauseElement], 

6696 *, 

6697 populate_result_map: bool, 

6698 **kw: Any, 

6699 ) -> str: 

6700 columns = [ 

6701 self._label_select_column(None, c, True, False, {}) 

6702 for c in base._select_iterables(returning_cols) 

6703 ] 

6704 return "RETURNING " + ", ".join(columns) 

6705 

6706 def update_from_clause( 

6707 self, update_stmt, from_table, extra_froms, from_hints, **kw 

6708 ): 

6709 kw["asfrom"] = True 

6710 return "FROM " + ", ".join( 

6711 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

6712 for t in extra_froms 

6713 ) 

6714 

6715 def delete_extra_from_clause( 

6716 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

6717 ): 

6718 kw["asfrom"] = True 

6719 return ", " + ", ".join( 

6720 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

6721 for t in extra_froms 

6722 ) 

6723 

6724 def visit_empty_set_expr(self, element_types, **kw): 

6725 return "SELECT 1 WHERE 1!=1" 

6726 

6727 def get_from_hint_text(self, table, text): 

6728 return "[%s]" % text 

6729 

6730 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

6731 return self._generate_generic_binary(binary, " <regexp> ", **kw) 

6732 

6733 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

6734 return self._generate_generic_binary(binary, " <not regexp> ", **kw) 

6735 

6736 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

6737 return "<regexp replace>(%s, %s)" % ( 

6738 binary.left._compiler_dispatch(self, **kw), 

6739 binary.right._compiler_dispatch(self, **kw), 

6740 ) 

6741 

6742 def visit_try_cast(self, cast, **kwargs): 

6743 return "TRY_CAST(%s AS %s)" % ( 

6744 cast.clause._compiler_dispatch(self, **kwargs), 

6745 cast.typeclause._compiler_dispatch(self, **kwargs), 

6746 ) 

6747 

6748 

6749class DDLCompiler(Compiled): 

6750 is_ddl = True 

6751 

6752 if TYPE_CHECKING: 

6753 

6754 def __init__( 

6755 self, 

6756 dialect: Dialect, 

6757 statement: ExecutableDDLElement, 

6758 schema_translate_map: Optional[SchemaTranslateMapType] = ..., 

6759 render_schema_translate: bool = ..., 

6760 compile_kwargs: Mapping[str, Any] = ..., 

6761 ): ... 

6762 

6763 @util.ro_memoized_property 

6764 def sql_compiler(self) -> SQLCompiler: 

6765 return self.dialect.statement_compiler( 

6766 self.dialect, None, schema_translate_map=self.schema_translate_map 

6767 ) 

6768 

6769 @util.memoized_property 

6770 def type_compiler(self): 

6771 return self.dialect.type_compiler_instance 

6772 

6773 def construct_params( 

6774 self, 

6775 params: Optional[_CoreSingleExecuteParams] = None, 

6776 extracted_parameters: Optional[Sequence[BindParameter[Any]]] = None, 

6777 escape_names: bool = True, 

6778 ) -> Optional[_MutableCoreSingleExecuteParams]: 

6779 return None 

6780 

6781 def visit_ddl(self, ddl, **kwargs): 

6782 # table events can substitute table and schema name 

6783 context = ddl.context 

6784 if isinstance(ddl.target, schema.Table): 

6785 context = context.copy() 

6786 

6787 preparer = self.preparer 

6788 path = preparer.format_table_seq(ddl.target) 

6789 if len(path) == 1: 

6790 table, sch = path[0], "" 

6791 else: 

6792 table, sch = path[-1], path[0] 

6793 

6794 context.setdefault("table", table) 

6795 context.setdefault("schema", sch) 

6796 context.setdefault("fullname", preparer.format_table(ddl.target)) 

6797 

6798 return self.sql_compiler.post_process_text(ddl.statement % context) 

6799 

6800 def visit_create_schema(self, create, **kw): 

6801 text = "CREATE SCHEMA " 

6802 if create.if_not_exists: 

6803 text += "IF NOT EXISTS " 

6804 return text + self.preparer.format_schema(create.element) 

6805 

6806 def visit_drop_schema(self, drop, **kw): 

6807 text = "DROP SCHEMA " 

6808 if drop.if_exists: 

6809 text += "IF EXISTS " 

6810 text += self.preparer.format_schema(drop.element) 

6811 if drop.cascade: 

6812 text += " CASCADE" 

6813 return text 

6814 

6815 def visit_create_table(self, create, **kw): 

6816 table = create.element 

6817 preparer = self.preparer 

6818 

6819 text = "\nCREATE " 

6820 if table._prefixes: 

6821 text += " ".join(table._prefixes) + " " 

6822 

6823 text += "TABLE " 

6824 if create.if_not_exists: 

6825 text += "IF NOT EXISTS " 

6826 

6827 text += preparer.format_table(table) + " " 

6828 

6829 create_table_suffix = self.create_table_suffix(table) 

6830 if create_table_suffix: 

6831 text += create_table_suffix + " " 

6832 

6833 text += "(" 

6834 

6835 separator = "\n" 

6836 

6837 # if only one primary key, specify it along with the column 

6838 first_pk = False 

6839 for create_column in create.columns: 

6840 column = create_column.element 

6841 try: 

6842 processed = self.process( 

6843 create_column, first_pk=column.primary_key and not first_pk 

6844 ) 

6845 if processed is not None: 

6846 text += separator 

6847 separator = ", \n" 

6848 text += "\t" + processed 

6849 if column.primary_key: 

6850 first_pk = True 

6851 except exc.CompileError as ce: 

6852 raise exc.CompileError( 

6853 "(in table '%s', column '%s'): %s" 

6854 % (table.description, column.name, ce.args[0]) 

6855 ) from ce 

6856 

6857 const = self.create_table_constraints( 

6858 table, 

6859 _include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa 

6860 ) 

6861 if const: 

6862 text += separator + "\t" + const 

6863 

6864 text += "\n)%s\n\n" % self.post_create_table(table) 

6865 return text 

6866 

6867 def visit_create_column(self, create, first_pk=False, **kw): 

6868 column = create.element 

6869 

6870 if column.system: 

6871 return None 

6872 

6873 text = self.get_column_specification(column, first_pk=first_pk) 

6874 const = " ".join( 

6875 self.process(constraint) for constraint in column.constraints 

6876 ) 

6877 if const: 

6878 text += " " + const 

6879 

6880 return text 

6881 

6882 def create_table_constraints( 

6883 self, table, _include_foreign_key_constraints=None, **kw 

6884 ): 

6885 # On some DB order is significant: visit PK first, then the 

6886 # other constraints (engine.ReflectionTest.testbasic failed on FB2) 

6887 constraints = [] 

6888 if table.primary_key: 

6889 constraints.append(table.primary_key) 

6890 

6891 all_fkcs = table.foreign_key_constraints 

6892 if _include_foreign_key_constraints is not None: 

6893 omit_fkcs = all_fkcs.difference(_include_foreign_key_constraints) 

6894 else: 

6895 omit_fkcs = set() 

6896 

6897 constraints.extend( 

6898 [ 

6899 c 

6900 for c in table._sorted_constraints 

6901 if c is not table.primary_key and c not in omit_fkcs 

6902 ] 

6903 ) 

6904 

6905 return ", \n\t".join( 

6906 p 

6907 for p in ( 

6908 self.process(constraint) 

6909 for constraint in constraints 

6910 if (constraint._should_create_for_compiler(self)) 

6911 and ( 

6912 not self.dialect.supports_alter 

6913 or not getattr(constraint, "use_alter", False) 

6914 ) 

6915 ) 

6916 if p is not None 

6917 ) 

6918 

6919 def visit_drop_table(self, drop, **kw): 

6920 text = "\nDROP TABLE " 

6921 if drop.if_exists: 

6922 text += "IF EXISTS " 

6923 return text + self.preparer.format_table(drop.element) 

6924 

6925 def visit_drop_view(self, drop, **kw): 

6926 return "\nDROP VIEW " + self.preparer.format_table(drop.element) 

6927 

6928 def _verify_index_table(self, index: Index) -> None: 

6929 if index.table is None: 

6930 raise exc.CompileError( 

6931 "Index '%s' is not associated with any table." % index.name 

6932 ) 

6933 

6934 def visit_create_index( 

6935 self, create, include_schema=False, include_table_schema=True, **kw 

6936 ): 

6937 index = create.element 

6938 self._verify_index_table(index) 

6939 preparer = self.preparer 

6940 text = "CREATE " 

6941 if index.unique: 

6942 text += "UNIQUE " 

6943 if index.name is None: 

6944 raise exc.CompileError( 

6945 "CREATE INDEX requires that the index have a name" 

6946 ) 

6947 

6948 text += "INDEX " 

6949 if create.if_not_exists: 

6950 text += "IF NOT EXISTS " 

6951 

6952 text += "%s ON %s (%s)" % ( 

6953 self._prepared_index_name(index, include_schema=include_schema), 

6954 preparer.format_table( 

6955 index.table, use_schema=include_table_schema 

6956 ), 

6957 ", ".join( 

6958 self.sql_compiler.process( 

6959 expr, include_table=False, literal_binds=True 

6960 ) 

6961 for expr in index.expressions 

6962 ), 

6963 ) 

6964 return text 

6965 

6966 def visit_drop_index(self, drop, **kw): 

6967 index = drop.element 

6968 

6969 if index.name is None: 

6970 raise exc.CompileError( 

6971 "DROP INDEX requires that the index have a name" 

6972 ) 

6973 text = "\nDROP INDEX " 

6974 if drop.if_exists: 

6975 text += "IF EXISTS " 

6976 

6977 return text + self._prepared_index_name(index, include_schema=True) 

6978 

6979 def _prepared_index_name( 

6980 self, index: Index, include_schema: bool = False 

6981 ) -> str: 

6982 if index.table is not None: 

6983 effective_schema = self.preparer.schema_for_object(index.table) 

6984 else: 

6985 effective_schema = None 

6986 if include_schema and effective_schema: 

6987 schema_name = self.preparer.quote_schema(effective_schema) 

6988 else: 

6989 schema_name = None 

6990 

6991 index_name: str = self.preparer.format_index(index) 

6992 

6993 if schema_name: 

6994 index_name = schema_name + "." + index_name 

6995 return index_name 

6996 

6997 def visit_add_constraint(self, create, **kw): 

6998 return "ALTER TABLE %s ADD %s" % ( 

6999 self.preparer.format_table(create.element.table), 

7000 self.process(create.element), 

7001 ) 

7002 

7003 def visit_set_table_comment(self, create, **kw): 

7004 return "COMMENT ON TABLE %s IS %s" % ( 

7005 self.preparer.format_table(create.element), 

7006 self.sql_compiler.render_literal_value( 

7007 create.element.comment, sqltypes.String() 

7008 ), 

7009 ) 

7010 

7011 def visit_drop_table_comment(self, drop, **kw): 

7012 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table( 

7013 drop.element 

7014 ) 

7015 

7016 def visit_set_column_comment(self, create, **kw): 

7017 return "COMMENT ON COLUMN %s IS %s" % ( 

7018 self.preparer.format_column( 

7019 create.element, use_table=True, use_schema=True 

7020 ), 

7021 self.sql_compiler.render_literal_value( 

7022 create.element.comment, sqltypes.String() 

7023 ), 

7024 ) 

7025 

7026 def visit_drop_column_comment(self, drop, **kw): 

7027 return "COMMENT ON COLUMN %s IS NULL" % self.preparer.format_column( 

7028 drop.element, use_table=True 

7029 ) 

7030 

7031 def visit_set_constraint_comment(self, create, **kw): 

7032 raise exc.UnsupportedCompilationError(self, type(create)) 

7033 

7034 def visit_drop_constraint_comment(self, drop, **kw): 

7035 raise exc.UnsupportedCompilationError(self, type(drop)) 

7036 

7037 def get_identity_options(self, identity_options: IdentityOptions) -> str: 

7038 text = [] 

7039 if identity_options.increment is not None: 

7040 text.append("INCREMENT BY %d" % identity_options.increment) 

7041 if identity_options.start is not None: 

7042 text.append("START WITH %d" % identity_options.start) 

7043 if identity_options.minvalue is not None: 

7044 text.append("MINVALUE %d" % identity_options.minvalue) 

7045 if identity_options.maxvalue is not None: 

7046 text.append("MAXVALUE %d" % identity_options.maxvalue) 

7047 if identity_options.nominvalue is not None: 

7048 text.append("NO MINVALUE") 

7049 if identity_options.nomaxvalue is not None: 

7050 text.append("NO MAXVALUE") 

7051 if identity_options.cache is not None: 

7052 text.append("CACHE %d" % identity_options.cache) 

7053 if identity_options.cycle is not None: 

7054 text.append("CYCLE" if identity_options.cycle else "NO CYCLE") 

7055 return " ".join(text) 

7056 

7057 def visit_create_sequence(self, create, prefix=None, **kw): 

7058 text = "CREATE SEQUENCE " 

7059 if create.if_not_exists: 

7060 text += "IF NOT EXISTS " 

7061 text += self.preparer.format_sequence(create.element) 

7062 

7063 if prefix: 

7064 text += prefix 

7065 options = self.get_identity_options(create.element) 

7066 if options: 

7067 text += " " + options 

7068 return text 

7069 

7070 def visit_drop_sequence(self, drop, **kw): 

7071 text = "DROP SEQUENCE " 

7072 if drop.if_exists: 

7073 text += "IF EXISTS " 

7074 return text + self.preparer.format_sequence(drop.element) 

7075 

7076 def visit_drop_constraint(self, drop, **kw): 

7077 constraint = drop.element 

7078 if constraint.name is not None: 

7079 formatted_name = self.preparer.format_constraint(constraint) 

7080 else: 

7081 formatted_name = None 

7082 

7083 if formatted_name is None: 

7084 raise exc.CompileError( 

7085 "Can't emit DROP CONSTRAINT for constraint %r; " 

7086 "it has no name" % drop.element 

7087 ) 

7088 return "ALTER TABLE %s DROP CONSTRAINT %s%s%s" % ( 

7089 self.preparer.format_table(drop.element.table), 

7090 "IF EXISTS " if drop.if_exists else "", 

7091 formatted_name, 

7092 " CASCADE" if drop.cascade else "", 

7093 ) 

7094 

7095 def get_column_specification(self, column, **kwargs): 

7096 colspec = ( 

7097 self.preparer.format_column(column) 

7098 + " " 

7099 + self.dialect.type_compiler_instance.process( 

7100 column.type, type_expression=column 

7101 ) 

7102 ) 

7103 default = self.get_column_default_string(column) 

7104 if default is not None: 

7105 colspec += " DEFAULT " + default 

7106 

7107 if column.computed is not None: 

7108 colspec += " " + self.process(column.computed) 

7109 

7110 if ( 

7111 column.identity is not None 

7112 and self.dialect.supports_identity_columns 

7113 ): 

7114 colspec += " " + self.process(column.identity) 

7115 

7116 if not column.nullable and ( 

7117 not column.identity or not self.dialect.supports_identity_columns 

7118 ): 

7119 colspec += " NOT NULL" 

7120 return colspec 

7121 

7122 def create_table_suffix(self, table): 

7123 return "" 

7124 

7125 def post_create_table(self, table): 

7126 return "" 

7127 

7128 def get_column_default_string(self, column: Column[Any]) -> Optional[str]: 

7129 if isinstance(column.server_default, schema.DefaultClause): 

7130 return self.render_default_string(column.server_default.arg) 

7131 else: 

7132 return None 

7133 

7134 def render_default_string(self, default: Union[Visitable, str]) -> str: 

7135 if isinstance(default, str): 

7136 return self.sql_compiler.render_literal_value( 

7137 default, sqltypes.STRINGTYPE 

7138 ) 

7139 else: 

7140 return self.sql_compiler.process(default, literal_binds=True) 

7141 

7142 def visit_table_or_column_check_constraint(self, constraint, **kw): 

7143 if constraint.is_column_level: 

7144 return self.visit_column_check_constraint(constraint) 

7145 else: 

7146 return self.visit_check_constraint(constraint) 

7147 

7148 def visit_check_constraint(self, constraint, **kw): 

7149 text = self.define_constraint_preamble(constraint, **kw) 

7150 text += self.define_check_body(constraint, **kw) 

7151 text += self.define_constraint_deferrability(constraint) 

7152 return text 

7153 

7154 def visit_column_check_constraint(self, constraint, **kw): 

7155 text = self.define_constraint_preamble(constraint, **kw) 

7156 text += self.define_check_body(constraint, **kw) 

7157 text += self.define_constraint_deferrability(constraint) 

7158 return text 

7159 

7160 def visit_primary_key_constraint( 

7161 self, constraint: PrimaryKeyConstraint, **kw: Any 

7162 ) -> str: 

7163 if len(constraint) == 0: 

7164 return "" 

7165 text = self.define_constraint_preamble(constraint, **kw) 

7166 text += self.define_primary_key_body(constraint, **kw) 

7167 text += self.define_constraint_deferrability(constraint) 

7168 return text 

7169 

7170 def visit_foreign_key_constraint( 

7171 self, constraint: ForeignKeyConstraint, **kw: Any 

7172 ) -> str: 

7173 text = self.define_constraint_preamble(constraint, **kw) 

7174 text += self.define_foreign_key_body(constraint, **kw) 

7175 text += self.define_constraint_match(constraint) 

7176 text += self.define_constraint_cascades(constraint) 

7177 text += self.define_constraint_deferrability(constraint) 

7178 return text 

7179 

7180 def define_constraint_remote_table(self, constraint, table, preparer): 

7181 """Format the remote table clause of a CREATE CONSTRAINT clause.""" 

7182 

7183 return preparer.format_table(table) 

7184 

7185 def visit_unique_constraint( 

7186 self, constraint: UniqueConstraint, **kw: Any 

7187 ) -> str: 

7188 if len(constraint) == 0: 

7189 return "" 

7190 text = self.define_constraint_preamble(constraint, **kw) 

7191 text += self.define_unique_body(constraint, **kw) 

7192 text += self.define_constraint_deferrability(constraint) 

7193 return text 

7194 

7195 def define_constraint_preamble( 

7196 self, constraint: Constraint, **kw: Any 

7197 ) -> str: 

7198 text = "" 

7199 if constraint.name is not None: 

7200 formatted_name = self.preparer.format_constraint(constraint) 

7201 if formatted_name is not None: 

7202 text += "CONSTRAINT %s " % formatted_name 

7203 return text 

7204 

7205 def define_primary_key_body( 

7206 self, constraint: PrimaryKeyConstraint, **kw: Any 

7207 ) -> str: 

7208 text = "" 

7209 text += "PRIMARY KEY " 

7210 text += "(%s)" % ", ".join( 

7211 self.preparer.quote(c.name) 

7212 for c in ( 

7213 constraint.columns_autoinc_first 

7214 if constraint._implicit_generated 

7215 else constraint.columns 

7216 ) 

7217 ) 

7218 return text 

7219 

7220 def define_foreign_key_body( 

7221 self, constraint: ForeignKeyConstraint, **kw: Any 

7222 ) -> str: 

7223 preparer = self.preparer 

7224 remote_table = list(constraint.elements)[0].column.table 

7225 text = "FOREIGN KEY(%s) REFERENCES %s (%s)" % ( 

7226 ", ".join( 

7227 preparer.quote(f.parent.name) for f in constraint.elements 

7228 ), 

7229 self.define_constraint_remote_table( 

7230 constraint, remote_table, preparer 

7231 ), 

7232 ", ".join( 

7233 preparer.quote(f.column.name) for f in constraint.elements 

7234 ), 

7235 ) 

7236 return text 

7237 

7238 def define_unique_body( 

7239 self, constraint: UniqueConstraint, **kw: Any 

7240 ) -> str: 

7241 text = "UNIQUE %s(%s)" % ( 

7242 self.define_unique_constraint_distinct(constraint, **kw), 

7243 ", ".join(self.preparer.quote(c.name) for c in constraint), 

7244 ) 

7245 return text 

7246 

7247 def define_check_body(self, constraint: CheckConstraint, **kw: Any) -> str: 

7248 text = "CHECK (%s)" % self.sql_compiler.process( 

7249 constraint.sqltext, include_table=False, literal_binds=True 

7250 ) 

7251 return text 

7252 

7253 def define_unique_constraint_distinct( 

7254 self, constraint: UniqueConstraint, **kw: Any 

7255 ) -> str: 

7256 return "" 

7257 

7258 def define_constraint_cascades( 

7259 self, constraint: ForeignKeyConstraint 

7260 ) -> str: 

7261 text = "" 

7262 if constraint.ondelete is not None: 

7263 text += self.define_constraint_ondelete_cascade(constraint) 

7264 

7265 if constraint.onupdate is not None: 

7266 text += self.define_constraint_onupdate_cascade(constraint) 

7267 return text 

7268 

7269 def define_constraint_ondelete_cascade( 

7270 self, constraint: ForeignKeyConstraint 

7271 ) -> str: 

7272 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

7273 constraint.ondelete, FK_ON_DELETE 

7274 ) 

7275 

7276 def define_constraint_onupdate_cascade( 

7277 self, constraint: ForeignKeyConstraint 

7278 ) -> str: 

7279 return " ON UPDATE %s" % self.preparer.validate_sql_phrase( 

7280 constraint.onupdate, FK_ON_UPDATE 

7281 ) 

7282 

7283 def define_constraint_deferrability(self, constraint: Constraint) -> str: 

7284 text = "" 

7285 if constraint.deferrable is not None: 

7286 if constraint.deferrable: 

7287 text += " DEFERRABLE" 

7288 else: 

7289 text += " NOT DEFERRABLE" 

7290 if constraint.initially is not None: 

7291 text += " INITIALLY %s" % self.preparer.validate_sql_phrase( 

7292 constraint.initially, FK_INITIALLY 

7293 ) 

7294 return text 

7295 

7296 def define_constraint_match(self, constraint: ForeignKeyConstraint) -> str: 

7297 text = "" 

7298 if constraint.match is not None: 

7299 text += " MATCH %s" % constraint.match 

7300 return text 

7301 

7302 def visit_computed_column(self, generated, **kw): 

7303 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( 

7304 generated.sqltext, include_table=False, literal_binds=True 

7305 ) 

7306 if generated.persisted is True: 

7307 text += " STORED" 

7308 elif generated.persisted is False: 

7309 text += " VIRTUAL" 

7310 return text 

7311 

7312 def visit_identity_column(self, identity, **kw): 

7313 text = "GENERATED %s AS IDENTITY" % ( 

7314 "ALWAYS" if identity.always else "BY DEFAULT", 

7315 ) 

7316 options = self.get_identity_options(identity) 

7317 if options: 

7318 text += " (%s)" % options 

7319 return text 

7320 

7321 

7322class GenericTypeCompiler(TypeCompiler): 

7323 def visit_FLOAT(self, type_: sqltypes.Float[Any], **kw: Any) -> str: 

7324 return "FLOAT" 

7325 

7326 def visit_DOUBLE(self, type_: sqltypes.Double[Any], **kw: Any) -> str: 

7327 return "DOUBLE" 

7328 

7329 def visit_DOUBLE_PRECISION( 

7330 self, type_: sqltypes.DOUBLE_PRECISION[Any], **kw: Any 

7331 ) -> str: 

7332 return "DOUBLE PRECISION" 

7333 

7334 def visit_REAL(self, type_: sqltypes.REAL[Any], **kw: Any) -> str: 

7335 return "REAL" 

7336 

7337 def visit_NUMERIC(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str: 

7338 if type_.precision is None: 

7339 return "NUMERIC" 

7340 elif type_.scale is None: 

7341 return "NUMERIC(%(precision)s)" % {"precision": type_.precision} 

7342 else: 

7343 return "NUMERIC(%(precision)s, %(scale)s)" % { 

7344 "precision": type_.precision, 

7345 "scale": type_.scale, 

7346 } 

7347 

7348 def visit_DECIMAL(self, type_: sqltypes.DECIMAL[Any], **kw: Any) -> str: 

7349 if type_.precision is None: 

7350 return "DECIMAL" 

7351 elif type_.scale is None: 

7352 return "DECIMAL(%(precision)s)" % {"precision": type_.precision} 

7353 else: 

7354 return "DECIMAL(%(precision)s, %(scale)s)" % { 

7355 "precision": type_.precision, 

7356 "scale": type_.scale, 

7357 } 

7358 

7359 def visit_INTEGER(self, type_: sqltypes.Integer, **kw: Any) -> str: 

7360 return "INTEGER" 

7361 

7362 def visit_SMALLINT(self, type_: sqltypes.SmallInteger, **kw: Any) -> str: 

7363 return "SMALLINT" 

7364 

7365 def visit_BIGINT(self, type_: sqltypes.BigInteger, **kw: Any) -> str: 

7366 return "BIGINT" 

7367 

7368 def visit_TIMESTAMP(self, type_: sqltypes.TIMESTAMP, **kw: Any) -> str: 

7369 return "TIMESTAMP" 

7370 

7371 def visit_DATETIME(self, type_: sqltypes.DateTime, **kw: Any) -> str: 

7372 return "DATETIME" 

7373 

7374 def visit_DATE(self, type_: sqltypes.Date, **kw: Any) -> str: 

7375 return "DATE" 

7376 

7377 def visit_TIME(self, type_: sqltypes.Time, **kw: Any) -> str: 

7378 return "TIME" 

7379 

7380 def visit_CLOB(self, type_: sqltypes.CLOB, **kw: Any) -> str: 

7381 return "CLOB" 

7382 

7383 def visit_NCLOB(self, type_: sqltypes.Text, **kw: Any) -> str: 

7384 return "NCLOB" 

7385 

7386 def _render_string_type( 

7387 self, name: str, length: Optional[int], collation: Optional[str] 

7388 ) -> str: 

7389 text = name 

7390 if length: 

7391 text += f"({length})" 

7392 if collation: 

7393 text += f' COLLATE "{collation}"' 

7394 return text 

7395 

7396 def visit_CHAR(self, type_: sqltypes.CHAR, **kw: Any) -> str: 

7397 return self._render_string_type("CHAR", type_.length, type_.collation) 

7398 

7399 def visit_NCHAR(self, type_: sqltypes.NCHAR, **kw: Any) -> str: 

7400 return self._render_string_type("NCHAR", type_.length, type_.collation) 

7401 

7402 def visit_VARCHAR(self, type_: sqltypes.String, **kw: Any) -> str: 

7403 return self._render_string_type( 

7404 "VARCHAR", type_.length, type_.collation 

7405 ) 

7406 

7407 def visit_NVARCHAR(self, type_: sqltypes.NVARCHAR, **kw: Any) -> str: 

7408 return self._render_string_type( 

7409 "NVARCHAR", type_.length, type_.collation 

7410 ) 

7411 

7412 def visit_TEXT(self, type_: sqltypes.Text, **kw: Any) -> str: 

7413 return self._render_string_type("TEXT", type_.length, type_.collation) 

7414 

7415 def visit_UUID(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str: 

7416 return "UUID" 

7417 

7418 def visit_BLOB(self, type_: sqltypes.LargeBinary, **kw: Any) -> str: 

7419 return "BLOB" 

7420 

7421 def visit_BINARY(self, type_: sqltypes.BINARY, **kw: Any) -> str: 

7422 return "BINARY" + (type_.length and "(%d)" % type_.length or "") 

7423 

7424 def visit_VARBINARY(self, type_: sqltypes.VARBINARY, **kw: Any) -> str: 

7425 return "VARBINARY" + (type_.length and "(%d)" % type_.length or "") 

7426 

7427 def visit_BOOLEAN(self, type_: sqltypes.Boolean, **kw: Any) -> str: 

7428 return "BOOLEAN" 

7429 

7430 def visit_uuid(self, type_: sqltypes.Uuid[Any], **kw: Any) -> str: 

7431 if not type_.native_uuid or not self.dialect.supports_native_uuid: 

7432 return self._render_string_type("CHAR", length=32, collation=None) 

7433 else: 

7434 return self.visit_UUID(type_, **kw) 

7435 

7436 def visit_large_binary( 

7437 self, type_: sqltypes.LargeBinary, **kw: Any 

7438 ) -> str: 

7439 return self.visit_BLOB(type_, **kw) 

7440 

7441 def visit_boolean(self, type_: sqltypes.Boolean, **kw: Any) -> str: 

7442 return self.visit_BOOLEAN(type_, **kw) 

7443 

7444 def visit_time(self, type_: sqltypes.Time, **kw: Any) -> str: 

7445 return self.visit_TIME(type_, **kw) 

7446 

7447 def visit_datetime(self, type_: sqltypes.DateTime, **kw: Any) -> str: 

7448 return self.visit_DATETIME(type_, **kw) 

7449 

7450 def visit_date(self, type_: sqltypes.Date, **kw: Any) -> str: 

7451 return self.visit_DATE(type_, **kw) 

7452 

7453 def visit_big_integer(self, type_: sqltypes.BigInteger, **kw: Any) -> str: 

7454 return self.visit_BIGINT(type_, **kw) 

7455 

7456 def visit_small_integer( 

7457 self, type_: sqltypes.SmallInteger, **kw: Any 

7458 ) -> str: 

7459 return self.visit_SMALLINT(type_, **kw) 

7460 

7461 def visit_integer(self, type_: sqltypes.Integer, **kw: Any) -> str: 

7462 return self.visit_INTEGER(type_, **kw) 

7463 

7464 def visit_real(self, type_: sqltypes.REAL[Any], **kw: Any) -> str: 

7465 return self.visit_REAL(type_, **kw) 

7466 

7467 def visit_float(self, type_: sqltypes.Float[Any], **kw: Any) -> str: 

7468 return self.visit_FLOAT(type_, **kw) 

7469 

7470 def visit_double(self, type_: sqltypes.Double[Any], **kw: Any) -> str: 

7471 return self.visit_DOUBLE(type_, **kw) 

7472 

7473 def visit_numeric(self, type_: sqltypes.Numeric[Any], **kw: Any) -> str: 

7474 return self.visit_NUMERIC(type_, **kw) 

7475 

7476 def visit_string(self, type_: sqltypes.String, **kw: Any) -> str: 

7477 return self.visit_VARCHAR(type_, **kw) 

7478 

7479 def visit_unicode(self, type_: sqltypes.Unicode, **kw: Any) -> str: 

7480 return self.visit_VARCHAR(type_, **kw) 

7481 

7482 def visit_text(self, type_: sqltypes.Text, **kw: Any) -> str: 

7483 return self.visit_TEXT(type_, **kw) 

7484 

7485 def visit_unicode_text( 

7486 self, type_: sqltypes.UnicodeText, **kw: Any 

7487 ) -> str: 

7488 return self.visit_TEXT(type_, **kw) 

7489 

7490 def visit_enum(self, type_: sqltypes.Enum, **kw: Any) -> str: 

7491 return self.visit_VARCHAR(type_, **kw) 

7492 

7493 def visit_null(self, type_, **kw): 

7494 raise exc.CompileError( 

7495 "Can't generate DDL for %r; " 

7496 "did you forget to specify a " 

7497 "type on this Column?" % type_ 

7498 ) 

7499 

7500 def visit_type_decorator( 

7501 self, type_: TypeDecorator[Any], **kw: Any 

7502 ) -> str: 

7503 return self.process(type_.type_engine(self.dialect), **kw) 

7504 

7505 def visit_user_defined( 

7506 self, type_: UserDefinedType[Any], **kw: Any 

7507 ) -> str: 

7508 return type_.get_col_spec(**kw) 

7509 

7510 

7511class StrSQLTypeCompiler(GenericTypeCompiler): 

7512 def process(self, type_, **kw): 

7513 try: 

7514 _compiler_dispatch = type_._compiler_dispatch 

7515 except AttributeError: 

7516 return self._visit_unknown(type_, **kw) 

7517 else: 

7518 return _compiler_dispatch(self, **kw) 

7519 

7520 def __getattr__(self, key): 

7521 if key.startswith("visit_"): 

7522 return self._visit_unknown 

7523 else: 

7524 raise AttributeError(key) 

7525 

7526 def _visit_unknown(self, type_, **kw): 

7527 if type_.__class__.__name__ == type_.__class__.__name__.upper(): 

7528 return type_.__class__.__name__ 

7529 else: 

7530 return repr(type_) 

7531 

7532 def visit_null(self, type_, **kw): 

7533 return "NULL" 

7534 

7535 def visit_user_defined(self, type_, **kw): 

7536 try: 

7537 get_col_spec = type_.get_col_spec 

7538 except AttributeError: 

7539 return repr(type_) 

7540 else: 

7541 return get_col_spec(**kw) 

7542 

7543 

7544class _SchemaForObjectCallable(Protocol): 

7545 def __call__(self, __obj: Any) -> str: ... 

7546 

7547 

7548class _BindNameForColProtocol(Protocol): 

7549 def __call__(self, col: ColumnClause[Any]) -> str: ... 

7550 

7551 

7552class IdentifierPreparer: 

7553 """Handle quoting and case-folding of identifiers based on options.""" 

7554 

7555 reserved_words = RESERVED_WORDS 

7556 

7557 legal_characters = LEGAL_CHARACTERS 

7558 

7559 illegal_initial_characters = ILLEGAL_INITIAL_CHARACTERS 

7560 

7561 initial_quote: str 

7562 

7563 final_quote: str 

7564 

7565 _strings: MutableMapping[str, str] 

7566 

7567 schema_for_object: _SchemaForObjectCallable = operator.attrgetter("schema") 

7568 """Return the .schema attribute for an object. 

7569 

7570 For the default IdentifierPreparer, the schema for an object is always 

7571 the value of the ".schema" attribute. if the preparer is replaced 

7572 with one that has a non-empty schema_translate_map, the value of the 

7573 ".schema" attribute is rendered a symbol that will be converted to a 

7574 real schema name from the mapping post-compile. 

7575 

7576 """ 

7577 

7578 _includes_none_schema_translate: bool = False 

7579 

7580 def __init__( 

7581 self, 

7582 dialect: Dialect, 

7583 initial_quote: str = '"', 

7584 final_quote: Optional[str] = None, 

7585 escape_quote: str = '"', 

7586 quote_case_sensitive_collations: bool = True, 

7587 omit_schema: bool = False, 

7588 ): 

7589 """Construct a new ``IdentifierPreparer`` object. 

7590 

7591 initial_quote 

7592 Character that begins a delimited identifier. 

7593 

7594 final_quote 

7595 Character that ends a delimited identifier. Defaults to 

7596 `initial_quote`. 

7597 

7598 omit_schema 

7599 Prevent prepending schema name. Useful for databases that do 

7600 not support schemae. 

7601 """ 

7602 

7603 self.dialect = dialect 

7604 self.initial_quote = initial_quote 

7605 self.final_quote = final_quote or self.initial_quote 

7606 self.escape_quote = escape_quote 

7607 self.escape_to_quote = self.escape_quote * 2 

7608 self.omit_schema = omit_schema 

7609 self.quote_case_sensitive_collations = quote_case_sensitive_collations 

7610 self._strings = {} 

7611 self._double_percents = self.dialect.paramstyle in ( 

7612 "format", 

7613 "pyformat", 

7614 ) 

7615 

7616 def _with_schema_translate(self, schema_translate_map): 

7617 prep = self.__class__.__new__(self.__class__) 

7618 prep.__dict__.update(self.__dict__) 

7619 

7620 includes_none = None in schema_translate_map 

7621 

7622 def symbol_getter(obj): 

7623 name = obj.schema 

7624 if obj._use_schema_map and (name is not None or includes_none): 

7625 if name is not None and ("[" in name or "]" in name): 

7626 raise exc.CompileError( 

7627 "Square bracket characters ([]) not supported " 

7628 "in schema translate name '%s'" % name 

7629 ) 

7630 return quoted_name( 

7631 "__[SCHEMA_%s]" % (name or "_none"), quote=False 

7632 ) 

7633 else: 

7634 return obj.schema 

7635 

7636 prep.schema_for_object = symbol_getter 

7637 prep._includes_none_schema_translate = includes_none 

7638 return prep 

7639 

7640 def _render_schema_translates( 

7641 self, statement: str, schema_translate_map: SchemaTranslateMapType 

7642 ) -> str: 

7643 d = schema_translate_map 

7644 if None in d: 

7645 if not self._includes_none_schema_translate: 

7646 raise exc.InvalidRequestError( 

7647 "schema translate map which previously did not have " 

7648 "`None` present as a key now has `None` present; compiled " 

7649 "statement may lack adequate placeholders. Please use " 

7650 "consistent keys in successive " 

7651 "schema_translate_map dictionaries." 

7652 ) 

7653 

7654 d["_none"] = d[None] # type: ignore[index] 

7655 

7656 def replace(m): 

7657 name = m.group(2) 

7658 if name in d: 

7659 effective_schema = d[name] 

7660 else: 

7661 if name in (None, "_none"): 

7662 raise exc.InvalidRequestError( 

7663 "schema translate map which previously had `None` " 

7664 "present as a key now no longer has it present; don't " 

7665 "know how to apply schema for compiled statement. " 

7666 "Please use consistent keys in successive " 

7667 "schema_translate_map dictionaries." 

7668 ) 

7669 effective_schema = name 

7670 

7671 if not effective_schema: 

7672 effective_schema = self.dialect.default_schema_name 

7673 if not effective_schema: 

7674 # TODO: no coverage here 

7675 raise exc.CompileError( 

7676 "Dialect has no default schema name; can't " 

7677 "use None as dynamic schema target." 

7678 ) 

7679 return self.quote_schema(effective_schema) 

7680 

7681 return re.sub(r"(__\[SCHEMA_([^\]]+)\])", replace, statement) 

7682 

7683 def _escape_identifier(self, value: str) -> str: 

7684 """Escape an identifier. 

7685 

7686 Subclasses should override this to provide database-dependent 

7687 escaping behavior. 

7688 """ 

7689 

7690 value = value.replace(self.escape_quote, self.escape_to_quote) 

7691 if self._double_percents: 

7692 value = value.replace("%", "%%") 

7693 return value 

7694 

7695 def _unescape_identifier(self, value: str) -> str: 

7696 """Canonicalize an escaped identifier. 

7697 

7698 Subclasses should override this to provide database-dependent 

7699 unescaping behavior that reverses _escape_identifier. 

7700 """ 

7701 

7702 return value.replace(self.escape_to_quote, self.escape_quote) 

7703 

7704 def validate_sql_phrase(self, element, reg): 

7705 """keyword sequence filter. 

7706 

7707 a filter for elements that are intended to represent keyword sequences, 

7708 such as "INITIALLY", "INITIALLY DEFERRED", etc. no special characters 

7709 should be present. 

7710 

7711 .. versionadded:: 1.3 

7712 

7713 """ 

7714 

7715 if element is not None and not reg.match(element): 

7716 raise exc.CompileError( 

7717 "Unexpected SQL phrase: %r (matching against %r)" 

7718 % (element, reg.pattern) 

7719 ) 

7720 return element 

7721 

7722 def quote_identifier(self, value: str) -> str: 

7723 """Quote an identifier. 

7724 

7725 Subclasses should override this to provide database-dependent 

7726 quoting behavior. 

7727 """ 

7728 

7729 return ( 

7730 self.initial_quote 

7731 + self._escape_identifier(value) 

7732 + self.final_quote 

7733 ) 

7734 

7735 def _requires_quotes(self, value: str) -> bool: 

7736 """Return True if the given identifier requires quoting.""" 

7737 lc_value = value.lower() 

7738 return ( 

7739 lc_value in self.reserved_words 

7740 or value[0] in self.illegal_initial_characters 

7741 or not self.legal_characters.match(str(value)) 

7742 or (lc_value != value) 

7743 ) 

7744 

7745 def _requires_quotes_illegal_chars(self, value): 

7746 """Return True if the given identifier requires quoting, but 

7747 not taking case convention into account.""" 

7748 return not self.legal_characters.match(str(value)) 

7749 

7750 def quote_schema(self, schema: str, force: Any = None) -> str: 

7751 """Conditionally quote a schema name. 

7752 

7753 

7754 The name is quoted if it is a reserved word, contains quote-necessary 

7755 characters, or is an instance of :class:`.quoted_name` which includes 

7756 ``quote`` set to ``True``. 

7757 

7758 Subclasses can override this to provide database-dependent 

7759 quoting behavior for schema names. 

7760 

7761 :param schema: string schema name 

7762 :param force: unused 

7763 

7764 .. deprecated:: 0.9 

7765 

7766 The :paramref:`.IdentifierPreparer.quote_schema.force` 

7767 parameter is deprecated and will be removed in a future 

7768 release. This flag has no effect on the behavior of the 

7769 :meth:`.IdentifierPreparer.quote` method; please refer to 

7770 :class:`.quoted_name`. 

7771 

7772 """ 

7773 if force is not None: 

7774 # not using the util.deprecated_params() decorator in this 

7775 # case because of the additional function call overhead on this 

7776 # very performance-critical spot. 

7777 util.warn_deprecated( 

7778 "The IdentifierPreparer.quote_schema.force parameter is " 

7779 "deprecated and will be removed in a future release. This " 

7780 "flag has no effect on the behavior of the " 

7781 "IdentifierPreparer.quote method; please refer to " 

7782 "quoted_name().", 

7783 # deprecated 0.9. warning from 1.3 

7784 version="0.9", 

7785 ) 

7786 

7787 return self.quote(schema) 

7788 

7789 def quote(self, ident: str, force: Any = None) -> str: 

7790 """Conditionally quote an identifier. 

7791 

7792 The identifier is quoted if it is a reserved word, contains 

7793 quote-necessary characters, or is an instance of 

7794 :class:`.quoted_name` which includes ``quote`` set to ``True``. 

7795 

7796 Subclasses can override this to provide database-dependent 

7797 quoting behavior for identifier names. 

7798 

7799 :param ident: string identifier 

7800 :param force: unused 

7801 

7802 .. deprecated:: 0.9 

7803 

7804 The :paramref:`.IdentifierPreparer.quote.force` 

7805 parameter is deprecated and will be removed in a future 

7806 release. This flag has no effect on the behavior of the 

7807 :meth:`.IdentifierPreparer.quote` method; please refer to 

7808 :class:`.quoted_name`. 

7809 

7810 """ 

7811 if force is not None: 

7812 # not using the util.deprecated_params() decorator in this 

7813 # case because of the additional function call overhead on this 

7814 # very performance-critical spot. 

7815 util.warn_deprecated( 

7816 "The IdentifierPreparer.quote.force parameter is " 

7817 "deprecated and will be removed in a future release. This " 

7818 "flag has no effect on the behavior of the " 

7819 "IdentifierPreparer.quote method; please refer to " 

7820 "quoted_name().", 

7821 # deprecated 0.9. warning from 1.3 

7822 version="0.9", 

7823 ) 

7824 

7825 force = getattr(ident, "quote", None) 

7826 

7827 if force is None: 

7828 if ident in self._strings: 

7829 return self._strings[ident] 

7830 else: 

7831 if self._requires_quotes(ident): 

7832 self._strings[ident] = self.quote_identifier(ident) 

7833 else: 

7834 self._strings[ident] = ident 

7835 return self._strings[ident] 

7836 elif force: 

7837 return self.quote_identifier(ident) 

7838 else: 

7839 return ident 

7840 

7841 def format_collation(self, collation_name): 

7842 if self.quote_case_sensitive_collations: 

7843 return self.quote(collation_name) 

7844 else: 

7845 return collation_name 

7846 

7847 def format_sequence( 

7848 self, sequence: schema.Sequence, use_schema: bool = True 

7849 ) -> str: 

7850 name = self.quote(sequence.name) 

7851 

7852 effective_schema = self.schema_for_object(sequence) 

7853 

7854 if ( 

7855 not self.omit_schema 

7856 and use_schema 

7857 and effective_schema is not None 

7858 ): 

7859 name = self.quote_schema(effective_schema) + "." + name 

7860 return name 

7861 

7862 def format_label( 

7863 self, label: Label[Any], name: Optional[str] = None 

7864 ) -> str: 

7865 return self.quote(name or label.name) 

7866 

7867 def format_alias( 

7868 self, alias: Optional[AliasedReturnsRows], name: Optional[str] = None 

7869 ) -> str: 

7870 if name is None: 

7871 assert alias is not None 

7872 return self.quote(alias.name) 

7873 else: 

7874 return self.quote(name) 

7875 

7876 def format_savepoint(self, savepoint, name=None): 

7877 # Running the savepoint name through quoting is unnecessary 

7878 # for all known dialects. This is here to support potential 

7879 # third party use cases 

7880 ident = name or savepoint.ident 

7881 if self._requires_quotes(ident): 

7882 ident = self.quote_identifier(ident) 

7883 return ident 

7884 

7885 @util.preload_module("sqlalchemy.sql.naming") 

7886 def format_constraint( 

7887 self, constraint: Union[Constraint, Index], _alembic_quote: bool = True 

7888 ) -> Optional[str]: 

7889 naming = util.preloaded.sql_naming 

7890 

7891 if constraint.name is _NONE_NAME: 

7892 name = naming._constraint_name_for_table( 

7893 constraint, constraint.table 

7894 ) 

7895 

7896 if name is None: 

7897 return None 

7898 else: 

7899 name = constraint.name 

7900 

7901 assert name is not None 

7902 if constraint.__visit_name__ == "index": 

7903 return self.truncate_and_render_index_name( 

7904 name, _alembic_quote=_alembic_quote 

7905 ) 

7906 else: 

7907 return self.truncate_and_render_constraint_name( 

7908 name, _alembic_quote=_alembic_quote 

7909 ) 

7910 

7911 def truncate_and_render_index_name( 

7912 self, name: str, _alembic_quote: bool = True 

7913 ) -> str: 

7914 # calculate these at format time so that ad-hoc changes 

7915 # to dialect.max_identifier_length etc. can be reflected 

7916 # as IdentifierPreparer is long lived 

7917 max_ = ( 

7918 self.dialect.max_index_name_length 

7919 or self.dialect.max_identifier_length 

7920 ) 

7921 return self._truncate_and_render_maxlen_name( 

7922 name, max_, _alembic_quote 

7923 ) 

7924 

7925 def truncate_and_render_constraint_name( 

7926 self, name: str, _alembic_quote: bool = True 

7927 ) -> str: 

7928 # calculate these at format time so that ad-hoc changes 

7929 # to dialect.max_identifier_length etc. can be reflected 

7930 # as IdentifierPreparer is long lived 

7931 max_ = ( 

7932 self.dialect.max_constraint_name_length 

7933 or self.dialect.max_identifier_length 

7934 ) 

7935 return self._truncate_and_render_maxlen_name( 

7936 name, max_, _alembic_quote 

7937 ) 

7938 

7939 def _truncate_and_render_maxlen_name( 

7940 self, name: str, max_: int, _alembic_quote: bool 

7941 ) -> str: 

7942 if isinstance(name, elements._truncated_label): 

7943 if len(name) > max_: 

7944 name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:] 

7945 else: 

7946 self.dialect.validate_identifier(name) 

7947 

7948 if not _alembic_quote: 

7949 return name 

7950 else: 

7951 return self.quote(name) 

7952 

7953 def format_index(self, index: Index) -> str: 

7954 name = self.format_constraint(index) 

7955 assert name is not None 

7956 return name 

7957 

7958 def format_table( 

7959 self, 

7960 table: FromClause, 

7961 use_schema: bool = True, 

7962 name: Optional[str] = None, 

7963 ) -> str: 

7964 """Prepare a quoted table and schema name.""" 

7965 if name is None: 

7966 if TYPE_CHECKING: 

7967 assert isinstance(table, NamedFromClause) 

7968 name = table.name 

7969 

7970 result = self.quote(name) 

7971 

7972 effective_schema = self.schema_for_object(table) 

7973 

7974 if not self.omit_schema and use_schema and effective_schema: 

7975 result = self.quote_schema(effective_schema) + "." + result 

7976 return result 

7977 

7978 def format_schema(self, name): 

7979 """Prepare a quoted schema name.""" 

7980 

7981 return self.quote(name) 

7982 

7983 def format_label_name( 

7984 self, 

7985 name, 

7986 anon_map=None, 

7987 ): 

7988 """Prepare a quoted column name.""" 

7989 

7990 if anon_map is not None and isinstance( 

7991 name, elements._truncated_label 

7992 ): 

7993 name = name.apply_map(anon_map) 

7994 

7995 return self.quote(name) 

7996 

7997 def format_column( 

7998 self, 

7999 column: ColumnElement[Any], 

8000 use_table: bool = False, 

8001 name: Optional[str] = None, 

8002 table_name: Optional[str] = None, 

8003 use_schema: bool = False, 

8004 anon_map: Optional[Mapping[str, Any]] = None, 

8005 ) -> str: 

8006 """Prepare a quoted column name.""" 

8007 

8008 if name is None: 

8009 name = column.name 

8010 assert name is not None 

8011 

8012 if anon_map is not None and isinstance( 

8013 name, elements._truncated_label 

8014 ): 

8015 name = name.apply_map(anon_map) 

8016 

8017 if not getattr(column, "is_literal", False): 

8018 if use_table: 

8019 return ( 

8020 self.format_table( 

8021 column.table, use_schema=use_schema, name=table_name 

8022 ) 

8023 + "." 

8024 + self.quote(name) 

8025 ) 

8026 else: 

8027 return self.quote(name) 

8028 else: 

8029 # literal textual elements get stuck into ColumnClause a lot, 

8030 # which shouldn't get quoted 

8031 

8032 if use_table: 

8033 return ( 

8034 self.format_table( 

8035 column.table, use_schema=use_schema, name=table_name 

8036 ) 

8037 + "." 

8038 + name 

8039 ) 

8040 else: 

8041 return name 

8042 

8043 def format_table_seq(self, table, use_schema=True): 

8044 """Format table name and schema as a tuple.""" 

8045 

8046 # Dialects with more levels in their fully qualified references 

8047 # ('database', 'owner', etc.) could override this and return 

8048 # a longer sequence. 

8049 

8050 effective_schema = self.schema_for_object(table) 

8051 

8052 if not self.omit_schema and use_schema and effective_schema: 

8053 return ( 

8054 self.quote_schema(effective_schema), 

8055 self.format_table(table, use_schema=False), 

8056 ) 

8057 else: 

8058 return (self.format_table(table, use_schema=False),) 

8059 

8060 @util.memoized_property 

8061 def _r_identifiers(self): 

8062 initial, final, escaped_final = ( 

8063 re.escape(s) 

8064 for s in ( 

8065 self.initial_quote, 

8066 self.final_quote, 

8067 self._escape_identifier(self.final_quote), 

8068 ) 

8069 ) 

8070 r = re.compile( 

8071 r"(?:" 

8072 r"(?:%(initial)s((?:%(escaped)s|[^%(final)s])+)%(final)s" 

8073 r"|([^\.]+))(?=\.|$))+" 

8074 % {"initial": initial, "final": final, "escaped": escaped_final} 

8075 ) 

8076 return r 

8077 

8078 def unformat_identifiers(self, identifiers: str) -> Sequence[str]: 

8079 """Unpack 'schema.table.column'-like strings into components.""" 

8080 

8081 r = self._r_identifiers 

8082 return [ 

8083 self._unescape_identifier(i) 

8084 for i in [a or b for a, b in r.findall(identifiers)] 

8085 ]