Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy_utils/types/pg_composite.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

133 statements  

1""" 

2CompositeType provides means to interact with 

3`PostgreSQL composite types`_. Currently this type features: 

4 

5* Easy attribute access to composite type fields 

6* Supports SQLAlchemy TypeDecorator types 

7* Ability to include composite types as part of PostgreSQL arrays 

8* Type creation and dropping 

9 

10Installation 

11^^^^^^^^^^^^ 

12 

13CompositeType automatically attaches `before_create` and `after_drop` DDL 

14listeners. These listeners create and drop the composite type in the 

15database. This means it works out of the box in your test environment where 

16you create the tables on each test run. 

17 

18When you already have your database set up you should call 

19:func:`register_composites` after you've set up all models. 

20 

21:: 

22 

23 register_composites(conn) 

24 

25 

26 

27Usage 

28^^^^^ 

29 

30:: 

31 

32 from collections import OrderedDict 

33 

34 import sqlalchemy as sa 

35 from sqlalchemy_utils import CompositeType, CurrencyType 

36 

37 

38 class Account(Base): 

39 __tablename__ = 'account' 

40 id = sa.Column(sa.Integer, primary_key=True) 

41 balance = sa.Column( 

42 CompositeType( 

43 'money_type', 

44 [ 

45 sa.Column('currency', CurrencyType), 

46 sa.Column('amount', sa.Integer) 

47 ] 

48 ) 

49 ) 

50 

51 

52Creation 

53~~~~~~~~ 

54When creating CompositeType, you can either pass in a tuple or a dictionary. 

55 

56:: 

57 account1 = Account() 

58 account1.balance = ('USD', 15) 

59 

60 account2 = Account() 

61 account2.balance = {'currency': 'USD', 'amount': 15} 

62 

63 session.add(account1) 

64 session.add(account2) 

65 session.commit() 

66 

67 

68Accessing fields 

69^^^^^^^^^^^^^^^^ 

70 

71CompositeType provides attribute access to underlying fields. In the following 

72example we find all accounts with balance amount more than 5000. 

73 

74 

75:: 

76 

77 session.query(Account).filter(Account.balance.amount > 5000) 

78 

79 

80Arrays of composites 

81^^^^^^^^^^^^^^^^^^^^ 

82 

83:: 

84 

85 from sqlalchemy.dialects.postgresql import ARRAY 

86 

87 

88 class Account(Base): 

89 __tablename__ = 'account' 

90 id = sa.Column(sa.Integer, primary_key=True) 

91 balances = sa.Column( 

92 ARRAY( 

93 CompositeType( 

94 'money_type', 

95 [ 

96 sa.Column('currency', CurrencyType), 

97 sa.Column('amount', sa.Integer) 

98 ] 

99 ), 

100 dimensions=1 

101 ) 

102 ) 

103 

104 

105.. _PostgreSQL composite types: 

106 https://www.postgresql.org/docs/current/rowtypes.html 

107 

108 

109Related links: 

110 

111https://schinckel.net/2014/09/24/using-postgres-composite-types-in-django/ 

112""" 

113 

114from collections import namedtuple 

115 

116import sqlalchemy as sa 

117from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2 

118from sqlalchemy.ext.compiler import compiles 

119from sqlalchemy.schema import _CreateDropBase 

120from sqlalchemy.sql.expression import FunctionElement 

121from sqlalchemy.types import SchemaType, to_instance, TypeDecorator, UserDefinedType 

122 

123from .. import ImproperlyConfigured 

124 

125psycopg2 = None 

126CompositeCaster = None 

127adapt = None 

128AsIs = None 

129register_adapter = None 

130try: 

131 import psycopg2 

132 from psycopg2.extensions import adapt, AsIs, register_adapter 

133 from psycopg2.extras import CompositeCaster 

134except ImportError: 

135 pass 

136 

137 

138class CompositeElement(FunctionElement): 

139 """ 

140 Instances of this class wrap a Postgres composite type. 

141 """ 

142 

143 def __init__(self, base, field, type_): 

144 self.name = field 

145 self.type = to_instance(type_) 

146 

147 super().__init__(base) 

148 

149 

150@compiles(CompositeElement) 

151def _compile_pgelem(expr, compiler, **kw): 

152 return f'({compiler.process(expr.clauses, **kw)}).{expr.name}' 

153 

154 

155# TODO: Make the registration work on connection level instead of global level 

156registered_composites = {} 

157 

158 

159class CompositeType(UserDefinedType, SchemaType): 

160 """ 

161 Represents a PostgreSQL composite type. 

162 

163 :param name: 

164 Name of the composite type. 

165 :param columns: 

166 List of columns that this composite type consists of 

167 """ 

168 

169 python_type = tuple 

170 

171 class comparator_factory(UserDefinedType.Comparator): 

172 def __getattr__(self, key): 

173 try: 

174 type_ = self.type.typemap[key] 

175 except KeyError: 

176 raise KeyError(f"Type '{self.name}' doesn't have an attribute: '{key}'") 

177 

178 return CompositeElement(self.expr, key, type_) 

179 

180 def __init__(self, name, columns, quote=None, **kwargs): 

181 if psycopg2 is None: 

182 raise ImproperlyConfigured( 

183 "'psycopg2' package is required in order to use CompositeType." 

184 ) 

185 SchemaType.__init__(self, name=name, quote=quote) 

186 self.columns = columns 

187 if name in registered_composites: 

188 self.type_cls = registered_composites[name].type_cls 

189 else: 

190 self.type_cls = namedtuple(self.name, [c.name for c in columns]) 

191 registered_composites[name] = self 

192 

193 class Caster(CompositeCaster): 

194 def make(obj, values): 

195 return self.type_cls(*values) 

196 

197 self.caster = Caster 

198 attach_composite_listeners() 

199 

200 def get_col_spec(self): 

201 return self.name 

202 

203 def bind_processor(self, dialect): 

204 def process(value): 

205 if value is None: 

206 return None 

207 

208 processed_value = [] 

209 for i, column in enumerate(self.columns): 

210 current_value = ( 

211 value.get(column.name) if isinstance(value, dict) else value[i] 

212 ) 

213 

214 if isinstance(column.type, TypeDecorator): 

215 processed_value.append( 

216 column.type.process_bind_param(current_value, dialect) 

217 ) 

218 else: 

219 processed_value.append(current_value) 

220 return self.type_cls(*processed_value) 

221 

222 return process 

223 

224 def result_processor(self, dialect, coltype): 

225 def process(value): 

226 if value is None: 

227 return None 

228 cls = value.__class__ 

229 kwargs = {} 

230 for column in self.columns: 

231 if isinstance(column.type, TypeDecorator): 

232 kwargs[column.name] = column.type.process_result_value( 

233 getattr(value, column.name), dialect 

234 ) 

235 else: 

236 kwargs[column.name] = getattr(value, column.name) 

237 return cls(**kwargs) 

238 

239 return process 

240 

241 def create(self, bind=None, checkfirst=None): 

242 if not checkfirst or not bind.dialect.has_type( 

243 bind, self.name, schema=self.schema 

244 ): 

245 bind.execute(CreateCompositeType(self)) 

246 

247 def drop(self, bind=None, checkfirst=True): 

248 if checkfirst and bind.dialect.has_type(bind, self.name, schema=self.schema): 

249 bind.execute(DropCompositeType(self)) 

250 

251 

252def register_psycopg2_composite(dbapi_connection, composite): 

253 psycopg2.extras.register_composite( 

254 composite.name, dbapi_connection, globally=True, factory=composite.caster 

255 ) 

256 

257 def adapt_composite(value): 

258 dialect = PGDialect_psycopg2() 

259 adapted = [ 

260 adapt( 

261 getattr(value, column.name) 

262 if not isinstance(column.type, TypeDecorator) 

263 else column.type.process_bind_param( 

264 getattr(value, column.name), dialect 

265 ) 

266 ) 

267 for column in composite.columns 

268 ] 

269 for value in adapted: 

270 if hasattr(value, 'prepare'): 

271 value.prepare(dbapi_connection) 

272 values = [ 

273 value.getquoted().decode(dbapi_connection.encoding) for value in adapted 

274 ] 

275 return AsIs( 

276 '({})::{}'.format( 

277 ', '.join(values), dialect.identifier_preparer.quote(composite.name) 

278 ) 

279 ) 

280 

281 register_adapter(composite.type_cls, adapt_composite) 

282 

283 

284def get_driver_connection(connection): 

285 try: 

286 # SQLAlchemy 2.0 

287 return connection.connection.driver_connection 

288 except AttributeError: 

289 return connection.connection.connection 

290 

291 

292def before_create(target, connection, **kw): 

293 for name, composite in registered_composites.items(): 

294 composite.create(connection, checkfirst=True) 

295 register_psycopg2_composite(get_driver_connection(connection), composite) 

296 

297 

298def after_drop(target, connection, **kw): 

299 for name, composite in registered_composites.items(): 

300 composite.drop(connection, checkfirst=True) 

301 

302 

303def register_composites(connection): 

304 for name, composite in registered_composites.items(): 

305 register_psycopg2_composite(get_driver_connection(connection), composite) 

306 

307 

308def attach_composite_listeners(): 

309 listeners = [ 

310 (sa.MetaData, 'before_create', before_create), 

311 (sa.MetaData, 'after_drop', after_drop), 

312 ] 

313 for listener in listeners: 

314 if not sa.event.contains(*listener): 

315 sa.event.listen(*listener) 

316 

317 

318def remove_composite_listeners(): 

319 listeners = [ 

320 (sa.MetaData, 'before_create', before_create), 

321 (sa.MetaData, 'after_drop', after_drop), 

322 ] 

323 for listener in listeners: 

324 if sa.event.contains(*listener): 

325 sa.event.remove(*listener) 

326 

327 

328class CreateCompositeType(_CreateDropBase): 

329 pass 

330 

331 

332@compiles(CreateCompositeType) 

333def _visit_create_composite_type(create, compiler, **kw): 

334 type_ = create.element 

335 fields = ', '.join( 

336 '{name} {type}'.format( 

337 name=column.name, 

338 type=compiler.dialect.type_compiler.process(to_instance(column.type)), 

339 ) 

340 for column in type_.columns 

341 ) 

342 

343 return 'CREATE TYPE {name} AS ({fields})'.format( 

344 name=compiler.preparer.format_type(type_), fields=fields 

345 ) 

346 

347 

348class DropCompositeType(_CreateDropBase): 

349 pass 

350 

351 

352@compiles(DropCompositeType) 

353def _visit_drop_composite_type(drop, compiler, **kw): 

354 type_ = drop.element 

355 

356 return f'DROP TYPE {compiler.preparer.format_type(type_)}'