1import sqlalchemy as sa
2import sqlalchemy.orm
3from sqlalchemy.sql.util import ClauseAdapter
4
5from .chained_join import chained_join # noqa
6
7
8def path_to_relationships(path, cls):
9 relationships = []
10 for path_name in path.split('.'):
11 rel = getattr(cls, path_name)
12 relationships.append(rel)
13 cls = rel.mapper.class_
14 return relationships
15
16
17def adapt_expr(expr, *selectables):
18 for selectable in selectables:
19 expr = ClauseAdapter(selectable).traverse(expr)
20 return expr
21
22
23def inverse_join(selectable, left_alias, right_alias, relationship):
24 if relationship.property.secondary is not None:
25 secondary_alias = sa.alias(relationship.property.secondary)
26 return selectable.join(
27 secondary_alias,
28 adapt_expr(
29 relationship.property.secondaryjoin,
30 sa.inspect(left_alias).selectable,
31 secondary_alias,
32 ),
33 ).join(
34 right_alias,
35 adapt_expr(
36 relationship.property.primaryjoin,
37 sa.inspect(right_alias).selectable,
38 secondary_alias,
39 ),
40 )
41 else:
42 join = sa.orm.join(right_alias, left_alias, relationship)
43 onclause = join.onclause
44 return selectable.join(right_alias, onclause)
45
46
47def relationship_to_correlation(relationship, alias):
48 if relationship.property.secondary is not None:
49 return adapt_expr(
50 relationship.property.primaryjoin,
51 alias,
52 )
53 else:
54 return sa.orm.join(relationship.parent, alias, relationship).onclause
55
56
57def chained_inverse_join(relationships, leaf_model):
58 selectable = sa.inspect(leaf_model).selectable
59 aliases = [leaf_model]
60 for index, relationship in enumerate(relationships[1:]):
61 aliases.append(sa.orm.aliased(relationship.mapper.class_))
62 selectable = inverse_join(
63 selectable, aliases[index], aliases[index + 1], relationships[index]
64 )
65
66 if relationships[-1].property.secondary is not None:
67 secondary_alias = sa.alias(relationships[-1].property.secondary)
68 selectable = selectable.join(
69 secondary_alias,
70 adapt_expr(
71 relationships[-1].property.secondaryjoin,
72 secondary_alias,
73 sa.inspect(aliases[-1]).selectable,
74 ),
75 )
76 aliases.append(secondary_alias)
77 return selectable, aliases
78
79
80def select_correlated_expression(
81 root_model, expr, path, leaf_model, from_obj=None, order_by=None, correlate=True
82):
83 relationships = list(reversed(path_to_relationships(path, root_model)))
84
85 query = sa.select(expr)
86
87 join_expr, aliases = chained_inverse_join(relationships, leaf_model)
88
89 if order_by:
90 query = query.order_by(
91 *[
92 adapt_expr(o, *(sa.inspect(alias).selectable for alias in aliases))
93 for o in order_by
94 ]
95 )
96
97 condition = relationship_to_correlation(relationships[-1], aliases[-1])
98
99 if from_obj is not None:
100 condition = adapt_expr(condition, from_obj)
101
102 query = query.select_from(join_expr.selectable)
103
104 if correlate:
105 query = query.correlate(from_obj if from_obj is not None else root_model)
106 return query.where(condition)