1import itertools
2import os
3from collections.abc import Mapping, Sequence
4from copy import copy
5
6import sqlalchemy as sa
7from sqlalchemy.engine.url import make_url
8from sqlalchemy.exc import OperationalError, ProgrammingError
9
10from ..utils import starts_with
11from .orm import quote
12
13
14def escape_like(string, escape_char='*'):
15 """
16 Escape the string parameter used in SQL LIKE expressions.
17
18 ::
19
20 from sqlalchemy_utils import escape_like
21
22
23 query = session.query(User).filter(
24 User.name.ilike(escape_like('John'))
25 )
26
27
28 :param string: a string to escape
29 :param escape_char: escape character
30 """
31 return (
32 string.replace(escape_char, escape_char * 2)
33 .replace('%', escape_char + '%')
34 .replace('_', escape_char + '_')
35 )
36
37
38def json_sql(value, scalars_to_json=True):
39 """
40 Convert python data structures to PostgreSQL specific SQLAlchemy JSON
41 constructs. This function is extremly useful if you need to build
42 PostgreSQL JSON on python side.
43
44 .. note::
45
46 This function needs PostgreSQL >= 9.4
47
48 Scalars are converted to to_json SQLAlchemy function objects
49
50 ::
51
52 json_sql(1) # Equals SQL: to_json(1)
53
54 json_sql('a') # to_json('a')
55
56
57 Mappings are converted to json_build_object constructs
58
59 ::
60
61 json_sql({'a': 'c', '2': 5}) # json_build_object('a', 'c', '2', 5)
62
63
64 Sequences (other than strings) are converted to json_build_array constructs
65
66 ::
67
68 json_sql([1, 2, 3]) # json_build_array(1, 2, 3)
69
70
71 You can also nest these data structures
72
73 ::
74
75 json_sql({'a': [1, 2, 3]})
76 # json_build_object('a', json_build_array[1, 2, 3])
77
78
79 :param value:
80 value to be converted to SQLAlchemy PostgreSQL function constructs
81 """
82 if scalars_to_json:
83
84 def scalar_convert(a):
85 return sa.func.to_json(sa.text(a))
86 else:
87 scalar_convert = sa.text
88
89 if isinstance(value, Mapping):
90 return sa.func.json_build_object(
91 *(
92 json_sql(v, scalars_to_json=False)
93 for v in itertools.chain(*value.items())
94 )
95 )
96 elif isinstance(value, str):
97 return scalar_convert(f"'{value}'")
98 elif isinstance(value, Sequence):
99 return sa.func.json_build_array(
100 *(json_sql(v, scalars_to_json=False) for v in value)
101 )
102 elif isinstance(value, (int, float)):
103 return scalar_convert(str(value))
104 return value
105
106
107def jsonb_sql(value, scalars_to_jsonb=True):
108 """
109 Convert python data structures to PostgreSQL specific SQLAlchemy JSONB
110 constructs. This function is extremly useful if you need to build
111 PostgreSQL JSONB on python side.
112
113 .. note::
114
115 This function needs PostgreSQL >= 9.4
116
117 Scalars are converted to to_jsonb SQLAlchemy function objects
118
119 ::
120
121 jsonb_sql(1) # Equals SQL: to_jsonb(1)
122
123 jsonb_sql('a') # to_jsonb('a')
124
125
126 Mappings are converted to jsonb_build_object constructs
127
128 ::
129
130 jsonb_sql({'a': 'c', '2': 5}) # jsonb_build_object('a', 'c', '2', 5)
131
132
133 Sequences (other than strings) converted to jsonb_build_array constructs
134
135 ::
136
137 jsonb_sql([1, 2, 3]) # jsonb_build_array(1, 2, 3)
138
139
140 You can also nest these data structures
141
142 ::
143
144 jsonb_sql({'a': [1, 2, 3]})
145 # jsonb_build_object('a', jsonb_build_array[1, 2, 3])
146
147
148 :param value:
149 value to be converted to SQLAlchemy PostgreSQL function constructs
150 :boolean jsonbb:
151 Flag to alternatively convert the return with a to_jsonb construct
152 """
153 if scalars_to_jsonb:
154
155 def scalar_convert(a):
156 return sa.func.to_jsonb(sa.text(a))
157 else:
158 scalar_convert = sa.text
159
160 if isinstance(value, Mapping):
161 return sa.func.jsonb_build_object(
162 *(
163 jsonb_sql(v, scalars_to_jsonb=False)
164 for v in itertools.chain(*value.items())
165 )
166 )
167 elif isinstance(value, str):
168 return scalar_convert(f"'{value}'")
169 elif isinstance(value, Sequence):
170 return sa.func.jsonb_build_array(
171 *(jsonb_sql(v, scalars_to_jsonb=False) for v in value)
172 )
173 elif isinstance(value, (int, float)):
174 return scalar_convert(str(value))
175 return value
176
177
178def has_index(column_or_constraint):
179 """
180 Return whether or not given column or the columns of given foreign key
181 constraint have an index. A column has an index if it has a single column
182 index or it is the first column in compound column index.
183
184 A foreign key constraint has an index if the constraint columns are the
185 first columns in compound column index.
186
187 :param column_or_constraint:
188 SQLAlchemy Column object or SA ForeignKeyConstraint object
189
190 .. versionadded: 0.26.2
191
192 .. versionchanged: 0.30.18
193 Added support for foreign key constaints.
194
195 ::
196
197 from sqlalchemy_utils import has_index
198
199
200 class Article(Base):
201 __tablename__ = 'article'
202 id = sa.Column(sa.Integer, primary_key=True)
203 title = sa.Column(sa.String(100))
204 is_published = sa.Column(sa.Boolean, index=True)
205 is_deleted = sa.Column(sa.Boolean)
206 is_archived = sa.Column(sa.Boolean)
207
208 __table_args__ = (
209 sa.Index('my_index', is_deleted, is_archived),
210 )
211
212
213 table = Article.__table__
214
215 has_index(table.c.is_published) # True
216 has_index(table.c.is_deleted) # True
217 has_index(table.c.is_archived) # False
218
219
220 Also supports primary key indexes
221
222 ::
223
224 from sqlalchemy_utils import has_index
225
226
227 class ArticleTranslation(Base):
228 __tablename__ = 'article_translation'
229 id = sa.Column(sa.Integer, primary_key=True)
230 locale = sa.Column(sa.String(10), primary_key=True)
231 title = sa.Column(sa.String(100))
232
233
234 table = ArticleTranslation.__table__
235
236 has_index(table.c.locale) # False
237 has_index(table.c.id) # True
238
239
240 This function supports foreign key constraints as well
241
242 ::
243
244
245 class User(Base):
246 __tablename__ = 'user'
247 first_name = sa.Column(sa.Unicode(255), primary_key=True)
248 last_name = sa.Column(sa.Unicode(255), primary_key=True)
249
250 class Article(Base):
251 __tablename__ = 'article'
252 id = sa.Column(sa.Integer, primary_key=True)
253 author_first_name = sa.Column(sa.Unicode(255))
254 author_last_name = sa.Column(sa.Unicode(255))
255 __table_args__ = (
256 sa.ForeignKeyConstraint(
257 [author_first_name, author_last_name],
258 [User.first_name, User.last_name]
259 ),
260 sa.Index(
261 'my_index',
262 author_first_name,
263 author_last_name
264 )
265 )
266
267 table = Article.__table__
268 constraint = list(table.foreign_keys)[0].constraint
269
270 has_index(constraint) # True
271 """
272 table = column_or_constraint.table
273 if not isinstance(table, sa.Table):
274 raise TypeError(
275 'Only columns belonging to Table objects are supported. Given '
276 'column belongs to %r.' % table
277 )
278 primary_keys = table.primary_key.columns.values()
279 if isinstance(column_or_constraint, sa.ForeignKeyConstraint):
280 columns = list(column_or_constraint.columns.values())
281 else:
282 columns = [column_or_constraint]
283
284 return (primary_keys and starts_with(primary_keys, columns)) or any(
285 starts_with(index.columns.values(), columns) for index in table.indexes
286 )
287
288
289def has_unique_index(column_or_constraint):
290 """
291 Return whether or not given column or given foreign key constraint has a
292 unique index.
293
294 A column has a unique index if it has a single column primary key index or
295 it has a single column UniqueConstraint.
296
297 A foreign key constraint has a unique index if the columns of the
298 constraint are the same as the columns of table primary key or the coluns
299 of any unique index or any unique constraint of the given table.
300
301 :param column: SQLAlchemy Column object
302
303 .. versionadded: 0.27.1
304
305 .. versionchanged: 0.30.18
306 Added support for foreign key constaints.
307
308 Fixed support for unique indexes (previously only worked for unique
309 constraints)
310
311 ::
312
313 from sqlalchemy_utils import has_unique_index
314
315
316 class Article(Base):
317 __tablename__ = 'article'
318 id = sa.Column(sa.Integer, primary_key=True)
319 title = sa.Column(sa.String(100))
320 is_published = sa.Column(sa.Boolean, unique=True)
321 is_deleted = sa.Column(sa.Boolean)
322 is_archived = sa.Column(sa.Boolean)
323
324
325 table = Article.__table__
326
327 has_unique_index(table.c.is_published) # True
328 has_unique_index(table.c.is_deleted) # False
329 has_unique_index(table.c.id) # True
330
331
332 This function supports foreign key constraints as well
333
334 ::
335
336
337 class User(Base):
338 __tablename__ = 'user'
339 first_name = sa.Column(sa.Unicode(255), primary_key=True)
340 last_name = sa.Column(sa.Unicode(255), primary_key=True)
341
342 class Article(Base):
343 __tablename__ = 'article'
344 id = sa.Column(sa.Integer, primary_key=True)
345 author_first_name = sa.Column(sa.Unicode(255))
346 author_last_name = sa.Column(sa.Unicode(255))
347 __table_args__ = (
348 sa.ForeignKeyConstraint(
349 [author_first_name, author_last_name],
350 [User.first_name, User.last_name]
351 ),
352 sa.Index(
353 'my_index',
354 author_first_name,
355 author_last_name,
356 unique=True
357 )
358 )
359
360 table = Article.__table__
361 constraint = list(table.foreign_keys)[0].constraint
362
363 has_unique_index(constraint) # True
364
365
366 :raises TypeError: if given column does not belong to a Table object
367 """
368 table = column_or_constraint.table
369 if not isinstance(table, sa.Table):
370 raise TypeError(
371 'Only columns belonging to Table objects are supported. Given '
372 'column belongs to %r.' % table
373 )
374 primary_keys = list(table.primary_key.columns.values())
375 if isinstance(column_or_constraint, sa.ForeignKeyConstraint):
376 columns = list(column_or_constraint.columns.values())
377 else:
378 columns = [column_or_constraint]
379
380 return (
381 (columns == primary_keys)
382 or any(
383 columns == list(constraint.columns.values())
384 for constraint in table.constraints
385 if isinstance(constraint, sa.sql.schema.UniqueConstraint)
386 )
387 or any(
388 columns == list(index.columns.values())
389 for index in table.indexes
390 if index.unique
391 )
392 )
393
394
395def is_auto_assigned_date_column(column):
396 """
397 Returns whether or not given SQLAlchemy Column object's is auto assigned
398 DateTime or Date.
399
400 :param column: SQLAlchemy Column object
401 """
402 return (
403 isinstance(column.type, sa.DateTime) or isinstance(column.type, sa.Date)
404 ) and (
405 column.default
406 or column.server_default
407 or column.onupdate
408 or column.server_onupdate
409 )
410
411
412def _set_url_database(url: sa.engine.url.URL, database):
413 """Set the database of an engine URL.
414
415 :param url: A SQLAlchemy engine URL.
416 :param database: New database to set.
417
418 """
419 if hasattr(url, '_replace'):
420 # Cannot use URL.set() as database may need to be set to None.
421 ret = url._replace(database=database)
422 else: # SQLAlchemy <1.4
423 url = copy(url)
424 url.database = database
425 ret = url
426 assert ret.database == database, ret
427 return ret
428
429
430def _get_scalar_result(engine, sql):
431 with engine.connect() as conn:
432 return conn.scalar(sql)
433
434
435def _sqlite_file_exists(database):
436 if not os.path.isfile(database) or os.path.getsize(database) < 100:
437 return False
438
439 with open(database, 'rb') as f:
440 header = f.read(100)
441
442 return header[:16] == b'SQLite format 3\x00'
443
444
445def database_exists(url):
446 """Check if a database exists.
447
448 :param url: A SQLAlchemy engine URL.
449
450 Performs backend-specific testing to quickly determine if a database
451 exists on the server. ::
452
453 database_exists('postgresql://postgres@localhost/name') #=> False
454 create_database('postgresql://postgres@localhost/name')
455 database_exists('postgresql://postgres@localhost/name') #=> True
456
457 Supports checking against a constructed URL as well. ::
458
459 engine = create_engine('postgresql://postgres@localhost/name')
460 database_exists(engine.url) #=> False
461 create_database(engine.url)
462 database_exists(engine.url) #=> True
463
464 """
465
466 url = make_url(url)
467 database = url.database
468 dialect_name = url.get_dialect().name
469 engine = None
470 try:
471 if dialect_name == 'postgresql':
472 text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database
473 for db in (database, 'postgres', 'template1', 'template0', None):
474 url = _set_url_database(url, database=db)
475 engine = sa.create_engine(url, isolation_level='AUTOCOMMIT')
476 try:
477 return bool(_get_scalar_result(engine, sa.text(text)))
478 except (ProgrammingError, OperationalError):
479 pass
480 return False
481
482 elif dialect_name == 'mysql':
483 url = _set_url_database(url, database=None)
484 engine = sa.create_engine(url)
485 text = (
486 'SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA '
487 "WHERE SCHEMA_NAME = '%s'" % database
488 )
489 return bool(_get_scalar_result(engine, sa.text(text)))
490
491 elif dialect_name == 'sqlite':
492 url = _set_url_database(url, database=None)
493 engine = sa.create_engine(url)
494 if database:
495 return database == ':memory:' or _sqlite_file_exists(database)
496 else:
497 # The default SQLAlchemy database is in memory, and :memory: is
498 # not required, thus we should support that use case.
499 return True
500 elif dialect_name == 'mssql':
501 text = "SELECT 1 FROM sys.databases WHERE name = '%s'" % database
502 url = _set_url_database(url, database='master')
503 engine = sa.create_engine(url, isolation_level='AUTOCOMMIT')
504 try:
505 return bool(_get_scalar_result(engine, sa.text(text)))
506 except (ProgrammingError, OperationalError):
507 return False
508 else:
509 text = 'SELECT 1'
510 try:
511 engine = sa.create_engine(url)
512 return bool(_get_scalar_result(engine, sa.text(text)))
513 except (ProgrammingError, OperationalError):
514 return False
515 finally:
516 if engine:
517 engine.dispose()
518
519
520def create_database(url, encoding='utf8', template=None):
521 """Issue the appropriate CREATE DATABASE statement.
522
523 :param url: A SQLAlchemy engine URL.
524 :param encoding: The encoding to create the database as.
525 :param template:
526 The name of the template from which to create the new database. At the
527 moment only supported by PostgreSQL driver.
528
529 To create a database, you can pass a simple URL that would have
530 been passed to ``create_engine``. ::
531
532 create_database('postgresql://postgres@localhost/name')
533
534 You may also pass the url from an existing engine. ::
535
536 create_database(engine.url)
537
538 Has full support for mysql, postgres, and sqlite. In theory,
539 other database engines should be supported.
540 """
541
542 url = make_url(url)
543 database = url.database
544 dialect_name = url.get_dialect().name
545 dialect_driver = url.get_dialect().driver
546
547 if dialect_name == 'postgresql':
548 url = _set_url_database(url, database='postgres')
549 elif dialect_name == 'mssql':
550 url = _set_url_database(url, database='master')
551 elif dialect_name == 'cockroachdb':
552 url = _set_url_database(url, database='defaultdb')
553 elif not dialect_name == 'sqlite':
554 url = _set_url_database(url, database=None)
555
556 if (dialect_name == 'mssql' and dialect_driver in {'pymssql', 'pyodbc'}) or (
557 dialect_name == 'postgresql'
558 and dialect_driver
559 in {'asyncpg', 'pg8000', 'psycopg', 'psycopg2', 'psycopg2cffi'}
560 ):
561 engine = sa.create_engine(url, isolation_level='AUTOCOMMIT')
562 else:
563 engine = sa.create_engine(url)
564
565 if dialect_name == 'postgresql':
566 if not template:
567 template = 'template1'
568
569 with engine.begin() as conn:
570 text = "CREATE DATABASE {} ENCODING '{}' TEMPLATE {}".format(
571 quote(conn, database), encoding, quote(conn, template)
572 )
573 conn.execute(sa.text(text))
574
575 elif dialect_name == 'mysql':
576 with engine.begin() as conn:
577 text = "CREATE DATABASE {} CHARACTER SET = '{}'".format(
578 quote(conn, database), encoding
579 )
580 conn.execute(sa.text(text))
581
582 elif dialect_name == 'sqlite' and database != ':memory:':
583 if database:
584 with engine.begin() as conn:
585 conn.execute(sa.text('CREATE TABLE DB(id int)'))
586 conn.execute(sa.text('DROP TABLE DB'))
587
588 else:
589 with engine.begin() as conn:
590 text = f'CREATE DATABASE {quote(conn, database)}'
591 conn.execute(sa.text(text))
592
593 engine.dispose()
594
595
596def drop_database(url):
597 """Issue the appropriate DROP DATABASE statement.
598
599 :param url: A SQLAlchemy engine URL.
600
601 Works similar to the :func:`create_database` method in that both url text
602 and a constructed url are accepted.
603
604 ::
605
606 drop_database('postgresql://postgres@localhost/name')
607 drop_database(engine.url)
608
609 """
610
611 url = make_url(url)
612 database = url.database
613 dialect_name = url.get_dialect().name
614 dialect_driver = url.get_dialect().driver
615
616 if dialect_name == 'postgresql':
617 url = _set_url_database(url, database='postgres')
618 elif dialect_name == 'mssql':
619 url = _set_url_database(url, database='master')
620 elif dialect_name == 'cockroachdb':
621 url = _set_url_database(url, database='defaultdb')
622 elif not dialect_name == 'sqlite':
623 url = _set_url_database(url, database=None)
624
625 if (dialect_name == 'mssql' and dialect_driver in {'pymssql', 'pyodbc'}) or (
626 dialect_name == 'postgresql'
627 and dialect_driver
628 in {'asyncpg', 'pg8000', 'psycopg', 'psycopg2', 'psycopg2cffi'}
629 ):
630 engine = sa.create_engine(url, isolation_level='AUTOCOMMIT')
631 else:
632 engine = sa.create_engine(url)
633
634 if dialect_name == 'sqlite' and database != ':memory:':
635 if database:
636 os.remove(database)
637 else:
638 with engine.begin() as conn:
639 text = f'DROP DATABASE {quote(conn, database)}'
640 conn.execute(sa.text(text))
641
642 engine.dispose()