1import sqlalchemy as sa
2from sqlalchemy.ext import compiler
3from sqlalchemy.schema import DDLElement, PrimaryKeyConstraint
4from sqlalchemy.sql.expression import ClauseElement, Executable
5
6from sqlalchemy_utils.functions import get_columns
7
8
9class CreateView(DDLElement):
10 def __init__(self, name, selectable, materialized=False, replace=False):
11 if materialized and replace:
12 raise ValueError('Cannot use CREATE OR REPLACE with materialized views')
13 self.name = name
14 self.selectable = selectable
15 self.materialized = materialized
16 self.replace = replace
17
18
19@compiler.compiles(CreateView)
20def compile_create_materialized_view(element, compiler, **kw):
21 return 'CREATE {}{}VIEW {} AS {}'.format(
22 'OR REPLACE ' if element.replace else '',
23 'MATERIALIZED ' if element.materialized else '',
24 compiler.dialect.identifier_preparer.quote(element.name),
25 compiler.sql_compiler.process(element.selectable, literal_binds=True),
26 )
27
28
29class DropView(DDLElement):
30 def __init__(self, name, materialized=False, cascade=True):
31 self.name = name
32 self.materialized = materialized
33 self.cascade = cascade
34
35
36@compiler.compiles(DropView)
37def compile_drop_materialized_view(element, compiler, **kw):
38 return 'DROP {}VIEW IF EXISTS {} {}'.format(
39 'MATERIALIZED ' if element.materialized else '',
40 compiler.dialect.identifier_preparer.quote(element.name),
41 'CASCADE' if element.cascade else '',
42 )
43
44
45def create_table_from_selectable(
46 name, selectable, indexes=None, metadata=None, aliases=None, **kwargs
47):
48 if indexes is None:
49 indexes = []
50 if metadata is None:
51 metadata = sa.MetaData()
52 if aliases is None:
53 aliases = {}
54 args = [
55 sa.Column(
56 c.name, c.type, key=aliases.get(c.name, c.name), primary_key=c.primary_key
57 )
58 for c in get_columns(selectable)
59 ] + indexes
60 table = sa.Table(name, metadata, *args, **kwargs)
61
62 if not any([c.primary_key for c in get_columns(selectable)]):
63 table.append_constraint(
64 PrimaryKeyConstraint(*[c.name for c in get_columns(selectable)])
65 )
66 return table
67
68
69def create_materialized_view(name, selectable, metadata, indexes=None, aliases=None):
70 """Create a view on a given metadata
71
72 :param name: The name of the view to create.
73 :param selectable: An SQLAlchemy selectable e.g. a select() statement.
74 :param metadata:
75 An SQLAlchemy Metadata instance that stores the features of the
76 database being described.
77 :param indexes: An optional list of SQLAlchemy Index instances.
78 :param aliases:
79 An optional dictionary containing with keys as column names and values
80 as column aliases.
81
82 Same as for ``create_view`` except that a ``CREATE MATERIALIZED VIEW``
83 statement is emitted instead of a ``CREATE VIEW``.
84
85 """
86 table = create_table_from_selectable(
87 name=name,
88 selectable=selectable,
89 indexes=indexes,
90 metadata=None,
91 aliases=aliases,
92 )
93
94 sa.event.listen(
95 metadata, 'after_create', CreateView(name, selectable, materialized=True)
96 )
97
98 @sa.event.listens_for(metadata, 'after_create')
99 def create_indexes(target, connection, **kw):
100 for idx in table.indexes:
101 idx.create(connection)
102
103 sa.event.listen(metadata, 'before_drop', DropView(name, materialized=True))
104 return table
105
106
107def create_view(
108 name,
109 selectable,
110 metadata,
111 cascade_on_drop=True,
112 replace=False,
113):
114 """Create a view on a given metadata
115
116 :param name: The name of the view to create.
117 :param selectable: An SQLAlchemy selectable e.g. a select() statement.
118 :param metadata:
119 An SQLAlchemy Metadata instance that stores the features of the
120 database being described.
121 :param cascade_on_drop: If ``True`` the view will be dropped with
122 ``CASCADE``, deleting all dependent objects as well.
123 :param replace: If ``True`` the view will be created with ``OR REPLACE``,
124 replacing an existing view with the same name.
125
126 The process for creating a view is similar to the standard way that a
127 table is constructed, except that a selectable is provided instead of
128 a set of columns. The view is created once a ``CREATE`` statement is
129 executed against the supplied metadata (e.g. ``metadata.create_all(..)``),
130 and dropped when a ``DROP`` is executed against the metadata.
131
132 To create a view that performs basic filtering on a table. ::
133
134 metadata = MetaData()
135 users = Table('users', metadata,
136 Column('id', Integer, primary_key=True),
137 Column('name', String),
138 Column('fullname', String),
139 Column('premium_user', Boolean, default=False),
140 )
141
142 premium_members = select(users).where(users.c.premium_user == True)
143 create_view('premium_users', premium_members, metadata)
144
145 metadata.create_all(engine) # View is created at this point
146
147 """
148 table = create_table_from_selectable(
149 name=name, selectable=selectable, metadata=None
150 )
151
152 sa.event.listen(
153 metadata,
154 'after_create',
155 CreateView(name, selectable, replace=replace),
156 )
157
158 @sa.event.listens_for(metadata, 'after_create')
159 def create_indexes(target, connection, **kw):
160 for idx in table.indexes:
161 idx.create(connection)
162
163 sa.event.listen(metadata, 'before_drop', DropView(name, cascade=cascade_on_drop))
164 return table
165
166
167class RefreshMaterializedView(Executable, ClauseElement):
168 inherit_cache = True
169
170 def __init__(self, name, concurrently):
171 self.name = name
172 self.concurrently = concurrently
173
174
175@compiler.compiles(RefreshMaterializedView)
176def compile_refresh_materialized_view(element, compiler):
177 return 'REFRESH MATERIALIZED VIEW {concurrently}{name}'.format(
178 concurrently='CONCURRENTLY ' if element.concurrently else '',
179 name=compiler.dialect.identifier_preparer.quote(element.name),
180 )
181
182
183def refresh_materialized_view(session, name, concurrently=False):
184 """Refreshes an already existing materialized view
185
186 :param session: An SQLAlchemy Session instance.
187 :param name: The name of the materialized view to refresh.
188 :param concurrently:
189 Optional flag that causes the ``CONCURRENTLY`` parameter
190 to be specified when the materialized view is refreshed.
191 """
192 # Since session.execute() bypasses autoflush, we must manually flush in
193 # order to include newly-created/modified objects in the refresh.
194 session.flush()
195 session.execute(RefreshMaterializedView(name, concurrently))