Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/coercions.py: 50%

495 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# sql/coercions.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8import numbers 

9import re 

10 

11from . import operators 

12from . import roles 

13from . import visitors 

14from .base import ExecutableOption 

15from .base import Options 

16from .traversals import HasCacheKey 

17from .visitors import Visitable 

18from .. import exc 

19from .. import inspection 

20from .. import util 

21from ..util import collections_abc 

22 

23 

24elements = None 

25lambdas = None 

26schema = None 

27selectable = None 

28sqltypes = None 

29traversals = None 

30 

31 

32def _is_literal(element): 

33 """Return whether or not the element is a "literal" in the context 

34 of a SQL expression construct. 

35 

36 """ 

37 

38 return ( 

39 not isinstance( 

40 element, 

41 (Visitable, schema.SchemaEventTarget), 

42 ) 

43 and not hasattr(element, "__clause_element__") 

44 ) 

45 

46 

47def _deep_is_literal(element): 

48 """Return whether or not the element is a "literal" in the context 

49 of a SQL expression construct. 

50 

51 does a deeper more esoteric check than _is_literal. is used 

52 for lambda elements that have to distinguish values that would 

53 be bound vs. not without any context. 

54 

55 """ 

56 

57 if isinstance(element, collections_abc.Sequence) and not isinstance( 

58 element, str 

59 ): 

60 for elem in element: 

61 if not _deep_is_literal(elem): 

62 return False 

63 else: 

64 return True 

65 

66 return ( 

67 not isinstance( 

68 element, 

69 ( 

70 Visitable, 

71 schema.SchemaEventTarget, 

72 HasCacheKey, 

73 Options, 

74 util.langhelpers._symbol, 

75 ), 

76 ) 

77 and not hasattr(element, "__clause_element__") 

78 and ( 

79 not isinstance(element, type) 

80 or not issubclass(element, HasCacheKey) 

81 ) 

82 ) 

83 

84 

85def _document_text_coercion(paramname, meth_rst, param_rst): 

86 return util.add_parameter_text( 

87 paramname, 

88 ( 

89 ".. warning:: " 

90 "The %s argument to %s can be passed as a Python string argument, " 

91 "which will be treated " 

92 "as **trusted SQL text** and rendered as given. **DO NOT PASS " 

93 "UNTRUSTED INPUT TO THIS PARAMETER**." 

94 ) 

95 % (param_rst, meth_rst), 

96 ) 

97 

98 

99def _expression_collection_was_a_list(attrname, fnname, args): 

100 if args and isinstance(args[0], (list, set, dict)) and len(args) == 1: 

101 if isinstance(args[0], list): 

102 util.warn_deprecated_20( 

103 'The "%s" argument to %s(), when referring to a sequence ' 

104 "of items, is now passed as a series of positional " 

105 "elements, rather than as a list. " % (attrname, fnname) 

106 ) 

107 return args[0] 

108 else: 

109 return args 

110 

111 

112def expect( 

113 role, 

114 element, 

115 apply_propagate_attrs=None, 

116 argname=None, 

117 post_inspect=False, 

118 **kw 

119): 

120 if ( 

121 role.allows_lambda 

122 # note callable() will not invoke a __getattr__() method, whereas 

123 # hasattr(obj, "__call__") will. by keeping the callable() check here 

124 # we prevent most needless calls to hasattr() and therefore 

125 # __getattr__(), which is present on ColumnElement. 

126 and callable(element) 

127 and hasattr(element, "__code__") 

128 ): 

129 return lambdas.LambdaElement( 

130 element, 

131 role, 

132 lambdas.LambdaOptions(**kw), 

133 apply_propagate_attrs=apply_propagate_attrs, 

134 ) 

135 

136 # major case is that we are given a ClauseElement already, skip more 

137 # elaborate logic up front if possible 

138 impl = _impl_lookup[role] 

139 

140 original_element = element 

141 

142 if not isinstance( 

143 element, 

144 ( 

145 elements.ClauseElement, 

146 schema.SchemaItem, 

147 schema.FetchedValue, 

148 lambdas.PyWrapper, 

149 ), 

150 ): 

151 resolved = None 

152 

153 if impl._resolve_literal_only: 

154 resolved = impl._literal_coercion(element, **kw) 

155 else: 

156 

157 original_element = element 

158 

159 is_clause_element = False 

160 

161 # this is a special performance optimization for ORM 

162 # joins used by JoinTargetImpl that we don't go through the 

163 # work of creating __clause_element__() when we only need the 

164 # original QueryableAttribute, as the former will do clause 

165 # adaption and all that which is just thrown away here. 

166 if ( 

167 impl._skip_clauseelement_for_target_match 

168 and isinstance(element, role) 

169 and hasattr(element, "__clause_element__") 

170 ): 

171 is_clause_element = True 

172 else: 

173 while hasattr(element, "__clause_element__"): 

174 is_clause_element = True 

175 

176 if not getattr(element, "is_clause_element", False): 

177 element = element.__clause_element__() 

178 else: 

179 break 

180 

181 if not is_clause_element: 

182 if impl._use_inspection: 

183 insp = inspection.inspect(element, raiseerr=False) 

184 if insp is not None: 

185 if post_inspect: 

186 insp._post_inspect 

187 try: 

188 resolved = insp.__clause_element__() 

189 except AttributeError: 

190 impl._raise_for_expected(original_element, argname) 

191 

192 if resolved is None: 

193 resolved = impl._literal_coercion( 

194 element, argname=argname, **kw 

195 ) 

196 else: 

197 resolved = element 

198 elif isinstance(element, lambdas.PyWrapper): 

199 resolved = element._sa__py_wrapper_literal(**kw) 

200 else: 

201 resolved = element 

202 if ( 

203 apply_propagate_attrs is not None 

204 and not apply_propagate_attrs._propagate_attrs 

205 and resolved._propagate_attrs 

206 ): 

207 apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs 

208 

209 if impl._role_class in resolved.__class__.__mro__: 

210 if impl._post_coercion: 

211 resolved = impl._post_coercion( 

212 resolved, 

213 argname=argname, 

214 original_element=original_element, 

215 **kw 

216 ) 

217 return resolved 

218 else: 

219 return impl._implicit_coercions( 

220 original_element, resolved, argname=argname, **kw 

221 ) 

222 

223 

224def expect_as_key(role, element, **kw): 

225 kw["as_key"] = True 

226 return expect(role, element, **kw) 

227 

228 

229def expect_col_expression_collection(role, expressions): 

230 for expr in expressions: 

231 strname = None 

232 column = None 

233 

234 resolved = expect(role, expr) 

235 if isinstance(resolved, util.string_types): 

236 strname = resolved = expr 

237 else: 

238 cols = [] 

239 visitors.traverse(resolved, {}, {"column": cols.append}) 

240 if cols: 

241 column = cols[0] 

242 add_element = column if column is not None else strname 

243 yield resolved, column, strname, add_element 

244 

245 

246class RoleImpl(object): 

247 __slots__ = ("_role_class", "name", "_use_inspection") 

248 

249 def _literal_coercion(self, element, **kw): 

250 raise NotImplementedError() 

251 

252 _post_coercion = None 

253 _resolve_literal_only = False 

254 _skip_clauseelement_for_target_match = False 

255 

256 def __init__(self, role_class): 

257 self._role_class = role_class 

258 self.name = role_class._role_name 

259 self._use_inspection = issubclass(role_class, roles.UsesInspection) 

260 

261 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

262 self._raise_for_expected(element, argname, resolved) 

263 

264 def _raise_for_expected( 

265 self, 

266 element, 

267 argname=None, 

268 resolved=None, 

269 advice=None, 

270 code=None, 

271 err=None, 

272 ): 

273 if resolved is not None and resolved is not element: 

274 got = "%r object resolved from %r object" % (resolved, element) 

275 else: 

276 got = repr(element) 

277 

278 if argname: 

279 msg = "%s expected for argument %r; got %s." % ( 

280 self.name, 

281 argname, 

282 got, 

283 ) 

284 else: 

285 msg = "%s expected, got %s." % (self.name, got) 

286 

287 if advice: 

288 msg += " " + advice 

289 

290 util.raise_(exc.ArgumentError(msg, code=code), replace_context=err) 

291 

292 

293class _Deannotate(object): 

294 __slots__ = () 

295 

296 def _post_coercion(self, resolved, **kw): 

297 from .util import _deep_deannotate 

298 

299 return _deep_deannotate(resolved) 

300 

301 

302class _StringOnly(object): 

303 __slots__ = () 

304 

305 _resolve_literal_only = True 

306 

307 

308class _ReturnsStringKey(object): 

309 __slots__ = () 

310 

311 def _implicit_coercions( 

312 self, original_element, resolved, argname=None, **kw 

313 ): 

314 if isinstance(original_element, util.string_types): 

315 return original_element 

316 else: 

317 self._raise_for_expected(original_element, argname, resolved) 

318 

319 def _literal_coercion(self, element, **kw): 

320 return element 

321 

322 

323class _ColumnCoercions(object): 

324 __slots__ = () 

325 

326 def _warn_for_scalar_subquery_coercion(self): 

327 util.warn( 

328 "implicitly coercing SELECT object to scalar subquery; " 

329 "please use the .scalar_subquery() method to produce a scalar " 

330 "subquery.", 

331 ) 

332 

333 def _implicit_coercions( 

334 self, original_element, resolved, argname=None, **kw 

335 ): 

336 if not getattr(resolved, "is_clause_element", False): 

337 self._raise_for_expected(original_element, argname, resolved) 

338 elif resolved._is_select_statement: 

339 self._warn_for_scalar_subquery_coercion() 

340 return resolved.scalar_subquery() 

341 elif resolved._is_from_clause and isinstance( 

342 resolved, selectable.Subquery 

343 ): 

344 self._warn_for_scalar_subquery_coercion() 

345 return resolved.element.scalar_subquery() 

346 elif self._role_class.allows_lambda and resolved._is_lambda_element: 

347 return resolved 

348 else: 

349 self._raise_for_expected(original_element, argname, resolved) 

350 

351 

352def _no_text_coercion( 

353 element, argname=None, exc_cls=exc.ArgumentError, extra=None, err=None 

354): 

355 util.raise_( 

356 exc_cls( 

357 "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be " 

358 "explicitly declared as text(%(expr)r)" 

359 % { 

360 "expr": util.ellipses_string(element), 

361 "argname": "for argument %s" % (argname,) if argname else "", 

362 "extra": "%s " % extra if extra else "", 

363 } 

364 ), 

365 replace_context=err, 

366 ) 

367 

368 

369class _NoTextCoercion(object): 

370 __slots__ = () 

371 

372 def _literal_coercion(self, element, argname=None, **kw): 

373 if isinstance(element, util.string_types) and issubclass( 

374 elements.TextClause, self._role_class 

375 ): 

376 _no_text_coercion(element, argname) 

377 else: 

378 self._raise_for_expected(element, argname) 

379 

380 

381class _CoerceLiterals(object): 

382 __slots__ = () 

383 _coerce_consts = False 

384 _coerce_star = False 

385 _coerce_numerics = False 

386 

387 def _text_coercion(self, element, argname=None): 

388 return _no_text_coercion(element, argname) 

389 

390 def _literal_coercion(self, element, argname=None, **kw): 

391 if isinstance(element, util.string_types): 

392 if self._coerce_star and element == "*": 

393 return elements.ColumnClause("*", is_literal=True) 

394 else: 

395 return self._text_coercion(element, argname, **kw) 

396 

397 if self._coerce_consts: 

398 if element is None: 

399 return elements.Null() 

400 elif element is False: 

401 return elements.False_() 

402 elif element is True: 

403 return elements.True_() 

404 

405 if self._coerce_numerics and isinstance(element, (numbers.Number)): 

406 return elements.ColumnClause(str(element), is_literal=True) 

407 

408 self._raise_for_expected(element, argname) 

409 

410 

411class LiteralValueImpl(RoleImpl): 

412 _resolve_literal_only = True 

413 

414 def _implicit_coercions( 

415 self, element, resolved, argname, type_=None, **kw 

416 ): 

417 if not _is_literal(resolved): 

418 self._raise_for_expected( 

419 element, resolved=resolved, argname=argname, **kw 

420 ) 

421 

422 return elements.BindParameter(None, element, type_=type_, unique=True) 

423 

424 def _literal_coercion(self, element, argname=None, type_=None, **kw): 

425 return element 

426 

427 

428class _SelectIsNotFrom(object): 

429 __slots__ = () 

430 

431 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

432 if isinstance(element, roles.SelectStatementRole) or isinstance( 

433 resolved, roles.SelectStatementRole 

434 ): 

435 advice = ( 

436 "To create a " 

437 "FROM clause from a %s object, use the .subquery() method." 

438 % (resolved.__class__ if resolved is not None else element,) 

439 ) 

440 code = "89ve" 

441 else: 

442 advice = code = None 

443 

444 return super(_SelectIsNotFrom, self)._raise_for_expected( 

445 element, 

446 argname=argname, 

447 resolved=resolved, 

448 advice=advice, 

449 code=code, 

450 **kw 

451 ) 

452 

453 

454class HasCacheKeyImpl(RoleImpl): 

455 __slots__ = () 

456 

457 def _implicit_coercions( 

458 self, original_element, resolved, argname=None, **kw 

459 ): 

460 if isinstance(original_element, traversals.HasCacheKey): 

461 return original_element 

462 else: 

463 self._raise_for_expected(original_element, argname, resolved) 

464 

465 def _literal_coercion(self, element, **kw): 

466 return element 

467 

468 

469class ExecutableOptionImpl(RoleImpl): 

470 __slots__ = () 

471 

472 def _implicit_coercions( 

473 self, original_element, resolved, argname=None, **kw 

474 ): 

475 if isinstance(original_element, ExecutableOption): 

476 return original_element 

477 else: 

478 self._raise_for_expected(original_element, argname, resolved) 

479 

480 def _literal_coercion(self, element, **kw): 

481 return element 

482 

483 

484class ExpressionElementImpl(_ColumnCoercions, RoleImpl): 

485 __slots__ = () 

486 

487 def _literal_coercion( 

488 self, element, name=None, type_=None, argname=None, is_crud=False, **kw 

489 ): 

490 if ( 

491 element is None 

492 and not is_crud 

493 and (type_ is None or not type_.should_evaluate_none) 

494 ): 

495 # TODO: there's no test coverage now for the 

496 # "should_evaluate_none" part of this, as outside of "crud" this 

497 # codepath is not normally used except in some special cases 

498 return elements.Null() 

499 else: 

500 try: 

501 return elements.BindParameter( 

502 name, element, type_, unique=True, _is_crud=is_crud 

503 ) 

504 except exc.ArgumentError as err: 

505 self._raise_for_expected(element, err=err) 

506 

507 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

508 if isinstance(element, roles.AnonymizedFromClauseRole): 

509 advice = ( 

510 "To create a " 

511 "column expression from a FROM clause row " 

512 "as a whole, use the .table_valued() method." 

513 ) 

514 else: 

515 advice = None 

516 

517 return super(ExpressionElementImpl, self)._raise_for_expected( 

518 element, argname=argname, resolved=resolved, advice=advice, **kw 

519 ) 

520 

521 

522class BinaryElementImpl(ExpressionElementImpl, RoleImpl): 

523 

524 __slots__ = () 

525 

526 def _literal_coercion( 

527 self, element, expr, operator, bindparam_type=None, argname=None, **kw 

528 ): 

529 try: 

530 return expr._bind_param(operator, element, type_=bindparam_type) 

531 except exc.ArgumentError as err: 

532 self._raise_for_expected(element, err=err) 

533 

534 def _post_coercion(self, resolved, expr, bindparam_type=None, **kw): 

535 if resolved.type._isnull and not expr.type._isnull: 

536 resolved = resolved._with_binary_element_type( 

537 bindparam_type if bindparam_type is not None else expr.type 

538 ) 

539 return resolved 

540 

541 

542class InElementImpl(RoleImpl): 

543 __slots__ = () 

544 

545 def _implicit_coercions( 

546 self, original_element, resolved, argname=None, **kw 

547 ): 

548 if resolved._is_from_clause: 

549 if ( 

550 isinstance(resolved, selectable.Alias) 

551 and resolved.element._is_select_statement 

552 ): 

553 self._warn_for_implicit_coercion(resolved) 

554 return self._post_coercion(resolved.element, **kw) 

555 else: 

556 self._warn_for_implicit_coercion(resolved) 

557 return self._post_coercion(resolved.select(), **kw) 

558 else: 

559 self._raise_for_expected(original_element, argname, resolved) 

560 

561 def _warn_for_implicit_coercion(self, elem): 

562 util.warn( 

563 "Coercing %s object into a select() for use in IN(); " 

564 "please pass a select() construct explicitly" 

565 % (elem.__class__.__name__) 

566 ) 

567 

568 def _literal_coercion(self, element, expr, operator, **kw): 

569 if isinstance(element, collections_abc.Iterable) and not isinstance( 

570 element, util.string_types 

571 ): 

572 non_literal_expressions = {} 

573 element = list(element) 

574 for o in element: 

575 if not _is_literal(o): 

576 if not isinstance(o, operators.ColumnOperators): 

577 self._raise_for_expected(element, **kw) 

578 else: 

579 non_literal_expressions[o] = o 

580 elif o is None: 

581 non_literal_expressions[o] = elements.Null() 

582 

583 if non_literal_expressions: 

584 return elements.ClauseList( 

585 *[ 

586 non_literal_expressions[o] 

587 if o in non_literal_expressions 

588 else expr._bind_param(operator, o) 

589 for o in element 

590 ] 

591 ) 

592 else: 

593 return expr._bind_param(operator, element, expanding=True) 

594 

595 else: 

596 self._raise_for_expected(element, **kw) 

597 

598 def _post_coercion(self, element, expr, operator, **kw): 

599 if element._is_select_statement: 

600 # for IN, we are doing scalar_subquery() coercion without 

601 # a warning 

602 return element.scalar_subquery() 

603 elif isinstance(element, elements.ClauseList): 

604 assert not len(element.clauses) == 0 

605 return element.self_group(against=operator) 

606 

607 elif isinstance(element, elements.BindParameter): 

608 element = element._clone(maintain_key=True) 

609 element.expanding = True 

610 element.expand_op = operator 

611 

612 return element 

613 else: 

614 return element 

615 

616 

617class OnClauseImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl): 

618 __slots__ = () 

619 

620 _coerce_consts = True 

621 

622 def _implicit_coercions( 

623 self, original_element, resolved, argname=None, legacy=False, **kw 

624 ): 

625 if legacy and isinstance(resolved, str): 

626 return resolved 

627 else: 

628 return super(OnClauseImpl, self)._implicit_coercions( 

629 original_element, 

630 resolved, 

631 argname=argname, 

632 legacy=legacy, 

633 **kw 

634 ) 

635 

636 def _text_coercion(self, element, argname=None, legacy=False): 

637 if legacy and isinstance(element, str): 

638 util.warn_deprecated_20( 

639 "Using strings to indicate relationship names in " 

640 "Query.join() is deprecated and will be removed in " 

641 "SQLAlchemy 2.0. Please use the class-bound attribute " 

642 "directly." 

643 ) 

644 return element 

645 

646 return super(OnClauseImpl, self)._text_coercion(element, argname) 

647 

648 def _post_coercion(self, resolved, original_element=None, **kw): 

649 # this is a hack right now as we want to use coercion on an 

650 # ORM InstrumentedAttribute, but we want to return the object 

651 # itself if it is one, not its clause element. 

652 # ORM context _join and _legacy_join() would need to be improved 

653 # to look for annotations in a clause element form. 

654 if isinstance(original_element, roles.JoinTargetRole): 

655 return original_element 

656 return resolved 

657 

658 

659class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl): 

660 __slots__ = () 

661 

662 _coerce_consts = True 

663 

664 def _text_coercion(self, element, argname=None): 

665 return _no_text_coercion(element, argname) 

666 

667 

668class StatementOptionImpl(_CoerceLiterals, RoleImpl): 

669 __slots__ = () 

670 

671 _coerce_consts = True 

672 

673 def _text_coercion(self, element, argname=None): 

674 return elements.TextClause(element) 

675 

676 

677class ColumnArgumentImpl(_NoTextCoercion, RoleImpl): 

678 __slots__ = () 

679 

680 

681class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl): 

682 __slots__ = () 

683 

684 

685class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl): 

686 __slots__ = () 

687 

688 def _text_coercion(self, element, argname=None): 

689 return elements.ColumnClause(element) 

690 

691 

692class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole): 

693 

694 __slots__ = () 

695 

696 _coerce_consts = True 

697 

698 def _text_coercion(self, element, argname=None): 

699 return elements._textual_label_reference(element) 

700 

701 

702class OrderByImpl(ByOfImpl, RoleImpl): 

703 __slots__ = () 

704 

705 def _post_coercion(self, resolved, **kw): 

706 if ( 

707 isinstance(resolved, self._role_class) 

708 and resolved._order_by_label_element is not None 

709 ): 

710 return elements._label_reference(resolved) 

711 else: 

712 return resolved 

713 

714 

715class GroupByImpl(ByOfImpl, RoleImpl): 

716 __slots__ = () 

717 

718 def _implicit_coercions( 

719 self, original_element, resolved, argname=None, **kw 

720 ): 

721 if isinstance(resolved, roles.StrictFromClauseRole): 

722 return elements.ClauseList(*resolved.c) 

723 else: 

724 return resolved 

725 

726 

727class DMLColumnImpl(_ReturnsStringKey, RoleImpl): 

728 __slots__ = () 

729 

730 def _post_coercion(self, element, as_key=False, **kw): 

731 if as_key: 

732 return element.key 

733 else: 

734 return element 

735 

736 

737class ConstExprImpl(RoleImpl): 

738 __slots__ = () 

739 

740 def _literal_coercion(self, element, argname=None, **kw): 

741 if element is None: 

742 return elements.Null() 

743 elif element is False: 

744 return elements.False_() 

745 elif element is True: 

746 return elements.True_() 

747 else: 

748 self._raise_for_expected(element, argname) 

749 

750 

751class TruncatedLabelImpl(_StringOnly, RoleImpl): 

752 __slots__ = () 

753 

754 def _implicit_coercions( 

755 self, original_element, resolved, argname=None, **kw 

756 ): 

757 if isinstance(original_element, util.string_types): 

758 return resolved 

759 else: 

760 self._raise_for_expected(original_element, argname, resolved) 

761 

762 def _literal_coercion(self, element, argname=None, **kw): 

763 """coerce the given value to :class:`._truncated_label`. 

764 

765 Existing :class:`._truncated_label` and 

766 :class:`._anonymous_label` objects are passed 

767 unchanged. 

768 """ 

769 

770 if isinstance(element, elements._truncated_label): 

771 return element 

772 else: 

773 return elements._truncated_label(element) 

774 

775 

776class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl): 

777 

778 __slots__ = () 

779 

780 _coerce_consts = True 

781 

782 def _text_coercion(self, element, argname=None): 

783 # see #5754 for why we can't easily deprecate this coercion. 

784 # essentially expressions like postgresql_where would have to be 

785 # text() as they come back from reflection and we don't want to 

786 # have text() elements wired into the inspection dictionaries. 

787 return elements.TextClause(element) 

788 

789 

790class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl): 

791 __slots__ = () 

792 

793 

794class DDLReferredColumnImpl(DDLConstraintColumnImpl): 

795 __slots__ = () 

796 

797 

798class LimitOffsetImpl(RoleImpl): 

799 __slots__ = () 

800 

801 def _implicit_coercions(self, element, resolved, argname=None, **kw): 

802 if resolved is None: 

803 return None 

804 else: 

805 self._raise_for_expected(element, argname, resolved) 

806 

807 def _literal_coercion(self, element, name, type_, **kw): 

808 if element is None: 

809 return None 

810 else: 

811 value = util.asint(element) 

812 return selectable._OffsetLimitParam( 

813 name, value, type_=type_, unique=True 

814 ) 

815 

816 

817class LabeledColumnExprImpl(ExpressionElementImpl): 

818 __slots__ = () 

819 

820 def _implicit_coercions( 

821 self, original_element, resolved, argname=None, **kw 

822 ): 

823 if isinstance(resolved, roles.ExpressionElementRole): 

824 return resolved.label(None) 

825 else: 

826 new = super(LabeledColumnExprImpl, self)._implicit_coercions( 

827 original_element, resolved, argname=argname, **kw 

828 ) 

829 if isinstance(new, roles.ExpressionElementRole): 

830 return new.label(None) 

831 else: 

832 self._raise_for_expected(original_element, argname, resolved) 

833 

834 

835class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl): 

836 __slots__ = () 

837 

838 _coerce_consts = True 

839 _coerce_numerics = True 

840 _coerce_star = True 

841 

842 _guess_straight_column = re.compile(r"^\w\S*$", re.I) 

843 

844 def _text_coercion(self, element, argname=None): 

845 element = str(element) 

846 

847 guess_is_literal = not self._guess_straight_column.match(element) 

848 raise exc.ArgumentError( 

849 "Textual column expression %(column)r %(argname)sshould be " 

850 "explicitly declared with text(%(column)r), " 

851 "or use %(literal_column)s(%(column)r) " 

852 "for more specificity" 

853 % { 

854 "column": util.ellipses_string(element), 

855 "argname": "for argument %s" % (argname,) if argname else "", 

856 "literal_column": "literal_column" 

857 if guess_is_literal 

858 else "column", 

859 } 

860 ) 

861 

862 

863class ReturnsRowsImpl(RoleImpl): 

864 __slots__ = () 

865 

866 

867class StatementImpl(_CoerceLiterals, RoleImpl): 

868 __slots__ = () 

869 

870 def _post_coercion(self, resolved, original_element, argname=None, **kw): 

871 if resolved is not original_element and not isinstance( 

872 original_element, util.string_types 

873 ): 

874 # use same method as Connection uses; this will later raise 

875 # ObjectNotExecutableError 

876 try: 

877 original_element._execute_on_connection 

878 except AttributeError: 

879 util.warn_deprecated( 

880 "Object %r should not be used directly in a SQL statement " 

881 "context, such as passing to methods such as " 

882 "session.execute(). This usage will be disallowed in a " 

883 "future release. " 

884 "Please use Core select() / update() / delete() etc. " 

885 "with Session.execute() and other statement execution " 

886 "methods." % original_element, 

887 "1.4", 

888 ) 

889 

890 return resolved 

891 

892 def _implicit_coercions( 

893 self, original_element, resolved, argname=None, **kw 

894 ): 

895 if resolved._is_lambda_element: 

896 return resolved 

897 else: 

898 return super(StatementImpl, self)._implicit_coercions( 

899 original_element, resolved, argname=argname, **kw 

900 ) 

901 

902 def _text_coercion(self, element, argname=None): 

903 util.warn_deprecated_20( 

904 "Using plain strings to indicate SQL statements without using " 

905 "the text() construct is " 

906 "deprecated and will be removed in version 2.0. Ensure plain " 

907 "SQL statements are passed using the text() construct." 

908 ) 

909 return elements.TextClause(element) 

910 

911 

912class SelectStatementImpl(_NoTextCoercion, RoleImpl): 

913 __slots__ = () 

914 

915 def _implicit_coercions( 

916 self, original_element, resolved, argname=None, **kw 

917 ): 

918 if resolved._is_text_clause: 

919 return resolved.columns() 

920 else: 

921 self._raise_for_expected(original_element, argname, resolved) 

922 

923 

924class HasCTEImpl(ReturnsRowsImpl): 

925 __slots__ = () 

926 

927 

928class IsCTEImpl(RoleImpl): 

929 __slots__ = () 

930 

931 

932class JoinTargetImpl(RoleImpl): 

933 __slots__ = () 

934 

935 _skip_clauseelement_for_target_match = True 

936 

937 def _literal_coercion(self, element, legacy=False, **kw): 

938 if isinstance(element, str): 

939 return element 

940 

941 def _implicit_coercions( 

942 self, original_element, resolved, argname=None, legacy=False, **kw 

943 ): 

944 if isinstance(original_element, roles.JoinTargetRole): 

945 # note that this codepath no longer occurs as of 

946 # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match 

947 # were set to False. 

948 return original_element 

949 elif legacy and isinstance(resolved, str): 

950 util.warn_deprecated_20( 

951 "Using strings to indicate relationship names in " 

952 "Query.join() is deprecated and will be removed in " 

953 "SQLAlchemy 2.0. Please use the class-bound attribute " 

954 "directly." 

955 ) 

956 return resolved 

957 elif legacy and isinstance(resolved, roles.WhereHavingRole): 

958 return resolved 

959 elif legacy and resolved._is_select_statement: 

960 util.warn_deprecated( 

961 "Implicit coercion of SELECT and textual SELECT " 

962 "constructs into FROM clauses is deprecated; please call " 

963 ".subquery() on any Core select or ORM Query object in " 

964 "order to produce a subquery object.", 

965 version="1.4", 

966 ) 

967 # TODO: doing _implicit_subquery here causes tests to fail, 

968 # how was this working before? probably that ORM 

969 # join logic treated it as a select and subquery would happen 

970 # in _ORMJoin->Join 

971 return resolved 

972 else: 

973 self._raise_for_expected(original_element, argname, resolved) 

974 

975 

976class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

977 __slots__ = () 

978 

979 def _implicit_coercions( 

980 self, 

981 original_element, 

982 resolved, 

983 argname=None, 

984 explicit_subquery=False, 

985 allow_select=True, 

986 **kw 

987 ): 

988 if resolved._is_select_statement: 

989 if explicit_subquery: 

990 return resolved.subquery() 

991 elif allow_select: 

992 util.warn_deprecated( 

993 "Implicit coercion of SELECT and textual SELECT " 

994 "constructs into FROM clauses is deprecated; please call " 

995 ".subquery() on any Core select or ORM Query object in " 

996 "order to produce a subquery object.", 

997 version="1.4", 

998 ) 

999 return resolved._implicit_subquery 

1000 elif resolved._is_text_clause: 

1001 return resolved 

1002 else: 

1003 self._raise_for_expected(original_element, argname, resolved) 

1004 

1005 def _post_coercion(self, element, deannotate=False, **kw): 

1006 if deannotate: 

1007 return element._deannotate() 

1008 else: 

1009 return element 

1010 

1011 

1012class StrictFromClauseImpl(FromClauseImpl): 

1013 __slots__ = () 

1014 

1015 def _implicit_coercions( 

1016 self, 

1017 original_element, 

1018 resolved, 

1019 argname=None, 

1020 allow_select=False, 

1021 **kw 

1022 ): 

1023 if resolved._is_select_statement and allow_select: 

1024 util.warn_deprecated( 

1025 "Implicit coercion of SELECT and textual SELECT constructs " 

1026 "into FROM clauses is deprecated; please call .subquery() " 

1027 "on any Core select or ORM Query object in order to produce a " 

1028 "subquery object.", 

1029 version="1.4", 

1030 ) 

1031 return resolved._implicit_subquery 

1032 else: 

1033 self._raise_for_expected(original_element, argname, resolved) 

1034 

1035 

1036class AnonymizedFromClauseImpl(StrictFromClauseImpl): 

1037 __slots__ = () 

1038 

1039 def _post_coercion(self, element, flat=False, name=None, **kw): 

1040 assert name is None 

1041 

1042 return element._anonymous_fromclause(flat=flat) 

1043 

1044 

1045class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl): 

1046 __slots__ = () 

1047 

1048 def _post_coercion(self, element, **kw): 

1049 if "dml_table" in element._annotations: 

1050 return element._annotations["dml_table"] 

1051 else: 

1052 return element 

1053 

1054 

1055class DMLSelectImpl(_NoTextCoercion, RoleImpl): 

1056 __slots__ = () 

1057 

1058 def _implicit_coercions( 

1059 self, original_element, resolved, argname=None, **kw 

1060 ): 

1061 if resolved._is_from_clause: 

1062 if ( 

1063 isinstance(resolved, selectable.Alias) 

1064 and resolved.element._is_select_statement 

1065 ): 

1066 return resolved.element 

1067 else: 

1068 return resolved.select() 

1069 else: 

1070 self._raise_for_expected(original_element, argname, resolved) 

1071 

1072 

1073class CompoundElementImpl(_NoTextCoercion, RoleImpl): 

1074 __slots__ = () 

1075 

1076 def _raise_for_expected(self, element, argname=None, resolved=None, **kw): 

1077 if isinstance(element, roles.FromClauseRole): 

1078 if element._is_subquery: 

1079 advice = ( 

1080 "Use the plain select() object without " 

1081 "calling .subquery() or .alias()." 

1082 ) 

1083 else: 

1084 advice = ( 

1085 "To SELECT from any FROM clause, use the .select() method." 

1086 ) 

1087 else: 

1088 advice = None 

1089 return super(CompoundElementImpl, self)._raise_for_expected( 

1090 element, argname=argname, resolved=resolved, advice=advice, **kw 

1091 ) 

1092 

1093 

1094_impl_lookup = {} 

1095 

1096 

1097for name in dir(roles): 

1098 cls = getattr(roles, name) 

1099 if name.endswith("Role"): 

1100 name = name.replace("Role", "Impl") 

1101 if name in globals(): 

1102 impl = globals()[name](cls) 

1103 _impl_lookup[cls] = impl