Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy_utils/functions/database.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

177 statements  

1import itertools 

2import os 

3from collections.abc import Mapping, Sequence 

4from copy import copy 

5 

6import sqlalchemy as sa 

7from sqlalchemy.engine.url import make_url 

8from sqlalchemy.exc import OperationalError, ProgrammingError 

9 

10from ..utils import starts_with 

11from .orm import quote 

12 

13 

14def escape_like(string, escape_char='*'): 

15 """ 

16 Escape the string parameter used in SQL LIKE expressions. 

17 

18 :: 

19 

20 from sqlalchemy_utils import escape_like 

21 

22 

23 query = session.query(User).filter( 

24 User.name.ilike(escape_like('John')) 

25 ) 

26 

27 

28 :param string: a string to escape 

29 :param escape_char: escape character 

30 """ 

31 return ( 

32 string.replace(escape_char, escape_char * 2) 

33 .replace('%', escape_char + '%') 

34 .replace('_', escape_char + '_') 

35 ) 

36 

37 

38def json_sql(value, scalars_to_json=True): 

39 """ 

40 Convert python data structures to PostgreSQL specific SQLAlchemy JSON 

41 constructs. This function is extremly useful if you need to build 

42 PostgreSQL JSON on python side. 

43 

44 .. note:: 

45 

46 This function needs PostgreSQL >= 9.4 

47 

48 Scalars are converted to to_json SQLAlchemy function objects 

49 

50 :: 

51 

52 json_sql(1) # Equals SQL: to_json(1) 

53 

54 json_sql('a') # to_json('a') 

55 

56 

57 Mappings are converted to json_build_object constructs 

58 

59 :: 

60 

61 json_sql({'a': 'c', '2': 5}) # json_build_object('a', 'c', '2', 5) 

62 

63 

64 Sequences (other than strings) are converted to json_build_array constructs 

65 

66 :: 

67 

68 json_sql([1, 2, 3]) # json_build_array(1, 2, 3) 

69 

70 

71 You can also nest these data structures 

72 

73 :: 

74 

75 json_sql({'a': [1, 2, 3]}) 

76 # json_build_object('a', json_build_array[1, 2, 3]) 

77 

78 

79 :param value: 

80 value to be converted to SQLAlchemy PostgreSQL function constructs 

81 """ 

82 if scalars_to_json: 

83 

84 def scalar_convert(a): 

85 return sa.func.to_json(sa.text(a)) 

86 else: 

87 scalar_convert = sa.text 

88 

89 if isinstance(value, Mapping): 

90 return sa.func.json_build_object( 

91 *( 

92 json_sql(v, scalars_to_json=False) 

93 for v in itertools.chain(*value.items()) 

94 ) 

95 ) 

96 elif isinstance(value, str): 

97 return scalar_convert(f"'{value}'") 

98 elif isinstance(value, Sequence): 

99 return sa.func.json_build_array( 

100 *(json_sql(v, scalars_to_json=False) for v in value) 

101 ) 

102 elif isinstance(value, (int, float)): 

103 return scalar_convert(str(value)) 

104 return value 

105 

106 

107def jsonb_sql(value, scalars_to_jsonb=True): 

108 """ 

109 Convert python data structures to PostgreSQL specific SQLAlchemy JSONB 

110 constructs. This function is extremly useful if you need to build 

111 PostgreSQL JSONB on python side. 

112 

113 .. note:: 

114 

115 This function needs PostgreSQL >= 9.4 

116 

117 Scalars are converted to to_jsonb SQLAlchemy function objects 

118 

119 :: 

120 

121 jsonb_sql(1) # Equals SQL: to_jsonb(1) 

122 

123 jsonb_sql('a') # to_jsonb('a') 

124 

125 

126 Mappings are converted to jsonb_build_object constructs 

127 

128 :: 

129 

130 jsonb_sql({'a': 'c', '2': 5}) # jsonb_build_object('a', 'c', '2', 5) 

131 

132 

133 Sequences (other than strings) converted to jsonb_build_array constructs 

134 

135 :: 

136 

137 jsonb_sql([1, 2, 3]) # jsonb_build_array(1, 2, 3) 

138 

139 

140 You can also nest these data structures 

141 

142 :: 

143 

144 jsonb_sql({'a': [1, 2, 3]}) 

145 # jsonb_build_object('a', jsonb_build_array[1, 2, 3]) 

146 

147 

148 :param value: 

149 value to be converted to SQLAlchemy PostgreSQL function constructs 

150 :boolean jsonbb: 

151 Flag to alternatively convert the return with a to_jsonb construct 

152 """ 

153 if scalars_to_jsonb: 

154 

155 def scalar_convert(a): 

156 return sa.func.to_jsonb(sa.text(a)) 

157 else: 

158 scalar_convert = sa.text 

159 

160 if isinstance(value, Mapping): 

161 return sa.func.jsonb_build_object( 

162 *( 

163 jsonb_sql(v, scalars_to_jsonb=False) 

164 for v in itertools.chain(*value.items()) 

165 ) 

166 ) 

167 elif isinstance(value, str): 

168 return scalar_convert(f"'{value}'") 

169 elif isinstance(value, Sequence): 

170 return sa.func.jsonb_build_array( 

171 *(jsonb_sql(v, scalars_to_jsonb=False) for v in value) 

172 ) 

173 elif isinstance(value, (int, float)): 

174 return scalar_convert(str(value)) 

175 return value 

176 

177 

178def has_index(column_or_constraint): 

179 """ 

180 Return whether or not given column or the columns of given foreign key 

181 constraint have an index. A column has an index if it has a single column 

182 index or it is the first column in compound column index. 

183 

184 A foreign key constraint has an index if the constraint columns are the 

185 first columns in compound column index. 

186 

187 :param column_or_constraint: 

188 SQLAlchemy Column object or SA ForeignKeyConstraint object 

189 

190 .. versionadded: 0.26.2 

191 

192 .. versionchanged: 0.30.18 

193 Added support for foreign key constaints. 

194 

195 :: 

196 

197 from sqlalchemy_utils import has_index 

198 

199 

200 class Article(Base): 

201 __tablename__ = 'article' 

202 id = sa.Column(sa.Integer, primary_key=True) 

203 title = sa.Column(sa.String(100)) 

204 is_published = sa.Column(sa.Boolean, index=True) 

205 is_deleted = sa.Column(sa.Boolean) 

206 is_archived = sa.Column(sa.Boolean) 

207 

208 __table_args__ = ( 

209 sa.Index('my_index', is_deleted, is_archived), 

210 ) 

211 

212 

213 table = Article.__table__ 

214 

215 has_index(table.c.is_published) # True 

216 has_index(table.c.is_deleted) # True 

217 has_index(table.c.is_archived) # False 

218 

219 

220 Also supports primary key indexes 

221 

222 :: 

223 

224 from sqlalchemy_utils import has_index 

225 

226 

227 class ArticleTranslation(Base): 

228 __tablename__ = 'article_translation' 

229 id = sa.Column(sa.Integer, primary_key=True) 

230 locale = sa.Column(sa.String(10), primary_key=True) 

231 title = sa.Column(sa.String(100)) 

232 

233 

234 table = ArticleTranslation.__table__ 

235 

236 has_index(table.c.locale) # False 

237 has_index(table.c.id) # True 

238 

239 

240 This function supports foreign key constraints as well 

241 

242 :: 

243 

244 

245 class User(Base): 

246 __tablename__ = 'user' 

247 first_name = sa.Column(sa.Unicode(255), primary_key=True) 

248 last_name = sa.Column(sa.Unicode(255), primary_key=True) 

249 

250 class Article(Base): 

251 __tablename__ = 'article' 

252 id = sa.Column(sa.Integer, primary_key=True) 

253 author_first_name = sa.Column(sa.Unicode(255)) 

254 author_last_name = sa.Column(sa.Unicode(255)) 

255 __table_args__ = ( 

256 sa.ForeignKeyConstraint( 

257 [author_first_name, author_last_name], 

258 [User.first_name, User.last_name] 

259 ), 

260 sa.Index( 

261 'my_index', 

262 author_first_name, 

263 author_last_name 

264 ) 

265 ) 

266 

267 table = Article.__table__ 

268 constraint = list(table.foreign_keys)[0].constraint 

269 

270 has_index(constraint) # True 

271 """ 

272 table = column_or_constraint.table 

273 if not isinstance(table, sa.Table): 

274 raise TypeError( 

275 'Only columns belonging to Table objects are supported. Given ' 

276 'column belongs to %r.' % table 

277 ) 

278 primary_keys = table.primary_key.columns.values() 

279 if isinstance(column_or_constraint, sa.ForeignKeyConstraint): 

280 columns = list(column_or_constraint.columns.values()) 

281 else: 

282 columns = [column_or_constraint] 

283 

284 return (primary_keys and starts_with(primary_keys, columns)) or any( 

285 starts_with(index.columns.values(), columns) for index in table.indexes 

286 ) 

287 

288 

289def has_unique_index(column_or_constraint): 

290 """ 

291 Return whether or not given column or given foreign key constraint has a 

292 unique index. 

293 

294 A column has a unique index if it has a single column primary key index or 

295 it has a single column UniqueConstraint. 

296 

297 A foreign key constraint has a unique index if the columns of the 

298 constraint are the same as the columns of table primary key or the coluns 

299 of any unique index or any unique constraint of the given table. 

300 

301 :param column: SQLAlchemy Column object 

302 

303 .. versionadded: 0.27.1 

304 

305 .. versionchanged: 0.30.18 

306 Added support for foreign key constaints. 

307 

308 Fixed support for unique indexes (previously only worked for unique 

309 constraints) 

310 

311 :: 

312 

313 from sqlalchemy_utils import has_unique_index 

314 

315 

316 class Article(Base): 

317 __tablename__ = 'article' 

318 id = sa.Column(sa.Integer, primary_key=True) 

319 title = sa.Column(sa.String(100)) 

320 is_published = sa.Column(sa.Boolean, unique=True) 

321 is_deleted = sa.Column(sa.Boolean) 

322 is_archived = sa.Column(sa.Boolean) 

323 

324 

325 table = Article.__table__ 

326 

327 has_unique_index(table.c.is_published) # True 

328 has_unique_index(table.c.is_deleted) # False 

329 has_unique_index(table.c.id) # True 

330 

331 

332 This function supports foreign key constraints as well 

333 

334 :: 

335 

336 

337 class User(Base): 

338 __tablename__ = 'user' 

339 first_name = sa.Column(sa.Unicode(255), primary_key=True) 

340 last_name = sa.Column(sa.Unicode(255), primary_key=True) 

341 

342 class Article(Base): 

343 __tablename__ = 'article' 

344 id = sa.Column(sa.Integer, primary_key=True) 

345 author_first_name = sa.Column(sa.Unicode(255)) 

346 author_last_name = sa.Column(sa.Unicode(255)) 

347 __table_args__ = ( 

348 sa.ForeignKeyConstraint( 

349 [author_first_name, author_last_name], 

350 [User.first_name, User.last_name] 

351 ), 

352 sa.Index( 

353 'my_index', 

354 author_first_name, 

355 author_last_name, 

356 unique=True 

357 ) 

358 ) 

359 

360 table = Article.__table__ 

361 constraint = list(table.foreign_keys)[0].constraint 

362 

363 has_unique_index(constraint) # True 

364 

365 

366 :raises TypeError: if given column does not belong to a Table object 

367 """ 

368 table = column_or_constraint.table 

369 if not isinstance(table, sa.Table): 

370 raise TypeError( 

371 'Only columns belonging to Table objects are supported. Given ' 

372 'column belongs to %r.' % table 

373 ) 

374 primary_keys = list(table.primary_key.columns.values()) 

375 if isinstance(column_or_constraint, sa.ForeignKeyConstraint): 

376 columns = list(column_or_constraint.columns.values()) 

377 else: 

378 columns = [column_or_constraint] 

379 

380 return ( 

381 (columns == primary_keys) 

382 or any( 

383 columns == list(constraint.columns.values()) 

384 for constraint in table.constraints 

385 if isinstance(constraint, sa.sql.schema.UniqueConstraint) 

386 ) 

387 or any( 

388 columns == list(index.columns.values()) 

389 for index in table.indexes 

390 if index.unique 

391 ) 

392 ) 

393 

394 

395def is_auto_assigned_date_column(column): 

396 """ 

397 Returns whether or not given SQLAlchemy Column object's is auto assigned 

398 DateTime or Date. 

399 

400 :param column: SQLAlchemy Column object 

401 """ 

402 return ( 

403 isinstance(column.type, sa.DateTime) or isinstance(column.type, sa.Date) 

404 ) and ( 

405 column.default 

406 or column.server_default 

407 or column.onupdate 

408 or column.server_onupdate 

409 ) 

410 

411 

412def _set_url_database(url: sa.engine.url.URL, database): 

413 """Set the database of an engine URL. 

414 

415 :param url: A SQLAlchemy engine URL. 

416 :param database: New database to set. 

417 

418 """ 

419 if hasattr(url, '_replace'): 

420 # Cannot use URL.set() as database may need to be set to None. 

421 ret = url._replace(database=database) 

422 else: # SQLAlchemy <1.4 

423 url = copy(url) 

424 url.database = database 

425 ret = url 

426 assert ret.database == database, ret 

427 return ret 

428 

429 

430def _get_scalar_result(engine, sql): 

431 with engine.connect() as conn: 

432 return conn.scalar(sql) 

433 

434 

435def _sqlite_file_exists(database): 

436 if not os.path.isfile(database) or os.path.getsize(database) < 100: 

437 return False 

438 

439 with open(database, 'rb') as f: 

440 header = f.read(100) 

441 

442 return header[:16] == b'SQLite format 3\x00' 

443 

444 

445def database_exists(url): 

446 """Check if a database exists. 

447 

448 :param url: A SQLAlchemy engine URL. 

449 

450 Performs backend-specific testing to quickly determine if a database 

451 exists on the server. :: 

452 

453 database_exists('postgresql://postgres@localhost/name') #=> False 

454 create_database('postgresql://postgres@localhost/name') 

455 database_exists('postgresql://postgres@localhost/name') #=> True 

456 

457 Supports checking against a constructed URL as well. :: 

458 

459 engine = create_engine('postgresql://postgres@localhost/name') 

460 database_exists(engine.url) #=> False 

461 create_database(engine.url) 

462 database_exists(engine.url) #=> True 

463 

464 """ 

465 

466 url = make_url(url) 

467 database = url.database 

468 dialect_name = url.get_dialect().name 

469 engine = None 

470 try: 

471 if dialect_name == 'postgresql': 

472 text = "SELECT 1 FROM pg_database WHERE datname='%s'" % database 

473 for db in (database, 'postgres', 'template1', 'template0', None): 

474 url = _set_url_database(url, database=db) 

475 engine = sa.create_engine(url, isolation_level='AUTOCOMMIT') 

476 try: 

477 return bool(_get_scalar_result(engine, sa.text(text))) 

478 except (ProgrammingError, OperationalError): 

479 pass 

480 return False 

481 

482 elif dialect_name == 'mysql': 

483 url = _set_url_database(url, database=None) 

484 engine = sa.create_engine(url) 

485 text = ( 

486 'SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA ' 

487 "WHERE SCHEMA_NAME = '%s'" % database 

488 ) 

489 return bool(_get_scalar_result(engine, sa.text(text))) 

490 

491 elif dialect_name == 'sqlite': 

492 url = _set_url_database(url, database=None) 

493 engine = sa.create_engine(url) 

494 if database: 

495 return database == ':memory:' or _sqlite_file_exists(database) 

496 else: 

497 # The default SQLAlchemy database is in memory, and :memory: is 

498 # not required, thus we should support that use case. 

499 return True 

500 elif dialect_name == 'mssql': 

501 text = "SELECT 1 FROM sys.databases WHERE name = '%s'" % database 

502 url = _set_url_database(url, database='master') 

503 engine = sa.create_engine(url, isolation_level='AUTOCOMMIT') 

504 try: 

505 return bool(_get_scalar_result(engine, sa.text(text))) 

506 except (ProgrammingError, OperationalError): 

507 return False 

508 else: 

509 text = 'SELECT 1' 

510 try: 

511 engine = sa.create_engine(url) 

512 return bool(_get_scalar_result(engine, sa.text(text))) 

513 except (ProgrammingError, OperationalError): 

514 return False 

515 finally: 

516 if engine: 

517 engine.dispose() 

518 

519 

520def create_database(url, encoding='utf8', template=None): 

521 """Issue the appropriate CREATE DATABASE statement. 

522 

523 :param url: A SQLAlchemy engine URL. 

524 :param encoding: The encoding to create the database as. 

525 :param template: 

526 The name of the template from which to create the new database. At the 

527 moment only supported by PostgreSQL driver. 

528 

529 To create a database, you can pass a simple URL that would have 

530 been passed to ``create_engine``. :: 

531 

532 create_database('postgresql://postgres@localhost/name') 

533 

534 You may also pass the url from an existing engine. :: 

535 

536 create_database(engine.url) 

537 

538 Has full support for mysql, postgres, and sqlite. In theory, 

539 other database engines should be supported. 

540 """ 

541 

542 url = make_url(url) 

543 database = url.database 

544 dialect_name = url.get_dialect().name 

545 dialect_driver = url.get_dialect().driver 

546 

547 if dialect_name == 'postgresql': 

548 url = _set_url_database(url, database='postgres') 

549 elif dialect_name == 'mssql': 

550 url = _set_url_database(url, database='master') 

551 elif dialect_name == 'cockroachdb': 

552 url = _set_url_database(url, database='defaultdb') 

553 elif not dialect_name == 'sqlite': 

554 url = _set_url_database(url, database=None) 

555 

556 if (dialect_name == 'mssql' and dialect_driver in {'pymssql', 'pyodbc'}) or ( 

557 dialect_name == 'postgresql' 

558 and dialect_driver 

559 in {'asyncpg', 'pg8000', 'psycopg', 'psycopg2', 'psycopg2cffi'} 

560 ): 

561 engine = sa.create_engine(url, isolation_level='AUTOCOMMIT') 

562 else: 

563 engine = sa.create_engine(url) 

564 

565 if dialect_name == 'postgresql': 

566 if not template: 

567 template = 'template1' 

568 

569 with engine.begin() as conn: 

570 text = "CREATE DATABASE {} ENCODING '{}' TEMPLATE {}".format( 

571 quote(conn, database), encoding, quote(conn, template) 

572 ) 

573 conn.execute(sa.text(text)) 

574 

575 elif dialect_name == 'mysql': 

576 with engine.begin() as conn: 

577 text = "CREATE DATABASE {} CHARACTER SET = '{}'".format( 

578 quote(conn, database), encoding 

579 ) 

580 conn.execute(sa.text(text)) 

581 

582 elif dialect_name == 'sqlite' and database != ':memory:': 

583 if database: 

584 with engine.begin() as conn: 

585 conn.execute(sa.text('CREATE TABLE DB(id int)')) 

586 conn.execute(sa.text('DROP TABLE DB')) 

587 

588 else: 

589 with engine.begin() as conn: 

590 text = f'CREATE DATABASE {quote(conn, database)}' 

591 conn.execute(sa.text(text)) 

592 

593 engine.dispose() 

594 

595 

596def drop_database(url): 

597 """Issue the appropriate DROP DATABASE statement. 

598 

599 :param url: A SQLAlchemy engine URL. 

600 

601 Works similar to the :func:`create_database` method in that both url text 

602 and a constructed url are accepted. 

603 

604 :: 

605 

606 drop_database('postgresql://postgres@localhost/name') 

607 drop_database(engine.url) 

608 

609 """ 

610 

611 url = make_url(url) 

612 database = url.database 

613 dialect_name = url.get_dialect().name 

614 dialect_driver = url.get_dialect().driver 

615 

616 if dialect_name == 'postgresql': 

617 url = _set_url_database(url, database='postgres') 

618 elif dialect_name == 'mssql': 

619 url = _set_url_database(url, database='master') 

620 elif dialect_name == 'cockroachdb': 

621 url = _set_url_database(url, database='defaultdb') 

622 elif not dialect_name == 'sqlite': 

623 url = _set_url_database(url, database=None) 

624 

625 if (dialect_name == 'mssql' and dialect_driver in {'pymssql', 'pyodbc'}) or ( 

626 dialect_name == 'postgresql' 

627 and dialect_driver 

628 in {'asyncpg', 'pg8000', 'psycopg', 'psycopg2', 'psycopg2cffi'} 

629 ): 

630 engine = sa.create_engine(url, isolation_level='AUTOCOMMIT') 

631 else: 

632 engine = sa.create_engine(url) 

633 

634 if dialect_name == 'sqlite' and database != ':memory:': 

635 if database: 

636 os.remove(database) 

637 else: 

638 with engine.begin() as conn: 

639 text = f'DROP DATABASE {quote(conn, database)}' 

640 conn.execute(sa.text(text)) 

641 

642 engine.dispose()