1"""
2CompositeType provides means to interact with
3`PostgreSQL composite types`_. Currently this type features:
4
5* Easy attribute access to composite type fields
6* Supports SQLAlchemy TypeDecorator types
7* Ability to include composite types as part of PostgreSQL arrays
8* Type creation and dropping
9
10Installation
11^^^^^^^^^^^^
12
13CompositeType automatically attaches `before_create` and `after_drop` DDL
14listeners. These listeners create and drop the composite type in the
15database. This means it works out of the box in your test environment where
16you create the tables on each test run.
17
18When you already have your database set up you should call
19:func:`register_composites` after you've set up all models.
20
21::
22
23 register_composites(conn)
24
25
26
27Usage
28^^^^^
29
30::
31
32 from collections import OrderedDict
33
34 import sqlalchemy as sa
35 from sqlalchemy_utils import CompositeType, CurrencyType
36
37
38 class Account(Base):
39 __tablename__ = 'account'
40 id = sa.Column(sa.Integer, primary_key=True)
41 balance = sa.Column(
42 CompositeType(
43 'money_type',
44 [
45 sa.Column('currency', CurrencyType),
46 sa.Column('amount', sa.Integer)
47 ]
48 )
49 )
50
51
52Creation
53~~~~~~~~
54When creating CompositeType, you can either pass in a tuple or a dictionary.
55
56::
57 account1 = Account()
58 account1.balance = ('USD', 15)
59
60 account2 = Account()
61 account2.balance = {'currency': 'USD', 'amount': 15}
62
63 session.add(account1)
64 session.add(account2)
65 session.commit()
66
67
68Accessing fields
69^^^^^^^^^^^^^^^^
70
71CompositeType provides attribute access to underlying fields. In the following
72example we find all accounts with balance amount more than 5000.
73
74
75::
76
77 session.query(Account).filter(Account.balance.amount > 5000)
78
79
80Arrays of composites
81^^^^^^^^^^^^^^^^^^^^
82
83::
84
85 from sqlalchemy.dialects.postgresql import ARRAY
86
87
88 class Account(Base):
89 __tablename__ = 'account'
90 id = sa.Column(sa.Integer, primary_key=True)
91 balances = sa.Column(
92 ARRAY(
93 CompositeType(
94 'money_type',
95 [
96 sa.Column('currency', CurrencyType),
97 sa.Column('amount', sa.Integer)
98 ]
99 ),
100 dimensions=1
101 )
102 )
103
104
105.. _PostgreSQL composite types:
106 https://www.postgresql.org/docs/current/rowtypes.html
107
108
109Related links:
110
111https://schinckel.net/2014/09/24/using-postgres-composite-types-in-django/
112"""
113
114from collections import namedtuple
115
116import sqlalchemy as sa
117from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
118from sqlalchemy.ext.compiler import compiles
119from sqlalchemy.schema import _CreateDropBase
120from sqlalchemy.sql.expression import FunctionElement
121from sqlalchemy.types import SchemaType, to_instance, TypeDecorator, UserDefinedType
122
123from .. import ImproperlyConfigured
124
125psycopg2 = None
126CompositeCaster = None
127adapt = None
128AsIs = None
129register_adapter = None
130try:
131 import psycopg2
132 from psycopg2.extensions import adapt, AsIs, register_adapter
133 from psycopg2.extras import CompositeCaster
134except ImportError:
135 pass
136
137
138class CompositeElement(FunctionElement):
139 """
140 Instances of this class wrap a Postgres composite type.
141 """
142
143 def __init__(self, base, field, type_):
144 self.name = field
145 self.type = to_instance(type_)
146
147 super().__init__(base)
148
149
150@compiles(CompositeElement)
151def _compile_pgelem(expr, compiler, **kw):
152 return f'({compiler.process(expr.clauses, **kw)}).{expr.name}'
153
154
155# TODO: Make the registration work on connection level instead of global level
156registered_composites = {}
157
158
159class CompositeType(UserDefinedType, SchemaType):
160 """
161 Represents a PostgreSQL composite type.
162
163 :param name:
164 Name of the composite type.
165 :param columns:
166 List of columns that this composite type consists of
167 """
168
169 python_type = tuple
170
171 class comparator_factory(UserDefinedType.Comparator):
172 def __getattr__(self, key):
173 try:
174 type_ = self.type.typemap[key]
175 except KeyError:
176 raise KeyError(f"Type '{self.name}' doesn't have an attribute: '{key}'")
177
178 return CompositeElement(self.expr, key, type_)
179
180 def __init__(self, name, columns, quote=None, **kwargs):
181 if psycopg2 is None:
182 raise ImproperlyConfigured(
183 "'psycopg2' package is required in order to use CompositeType."
184 )
185 SchemaType.__init__(self, name=name, quote=quote)
186 self.columns = columns
187 if name in registered_composites:
188 self.type_cls = registered_composites[name].type_cls
189 else:
190 self.type_cls = namedtuple(self.name, [c.name for c in columns])
191 registered_composites[name] = self
192
193 class Caster(CompositeCaster):
194 def make(obj, values):
195 return self.type_cls(*values)
196
197 self.caster = Caster
198 attach_composite_listeners()
199
200 def get_col_spec(self):
201 return self.name
202
203 def bind_processor(self, dialect):
204 def process(value):
205 if value is None:
206 return None
207
208 processed_value = []
209 for i, column in enumerate(self.columns):
210 current_value = (
211 value.get(column.name) if isinstance(value, dict) else value[i]
212 )
213
214 if isinstance(column.type, TypeDecorator):
215 processed_value.append(
216 column.type.process_bind_param(current_value, dialect)
217 )
218 else:
219 processed_value.append(current_value)
220 return self.type_cls(*processed_value)
221
222 return process
223
224 def result_processor(self, dialect, coltype):
225 def process(value):
226 if value is None:
227 return None
228 cls = value.__class__
229 kwargs = {}
230 for column in self.columns:
231 if isinstance(column.type, TypeDecorator):
232 kwargs[column.name] = column.type.process_result_value(
233 getattr(value, column.name), dialect
234 )
235 else:
236 kwargs[column.name] = getattr(value, column.name)
237 return cls(**kwargs)
238
239 return process
240
241 def create(self, bind=None, checkfirst=None):
242 if not checkfirst or not bind.dialect.has_type(
243 bind, self.name, schema=self.schema
244 ):
245 bind.execute(CreateCompositeType(self))
246
247 def drop(self, bind=None, checkfirst=True):
248 if checkfirst and bind.dialect.has_type(bind, self.name, schema=self.schema):
249 bind.execute(DropCompositeType(self))
250
251
252def register_psycopg2_composite(dbapi_connection, composite):
253 psycopg2.extras.register_composite(
254 composite.name, dbapi_connection, globally=True, factory=composite.caster
255 )
256
257 def adapt_composite(value):
258 dialect = PGDialect_psycopg2()
259 adapted = [
260 adapt(
261 getattr(value, column.name)
262 if not isinstance(column.type, TypeDecorator)
263 else column.type.process_bind_param(
264 getattr(value, column.name), dialect
265 )
266 )
267 for column in composite.columns
268 ]
269 for value in adapted:
270 if hasattr(value, 'prepare'):
271 value.prepare(dbapi_connection)
272 values = [
273 value.getquoted().decode(dbapi_connection.encoding) for value in adapted
274 ]
275 return AsIs(
276 '({})::{}'.format(
277 ', '.join(values), dialect.identifier_preparer.quote(composite.name)
278 )
279 )
280
281 register_adapter(composite.type_cls, adapt_composite)
282
283
284def get_driver_connection(connection):
285 try:
286 # SQLAlchemy 2.0
287 return connection.connection.driver_connection
288 except AttributeError:
289 return connection.connection.connection
290
291
292def before_create(target, connection, **kw):
293 for name, composite in registered_composites.items():
294 composite.create(connection, checkfirst=True)
295 register_psycopg2_composite(get_driver_connection(connection), composite)
296
297
298def after_drop(target, connection, **kw):
299 for name, composite in registered_composites.items():
300 composite.drop(connection, checkfirst=True)
301
302
303def register_composites(connection):
304 for name, composite in registered_composites.items():
305 register_psycopg2_composite(get_driver_connection(connection), composite)
306
307
308def attach_composite_listeners():
309 listeners = [
310 (sa.MetaData, 'before_create', before_create),
311 (sa.MetaData, 'after_drop', after_drop),
312 ]
313 for listener in listeners:
314 if not sa.event.contains(*listener):
315 sa.event.listen(*listener)
316
317
318def remove_composite_listeners():
319 listeners = [
320 (sa.MetaData, 'before_create', before_create),
321 (sa.MetaData, 'after_drop', after_drop),
322 ]
323 for listener in listeners:
324 if sa.event.contains(*listener):
325 sa.event.remove(*listener)
326
327
328class CreateCompositeType(_CreateDropBase):
329 pass
330
331
332@compiles(CreateCompositeType)
333def _visit_create_composite_type(create, compiler, **kw):
334 type_ = create.element
335 fields = ', '.join(
336 '{name} {type}'.format(
337 name=column.name,
338 type=compiler.dialect.type_compiler.process(to_instance(column.type)),
339 )
340 for column in type_.columns
341 )
342
343 return 'CREATE TYPE {name} AS ({fields})'.format(
344 name=compiler.preparer.format_type(type_), fields=fields
345 )
346
347
348class DropCompositeType(_CreateDropBase):
349 pass
350
351
352@compiles(DropCompositeType)
353def _visit_drop_composite_type(drop, compiler, **kw):
354 type_ = drop.element
355
356 return f'DROP TYPE {compiler.preparer.format_type(type_)}'