Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy_utils/view.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1import sqlalchemy as sa 

2from sqlalchemy.ext import compiler 

3from sqlalchemy.schema import DDLElement, PrimaryKeyConstraint 

4from sqlalchemy.sql.expression import ClauseElement, Executable 

5 

6from sqlalchemy_utils.functions import get_columns 

7 

8 

9class CreateView(DDLElement): 

10 def __init__(self, name, selectable, materialized=False, replace=False): 

11 if materialized and replace: 

12 raise ValueError('Cannot use CREATE OR REPLACE with materialized views') 

13 self.name = name 

14 self.selectable = selectable 

15 self.materialized = materialized 

16 self.replace = replace 

17 

18 

19@compiler.compiles(CreateView) 

20def compile_create_materialized_view(element, compiler, **kw): 

21 return 'CREATE {}{}VIEW {} AS {}'.format( 

22 'OR REPLACE ' if element.replace else '', 

23 'MATERIALIZED ' if element.materialized else '', 

24 compiler.dialect.identifier_preparer.quote(element.name), 

25 compiler.sql_compiler.process(element.selectable, literal_binds=True), 

26 ) 

27 

28 

29class DropView(DDLElement): 

30 def __init__(self, name, materialized=False, cascade=True): 

31 self.name = name 

32 self.materialized = materialized 

33 self.cascade = cascade 

34 

35 

36@compiler.compiles(DropView) 

37def compile_drop_materialized_view(element, compiler, **kw): 

38 return 'DROP {}VIEW IF EXISTS {} {}'.format( 

39 'MATERIALIZED ' if element.materialized else '', 

40 compiler.dialect.identifier_preparer.quote(element.name), 

41 'CASCADE' if element.cascade else '', 

42 ) 

43 

44 

45def create_table_from_selectable( 

46 name, selectable, indexes=None, metadata=None, aliases=None, **kwargs 

47): 

48 if indexes is None: 

49 indexes = [] 

50 if metadata is None: 

51 metadata = sa.MetaData() 

52 if aliases is None: 

53 aliases = {} 

54 args = [ 

55 sa.Column( 

56 c.name, c.type, key=aliases.get(c.name, c.name), primary_key=c.primary_key 

57 ) 

58 for c in get_columns(selectable) 

59 ] + indexes 

60 table = sa.Table(name, metadata, *args, **kwargs) 

61 

62 if not any([c.primary_key for c in get_columns(selectable)]): 

63 table.append_constraint( 

64 PrimaryKeyConstraint(*[c.name for c in get_columns(selectable)]) 

65 ) 

66 return table 

67 

68 

69def create_materialized_view(name, selectable, metadata, indexes=None, aliases=None): 

70 """Create a view on a given metadata 

71 

72 :param name: The name of the view to create. 

73 :param selectable: An SQLAlchemy selectable e.g. a select() statement. 

74 :param metadata: 

75 An SQLAlchemy Metadata instance that stores the features of the 

76 database being described. 

77 :param indexes: An optional list of SQLAlchemy Index instances. 

78 :param aliases: 

79 An optional dictionary containing with keys as column names and values 

80 as column aliases. 

81 

82 Same as for ``create_view`` except that a ``CREATE MATERIALIZED VIEW`` 

83 statement is emitted instead of a ``CREATE VIEW``. 

84 

85 """ 

86 table = create_table_from_selectable( 

87 name=name, 

88 selectable=selectable, 

89 indexes=indexes, 

90 metadata=None, 

91 aliases=aliases, 

92 ) 

93 

94 sa.event.listen( 

95 metadata, 'after_create', CreateView(name, selectable, materialized=True) 

96 ) 

97 

98 @sa.event.listens_for(metadata, 'after_create') 

99 def create_indexes(target, connection, **kw): 

100 for idx in table.indexes: 

101 idx.create(connection) 

102 

103 sa.event.listen(metadata, 'before_drop', DropView(name, materialized=True)) 

104 return table 

105 

106 

107def create_view( 

108 name, 

109 selectable, 

110 metadata, 

111 cascade_on_drop=True, 

112 replace=False, 

113): 

114 """Create a view on a given metadata 

115 

116 :param name: The name of the view to create. 

117 :param selectable: An SQLAlchemy selectable e.g. a select() statement. 

118 :param metadata: 

119 An SQLAlchemy Metadata instance that stores the features of the 

120 database being described. 

121 :param cascade_on_drop: If ``True`` the view will be dropped with 

122 ``CASCADE``, deleting all dependent objects as well. 

123 :param replace: If ``True`` the view will be created with ``OR REPLACE``, 

124 replacing an existing view with the same name. 

125 

126 The process for creating a view is similar to the standard way that a 

127 table is constructed, except that a selectable is provided instead of 

128 a set of columns. The view is created once a ``CREATE`` statement is 

129 executed against the supplied metadata (e.g. ``metadata.create_all(..)``), 

130 and dropped when a ``DROP`` is executed against the metadata. 

131 

132 To create a view that performs basic filtering on a table. :: 

133 

134 metadata = MetaData() 

135 users = Table('users', metadata, 

136 Column('id', Integer, primary_key=True), 

137 Column('name', String), 

138 Column('fullname', String), 

139 Column('premium_user', Boolean, default=False), 

140 ) 

141 

142 premium_members = select(users).where(users.c.premium_user == True) 

143 create_view('premium_users', premium_members, metadata) 

144 

145 metadata.create_all(engine) # View is created at this point 

146 

147 """ 

148 table = create_table_from_selectable( 

149 name=name, selectable=selectable, metadata=None 

150 ) 

151 

152 sa.event.listen( 

153 metadata, 

154 'after_create', 

155 CreateView(name, selectable, replace=replace), 

156 ) 

157 

158 @sa.event.listens_for(metadata, 'after_create') 

159 def create_indexes(target, connection, **kw): 

160 for idx in table.indexes: 

161 idx.create(connection) 

162 

163 sa.event.listen(metadata, 'before_drop', DropView(name, cascade=cascade_on_drop)) 

164 return table 

165 

166 

167class RefreshMaterializedView(Executable, ClauseElement): 

168 inherit_cache = True 

169 

170 def __init__(self, name, concurrently): 

171 self.name = name 

172 self.concurrently = concurrently 

173 

174 

175@compiler.compiles(RefreshMaterializedView) 

176def compile_refresh_materialized_view(element, compiler): 

177 return 'REFRESH MATERIALIZED VIEW {concurrently}{name}'.format( 

178 concurrently='CONCURRENTLY ' if element.concurrently else '', 

179 name=compiler.dialect.identifier_preparer.quote(element.name), 

180 ) 

181 

182 

183def refresh_materialized_view(session, name, concurrently=False): 

184 """Refreshes an already existing materialized view 

185 

186 :param session: An SQLAlchemy Session instance. 

187 :param name: The name of the materialized view to refresh. 

188 :param concurrently: 

189 Optional flag that causes the ``CONCURRENTLY`` parameter 

190 to be specified when the materialized view is refreshed. 

191 """ 

192 # Since session.execute() bypasses autoflush, we must manually flush in 

193 # order to include newly-created/modified objects in the refresh. 

194 session.flush() 

195 session.execute(RefreshMaterializedView(name, concurrently))