Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1171 statements  

1# dialects/oracle/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r""" 

11.. dialect:: oracle 

12 :name: Oracle Database 

13 :normal_support: 11+ 

14 :best_effort: 9+ 

15 

16 

17Auto Increment Behavior 

18----------------------- 

19 

20SQLAlchemy Table objects which include integer primary keys are usually assumed 

21to have "autoincrementing" behavior, meaning they can generate their own 

22primary key values upon INSERT. For use within Oracle Database, two options are 

23available, which are the use of IDENTITY columns (Oracle Database 12 and above 

24only) or the association of a SEQUENCE with the column. 

25 

26Specifying GENERATED AS IDENTITY (Oracle Database 12 and above) 

27~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

28 

29Starting from version 12, Oracle Database can make use of identity columns 

30using the :class:`_sql.Identity` to specify the autoincrementing behavior:: 

31 

32 t = Table( 

33 "mytable", 

34 metadata, 

35 Column("id", Integer, Identity(start=3), primary_key=True), 

36 Column(...), 

37 ..., 

38 ) 

39 

40The CREATE TABLE for the above :class:`_schema.Table` object would be: 

41 

42.. sourcecode:: sql 

43 

44 CREATE TABLE mytable ( 

45 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3), 

46 ..., 

47 PRIMARY KEY (id) 

48 ) 

49 

50The :class:`_schema.Identity` object support many options to control the 

51"autoincrementing" behavior of the column, like the starting value, the 

52incrementing value, etc. In addition to the standard options, Oracle Database 

53supports setting :paramref:`_schema.Identity.always` to ``None`` to use the 

54default generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports 

55setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL 

56in conjunction with a 'BY DEFAULT' identity column. 

57 

58Using a SEQUENCE (all Oracle Database versions) 

59~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

60 

61Older version of Oracle Database had no "autoincrement" feature: SQLAlchemy 

62relies upon sequences to produce these values. With the older Oracle Database 

63versions, *a sequence must always be explicitly specified to enable 

64autoincrement*. This is divergent with the majority of documentation examples 

65which assume the usage of an autoincrement-capable database. To specify 

66sequences, use the sqlalchemy.schema.Sequence object which is passed to a 

67Column construct:: 

68 

69 t = Table( 

70 "mytable", 

71 metadata, 

72 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True), 

73 Column(...), 

74 ..., 

75 ) 

76 

77This step is also required when using table reflection, i.e. autoload_with=engine:: 

78 

79 t = Table( 

80 "mytable", 

81 metadata, 

82 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True), 

83 autoload_with=engine, 

84 ) 

85 

86.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

87 in a :class:`_schema.Column` to specify the option of an autoincrementing 

88 column. 

89 

90.. _oracle_isolation_level: 

91 

92Transaction Isolation Level / Autocommit 

93---------------------------------------- 

94 

95Oracle Database supports "READ COMMITTED" and "SERIALIZABLE" modes of 

96isolation. The AUTOCOMMIT isolation level is also supported by the 

97python-oracledb and cx_Oracle dialects. 

98 

99To set using per-connection execution options:: 

100 

101 connection = engine.connect() 

102 connection = connection.execution_options(isolation_level="AUTOCOMMIT") 

103 

104For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle Database dialects sets 

105the level at the session level using ``ALTER SESSION``, which is reverted back 

106to its default setting when the connection is returned to the connection pool. 

107 

108Valid values for ``isolation_level`` include: 

109 

110* ``READ COMMITTED`` 

111* ``AUTOCOMMIT`` 

112* ``SERIALIZABLE`` 

113 

114.. note:: The implementation for the 

115 :meth:`_engine.Connection.get_isolation_level` method as implemented by the 

116 Oracle Database dialects necessarily force the start of a transaction using the 

117 Oracle Database DBMS_TRANSACTION.LOCAL_TRANSACTION_ID function; otherwise no 

118 level is normally readable. 

119 

120 Additionally, the :meth:`_engine.Connection.get_isolation_level` method will 

121 raise an exception if the ``v$transaction`` view is not available due to 

122 permissions or other reasons, which is a common occurrence in Oracle Database 

123 installations. 

124 

125 The python-oracledb and cx_Oracle dialects attempt to call the 

126 :meth:`_engine.Connection.get_isolation_level` method when the dialect makes 

127 its first connection to the database in order to acquire the 

128 "default"isolation level. This default level is necessary so that the level 

129 can be reset on a connection after it has been temporarily modified using 

130 :meth:`_engine.Connection.execution_options` method. In the common event 

131 that the :meth:`_engine.Connection.get_isolation_level` method raises an 

132 exception due to ``v$transaction`` not being readable as well as any other 

133 database-related failure, the level is assumed to be "READ COMMITTED". No 

134 warning is emitted for this initial first-connect condition as it is 

135 expected to be a common restriction on Oracle databases. 

136 

137.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_Oracle dialect 

138 as well as the notion of a default isolation level 

139 

140.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live 

141 reading of the isolation level. 

142 

143.. versionchanged:: 1.3.22 In the event that the default isolation 

144 level cannot be read due to permissions on the v$transaction view as 

145 is common in Oracle installations, the default isolation level is hardcoded 

146 to "READ COMMITTED" which was the behavior prior to 1.3.21. 

147 

148.. seealso:: 

149 

150 :ref:`dbapi_autocommit` 

151 

152Identifier Casing 

153----------------- 

154 

155In Oracle Database, the data dictionary represents all case insensitive 

156identifier names using UPPERCASE text. This is in contradiction to the 

157expectations of SQLAlchemy, which assume a case insensitive name is represented 

158as lowercase text. 

159 

160As an example of case insensitive identifier names, consider the following table: 

161 

162.. sourcecode:: sql 

163 

164 CREATE TABLE MyTable (Identifier INTEGER PRIMARY KEY) 

165 

166If you were to ask Oracle Database for information about this table, the 

167table name would be reported as ``MYTABLE`` and the column name would 

168be reported as ``IDENTIFIER``. Compare to most other databases such as 

169PostgreSQL and MySQL which would report these names as ``mytable`` and 

170``identifier``. The names are **not quoted, therefore are case insensitive**. 

171The special casing of ``MyTable`` and ``Identifier`` would only be maintained 

172if they were quoted in the table definition: 

173 

174.. sourcecode:: sql 

175 

176 CREATE TABLE "MyTable" ("Identifier" INTEGER PRIMARY KEY) 

177 

178When constructing a SQLAlchemy :class:`.Table` object, **an all lowercase name 

179is considered to be case insensitive**. So the following table assumes 

180case insensitive names:: 

181 

182 Table("mytable", metadata, Column("identifier", Integer, primary_key=True)) 

183 

184Whereas when mixed case or UPPERCASE names are used, case sensitivity is 

185assumed:: 

186 

187 Table("MyTable", metadata, Column("Identifier", Integer, primary_key=True)) 

188 

189A similar situation occurs at the database driver level when emitting a 

190textual SQL SELECT statement and looking at column names in the DBAPI 

191``cursor.description`` attribute. A database like PostgreSQL will normalize 

192case insensitive names to be lowercase:: 

193 

194 >>> pg_engine = create_engine("postgresql://scott:tiger@localhost/test") 

195 >>> pg_connection = pg_engine.connect() 

196 >>> result = pg_connection.exec_driver_sql("SELECT 1 AS SomeName") 

197 >>> result.cursor.description 

198 (Column(name='somename', type_code=23),) 

199 

200Whereas Oracle normalizes them to UPPERCASE:: 

201 

202 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe") 

203 >>> oracle_connection = oracle_engine.connect() 

204 >>> result = oracle_connection.exec_driver_sql( 

205 ... "SELECT 1 AS SomeName FROM DUAL" 

206 ... ) 

207 >>> result.cursor.description 

208 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)] 

209 

210In order to achieve cross-database parity for the two cases of a. table 

211reflection and b. textual-only SQL statement round trips, SQLAlchemy performs a step 

212called **name normalization** when using the Oracle dialect. This process may 

213also apply to other third party dialects that have similar UPPERCASE handling 

214of case insensitive names. 

215 

216When using name normalization, SQLAlchemy attempts to detect if a name is 

217case insensitive by checking if all characters are UPPERCASE letters only; 

218if so, then it assumes this is a case insensitive name and is delivered as 

219a lowercase name. 

220 

221For table reflection, a tablename that is seen represented as all UPPERCASE 

222in Oracle Database's catalog tables will be assumed to have a case insensitive 

223name. This is what allows the ``Table`` definition to use lower case names 

224and be equally compatible from a reflection point of view on Oracle Database 

225and all other databases such as PostgreSQL and MySQL:: 

226 

227 # matches a table created with CREATE TABLE mytable 

228 Table("mytable", metadata, autoload_with=some_engine) 

229 

230Above, the all lowercase name ``"mytable"`` is case insensitive; it will match 

231a table reported by PostgreSQL as ``"mytable"`` and a table reported by 

232Oracle as ``"MYTABLE"``. If name normalization were not present, it would 

233not be possible for the above :class:`.Table` definition to be introspectable 

234in a cross-database way, since we are dealing with a case insensitive name 

235that is not reported by each database in the same way. 

236 

237Case sensitivity can be forced on in this case, such as if we wanted to represent 

238the quoted tablename ``"MYTABLE"`` with that exact casing, most simply by using 

239that casing directly, which will be seen as a case sensitive name:: 

240 

241 # matches a table created with CREATE TABLE "MYTABLE" 

242 Table("MYTABLE", metadata, autoload_with=some_engine) 

243 

244For the unusual case of a quoted all-lowercase name, the :class:`.quoted_name` 

245construct may be used:: 

246 

247 from sqlalchemy import quoted_name 

248 

249 # matches a table created with CREATE TABLE "mytable" 

250 Table( 

251 quoted_name("mytable", quote=True), metadata, autoload_with=some_engine 

252 ) 

253 

254Name normalization also takes place when handling result sets from **purely 

255textual SQL strings**, that have no other :class:`.Table` or :class:`.Column` 

256metadata associated with them. This includes SQL strings executed using 

257:meth:`.Connection.exec_driver_sql` and SQL strings executed using the 

258:func:`.text` construct which do not include :class:`.Column` metadata. 

259 

260Returning to the Oracle Database SELECT statement, we see that even though 

261``cursor.description`` reports the column name as ``SOMENAME``, SQLAlchemy 

262name normalizes this to ``somename``:: 

263 

264 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe") 

265 >>> oracle_connection = oracle_engine.connect() 

266 >>> result = oracle_connection.exec_driver_sql( 

267 ... "SELECT 1 AS SomeName FROM DUAL" 

268 ... ) 

269 >>> result.cursor.description 

270 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)] 

271 >>> result.keys() 

272 RMKeyView(['somename']) 

273 

274The single scenario where the above behavior produces inaccurate results 

275is when using an all-uppercase, quoted name. SQLAlchemy has no way to determine 

276that a particular name in ``cursor.description`` was quoted, and is therefore 

277case sensitive, or was not quoted, and should be name normalized:: 

278 

279 >>> result = oracle_connection.exec_driver_sql( 

280 ... 'SELECT 1 AS "SOMENAME" FROM DUAL' 

281 ... ) 

282 >>> result.cursor.description 

283 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)] 

284 >>> result.keys() 

285 RMKeyView(['somename']) 

286 

287For this case, a new feature will be available in SQLAlchemy 2.1 to disable 

288the name normalization behavior in specific cases. 

289 

290 

291.. _oracle_max_identifier_lengths: 

292 

293Maximum Identifier Lengths 

294-------------------------- 

295 

296SQLAlchemy is sensitive to the maximum identifier length supported by Oracle 

297Database. This affects generated SQL label names as well as the generation of 

298constraint names, particularly in the case where the constraint naming 

299convention feature described at :ref:`constraint_naming_conventions` is being 

300used. 

301 

302Oracle Database 12.2 increased the default maximum identifier length from 30 to 

303128. As of SQLAlchemy 1.4, the default maximum identifier length for the Oracle 

304dialects is 128 characters. Upon first connection, the maximum length actually 

305supported by the database is obtained. In all cases, setting the 

306:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this 

307change and the value given will be used as is:: 

308 

309 engine = create_engine( 

310 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1", 

311 max_identifier_length=30, 

312 ) 

313 

314If :paramref:`_sa.create_engine.max_identifier_length` is not set, the oracledb 

315dialect internally uses the ``max_identifier_length`` attribute available on 

316driver connections since python-oracledb version 2.5. When using an older 

317driver version, or using the cx_Oracle dialect, SQLAlchemy will instead attempt 

318to use the query ``SELECT value FROM v$parameter WHERE name = 'compatible'`` 

319upon first connect in order to determine the effective compatibility version of 

320the database. The "compatibility" version is a version number that is 

321independent of the actual database version. It is used to assist database 

322migration. It is configured by an Oracle Database initialization parameter. The 

323compatibility version then determines the maximum allowed identifier length for 

324the database. If the V$ view is not available, the database version information 

325is used instead. 

326 

327The maximum identifier length comes into play both when generating anonymized 

328SQL labels in SELECT statements, but more crucially when generating constraint 

329names from a naming convention. It is this area that has created the need for 

330SQLAlchemy to change this default conservatively. For example, the following 

331naming convention produces two very different constraint names based on the 

332identifier length:: 

333 

334 from sqlalchemy import Column 

335 from sqlalchemy import Index 

336 from sqlalchemy import Integer 

337 from sqlalchemy import MetaData 

338 from sqlalchemy import Table 

339 from sqlalchemy.dialects import oracle 

340 from sqlalchemy.schema import CreateIndex 

341 

342 m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"}) 

343 

344 t = Table( 

345 "t", 

346 m, 

347 Column("some_column_name_1", Integer), 

348 Column("some_column_name_2", Integer), 

349 Column("some_column_name_3", Integer), 

350 ) 

351 

352 ix = Index( 

353 None, 

354 t.c.some_column_name_1, 

355 t.c.some_column_name_2, 

356 t.c.some_column_name_3, 

357 ) 

358 

359 oracle_dialect = oracle.dialect(max_identifier_length=30) 

360 print(CreateIndex(ix).compile(dialect=oracle_dialect)) 

361 

362With an identifier length of 30, the above CREATE INDEX looks like: 

363 

364.. sourcecode:: sql 

365 

366 CREATE INDEX ix_some_column_name_1s_70cd ON t 

367 (some_column_name_1, some_column_name_2, some_column_name_3) 

368 

369However with length of 128, it becomes:: 

370 

371.. sourcecode:: sql 

372 

373 CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t 

374 (some_column_name_1, some_column_name_2, some_column_name_3) 

375 

376Applications which have run versions of SQLAlchemy prior to 1.4 on Oracle 

377Database version 12.2 or greater are therefore subject to the scenario of a 

378database migration that wishes to "DROP CONSTRAINT" on a name that was 

379previously generated with the shorter length. This migration will fail when 

380the identifier length is changed without the name of the index or constraint 

381first being adjusted. Such applications are strongly advised to make use of 

382:paramref:`_sa.create_engine.max_identifier_length` in order to maintain 

383control of the generation of truncated names, and to fully review and test all 

384database migrations in a staging environment when changing this value to ensure 

385that the impact of this change has been mitigated. 

386 

387.. versionchanged:: 1.4 the default max_identifier_length for Oracle Database 

388 is 128 characters, which is adjusted down to 30 upon first connect if the 

389 Oracle Database, or its compatibility setting, are lower than version 12.2. 

390 

391 

392LIMIT/OFFSET/FETCH Support 

393-------------------------- 

394 

395Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make use 

396of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming Oracle Database 12c or 

397above, and assuming the SELECT statement is not embedded within a compound 

398statement like UNION. This syntax is also available directly by using the 

399:meth:`_sql.Select.fetch` method. 

400 

401.. versionchanged:: 2.0 the Oracle Database dialects now use ``FETCH FIRST N 

402 ROW / OFFSET N ROWS`` for all :meth:`_sql.Select.limit` and 

403 :meth:`_sql.Select.offset` usage including within the ORM and legacy 

404 :class:`_orm.Query`. To force the legacy behavior using window functions, 

405 specify the ``enable_offset_fetch=False`` dialect parameter to 

406 :func:`_sa.create_engine`. 

407 

408The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle Database 

409version by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`, 

410which will force the use of "legacy" mode that makes use of window functions. 

411This mode is also selected automatically when using a version of Oracle 

412Database prior to 12c. 

413 

414When using legacy mode, or when a :class:`.Select` statement with limit/offset 

415is embedded in a compound statement, an emulated approach for LIMIT / OFFSET 

416based on window functions is used, which involves creation of a subquery using 

417``ROW_NUMBER`` that is prone to performance issues as well as SQL construction 

418issues for complex statements. However, this approach is supported by all 

419Oracle Database versions. See notes below. 

420 

421Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used) 

422~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

423 

424If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the 

425ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an 

426Oracle Database version prior to 12c, the following notes apply: 

427 

428* SQLAlchemy currently makes use of ROWNUM to achieve 

429 LIMIT/OFFSET; the exact methodology is taken from 

430 https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results . 

431 

432* the "FIRST_ROWS()" optimization keyword is not used by default. To enable 

433 the usage of this optimization directive, specify ``optimize_limits=True`` 

434 to :func:`_sa.create_engine`. 

435 

436 .. versionchanged:: 1.4 

437 

438 The Oracle Database dialect renders limit/offset integer values using a 

439 "post compile" scheme which renders the integer directly before passing 

440 the statement to the cursor for execution. The ``use_binds_for_limits`` 

441 flag no longer has an effect. 

442 

443 .. seealso:: 

444 

445 :ref:`change_4808`. 

446 

447.. _oracle_returning: 

448 

449RETURNING Support 

450----------------- 

451 

452Oracle Database supports RETURNING fully for INSERT, UPDATE and DELETE 

453statements that are invoked with a single collection of bound parameters (that 

454is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally 

455support RETURNING with :term:`executemany` statements). Multiple rows may be 

456returned as well. 

457 

458.. versionchanged:: 2.0 the Oracle Database backend has full support for 

459 RETURNING on parity with other backends. 

460 

461 

462ON UPDATE CASCADE 

463----------------- 

464 

465Oracle Database doesn't have native ON UPDATE CASCADE functionality. A trigger 

466based solution is available at 

467https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html 

468 

469When using the SQLAlchemy ORM, the ORM has limited ability to manually issue 

470cascading updates - specify ForeignKey objects using the 

471"deferrable=True, initially='deferred'" keyword arguments, 

472and specify "passive_updates=False" on each relationship(). 

473 

474Oracle Database 8 Compatibility 

475------------------------------- 

476 

477.. warning:: The status of Oracle Database 8 compatibility is not known for 

478 SQLAlchemy 2.0. 

479 

480When Oracle Database 8 is detected, the dialect internally configures itself to 

481the following behaviors: 

482 

483* the use_ansi flag is set to False. This has the effect of converting all 

484 JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN 

485 makes use of Oracle's (+) operator. 

486 

487* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when 

488 the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued 

489 instead. This because these types don't seem to work correctly on Oracle 8 

490 even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and 

491 :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate 

492 NVARCHAR2 and NCLOB. 

493 

494 

495Synonym/DBLINK Reflection 

496------------------------- 

497 

498When using reflection with Table objects, the dialect can optionally search 

499for tables indicated by synonyms, either in local or remote schemas or 

500accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as 

501a keyword argument to the :class:`_schema.Table` construct:: 

502 

503 some_table = Table( 

504 "some_table", autoload_with=some_engine, oracle_resolve_synonyms=True 

505 ) 

506 

507When this flag is set, the given name (such as ``some_table`` above) will be 

508searched not just in the ``ALL_TABLES`` view, but also within the 

509``ALL_SYNONYMS`` view to see if this name is actually a synonym to another 

510name. If the synonym is located and refers to a DBLINK, the Oracle Database 

511dialects know how to locate the table's information using DBLINK syntax(e.g. 

512``@dblink``). 

513 

514``oracle_resolve_synonyms`` is accepted wherever reflection arguments are 

515accepted, including methods such as :meth:`_schema.MetaData.reflect` and 

516:meth:`_reflection.Inspector.get_columns`. 

517 

518If synonyms are not in use, this flag should be left disabled. 

519 

520.. _oracle_constraint_reflection: 

521 

522Constraint Reflection 

523--------------------- 

524 

525The Oracle Database dialects can return information about foreign key, unique, 

526and CHECK constraints, as well as indexes on tables. 

527 

528Raw information regarding these constraints can be acquired using 

529:meth:`_reflection.Inspector.get_foreign_keys`, 

530:meth:`_reflection.Inspector.get_unique_constraints`, 

531:meth:`_reflection.Inspector.get_check_constraints`, and 

532:meth:`_reflection.Inspector.get_indexes`. 

533 

534.. versionchanged:: 1.2 The Oracle Database dialect can now reflect UNIQUE and 

535 CHECK constraints. 

536 

537When using reflection at the :class:`_schema.Table` level, the 

538:class:`_schema.Table` 

539will also include these constraints. 

540 

541Note the following caveats: 

542 

543* When using the :meth:`_reflection.Inspector.get_check_constraints` method, 

544 Oracle Database builds a special "IS NOT NULL" constraint for columns that 

545 specify "NOT NULL". This constraint is **not** returned by default; to 

546 include the "IS NOT NULL" constraints, pass the flag ``include_all=True``:: 

547 

548 from sqlalchemy import create_engine, inspect 

549 

550 engine = create_engine( 

551 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1" 

552 ) 

553 inspector = inspect(engine) 

554 all_check_constraints = inspector.get_check_constraints( 

555 "some_table", include_all=True 

556 ) 

557 

558* in most cases, when reflecting a :class:`_schema.Table`, a UNIQUE constraint 

559 will **not** be available as a :class:`.UniqueConstraint` object, as Oracle 

560 Database mirrors unique constraints with a UNIQUE index in most cases (the 

561 exception seems to be when two or more unique constraints represent the same 

562 columns); the :class:`_schema.Table` will instead represent these using 

563 :class:`.Index` with the ``unique=True`` flag set. 

564 

565* Oracle Database creates an implicit index for the primary key of a table; 

566 this index is **excluded** from all index results. 

567 

568* the list of columns reflected for an index will not include column names 

569 that start with SYS_NC. 

570 

571Table names with SYSTEM/SYSAUX tablespaces 

572------------------------------------------- 

573 

574The :meth:`_reflection.Inspector.get_table_names` and 

575:meth:`_reflection.Inspector.get_temp_table_names` 

576methods each return a list of table names for the current engine. These methods 

577are also part of the reflection which occurs within an operation such as 

578:meth:`_schema.MetaData.reflect`. By default, 

579these operations exclude the ``SYSTEM`` 

580and ``SYSAUX`` tablespaces from the operation. In order to change this, the 

581default list of tablespaces excluded can be changed at the engine level using 

582the ``exclude_tablespaces`` parameter:: 

583 

584 # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM 

585 e = create_engine( 

586 "oracle+oracledb://scott:tiger@localhost:1521/?service_name=freepdb1", 

587 exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"], 

588 ) 

589 

590.. _oracle_float_support: 

591 

592FLOAT / DOUBLE Support and Behaviors 

593------------------------------------ 

594 

595The SQLAlchemy :class:`.Float` and :class:`.Double` datatypes are generic 

596datatypes that resolve to the "least surprising" datatype for a given backend. 

597For Oracle Database, this means they resolve to the ``FLOAT`` and ``DOUBLE`` 

598types:: 

599 

600 >>> from sqlalchemy import cast, literal, Float 

601 >>> from sqlalchemy.dialects import oracle 

602 >>> float_datatype = Float() 

603 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect())) 

604 CAST(:param_1 AS FLOAT) 

605 

606Oracle's ``FLOAT`` / ``DOUBLE`` datatypes are aliases for ``NUMBER``. Oracle 

607Database stores ``NUMBER`` values with full precision, not floating point 

608precision, which means that ``FLOAT`` / ``DOUBLE`` do not actually behave like 

609native FP values. Oracle Database instead offers special datatypes 

610``BINARY_FLOAT`` and ``BINARY_DOUBLE`` to deliver real 4- and 8- byte FP 

611values. 

612 

613SQLAlchemy supports these datatypes directly using :class:`.BINARY_FLOAT` and 

614:class:`.BINARY_DOUBLE`. To use the :class:`.Float` or :class:`.Double` 

615datatypes in a database agnostic way, while allowing Oracle backends to utilize 

616one of these types, use the :meth:`.TypeEngine.with_variant` method to set up a 

617variant:: 

618 

619 >>> from sqlalchemy import cast, literal, Float 

620 >>> from sqlalchemy.dialects import oracle 

621 >>> float_datatype = Float().with_variant(oracle.BINARY_FLOAT(), "oracle") 

622 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect())) 

623 CAST(:param_1 AS BINARY_FLOAT) 

624 

625E.g. to use this datatype in a :class:`.Table` definition:: 

626 

627 my_table = Table( 

628 "my_table", 

629 metadata, 

630 Column( 

631 "fp_data", Float().with_variant(oracle.BINARY_FLOAT(), "oracle") 

632 ), 

633 ) 

634 

635DateTime Compatibility 

636---------------------- 

637 

638Oracle Database has no datatype known as ``DATETIME``, it instead has only 

639``DATE``, which can actually store a date and time value. For this reason, the 

640Oracle Database dialects provide a type :class:`_oracle.DATE` which is a 

641subclass of :class:`.DateTime`. This type has no special behavior, and is only 

642present as a "marker" for this type; additionally, when a database column is 

643reflected and the type is reported as ``DATE``, the time-supporting 

644:class:`_oracle.DATE` type is used. 

645 

646.. _oracle_table_options: 

647 

648Oracle Database Table Options 

649----------------------------- 

650 

651The CREATE TABLE phrase supports the following options with Oracle Database 

652dialects in conjunction with the :class:`_schema.Table` construct: 

653 

654 

655* ``ON COMMIT``:: 

656 

657 Table( 

658 "some_table", 

659 metadata, 

660 ..., 

661 prefixes=["GLOBAL TEMPORARY"], 

662 oracle_on_commit="PRESERVE ROWS", 

663 ) 

664 

665* 

666 ``COMPRESS``:: 

667 

668 Table( 

669 "mytable", metadata, Column("data", String(32)), oracle_compress=True 

670 ) 

671 

672 Table("mytable", metadata, Column("data", String(32)), oracle_compress=6) 

673 

674 The ``oracle_compress`` parameter accepts either an integer compression 

675 level, or ``True`` to use the default compression level. 

676 

677* 

678 ``TABLESPACE``:: 

679 

680 Table("mytable", metadata, ..., oracle_tablespace="EXAMPLE_TABLESPACE") 

681 

682 The ``oracle_tablespace`` parameter specifies the tablespace in which the 

683 table is to be created. This is useful when you want to create a table in a 

684 tablespace other than the default tablespace of the user. 

685 

686 .. versionadded:: 2.0.37 

687 

688.. _oracle_index_options: 

689 

690Oracle Database Specific Index Options 

691-------------------------------------- 

692 

693Bitmap Indexes 

694~~~~~~~~~~~~~~ 

695 

696You can specify the ``oracle_bitmap`` parameter to create a bitmap index 

697instead of a B-tree index:: 

698 

699 Index("my_index", my_table.c.data, oracle_bitmap=True) 

700 

701Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not 

702check for such limitations, only the database will. 

703 

704Index compression 

705~~~~~~~~~~~~~~~~~ 

706 

707Oracle Database has a more efficient storage mode for indexes containing lots 

708of repeated values. Use the ``oracle_compress`` parameter to turn on key 

709compression:: 

710 

711 Index("my_index", my_table.c.data, oracle_compress=True) 

712 

713 Index( 

714 "my_index", 

715 my_table.c.data1, 

716 my_table.c.data2, 

717 unique=True, 

718 oracle_compress=1, 

719 ) 

720 

721The ``oracle_compress`` parameter accepts either an integer specifying the 

722number of prefix columns to compress, or ``True`` to use the default (all 

723columns for non-unique indexes, all but the last column for unique indexes). 

724 

725.. _oracle_vector_datatype: 

726 

727VECTOR Datatype 

728--------------- 

729 

730Oracle Database 23ai introduced a new VECTOR datatype for artificial intelligence 

731and machine learning search operations. The VECTOR datatype is a homogeneous array 

732of 8-bit signed integers, 8-bit unsigned integers (binary), 32-bit floating-point 

733numbers, or 64-bit floating-point numbers. 

734 

735A vector's storage type can be either DENSE or SPARSE. A dense vector contains 

736meaningful values in most or all of its dimensions. In contrast, a sparse vector 

737has non-zero values in only a few dimensions, with the majority being zero. 

738 

739Sparse vectors are represented by the total number of vector dimensions, an array 

740of indices, and an array of values where each value’s location in the vector is 

741indicated by the corresponding indices array position. All other vector values are 

742treated as zero. 

743 

744The storage formats that can be used with sparse vectors are float32, float64, and 

745int8. Note that the binary storage format cannot be used with sparse vectors. 

746 

747Sparse vectors are supported when you are using Oracle Database 23.7 or later. 

748 

749.. seealso:: 

750 

751 `Using VECTOR Data 

752 <https://python-oracledb.readthedocs.io/en/latest/user_guide/vector_data_type.html>`_ - in the documentation 

753 for the :ref:`oracledb` driver. 

754 

755.. versionadded:: 2.0.41 - Added VECTOR datatype 

756 

757.. versionadded:: 2.0.43 - Added DENSE/SPARSE support 

758 

759CREATE TABLE support for VECTOR 

760~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

761 

762With the :class:`.VECTOR` datatype, you can specify the number of dimensions, 

763the storage format, and the storage type for the data. Valid values for the 

764storage format are enum members of :class:`.VectorStorageFormat`. Valid values 

765for the storage type are enum members of :class:`.VectorStorageType`. If 

766storage type is not specified, a DENSE vector is created by default. 

767 

768To create a table that includes a :class:`.VECTOR` column:: 

769 

770 from sqlalchemy.dialects.oracle import ( 

771 VECTOR, 

772 VectorStorageFormat, 

773 VectorStorageType, 

774 ) 

775 

776 t = Table( 

777 "t1", 

778 metadata, 

779 Column("id", Integer, primary_key=True), 

780 Column( 

781 "embedding", 

782 VECTOR( 

783 dim=3, 

784 storage_format=VectorStorageFormat.FLOAT32, 

785 storage_type=VectorStorageType.SPARSE, 

786 ), 

787 ), 

788 Column(...), 

789 ..., 

790 ) 

791 

792Vectors can also be defined with an arbitrary number of dimensions and formats. 

793This allows you to specify vectors of different dimensions with the various 

794storage formats mentioned below. 

795 

796**Examples** 

797 

798* In this case, the storage format is flexible, allowing any vector type data to be 

799 inserted, such as INT8 or BINARY etc:: 

800 

801 vector_col: Mapped[array.array] = mapped_column(VECTOR(dim=3)) 

802 

803* The dimension is flexible in this case, meaning that any dimension vector can 

804 be used:: 

805 

806 vector_col: Mapped[array.array] = mapped_column( 

807 VECTOR(storage_format=VectorStorageType.INT8) 

808 ) 

809 

810* Both the dimensions and the storage format are flexible. It creates a DENSE vector:: 

811 

812 vector_col: Mapped[array.array] = mapped_column(VECTOR) 

813 

814* To create a SPARSE vector with both dimensions and the storage format as flexible, 

815 use the :attr:`.VectorStorageType.SPARSE` storage type:: 

816 

817 vector_col: Mapped[array.array] = mapped_column( 

818 VECTOR(storage_type=VectorStorageType.SPARSE) 

819 ) 

820 

821Python Datatypes for VECTOR 

822~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

823 

824VECTOR data can be inserted using Python list or Python ``array.array()`` objects. 

825Python arrays of type FLOAT (32-bit), DOUBLE (64-bit), INT (8-bit signed integers), 

826or BINARY (8-bit unsigned integers) are used as bind values when inserting 

827VECTOR columns:: 

828 

829 from sqlalchemy import insert, select 

830 

831 with engine.begin() as conn: 

832 conn.execute( 

833 insert(t1), 

834 {"id": 1, "embedding": [1, 2, 3]}, 

835 ) 

836 

837Data can be inserted into a sparse vector using the :class:`_oracle.SparseVector` 

838class, creating an object consisting of the number of dimensions, an array of indices, and a 

839corresponding array of values:: 

840 

841 from sqlalchemy import insert, select 

842 from sqlalchemy.dialects.oracle import SparseVector 

843 

844 sparse_val = SparseVector(10, [1, 2], array.array("d", [23.45, 221.22])) 

845 

846 with engine.begin() as conn: 

847 conn.execute( 

848 insert(t1), 

849 {"id": 1, "embedding": sparse_val}, 

850 ) 

851 

852VECTOR Indexes 

853~~~~~~~~~~~~~~ 

854 

855The VECTOR feature supports an Oracle-specific parameter ``oracle_vector`` 

856on the :class:`.Index` construct, which allows the construction of VECTOR 

857indexes. 

858 

859SPARSE vectors cannot be used in the creation of vector indexes. 

860 

861To utilize VECTOR indexing, set the ``oracle_vector`` parameter to True to use 

862the default values provided by Oracle. HNSW is the default indexing method:: 

863 

864 from sqlalchemy import Index 

865 

866 Index( 

867 "vector_index", 

868 t1.c.embedding, 

869 oracle_vector=True, 

870 ) 

871 

872The full range of parameters for vector indexes are available by using the 

873:class:`.VectorIndexConfig` dataclass in place of a boolean; this dataclass 

874allows full configuration of the index:: 

875 

876 Index( 

877 "hnsw_vector_index", 

878 t1.c.embedding, 

879 oracle_vector=VectorIndexConfig( 

880 index_type=VectorIndexType.HNSW, 

881 distance=VectorDistanceType.COSINE, 

882 accuracy=90, 

883 hnsw_neighbors=5, 

884 hnsw_efconstruction=20, 

885 parallel=10, 

886 ), 

887 ) 

888 

889 Index( 

890 "ivf_vector_index", 

891 t1.c.embedding, 

892 oracle_vector=VectorIndexConfig( 

893 index_type=VectorIndexType.IVF, 

894 distance=VectorDistanceType.DOT, 

895 accuracy=90, 

896 ivf_neighbor_partitions=5, 

897 ), 

898 ) 

899 

900For complete explanation of these parameters, see the Oracle documentation linked 

901below. 

902 

903.. seealso:: 

904 

905 `CREATE VECTOR INDEX <https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B396C369-54BB-4098-A0DD-7C54B3A0D66F>`_ - in the Oracle documentation 

906 

907 

908 

909Similarity Searching 

910~~~~~~~~~~~~~~~~~~~~ 

911 

912When using the :class:`_oracle.VECTOR` datatype with a :class:`.Column` or similar 

913ORM mapped construct, additional comparison functions are available, including: 

914 

915* ``l2_distance`` 

916* ``cosine_distance`` 

917* ``inner_product`` 

918 

919Example Usage:: 

920 

921 result_vector = connection.scalars( 

922 select(t1).order_by(t1.embedding.l2_distance([2, 3, 4])).limit(3) 

923 ) 

924 

925 for user in vector: 

926 print(user.id, user.embedding) 

927 

928FETCH APPROXIMATE support 

929~~~~~~~~~~~~~~~~~~~~~~~~~ 

930 

931Approximate vector search can only be performed when all syntax and semantic 

932rules are satisfied, the corresponding vector index is available, and the 

933query optimizer determines to perform it. If any of these conditions are 

934unmet, then an approximate search is not performed. In this case the query 

935returns exact results. 

936 

937To enable approximate searching during similarity searches on VECTORS, the 

938``oracle_fetch_approximate`` parameter may be used with the :meth:`.Select.fetch` 

939clause to add ``FETCH APPROX`` to the SELECT statement:: 

940 

941 select(users_table).fetch(5, oracle_fetch_approximate=True) 

942 

943""" # noqa 

944 

945from __future__ import annotations 

946 

947from collections import defaultdict 

948from dataclasses import fields 

949from functools import lru_cache 

950from functools import wraps 

951import re 

952 

953from . import dictionary 

954from .types import _OracleBoolean 

955from .types import _OracleDate 

956from .types import BFILE 

957from .types import BINARY_DOUBLE 

958from .types import BINARY_FLOAT 

959from .types import DATE 

960from .types import FLOAT 

961from .types import INTERVAL 

962from .types import LONG 

963from .types import NCLOB 

964from .types import NUMBER 

965from .types import NVARCHAR2 # noqa 

966from .types import OracleRaw # noqa 

967from .types import RAW 

968from .types import ROWID # noqa 

969from .types import TIMESTAMP 

970from .types import VARCHAR2 # noqa 

971from .vector import VECTOR 

972from .vector import VectorIndexConfig 

973from .vector import VectorIndexType 

974from ... import Computed 

975from ... import exc 

976from ... import schema as sa_schema 

977from ... import sql 

978from ... import util 

979from ...engine import default 

980from ...engine import ObjectKind 

981from ...engine import ObjectScope 

982from ...engine import reflection 

983from ...engine.reflection import ReflectionDefaults 

984from ...sql import and_ 

985from ...sql import bindparam 

986from ...sql import compiler 

987from ...sql import expression 

988from ...sql import func 

989from ...sql import null 

990from ...sql import or_ 

991from ...sql import select 

992from ...sql import selectable as sa_selectable 

993from ...sql import sqltypes 

994from ...sql import util as sql_util 

995from ...sql import visitors 

996from ...sql.visitors import InternalTraversal 

997from ...types import BLOB 

998from ...types import CHAR 

999from ...types import CLOB 

1000from ...types import DOUBLE_PRECISION 

1001from ...types import INTEGER 

1002from ...types import NCHAR 

1003from ...types import NVARCHAR 

1004from ...types import REAL 

1005from ...types import VARCHAR 

1006 

1007RESERVED_WORDS = set( 

1008 "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN " 

1009 "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED " 

1010 "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE " 

1011 "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE " 

1012 "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES " 

1013 "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS " 

1014 "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER " 

1015 "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR " 

1016 "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split() 

1017) 

1018 

1019NO_ARG_FNS = set( 

1020 "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split() 

1021) 

1022 

1023 

1024colspecs = { 

1025 sqltypes.Boolean: _OracleBoolean, 

1026 sqltypes.Interval: INTERVAL, 

1027 sqltypes.DateTime: DATE, 

1028 sqltypes.Date: _OracleDate, 

1029} 

1030 

1031ischema_names = { 

1032 "VARCHAR2": VARCHAR, 

1033 "NVARCHAR2": NVARCHAR, 

1034 "CHAR": CHAR, 

1035 "NCHAR": NCHAR, 

1036 "DATE": DATE, 

1037 "NUMBER": NUMBER, 

1038 "BLOB": BLOB, 

1039 "BFILE": BFILE, 

1040 "CLOB": CLOB, 

1041 "NCLOB": NCLOB, 

1042 "TIMESTAMP": TIMESTAMP, 

1043 "TIMESTAMP WITH TIME ZONE": TIMESTAMP, 

1044 "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP, 

1045 "INTERVAL DAY TO SECOND": INTERVAL, 

1046 "RAW": RAW, 

1047 "FLOAT": FLOAT, 

1048 "DOUBLE PRECISION": DOUBLE_PRECISION, 

1049 "REAL": REAL, 

1050 "LONG": LONG, 

1051 "BINARY_DOUBLE": BINARY_DOUBLE, 

1052 "BINARY_FLOAT": BINARY_FLOAT, 

1053 "ROWID": ROWID, 

1054 "VECTOR": VECTOR, 

1055} 

1056 

1057 

1058class OracleTypeCompiler(compiler.GenericTypeCompiler): 

1059 # Note: 

1060 # Oracle DATE == DATETIME 

1061 # Oracle does not allow milliseconds in DATE 

1062 # Oracle does not support TIME columns 

1063 

1064 def visit_datetime(self, type_, **kw): 

1065 return self.visit_DATE(type_, **kw) 

1066 

1067 def visit_float(self, type_, **kw): 

1068 return self.visit_FLOAT(type_, **kw) 

1069 

1070 def visit_double(self, type_, **kw): 

1071 return self.visit_DOUBLE_PRECISION(type_, **kw) 

1072 

1073 def visit_unicode(self, type_, **kw): 

1074 if self.dialect._use_nchar_for_unicode: 

1075 return self.visit_NVARCHAR2(type_, **kw) 

1076 else: 

1077 return self.visit_VARCHAR2(type_, **kw) 

1078 

1079 def visit_INTERVAL(self, type_, **kw): 

1080 return "INTERVAL DAY%s TO SECOND%s" % ( 

1081 type_.day_precision is not None 

1082 and "(%d)" % type_.day_precision 

1083 or "", 

1084 type_.second_precision is not None 

1085 and "(%d)" % type_.second_precision 

1086 or "", 

1087 ) 

1088 

1089 def visit_LONG(self, type_, **kw): 

1090 return "LONG" 

1091 

1092 def visit_TIMESTAMP(self, type_, **kw): 

1093 if getattr(type_, "local_timezone", False): 

1094 return "TIMESTAMP WITH LOCAL TIME ZONE" 

1095 elif type_.timezone: 

1096 return "TIMESTAMP WITH TIME ZONE" 

1097 else: 

1098 return "TIMESTAMP" 

1099 

1100 def visit_DOUBLE_PRECISION(self, type_, **kw): 

1101 return self._generate_numeric(type_, "DOUBLE PRECISION", **kw) 

1102 

1103 def visit_BINARY_DOUBLE(self, type_, **kw): 

1104 return self._generate_numeric(type_, "BINARY_DOUBLE", **kw) 

1105 

1106 def visit_BINARY_FLOAT(self, type_, **kw): 

1107 return self._generate_numeric(type_, "BINARY_FLOAT", **kw) 

1108 

1109 def visit_FLOAT(self, type_, **kw): 

1110 kw["_requires_binary_precision"] = True 

1111 return self._generate_numeric(type_, "FLOAT", **kw) 

1112 

1113 def visit_NUMBER(self, type_, **kw): 

1114 return self._generate_numeric(type_, "NUMBER", **kw) 

1115 

1116 def _generate_numeric( 

1117 self, 

1118 type_, 

1119 name, 

1120 precision=None, 

1121 scale=None, 

1122 _requires_binary_precision=False, 

1123 **kw, 

1124 ): 

1125 if precision is None: 

1126 precision = getattr(type_, "precision", None) 

1127 

1128 if _requires_binary_precision: 

1129 binary_precision = getattr(type_, "binary_precision", None) 

1130 

1131 if precision and binary_precision is None: 

1132 # https://www.oracletutorial.com/oracle-basics/oracle-float/ 

1133 estimated_binary_precision = int(precision / 0.30103) 

1134 raise exc.ArgumentError( 

1135 "Oracle Database FLOAT types use 'binary precision', " 

1136 "which does not convert cleanly from decimal " 

1137 "'precision'. Please specify " 

1138 "this type with a separate Oracle Database variant, such " 

1139 f"as {type_.__class__.__name__}(precision={precision})." 

1140 f"with_variant(oracle.FLOAT" 

1141 f"(binary_precision=" 

1142 f"{estimated_binary_precision}), 'oracle'), so that the " 

1143 "Oracle Database specific 'binary_precision' may be " 

1144 "specified accurately." 

1145 ) 

1146 else: 

1147 precision = binary_precision 

1148 

1149 if scale is None: 

1150 scale = getattr(type_, "scale", None) 

1151 

1152 if precision is None: 

1153 return name 

1154 elif scale is None: 

1155 n = "%(name)s(%(precision)s)" 

1156 return n % {"name": name, "precision": precision} 

1157 else: 

1158 n = "%(name)s(%(precision)s, %(scale)s)" 

1159 return n % {"name": name, "precision": precision, "scale": scale} 

1160 

1161 def visit_string(self, type_, **kw): 

1162 return self.visit_VARCHAR2(type_, **kw) 

1163 

1164 def visit_VARCHAR2(self, type_, **kw): 

1165 return self._visit_varchar(type_, "", "2") 

1166 

1167 def visit_NVARCHAR2(self, type_, **kw): 

1168 return self._visit_varchar(type_, "N", "2") 

1169 

1170 visit_NVARCHAR = visit_NVARCHAR2 

1171 

1172 def visit_VARCHAR(self, type_, **kw): 

1173 return self._visit_varchar(type_, "", "") 

1174 

1175 def _visit_varchar(self, type_, n, num): 

1176 if not type_.length: 

1177 return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n} 

1178 elif not n and self.dialect._supports_char_length: 

1179 varchar = "VARCHAR%(two)s(%(length)s CHAR)" 

1180 return varchar % {"length": type_.length, "two": num} 

1181 else: 

1182 varchar = "%(n)sVARCHAR%(two)s(%(length)s)" 

1183 return varchar % {"length": type_.length, "two": num, "n": n} 

1184 

1185 def visit_text(self, type_, **kw): 

1186 return self.visit_CLOB(type_, **kw) 

1187 

1188 def visit_unicode_text(self, type_, **kw): 

1189 if self.dialect._use_nchar_for_unicode: 

1190 return self.visit_NCLOB(type_, **kw) 

1191 else: 

1192 return self.visit_CLOB(type_, **kw) 

1193 

1194 def visit_large_binary(self, type_, **kw): 

1195 return self.visit_BLOB(type_, **kw) 

1196 

1197 def visit_big_integer(self, type_, **kw): 

1198 return self.visit_NUMBER(type_, precision=19, **kw) 

1199 

1200 def visit_boolean(self, type_, **kw): 

1201 return self.visit_SMALLINT(type_, **kw) 

1202 

1203 def visit_RAW(self, type_, **kw): 

1204 if type_.length: 

1205 return "RAW(%(length)s)" % {"length": type_.length} 

1206 else: 

1207 return "RAW" 

1208 

1209 def visit_ROWID(self, type_, **kw): 

1210 return "ROWID" 

1211 

1212 def visit_VECTOR(self, type_, **kw): 

1213 dim = type_.dim if type_.dim is not None else "*" 

1214 storage_format = ( 

1215 type_.storage_format.value 

1216 if type_.storage_format is not None 

1217 else "*" 

1218 ) 

1219 storage_type = ( 

1220 type_.storage_type.value if type_.storage_type is not None else "*" 

1221 ) 

1222 return f"VECTOR({dim},{storage_format},{storage_type})" 

1223 

1224 

1225class OracleCompiler(compiler.SQLCompiler): 

1226 """Oracle compiler modifies the lexical structure of Select 

1227 statements to work under non-ANSI configured Oracle databases, if 

1228 the use_ansi flag is False. 

1229 """ 

1230 

1231 compound_keywords = util.update_copy( 

1232 compiler.SQLCompiler.compound_keywords, 

1233 {expression.CompoundSelect.EXCEPT: "MINUS"}, 

1234 ) 

1235 

1236 def __init__(self, *args, **kwargs): 

1237 self.__wheres = {} 

1238 super().__init__(*args, **kwargs) 

1239 

1240 def visit_mod_binary(self, binary, operator, **kw): 

1241 return "mod(%s, %s)" % ( 

1242 self.process(binary.left, **kw), 

1243 self.process(binary.right, **kw), 

1244 ) 

1245 

1246 def visit_now_func(self, fn, **kw): 

1247 return "CURRENT_TIMESTAMP" 

1248 

1249 def visit_char_length_func(self, fn, **kw): 

1250 return "LENGTH" + self.function_argspec(fn, **kw) 

1251 

1252 def visit_match_op_binary(self, binary, operator, **kw): 

1253 return "CONTAINS (%s, %s)" % ( 

1254 self.process(binary.left), 

1255 self.process(binary.right), 

1256 ) 

1257 

1258 def visit_true(self, expr, **kw): 

1259 return "1" 

1260 

1261 def visit_false(self, expr, **kw): 

1262 return "0" 

1263 

1264 def get_cte_preamble(self, recursive): 

1265 return "WITH" 

1266 

1267 def get_select_hint_text(self, byfroms): 

1268 return " ".join("/*+ %s */" % text for table, text in byfroms.items()) 

1269 

1270 def function_argspec(self, fn, **kw): 

1271 if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS: 

1272 return compiler.SQLCompiler.function_argspec(self, fn, **kw) 

1273 else: 

1274 return "" 

1275 

1276 def visit_function(self, func, **kw): 

1277 text = super().visit_function(func, **kw) 

1278 if kw.get("asfrom", False) and func.name.lower() != "table": 

1279 text = "TABLE (%s)" % text 

1280 return text 

1281 

1282 def visit_table_valued_column(self, element, **kw): 

1283 text = super().visit_table_valued_column(element, **kw) 

1284 text = text + ".COLUMN_VALUE" 

1285 return text 

1286 

1287 def default_from(self): 

1288 """Called when a ``SELECT`` statement has no froms, 

1289 and no ``FROM`` clause is to be appended. 

1290 

1291 The Oracle compiler tacks a "FROM DUAL" to the statement. 

1292 """ 

1293 

1294 return " FROM DUAL" 

1295 

1296 def visit_join(self, join, from_linter=None, **kwargs): 

1297 if self.dialect.use_ansi: 

1298 return compiler.SQLCompiler.visit_join( 

1299 self, join, from_linter=from_linter, **kwargs 

1300 ) 

1301 else: 

1302 if from_linter: 

1303 from_linter.edges.add((join.left, join.right)) 

1304 

1305 kwargs["asfrom"] = True 

1306 if isinstance(join.right, expression.FromGrouping): 

1307 right = join.right.element 

1308 else: 

1309 right = join.right 

1310 return ( 

1311 self.process(join.left, from_linter=from_linter, **kwargs) 

1312 + ", " 

1313 + self.process(right, from_linter=from_linter, **kwargs) 

1314 ) 

1315 

1316 def _get_nonansi_join_whereclause(self, froms): 

1317 clauses = [] 

1318 

1319 def visit_join(join): 

1320 if join.isouter: 

1321 # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354 

1322 # "apply the outer join operator (+) to all columns of B in 

1323 # the join condition in the WHERE clause" - that is, 

1324 # unconditionally regardless of operator or the other side 

1325 def visit_binary(binary): 

1326 if isinstance( 

1327 binary.left, expression.ColumnClause 

1328 ) and join.right.is_derived_from(binary.left.table): 

1329 binary.left = _OuterJoinColumn(binary.left) 

1330 elif isinstance( 

1331 binary.right, expression.ColumnClause 

1332 ) and join.right.is_derived_from(binary.right.table): 

1333 binary.right = _OuterJoinColumn(binary.right) 

1334 

1335 clauses.append( 

1336 visitors.cloned_traverse( 

1337 join.onclause, {}, {"binary": visit_binary} 

1338 ) 

1339 ) 

1340 else: 

1341 clauses.append(join.onclause) 

1342 

1343 for j in join.left, join.right: 

1344 if isinstance(j, expression.Join): 

1345 visit_join(j) 

1346 elif isinstance(j, expression.FromGrouping): 

1347 visit_join(j.element) 

1348 

1349 for f in froms: 

1350 if isinstance(f, expression.Join): 

1351 visit_join(f) 

1352 

1353 if not clauses: 

1354 return None 

1355 else: 

1356 return sql.and_(*clauses) 

1357 

1358 def visit_outer_join_column(self, vc, **kw): 

1359 return self.process(vc.column, **kw) + "(+)" 

1360 

1361 def visit_sequence(self, seq, **kw): 

1362 return self.preparer.format_sequence(seq) + ".nextval" 

1363 

1364 def get_render_as_alias_suffix(self, alias_name_text): 

1365 """Oracle doesn't like ``FROM table AS alias``""" 

1366 

1367 return " " + alias_name_text 

1368 

1369 def returning_clause( 

1370 self, stmt, returning_cols, *, populate_result_map, **kw 

1371 ): 

1372 columns = [] 

1373 binds = [] 

1374 

1375 for i, column in enumerate( 

1376 expression._select_iterables(returning_cols) 

1377 ): 

1378 if ( 

1379 self.isupdate 

1380 and isinstance(column, sa_schema.Column) 

1381 and isinstance(column.server_default, Computed) 

1382 and not self.dialect._supports_update_returning_computed_cols 

1383 ): 

1384 util.warn( 

1385 "Computed columns don't work with Oracle Database UPDATE " 

1386 "statements that use RETURNING; the value of the column " 

1387 "*before* the UPDATE takes place is returned. It is " 

1388 "advised to not use RETURNING with an Oracle Database " 

1389 "computed column. Consider setting implicit_returning " 

1390 "to False on the Table object in order to avoid implicit " 

1391 "RETURNING clauses from being generated for this Table." 

1392 ) 

1393 if column.type._has_column_expression: 

1394 col_expr = column.type.column_expression(column) 

1395 else: 

1396 col_expr = column 

1397 

1398 outparam = sql.outparam("ret_%d" % i, type_=column.type) 

1399 self.binds[outparam.key] = outparam 

1400 binds.append( 

1401 self.bindparam_string(self._truncate_bindparam(outparam)) 

1402 ) 

1403 

1404 # has_out_parameters would in a normal case be set to True 

1405 # as a result of the compiler visiting an outparam() object. 

1406 # in this case, the above outparam() objects are not being 

1407 # visited. Ensure the statement itself didn't have other 

1408 # outparam() objects independently. 

1409 # technically, this could be supported, but as it would be 

1410 # a very strange use case without a clear rationale, disallow it 

1411 if self.has_out_parameters: 

1412 raise exc.InvalidRequestError( 

1413 "Using explicit outparam() objects with " 

1414 "UpdateBase.returning() in the same Core DML statement " 

1415 "is not supported in the Oracle Database dialects." 

1416 ) 

1417 

1418 self._oracle_returning = True 

1419 

1420 columns.append(self.process(col_expr, within_columns_clause=False)) 

1421 if populate_result_map: 

1422 self._add_to_result_map( 

1423 getattr(col_expr, "name", col_expr._anon_name_label), 

1424 getattr(col_expr, "name", col_expr._anon_name_label), 

1425 ( 

1426 column, 

1427 getattr(column, "name", None), 

1428 getattr(column, "key", None), 

1429 ), 

1430 column.type, 

1431 ) 

1432 

1433 return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds) 

1434 

1435 def _row_limit_clause(self, select, **kw): 

1436 """Oracle Database 12c supports OFFSET/FETCH operators 

1437 Use it instead subquery with row_number 

1438 

1439 """ 

1440 

1441 if ( 

1442 select._fetch_clause is not None 

1443 or not self.dialect._supports_offset_fetch 

1444 ): 

1445 return super()._row_limit_clause( 

1446 select, use_literal_execute_for_simple_int=True, **kw 

1447 ) 

1448 else: 

1449 return self.fetch_clause( 

1450 select, 

1451 fetch_clause=self._get_limit_or_fetch(select), 

1452 use_literal_execute_for_simple_int=True, 

1453 **kw, 

1454 ) 

1455 

1456 def _get_limit_or_fetch(self, select): 

1457 if select._fetch_clause is None: 

1458 return select._limit_clause 

1459 else: 

1460 return select._fetch_clause 

1461 

1462 def fetch_clause( 

1463 self, 

1464 select, 

1465 fetch_clause=None, 

1466 require_offset=False, 

1467 use_literal_execute_for_simple_int=False, 

1468 **kw, 

1469 ): 

1470 text = super().fetch_clause( 

1471 select, 

1472 fetch_clause=fetch_clause, 

1473 require_offset=require_offset, 

1474 use_literal_execute_for_simple_int=( 

1475 use_literal_execute_for_simple_int 

1476 ), 

1477 **kw, 

1478 ) 

1479 

1480 if select.dialect_options["oracle"]["fetch_approximate"]: 

1481 text = re.sub("FETCH FIRST", "FETCH APPROX FIRST", text) 

1482 

1483 return text 

1484 

1485 def translate_select_structure(self, select_stmt, **kwargs): 

1486 select = select_stmt 

1487 

1488 if not getattr(select, "_oracle_visit", None): 

1489 if not self.dialect.use_ansi: 

1490 froms = self._display_froms_for_select( 

1491 select, kwargs.get("asfrom", False) 

1492 ) 

1493 whereclause = self._get_nonansi_join_whereclause(froms) 

1494 if whereclause is not None: 

1495 select = select.where(whereclause) 

1496 select._oracle_visit = True 

1497 

1498 # if fetch is used this is not needed 

1499 if ( 

1500 select._has_row_limiting_clause 

1501 and not self.dialect._supports_offset_fetch 

1502 and select._fetch_clause is None 

1503 ): 

1504 limit_clause = select._limit_clause 

1505 offset_clause = select._offset_clause 

1506 

1507 if select._simple_int_clause(limit_clause): 

1508 limit_clause = limit_clause.render_literal_execute() 

1509 

1510 if select._simple_int_clause(offset_clause): 

1511 offset_clause = offset_clause.render_literal_execute() 

1512 

1513 # currently using form at: 

1514 # https://blogs.oracle.com/oraclemagazine/\ 

1515 # on-rownum-and-limiting-results 

1516 

1517 orig_select = select 

1518 select = select._generate() 

1519 select._oracle_visit = True 

1520 

1521 # add expressions to accommodate FOR UPDATE OF 

1522 for_update = select._for_update_arg 

1523 if for_update is not None and for_update.of: 

1524 for_update = for_update._clone() 

1525 for_update._copy_internals() 

1526 

1527 for elem in for_update.of: 

1528 if not select.selected_columns.contains_column(elem): 

1529 select = select.add_columns(elem) 

1530 

1531 # Wrap the middle select and add the hint 

1532 inner_subquery = select.alias() 

1533 limitselect = sql.select( 

1534 *[ 

1535 c 

1536 for c in inner_subquery.c 

1537 if orig_select.selected_columns.corresponding_column(c) 

1538 is not None 

1539 ] 

1540 ) 

1541 

1542 if ( 

1543 limit_clause is not None 

1544 and self.dialect.optimize_limits 

1545 and select._simple_int_clause(limit_clause) 

1546 ): 

1547 limitselect = limitselect.prefix_with( 

1548 expression.text( 

1549 "/*+ FIRST_ROWS(%s) */" 

1550 % self.process(limit_clause, **kwargs) 

1551 ) 

1552 ) 

1553 

1554 limitselect._oracle_visit = True 

1555 limitselect._is_wrapper = True 

1556 

1557 # add expressions to accommodate FOR UPDATE OF 

1558 if for_update is not None and for_update.of: 

1559 adapter = sql_util.ClauseAdapter(inner_subquery) 

1560 for_update.of = [ 

1561 adapter.traverse(elem) for elem in for_update.of 

1562 ] 

1563 

1564 # If needed, add the limiting clause 

1565 if limit_clause is not None: 

1566 if select._simple_int_clause(limit_clause) and ( 

1567 offset_clause is None 

1568 or select._simple_int_clause(offset_clause) 

1569 ): 

1570 max_row = limit_clause 

1571 

1572 if offset_clause is not None: 

1573 max_row = max_row + offset_clause 

1574 

1575 else: 

1576 max_row = limit_clause 

1577 

1578 if offset_clause is not None: 

1579 max_row = max_row + offset_clause 

1580 limitselect = limitselect.where( 

1581 sql.literal_column("ROWNUM") <= max_row 

1582 ) 

1583 

1584 # If needed, add the ora_rn, and wrap again with offset. 

1585 if offset_clause is None: 

1586 limitselect._for_update_arg = for_update 

1587 select = limitselect 

1588 else: 

1589 limitselect = limitselect.add_columns( 

1590 sql.literal_column("ROWNUM").label("ora_rn") 

1591 ) 

1592 limitselect._oracle_visit = True 

1593 limitselect._is_wrapper = True 

1594 

1595 if for_update is not None and for_update.of: 

1596 limitselect_cols = limitselect.selected_columns 

1597 for elem in for_update.of: 

1598 if ( 

1599 limitselect_cols.corresponding_column(elem) 

1600 is None 

1601 ): 

1602 limitselect = limitselect.add_columns(elem) 

1603 

1604 limit_subquery = limitselect.alias() 

1605 origselect_cols = orig_select.selected_columns 

1606 offsetselect = sql.select( 

1607 *[ 

1608 c 

1609 for c in limit_subquery.c 

1610 if origselect_cols.corresponding_column(c) 

1611 is not None 

1612 ] 

1613 ) 

1614 

1615 offsetselect._oracle_visit = True 

1616 offsetselect._is_wrapper = True 

1617 

1618 if for_update is not None and for_update.of: 

1619 adapter = sql_util.ClauseAdapter(limit_subquery) 

1620 for_update.of = [ 

1621 adapter.traverse(elem) for elem in for_update.of 

1622 ] 

1623 

1624 offsetselect = offsetselect.where( 

1625 sql.literal_column("ora_rn") > offset_clause 

1626 ) 

1627 

1628 offsetselect._for_update_arg = for_update 

1629 select = offsetselect 

1630 

1631 return select 

1632 

1633 def limit_clause(self, select, **kw): 

1634 return "" 

1635 

1636 def visit_empty_set_expr(self, type_, **kw): 

1637 return "SELECT 1 FROM DUAL WHERE 1!=1" 

1638 

1639 def for_update_clause(self, select, **kw): 

1640 if self.is_subquery(): 

1641 return "" 

1642 

1643 tmp = " FOR UPDATE" 

1644 

1645 if select._for_update_arg.of: 

1646 tmp += " OF " + ", ".join( 

1647 self.process(elem, **kw) for elem in select._for_update_arg.of 

1648 ) 

1649 

1650 if select._for_update_arg.nowait: 

1651 tmp += " NOWAIT" 

1652 if select._for_update_arg.skip_locked: 

1653 tmp += " SKIP LOCKED" 

1654 

1655 return tmp 

1656 

1657 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1658 return "DECODE(%s, %s, 0, 1) = 1" % ( 

1659 self.process(binary.left), 

1660 self.process(binary.right), 

1661 ) 

1662 

1663 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1664 return "DECODE(%s, %s, 0, 1) = 0" % ( 

1665 self.process(binary.left), 

1666 self.process(binary.right), 

1667 ) 

1668 

1669 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1670 string = self.process(binary.left, **kw) 

1671 pattern = self.process(binary.right, **kw) 

1672 flags = binary.modifiers["flags"] 

1673 if flags is None: 

1674 return "REGEXP_LIKE(%s, %s)" % (string, pattern) 

1675 else: 

1676 return "REGEXP_LIKE(%s, %s, %s)" % ( 

1677 string, 

1678 pattern, 

1679 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

1680 ) 

1681 

1682 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1683 return "NOT %s" % self.visit_regexp_match_op_binary( 

1684 binary, operator, **kw 

1685 ) 

1686 

1687 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

1688 string = self.process(binary.left, **kw) 

1689 pattern_replace = self.process(binary.right, **kw) 

1690 flags = binary.modifiers["flags"] 

1691 if flags is None: 

1692 return "REGEXP_REPLACE(%s, %s)" % ( 

1693 string, 

1694 pattern_replace, 

1695 ) 

1696 else: 

1697 return "REGEXP_REPLACE(%s, %s, %s)" % ( 

1698 string, 

1699 pattern_replace, 

1700 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

1701 ) 

1702 

1703 def visit_aggregate_strings_func(self, fn, **kw): 

1704 return "LISTAGG%s" % self.function_argspec(fn, **kw) 

1705 

1706 def _visit_bitwise(self, binary, fn_name, custom_right=None, **kw): 

1707 left = self.process(binary.left, **kw) 

1708 right = self.process( 

1709 custom_right if custom_right is not None else binary.right, **kw 

1710 ) 

1711 return f"{fn_name}({left}, {right})" 

1712 

1713 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1714 return self._visit_bitwise(binary, "BITXOR", **kw) 

1715 

1716 def visit_bitwise_or_op_binary(self, binary, operator, **kw): 

1717 return self._visit_bitwise(binary, "BITOR", **kw) 

1718 

1719 def visit_bitwise_and_op_binary(self, binary, operator, **kw): 

1720 return self._visit_bitwise(binary, "BITAND", **kw) 

1721 

1722 def visit_bitwise_rshift_op_binary(self, binary, operator, **kw): 

1723 raise exc.CompileError("Cannot compile bitwise_rshift in oracle") 

1724 

1725 def visit_bitwise_lshift_op_binary(self, binary, operator, **kw): 

1726 raise exc.CompileError("Cannot compile bitwise_lshift in oracle") 

1727 

1728 def visit_bitwise_not_op_unary_operator(self, element, operator, **kw): 

1729 raise exc.CompileError("Cannot compile bitwise_not in oracle") 

1730 

1731 

1732class OracleDDLCompiler(compiler.DDLCompiler): 

1733 

1734 def _build_vector_index_config( 

1735 self, vector_index_config: VectorIndexConfig 

1736 ) -> str: 

1737 parts = [] 

1738 sql_param_name = { 

1739 "hnsw_neighbors": "neighbors", 

1740 "hnsw_efconstruction": "efconstruction", 

1741 "ivf_neighbor_partitions": "neighbor partitions", 

1742 "ivf_sample_per_partition": "sample_per_partition", 

1743 "ivf_min_vectors_per_partition": "min_vectors_per_partition", 

1744 } 

1745 if vector_index_config.index_type == VectorIndexType.HNSW: 

1746 parts.append("ORGANIZATION INMEMORY NEIGHBOR GRAPH") 

1747 elif vector_index_config.index_type == VectorIndexType.IVF: 

1748 parts.append("ORGANIZATION NEIGHBOR PARTITIONS") 

1749 if vector_index_config.distance is not None: 

1750 parts.append(f"DISTANCE {vector_index_config.distance.value}") 

1751 

1752 if vector_index_config.accuracy is not None: 

1753 parts.append( 

1754 f"WITH TARGET ACCURACY {vector_index_config.accuracy}" 

1755 ) 

1756 

1757 parameters_str = [f"type {vector_index_config.index_type.name}"] 

1758 prefix = vector_index_config.index_type.name.lower() + "_" 

1759 

1760 for field in fields(vector_index_config): 

1761 if field.name.startswith(prefix): 

1762 key = sql_param_name.get(field.name) 

1763 value = getattr(vector_index_config, field.name) 

1764 if value is not None: 

1765 parameters_str.append(f"{key} {value}") 

1766 

1767 parameters_str = ", ".join(parameters_str) 

1768 parts.append(f"PARAMETERS ({parameters_str})") 

1769 

1770 if vector_index_config.parallel is not None: 

1771 parts.append(f"PARALLEL {vector_index_config.parallel}") 

1772 

1773 return " ".join(parts) 

1774 

1775 def define_constraint_cascades(self, constraint): 

1776 text = "" 

1777 if constraint.ondelete is not None: 

1778 text += " ON DELETE %s" % constraint.ondelete 

1779 

1780 # oracle has no ON UPDATE CASCADE - 

1781 # its only available via triggers 

1782 # https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html 

1783 if constraint.onupdate is not None: 

1784 util.warn( 

1785 "Oracle Database does not contain native UPDATE CASCADE " 

1786 "functionality - onupdates will not be rendered for foreign " 

1787 "keys. Consider using deferrable=True, initially='deferred' " 

1788 "or triggers." 

1789 ) 

1790 

1791 return text 

1792 

1793 def visit_drop_table_comment(self, drop, **kw): 

1794 return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table( 

1795 drop.element 

1796 ) 

1797 

1798 def visit_create_index(self, create, **kw): 

1799 index = create.element 

1800 self._verify_index_table(index) 

1801 preparer = self.preparer 

1802 text = "CREATE " 

1803 if index.unique: 

1804 text += "UNIQUE " 

1805 if index.dialect_options["oracle"]["bitmap"]: 

1806 text += "BITMAP " 

1807 vector_options = index.dialect_options["oracle"]["vector"] 

1808 if vector_options: 

1809 text += "VECTOR " 

1810 text += "INDEX %s ON %s (%s)" % ( 

1811 self._prepared_index_name(index, include_schema=True), 

1812 preparer.format_table(index.table, use_schema=True), 

1813 ", ".join( 

1814 self.sql_compiler.process( 

1815 expr, include_table=False, literal_binds=True 

1816 ) 

1817 for expr in index.expressions 

1818 ), 

1819 ) 

1820 if index.dialect_options["oracle"]["compress"] is not False: 

1821 if index.dialect_options["oracle"]["compress"] is True: 

1822 text += " COMPRESS" 

1823 else: 

1824 text += " COMPRESS %d" % ( 

1825 index.dialect_options["oracle"]["compress"] 

1826 ) 

1827 if vector_options: 

1828 if vector_options is True: 

1829 vector_options = VectorIndexConfig() 

1830 

1831 text += " " + self._build_vector_index_config(vector_options) 

1832 return text 

1833 

1834 def post_create_table(self, table): 

1835 table_opts = [] 

1836 opts = table.dialect_options["oracle"] 

1837 

1838 if opts["on_commit"]: 

1839 on_commit_options = opts["on_commit"].replace("_", " ").upper() 

1840 table_opts.append("\n ON COMMIT %s" % on_commit_options) 

1841 

1842 if opts["compress"]: 

1843 if opts["compress"] is True: 

1844 table_opts.append("\n COMPRESS") 

1845 else: 

1846 table_opts.append("\n COMPRESS FOR %s" % (opts["compress"])) 

1847 if opts["tablespace"]: 

1848 table_opts.append( 

1849 "\n TABLESPACE %s" % self.preparer.quote(opts["tablespace"]) 

1850 ) 

1851 return "".join(table_opts) 

1852 

1853 def get_identity_options(self, identity_options): 

1854 text = super().get_identity_options(identity_options) 

1855 text = text.replace("NO MINVALUE", "NOMINVALUE") 

1856 text = text.replace("NO MAXVALUE", "NOMAXVALUE") 

1857 text = text.replace("NO CYCLE", "NOCYCLE") 

1858 if identity_options.order is not None: 

1859 text += " ORDER" if identity_options.order else " NOORDER" 

1860 return text.strip() 

1861 

1862 def visit_computed_column(self, generated, **kw): 

1863 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( 

1864 generated.sqltext, include_table=False, literal_binds=True 

1865 ) 

1866 if generated.persisted is True: 

1867 raise exc.CompileError( 

1868 "Oracle Database computed columns do not support 'stored' " 

1869 "persistence; set the 'persisted' flag to None or False for " 

1870 "Oracle Database support." 

1871 ) 

1872 elif generated.persisted is False: 

1873 text += " VIRTUAL" 

1874 return text 

1875 

1876 def visit_identity_column(self, identity, **kw): 

1877 if identity.always is None: 

1878 kind = "" 

1879 else: 

1880 kind = "ALWAYS" if identity.always else "BY DEFAULT" 

1881 text = "GENERATED %s" % kind 

1882 if identity.on_null: 

1883 text += " ON NULL" 

1884 text += " AS IDENTITY" 

1885 options = self.get_identity_options(identity) 

1886 if options: 

1887 text += " (%s)" % options 

1888 return text 

1889 

1890 

1891class OracleIdentifierPreparer(compiler.IdentifierPreparer): 

1892 reserved_words = {x.lower() for x in RESERVED_WORDS} 

1893 illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union( 

1894 ["_", "$"] 

1895 ) 

1896 

1897 def _bindparam_requires_quotes(self, value): 

1898 """Return True if the given identifier requires quoting.""" 

1899 lc_value = value.lower() 

1900 return ( 

1901 lc_value in self.reserved_words 

1902 or value[0] in self.illegal_initial_characters 

1903 or not self.legal_characters.match(str(value)) 

1904 ) 

1905 

1906 def format_savepoint(self, savepoint): 

1907 name = savepoint.ident.lstrip("_") 

1908 return super().format_savepoint(savepoint, name) 

1909 

1910 

1911class OracleExecutionContext(default.DefaultExecutionContext): 

1912 def fire_sequence(self, seq, type_): 

1913 return self._execute_scalar( 

1914 "SELECT " 

1915 + self.identifier_preparer.format_sequence(seq) 

1916 + ".nextval FROM DUAL", 

1917 type_, 

1918 ) 

1919 

1920 def pre_exec(self): 

1921 if self.statement and "_oracle_dblink" in self.execution_options: 

1922 self.statement = self.statement.replace( 

1923 dictionary.DB_LINK_PLACEHOLDER, 

1924 self.execution_options["_oracle_dblink"], 

1925 ) 

1926 

1927 

1928class OracleDialect(default.DefaultDialect): 

1929 name = "oracle" 

1930 supports_statement_cache = True 

1931 supports_alter = True 

1932 max_identifier_length = 128 

1933 

1934 _supports_offset_fetch = True 

1935 

1936 insert_returning = True 

1937 update_returning = True 

1938 delete_returning = True 

1939 

1940 div_is_floordiv = False 

1941 

1942 supports_simple_order_by_label = False 

1943 cte_follows_insert = True 

1944 returns_native_bytes = True 

1945 

1946 supports_sequences = True 

1947 sequences_optional = False 

1948 postfetch_lastrowid = False 

1949 

1950 default_paramstyle = "named" 

1951 colspecs = colspecs 

1952 ischema_names = ischema_names 

1953 requires_name_normalize = True 

1954 

1955 supports_comments = True 

1956 

1957 supports_default_values = False 

1958 supports_default_metavalue = True 

1959 supports_empty_insert = False 

1960 supports_identity_columns = True 

1961 

1962 statement_compiler = OracleCompiler 

1963 ddl_compiler = OracleDDLCompiler 

1964 type_compiler_cls = OracleTypeCompiler 

1965 preparer = OracleIdentifierPreparer 

1966 execution_ctx_cls = OracleExecutionContext 

1967 

1968 reflection_options = ("oracle_resolve_synonyms",) 

1969 

1970 _use_nchar_for_unicode = False 

1971 

1972 construct_arguments = [ 

1973 ( 

1974 sa_schema.Table, 

1975 { 

1976 "resolve_synonyms": False, 

1977 "on_commit": None, 

1978 "compress": False, 

1979 "tablespace": None, 

1980 }, 

1981 ), 

1982 ( 

1983 sa_schema.Index, 

1984 { 

1985 "bitmap": False, 

1986 "compress": False, 

1987 "vector": False, 

1988 }, 

1989 ), 

1990 (sa_selectable.Select, {"fetch_approximate": False}), 

1991 (sa_selectable.CompoundSelect, {"fetch_approximate": False}), 

1992 ] 

1993 

1994 @util.deprecated_params( 

1995 use_binds_for_limits=( 

1996 "1.4", 

1997 "The ``use_binds_for_limits`` Oracle Database dialect parameter " 

1998 "is deprecated. The dialect now renders LIMIT / OFFSET integers " 

1999 "inline in all cases using a post-compilation hook, so that the " 

2000 "value is still represented by a 'bound parameter' on the Core " 

2001 "Expression side.", 

2002 ) 

2003 ) 

2004 def __init__( 

2005 self, 

2006 use_ansi=True, 

2007 optimize_limits=False, 

2008 use_binds_for_limits=None, 

2009 use_nchar_for_unicode=False, 

2010 exclude_tablespaces=("SYSTEM", "SYSAUX"), 

2011 enable_offset_fetch=True, 

2012 **kwargs, 

2013 ): 

2014 default.DefaultDialect.__init__(self, **kwargs) 

2015 self._use_nchar_for_unicode = use_nchar_for_unicode 

2016 self.use_ansi = use_ansi 

2017 self.optimize_limits = optimize_limits 

2018 self.exclude_tablespaces = exclude_tablespaces 

2019 self.enable_offset_fetch = self._supports_offset_fetch = ( 

2020 enable_offset_fetch 

2021 ) 

2022 

2023 def initialize(self, connection): 

2024 super().initialize(connection) 

2025 

2026 # Oracle 8i has RETURNING: 

2027 # https://docs.oracle.com/cd/A87860_01/doc/index.htm 

2028 

2029 # so does Oracle8: 

2030 # https://docs.oracle.com/cd/A64702_01/doc/index.htm 

2031 

2032 if self._is_oracle_8: 

2033 self.colspecs = self.colspecs.copy() 

2034 self.colspecs.pop(sqltypes.Interval) 

2035 self.use_ansi = False 

2036 

2037 self.supports_identity_columns = self.server_version_info >= (12,) 

2038 self._supports_offset_fetch = ( 

2039 self.enable_offset_fetch and self.server_version_info >= (12,) 

2040 ) 

2041 

2042 def _get_effective_compat_server_version_info(self, connection): 

2043 # dialect does not need compat levels below 12.2, so don't query 

2044 # in those cases 

2045 

2046 if self.server_version_info < (12, 2): 

2047 return self.server_version_info 

2048 try: 

2049 compat = connection.exec_driver_sql( 

2050 "SELECT value FROM v$parameter WHERE name = 'compatible'" 

2051 ).scalar() 

2052 except exc.DBAPIError: 

2053 compat = None 

2054 

2055 if compat: 

2056 try: 

2057 return tuple(int(x) for x in compat.split(".")) 

2058 except: 

2059 return self.server_version_info 

2060 else: 

2061 return self.server_version_info 

2062 

2063 @property 

2064 def _is_oracle_8(self): 

2065 return self.server_version_info and self.server_version_info < (9,) 

2066 

2067 @property 

2068 def _supports_table_compression(self): 

2069 return self.server_version_info and self.server_version_info >= (10, 1) 

2070 

2071 @property 

2072 def _supports_table_compress_for(self): 

2073 return self.server_version_info and self.server_version_info >= (11,) 

2074 

2075 @property 

2076 def _supports_char_length(self): 

2077 return not self._is_oracle_8 

2078 

2079 @property 

2080 def _supports_update_returning_computed_cols(self): 

2081 # on version 18 this error is no longet present while it happens on 11 

2082 # it may work also on versions before the 18 

2083 return self.server_version_info and self.server_version_info >= (18,) 

2084 

2085 @property 

2086 def _supports_except_all(self): 

2087 return self.server_version_info and self.server_version_info >= (21,) 

2088 

2089 def do_release_savepoint(self, connection, name): 

2090 # Oracle does not support RELEASE SAVEPOINT 

2091 pass 

2092 

2093 def _check_max_identifier_length(self, connection): 

2094 if self._get_effective_compat_server_version_info(connection) < ( 

2095 12, 

2096 2, 

2097 ): 

2098 return 30 

2099 else: 

2100 # use the default 

2101 return None 

2102 

2103 def get_isolation_level_values(self, dbapi_connection): 

2104 return ["READ COMMITTED", "SERIALIZABLE"] 

2105 

2106 def get_default_isolation_level(self, dbapi_conn): 

2107 try: 

2108 return self.get_isolation_level(dbapi_conn) 

2109 except NotImplementedError: 

2110 raise 

2111 except: 

2112 return "READ COMMITTED" 

2113 

2114 def _execute_reflection( 

2115 self, connection, query, dblink, returns_long, params=None 

2116 ): 

2117 if dblink and not dblink.startswith("@"): 

2118 dblink = f"@{dblink}" 

2119 execution_options = { 

2120 # handle db links 

2121 "_oracle_dblink": dblink or "", 

2122 # override any schema translate map 

2123 "schema_translate_map": None, 

2124 } 

2125 

2126 if dblink and returns_long: 

2127 # Oracle seems to error with 

2128 # "ORA-00997: illegal use of LONG datatype" when returning 

2129 # LONG columns via a dblink in a query with bind params 

2130 # This type seems to be very hard to cast into something else 

2131 # so it seems easier to just use bind param in this case 

2132 def visit_bindparam(bindparam): 

2133 bindparam.literal_execute = True 

2134 

2135 query = visitors.cloned_traverse( 

2136 query, {}, {"bindparam": visit_bindparam} 

2137 ) 

2138 return connection.execute( 

2139 query, params, execution_options=execution_options 

2140 ) 

2141 

2142 @util.memoized_property 

2143 def _has_table_query(self): 

2144 # materialized views are returned by all_tables 

2145 tables = ( 

2146 select( 

2147 dictionary.all_tables.c.table_name, 

2148 dictionary.all_tables.c.owner, 

2149 ) 

2150 .union_all( 

2151 select( 

2152 dictionary.all_views.c.view_name.label("table_name"), 

2153 dictionary.all_views.c.owner, 

2154 ) 

2155 ) 

2156 .subquery("tables_and_views") 

2157 ) 

2158 

2159 query = select(tables.c.table_name).where( 

2160 tables.c.table_name == bindparam("table_name"), 

2161 tables.c.owner == bindparam("owner"), 

2162 ) 

2163 return query 

2164 

2165 @reflection.cache 

2166 def has_table( 

2167 self, connection, table_name, schema=None, dblink=None, **kw 

2168 ): 

2169 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2170 self._ensure_has_table_connection(connection) 

2171 

2172 if not schema: 

2173 schema = self.default_schema_name 

2174 

2175 params = { 

2176 "table_name": self.denormalize_name(table_name), 

2177 "owner": self.denormalize_schema_name(schema), 

2178 } 

2179 cursor = self._execute_reflection( 

2180 connection, 

2181 self._has_table_query, 

2182 dblink, 

2183 returns_long=False, 

2184 params=params, 

2185 ) 

2186 return bool(cursor.scalar()) 

2187 

2188 @reflection.cache 

2189 def has_sequence( 

2190 self, connection, sequence_name, schema=None, dblink=None, **kw 

2191 ): 

2192 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2193 if not schema: 

2194 schema = self.default_schema_name 

2195 

2196 query = select(dictionary.all_sequences.c.sequence_name).where( 

2197 dictionary.all_sequences.c.sequence_name 

2198 == self.denormalize_schema_name(sequence_name), 

2199 dictionary.all_sequences.c.sequence_owner 

2200 == self.denormalize_schema_name(schema), 

2201 ) 

2202 

2203 cursor = self._execute_reflection( 

2204 connection, query, dblink, returns_long=False 

2205 ) 

2206 return bool(cursor.scalar()) 

2207 

2208 def _get_default_schema_name(self, connection): 

2209 return self.normalize_name( 

2210 connection.exec_driver_sql( 

2211 "select sys_context( 'userenv', 'current_schema' ) from dual" 

2212 ).scalar() 

2213 ) 

2214 

2215 def denormalize_schema_name(self, name): 

2216 # look for quoted_name 

2217 force = getattr(name, "quote", None) 

2218 if force is None and name == "public": 

2219 # look for case insensitive, no quoting specified, "public" 

2220 return "PUBLIC" 

2221 return super().denormalize_name(name) 

2222 

2223 @reflection.flexi_cache( 

2224 ("schema", InternalTraversal.dp_string), 

2225 ("filter_names", InternalTraversal.dp_string_list), 

2226 ("dblink", InternalTraversal.dp_string), 

2227 ) 

2228 def _get_synonyms(self, connection, schema, filter_names, dblink, **kw): 

2229 owner = self.denormalize_schema_name( 

2230 schema or self.default_schema_name 

2231 ) 

2232 

2233 has_filter_names, params = self._prepare_filter_names(filter_names) 

2234 query = select( 

2235 dictionary.all_synonyms.c.synonym_name, 

2236 dictionary.all_synonyms.c.table_name, 

2237 dictionary.all_synonyms.c.table_owner, 

2238 dictionary.all_synonyms.c.db_link, 

2239 ).where(dictionary.all_synonyms.c.owner == owner) 

2240 if has_filter_names: 

2241 query = query.where( 

2242 dictionary.all_synonyms.c.synonym_name.in_( 

2243 params["filter_names"] 

2244 ) 

2245 ) 

2246 result = self._execute_reflection( 

2247 connection, query, dblink, returns_long=False 

2248 ).mappings() 

2249 return result.all() 

2250 

2251 @lru_cache() 

2252 def _all_objects_query( 

2253 self, owner, scope, kind, has_filter_names, has_mat_views 

2254 ): 

2255 query = ( 

2256 select(dictionary.all_objects.c.object_name) 

2257 .select_from(dictionary.all_objects) 

2258 .where(dictionary.all_objects.c.owner == owner) 

2259 ) 

2260 

2261 # NOTE: materialized views are listed in all_objects twice; 

2262 # once as MATERIALIZE VIEW and once as TABLE 

2263 if kind is ObjectKind.ANY: 

2264 # materilaized view are listed also as tables so there is no 

2265 # need to add them to the in_. 

2266 query = query.where( 

2267 dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW")) 

2268 ) 

2269 else: 

2270 object_type = [] 

2271 if ObjectKind.VIEW in kind: 

2272 object_type.append("VIEW") 

2273 if ( 

2274 ObjectKind.MATERIALIZED_VIEW in kind 

2275 and ObjectKind.TABLE not in kind 

2276 ): 

2277 # materilaized view are listed also as tables so there is no 

2278 # need to add them to the in_ if also selecting tables. 

2279 object_type.append("MATERIALIZED VIEW") 

2280 if ObjectKind.TABLE in kind: 

2281 object_type.append("TABLE") 

2282 if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind: 

2283 # materialized view are listed also as tables, 

2284 # so they need to be filtered out 

2285 # EXCEPT ALL / MINUS profiles as faster than using 

2286 # NOT EXISTS or NOT IN with a subquery, but it's in 

2287 # general faster to get the mat view names and exclude 

2288 # them only when needed 

2289 query = query.where( 

2290 dictionary.all_objects.c.object_name.not_in( 

2291 bindparam("mat_views") 

2292 ) 

2293 ) 

2294 query = query.where( 

2295 dictionary.all_objects.c.object_type.in_(object_type) 

2296 ) 

2297 

2298 # handles scope 

2299 if scope is ObjectScope.DEFAULT: 

2300 query = query.where(dictionary.all_objects.c.temporary == "N") 

2301 elif scope is ObjectScope.TEMPORARY: 

2302 query = query.where(dictionary.all_objects.c.temporary == "Y") 

2303 

2304 if has_filter_names: 

2305 query = query.where( 

2306 dictionary.all_objects.c.object_name.in_( 

2307 bindparam("filter_names") 

2308 ) 

2309 ) 

2310 return query 

2311 

2312 @reflection.flexi_cache( 

2313 ("schema", InternalTraversal.dp_string), 

2314 ("scope", InternalTraversal.dp_plain_obj), 

2315 ("kind", InternalTraversal.dp_plain_obj), 

2316 ("filter_names", InternalTraversal.dp_string_list), 

2317 ("dblink", InternalTraversal.dp_string), 

2318 ) 

2319 def _get_all_objects( 

2320 self, connection, schema, scope, kind, filter_names, dblink, **kw 

2321 ): 

2322 owner = self.denormalize_schema_name( 

2323 schema or self.default_schema_name 

2324 ) 

2325 

2326 has_filter_names, params = self._prepare_filter_names(filter_names) 

2327 has_mat_views = False 

2328 if ( 

2329 ObjectKind.TABLE in kind 

2330 and ObjectKind.MATERIALIZED_VIEW not in kind 

2331 ): 

2332 # see note in _all_objects_query 

2333 mat_views = self.get_materialized_view_names( 

2334 connection, schema, dblink, _normalize=False, **kw 

2335 ) 

2336 if mat_views: 

2337 params["mat_views"] = mat_views 

2338 has_mat_views = True 

2339 

2340 query = self._all_objects_query( 

2341 owner, scope, kind, has_filter_names, has_mat_views 

2342 ) 

2343 

2344 result = self._execute_reflection( 

2345 connection, query, dblink, returns_long=False, params=params 

2346 ).scalars() 

2347 

2348 return result.all() 

2349 

2350 def _handle_synonyms_decorator(fn): 

2351 @wraps(fn) 

2352 def wrapper(self, *args, **kwargs): 

2353 return self._handle_synonyms(fn, *args, **kwargs) 

2354 

2355 return wrapper 

2356 

2357 def _handle_synonyms(self, fn, connection, *args, **kwargs): 

2358 if not kwargs.get("oracle_resolve_synonyms", False): 

2359 return fn(self, connection, *args, **kwargs) 

2360 

2361 original_kw = kwargs.copy() 

2362 schema = kwargs.pop("schema", None) 

2363 result = self._get_synonyms( 

2364 connection, 

2365 schema=schema, 

2366 filter_names=kwargs.pop("filter_names", None), 

2367 dblink=kwargs.pop("dblink", None), 

2368 info_cache=kwargs.get("info_cache", None), 

2369 ) 

2370 

2371 dblinks_owners = defaultdict(dict) 

2372 for row in result: 

2373 key = row["db_link"], row["table_owner"] 

2374 tn = self.normalize_name(row["table_name"]) 

2375 dblinks_owners[key][tn] = row["synonym_name"] 

2376 

2377 if not dblinks_owners: 

2378 # No synonym, do the plain thing 

2379 return fn(self, connection, *args, **original_kw) 

2380 

2381 data = {} 

2382 for (dblink, table_owner), mapping in dblinks_owners.items(): 

2383 call_kw = { 

2384 **original_kw, 

2385 "schema": table_owner, 

2386 "dblink": self.normalize_name(dblink), 

2387 "filter_names": mapping.keys(), 

2388 } 

2389 call_result = fn(self, connection, *args, **call_kw) 

2390 for (_, tn), value in call_result: 

2391 synonym_name = self.normalize_name(mapping[tn]) 

2392 data[(schema, synonym_name)] = value 

2393 return data.items() 

2394 

2395 @reflection.cache 

2396 def get_schema_names(self, connection, dblink=None, **kw): 

2397 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2398 query = select(dictionary.all_users.c.username).order_by( 

2399 dictionary.all_users.c.username 

2400 ) 

2401 result = self._execute_reflection( 

2402 connection, query, dblink, returns_long=False 

2403 ).scalars() 

2404 return [self.normalize_name(row) for row in result] 

2405 

2406 @reflection.cache 

2407 def get_table_names(self, connection, schema=None, dblink=None, **kw): 

2408 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2409 # note that table_names() isn't loading DBLINKed or synonym'ed tables 

2410 if schema is None: 

2411 schema = self.default_schema_name 

2412 

2413 den_schema = self.denormalize_schema_name(schema) 

2414 if kw.get("oracle_resolve_synonyms", False): 

2415 tables = ( 

2416 select( 

2417 dictionary.all_tables.c.table_name, 

2418 dictionary.all_tables.c.owner, 

2419 dictionary.all_tables.c.iot_name, 

2420 dictionary.all_tables.c.duration, 

2421 dictionary.all_tables.c.tablespace_name, 

2422 ) 

2423 .union_all( 

2424 select( 

2425 dictionary.all_synonyms.c.synonym_name.label( 

2426 "table_name" 

2427 ), 

2428 dictionary.all_synonyms.c.owner, 

2429 dictionary.all_tables.c.iot_name, 

2430 dictionary.all_tables.c.duration, 

2431 dictionary.all_tables.c.tablespace_name, 

2432 ) 

2433 .select_from(dictionary.all_tables) 

2434 .join( 

2435 dictionary.all_synonyms, 

2436 and_( 

2437 dictionary.all_tables.c.table_name 

2438 == dictionary.all_synonyms.c.table_name, 

2439 dictionary.all_tables.c.owner 

2440 == func.coalesce( 

2441 dictionary.all_synonyms.c.table_owner, 

2442 dictionary.all_synonyms.c.owner, 

2443 ), 

2444 ), 

2445 ) 

2446 ) 

2447 .subquery("available_tables") 

2448 ) 

2449 else: 

2450 tables = dictionary.all_tables 

2451 

2452 query = select(tables.c.table_name) 

2453 if self.exclude_tablespaces: 

2454 query = query.where( 

2455 func.coalesce( 

2456 tables.c.tablespace_name, "no tablespace" 

2457 ).not_in(self.exclude_tablespaces) 

2458 ) 

2459 query = query.where( 

2460 tables.c.owner == den_schema, 

2461 tables.c.iot_name.is_(null()), 

2462 tables.c.duration.is_(null()), 

2463 ) 

2464 

2465 # remove materialized views 

2466 mat_query = select( 

2467 dictionary.all_mviews.c.mview_name.label("table_name") 

2468 ).where(dictionary.all_mviews.c.owner == den_schema) 

2469 

2470 query = ( 

2471 query.except_all(mat_query) 

2472 if self._supports_except_all 

2473 else query.except_(mat_query) 

2474 ) 

2475 

2476 result = self._execute_reflection( 

2477 connection, query, dblink, returns_long=False 

2478 ).scalars() 

2479 return [self.normalize_name(row) for row in result] 

2480 

2481 @reflection.cache 

2482 def get_temp_table_names(self, connection, dblink=None, **kw): 

2483 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2484 schema = self.denormalize_schema_name(self.default_schema_name) 

2485 

2486 query = select(dictionary.all_tables.c.table_name) 

2487 if self.exclude_tablespaces: 

2488 query = query.where( 

2489 func.coalesce( 

2490 dictionary.all_tables.c.tablespace_name, "no tablespace" 

2491 ).not_in(self.exclude_tablespaces) 

2492 ) 

2493 query = query.where( 

2494 dictionary.all_tables.c.owner == schema, 

2495 dictionary.all_tables.c.iot_name.is_(null()), 

2496 dictionary.all_tables.c.duration.is_not(null()), 

2497 ) 

2498 

2499 result = self._execute_reflection( 

2500 connection, query, dblink, returns_long=False 

2501 ).scalars() 

2502 return [self.normalize_name(row) for row in result] 

2503 

2504 @reflection.cache 

2505 def get_materialized_view_names( 

2506 self, connection, schema=None, dblink=None, _normalize=True, **kw 

2507 ): 

2508 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2509 if not schema: 

2510 schema = self.default_schema_name 

2511 

2512 query = select(dictionary.all_mviews.c.mview_name).where( 

2513 dictionary.all_mviews.c.owner 

2514 == self.denormalize_schema_name(schema) 

2515 ) 

2516 result = self._execute_reflection( 

2517 connection, query, dblink, returns_long=False 

2518 ).scalars() 

2519 if _normalize: 

2520 return [self.normalize_name(row) for row in result] 

2521 else: 

2522 return result.all() 

2523 

2524 @reflection.cache 

2525 def get_view_names(self, connection, schema=None, dblink=None, **kw): 

2526 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2527 if not schema: 

2528 schema = self.default_schema_name 

2529 

2530 query = select(dictionary.all_views.c.view_name).where( 

2531 dictionary.all_views.c.owner 

2532 == self.denormalize_schema_name(schema) 

2533 ) 

2534 result = self._execute_reflection( 

2535 connection, query, dblink, returns_long=False 

2536 ).scalars() 

2537 return [self.normalize_name(row) for row in result] 

2538 

2539 @reflection.cache 

2540 def get_sequence_names(self, connection, schema=None, dblink=None, **kw): 

2541 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2542 if not schema: 

2543 schema = self.default_schema_name 

2544 query = select(dictionary.all_sequences.c.sequence_name).where( 

2545 dictionary.all_sequences.c.sequence_owner 

2546 == self.denormalize_schema_name(schema) 

2547 ) 

2548 

2549 result = self._execute_reflection( 

2550 connection, query, dblink, returns_long=False 

2551 ).scalars() 

2552 return [self.normalize_name(row) for row in result] 

2553 

2554 def _value_or_raise(self, data, table, schema): 

2555 table = self.normalize_name(str(table)) 

2556 try: 

2557 return dict(data)[(schema, table)] 

2558 except KeyError: 

2559 raise exc.NoSuchTableError( 

2560 f"{schema}.{table}" if schema else table 

2561 ) from None 

2562 

2563 def _prepare_filter_names(self, filter_names): 

2564 if filter_names: 

2565 fn = [self.denormalize_name(name) for name in filter_names] 

2566 return True, {"filter_names": fn} 

2567 else: 

2568 return False, {} 

2569 

2570 @reflection.cache 

2571 def get_table_options(self, connection, table_name, schema=None, **kw): 

2572 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2573 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2574 """ 

2575 data = self.get_multi_table_options( 

2576 connection, 

2577 schema=schema, 

2578 filter_names=[table_name], 

2579 scope=ObjectScope.ANY, 

2580 kind=ObjectKind.ANY, 

2581 **kw, 

2582 ) 

2583 return self._value_or_raise(data, table_name, schema) 

2584 

2585 @lru_cache() 

2586 def _table_options_query( 

2587 self, owner, scope, kind, has_filter_names, has_mat_views 

2588 ): 

2589 query = select( 

2590 dictionary.all_tables.c.table_name, 

2591 ( 

2592 dictionary.all_tables.c.compression 

2593 if self._supports_table_compression 

2594 else sql.null().label("compression") 

2595 ), 

2596 ( 

2597 dictionary.all_tables.c.compress_for 

2598 if self._supports_table_compress_for 

2599 else sql.null().label("compress_for") 

2600 ), 

2601 dictionary.all_tables.c.tablespace_name, 

2602 ).where(dictionary.all_tables.c.owner == owner) 

2603 if has_filter_names: 

2604 query = query.where( 

2605 dictionary.all_tables.c.table_name.in_( 

2606 bindparam("filter_names") 

2607 ) 

2608 ) 

2609 if scope is ObjectScope.DEFAULT: 

2610 query = query.where(dictionary.all_tables.c.duration.is_(null())) 

2611 elif scope is ObjectScope.TEMPORARY: 

2612 query = query.where( 

2613 dictionary.all_tables.c.duration.is_not(null()) 

2614 ) 

2615 

2616 if ( 

2617 has_mat_views 

2618 and ObjectKind.TABLE in kind 

2619 and ObjectKind.MATERIALIZED_VIEW not in kind 

2620 ): 

2621 # cant use EXCEPT ALL / MINUS here because we don't have an 

2622 # excludable row vs. the query above 

2623 # outerjoin + where null works better on oracle 21 but 11 does 

2624 # not like it at all. this is the next best thing 

2625 

2626 query = query.where( 

2627 dictionary.all_tables.c.table_name.not_in( 

2628 bindparam("mat_views") 

2629 ) 

2630 ) 

2631 elif ( 

2632 ObjectKind.TABLE not in kind 

2633 and ObjectKind.MATERIALIZED_VIEW in kind 

2634 ): 

2635 query = query.where( 

2636 dictionary.all_tables.c.table_name.in_(bindparam("mat_views")) 

2637 ) 

2638 return query 

2639 

2640 @_handle_synonyms_decorator 

2641 def get_multi_table_options( 

2642 self, 

2643 connection, 

2644 *, 

2645 schema, 

2646 filter_names, 

2647 scope, 

2648 kind, 

2649 dblink=None, 

2650 **kw, 

2651 ): 

2652 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2653 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2654 """ 

2655 owner = self.denormalize_schema_name( 

2656 schema or self.default_schema_name 

2657 ) 

2658 

2659 has_filter_names, params = self._prepare_filter_names(filter_names) 

2660 has_mat_views = False 

2661 

2662 if ( 

2663 ObjectKind.TABLE in kind 

2664 and ObjectKind.MATERIALIZED_VIEW not in kind 

2665 ): 

2666 # see note in _table_options_query 

2667 mat_views = self.get_materialized_view_names( 

2668 connection, schema, dblink, _normalize=False, **kw 

2669 ) 

2670 if mat_views: 

2671 params["mat_views"] = mat_views 

2672 has_mat_views = True 

2673 elif ( 

2674 ObjectKind.TABLE not in kind 

2675 and ObjectKind.MATERIALIZED_VIEW in kind 

2676 ): 

2677 mat_views = self.get_materialized_view_names( 

2678 connection, schema, dblink, _normalize=False, **kw 

2679 ) 

2680 params["mat_views"] = mat_views 

2681 

2682 options = {} 

2683 default = ReflectionDefaults.table_options 

2684 

2685 if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind: 

2686 query = self._table_options_query( 

2687 owner, scope, kind, has_filter_names, has_mat_views 

2688 ) 

2689 result = self._execute_reflection( 

2690 connection, query, dblink, returns_long=False, params=params 

2691 ) 

2692 

2693 for table, compression, compress_for, tablespace in result: 

2694 data = default() 

2695 if compression == "ENABLED": 

2696 data["oracle_compress"] = compress_for 

2697 if tablespace: 

2698 data["oracle_tablespace"] = tablespace 

2699 options[(schema, self.normalize_name(table))] = data 

2700 if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope: 

2701 # add the views (no temporary views) 

2702 for view in self.get_view_names(connection, schema, dblink, **kw): 

2703 if not filter_names or view in filter_names: 

2704 options[(schema, view)] = default() 

2705 

2706 return options.items() 

2707 

2708 @reflection.cache 

2709 def get_columns(self, connection, table_name, schema=None, **kw): 

2710 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2711 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2712 """ 

2713 

2714 data = self.get_multi_columns( 

2715 connection, 

2716 schema=schema, 

2717 filter_names=[table_name], 

2718 scope=ObjectScope.ANY, 

2719 kind=ObjectKind.ANY, 

2720 **kw, 

2721 ) 

2722 return self._value_or_raise(data, table_name, schema) 

2723 

2724 def _run_batches( 

2725 self, connection, query, dblink, returns_long, mappings, all_objects 

2726 ): 

2727 each_batch = 500 

2728 batches = list(all_objects) 

2729 while batches: 

2730 batch = batches[0:each_batch] 

2731 batches[0:each_batch] = [] 

2732 

2733 result = self._execute_reflection( 

2734 connection, 

2735 query, 

2736 dblink, 

2737 returns_long=returns_long, 

2738 params={"all_objects": batch}, 

2739 ) 

2740 if mappings: 

2741 yield from result.mappings() 

2742 else: 

2743 yield from result 

2744 

2745 @lru_cache() 

2746 def _column_query(self, owner): 

2747 all_cols = dictionary.all_tab_cols 

2748 all_comments = dictionary.all_col_comments 

2749 all_ids = dictionary.all_tab_identity_cols 

2750 

2751 if self.server_version_info >= (12,): 

2752 add_cols = ( 

2753 all_cols.c.default_on_null, 

2754 sql.case( 

2755 (all_ids.c.table_name.is_(None), sql.null()), 

2756 else_=all_ids.c.generation_type 

2757 + "," 

2758 + all_ids.c.identity_options, 

2759 ).label("identity_options"), 

2760 ) 

2761 join_identity_cols = True 

2762 else: 

2763 add_cols = ( 

2764 sql.null().label("default_on_null"), 

2765 sql.null().label("identity_options"), 

2766 ) 

2767 join_identity_cols = False 

2768 

2769 # NOTE: on oracle cannot create tables/views without columns and 

2770 # a table cannot have all column hidden: 

2771 # ORA-54039: table must have at least one column that is not invisible 

2772 # all_tab_cols returns data for tables/views/mat-views. 

2773 # all_tab_cols does not return recycled tables 

2774 

2775 query = ( 

2776 select( 

2777 all_cols.c.table_name, 

2778 all_cols.c.column_name, 

2779 all_cols.c.data_type, 

2780 all_cols.c.char_length, 

2781 all_cols.c.data_precision, 

2782 all_cols.c.data_scale, 

2783 all_cols.c.nullable, 

2784 all_cols.c.data_default, 

2785 all_comments.c.comments, 

2786 all_cols.c.virtual_column, 

2787 *add_cols, 

2788 ).select_from(all_cols) 

2789 # NOTE: all_col_comments has a row for each column even if no 

2790 # comment is present, so a join could be performed, but there 

2791 # seems to be no difference compared to an outer join 

2792 .outerjoin( 

2793 all_comments, 

2794 and_( 

2795 all_cols.c.table_name == all_comments.c.table_name, 

2796 all_cols.c.column_name == all_comments.c.column_name, 

2797 all_cols.c.owner == all_comments.c.owner, 

2798 ), 

2799 ) 

2800 ) 

2801 if join_identity_cols: 

2802 query = query.outerjoin( 

2803 all_ids, 

2804 and_( 

2805 all_cols.c.table_name == all_ids.c.table_name, 

2806 all_cols.c.column_name == all_ids.c.column_name, 

2807 all_cols.c.owner == all_ids.c.owner, 

2808 ), 

2809 ) 

2810 

2811 query = query.where( 

2812 all_cols.c.table_name.in_(bindparam("all_objects")), 

2813 all_cols.c.hidden_column == "NO", 

2814 all_cols.c.owner == owner, 

2815 ).order_by(all_cols.c.table_name, all_cols.c.column_id) 

2816 return query 

2817 

2818 @_handle_synonyms_decorator 

2819 def get_multi_columns( 

2820 self, 

2821 connection, 

2822 *, 

2823 schema, 

2824 filter_names, 

2825 scope, 

2826 kind, 

2827 dblink=None, 

2828 **kw, 

2829 ): 

2830 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2831 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2832 """ 

2833 owner = self.denormalize_schema_name( 

2834 schema or self.default_schema_name 

2835 ) 

2836 query = self._column_query(owner) 

2837 

2838 if ( 

2839 filter_names 

2840 and kind is ObjectKind.ANY 

2841 and scope is ObjectScope.ANY 

2842 ): 

2843 all_objects = [self.denormalize_name(n) for n in filter_names] 

2844 else: 

2845 all_objects = self._get_all_objects( 

2846 connection, schema, scope, kind, filter_names, dblink, **kw 

2847 ) 

2848 

2849 columns = defaultdict(list) 

2850 

2851 # all_tab_cols.data_default is LONG 

2852 result = self._run_batches( 

2853 connection, 

2854 query, 

2855 dblink, 

2856 returns_long=True, 

2857 mappings=True, 

2858 all_objects=all_objects, 

2859 ) 

2860 

2861 def maybe_int(value): 

2862 if isinstance(value, float) and value.is_integer(): 

2863 return int(value) 

2864 else: 

2865 return value 

2866 

2867 remove_size = re.compile(r"\(\d+\)") 

2868 

2869 for row_dict in result: 

2870 table_name = self.normalize_name(row_dict["table_name"]) 

2871 orig_colname = row_dict["column_name"] 

2872 colname = self.normalize_name(orig_colname) 

2873 coltype = row_dict["data_type"] 

2874 precision = maybe_int(row_dict["data_precision"]) 

2875 

2876 if coltype == "NUMBER": 

2877 scale = maybe_int(row_dict["data_scale"]) 

2878 if precision is None and scale == 0: 

2879 coltype = INTEGER() 

2880 else: 

2881 coltype = NUMBER(precision, scale) 

2882 elif coltype == "FLOAT": 

2883 # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm 

2884 if precision == 126: 

2885 # The DOUBLE PRECISION datatype is a floating-point 

2886 # number with binary precision 126. 

2887 coltype = DOUBLE_PRECISION() 

2888 elif precision == 63: 

2889 # The REAL datatype is a floating-point number with a 

2890 # binary precision of 63, or 18 decimal. 

2891 coltype = REAL() 

2892 else: 

2893 # non standard precision 

2894 coltype = FLOAT(binary_precision=precision) 

2895 

2896 elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): 

2897 char_length = maybe_int(row_dict["char_length"]) 

2898 coltype = self.ischema_names.get(coltype)(char_length) 

2899 elif "WITH TIME ZONE" in coltype: 

2900 coltype = TIMESTAMP(timezone=True) 

2901 elif "WITH LOCAL TIME ZONE" in coltype: 

2902 coltype = TIMESTAMP(local_timezone=True) 

2903 else: 

2904 coltype = re.sub(remove_size, "", coltype) 

2905 try: 

2906 coltype = self.ischema_names[coltype] 

2907 except KeyError: 

2908 util.warn( 

2909 "Did not recognize type '%s' of column '%s'" 

2910 % (coltype, colname) 

2911 ) 

2912 coltype = sqltypes.NULLTYPE 

2913 

2914 default = row_dict["data_default"] 

2915 if row_dict["virtual_column"] == "YES": 

2916 computed = dict(sqltext=default) 

2917 default = None 

2918 else: 

2919 computed = None 

2920 

2921 identity_options = row_dict["identity_options"] 

2922 if identity_options is not None: 

2923 identity = self._parse_identity_options( 

2924 identity_options, row_dict["default_on_null"] 

2925 ) 

2926 default = None 

2927 else: 

2928 identity = None 

2929 

2930 cdict = { 

2931 "name": colname, 

2932 "type": coltype, 

2933 "nullable": row_dict["nullable"] == "Y", 

2934 "default": default, 

2935 "comment": row_dict["comments"], 

2936 } 

2937 if orig_colname.lower() == orig_colname: 

2938 cdict["quote"] = True 

2939 if computed is not None: 

2940 cdict["computed"] = computed 

2941 if identity is not None: 

2942 cdict["identity"] = identity 

2943 

2944 columns[(schema, table_name)].append(cdict) 

2945 

2946 # NOTE: default not needed since all tables have columns 

2947 # default = ReflectionDefaults.columns 

2948 # return ( 

2949 # (key, value if value else default()) 

2950 # for key, value in columns.items() 

2951 # ) 

2952 return columns.items() 

2953 

2954 def _parse_identity_options(self, identity_options, default_on_null): 

2955 # identity_options is a string that starts with 'ALWAYS,' or 

2956 # 'BY DEFAULT,' and continues with 

2957 # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1, 

2958 # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N, 

2959 # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N 

2960 parts = [p.strip() for p in identity_options.split(",")] 

2961 identity = { 

2962 "always": parts[0] == "ALWAYS", 

2963 "on_null": default_on_null == "YES", 

2964 } 

2965 

2966 for part in parts[1:]: 

2967 option, value = part.split(":") 

2968 value = value.strip() 

2969 

2970 if "START WITH" in option: 

2971 identity["start"] = int(value) 

2972 elif "INCREMENT BY" in option: 

2973 identity["increment"] = int(value) 

2974 elif "MAX_VALUE" in option: 

2975 identity["maxvalue"] = int(value) 

2976 elif "MIN_VALUE" in option: 

2977 identity["minvalue"] = int(value) 

2978 elif "CYCLE_FLAG" in option: 

2979 identity["cycle"] = value == "Y" 

2980 elif "CACHE_SIZE" in option: 

2981 identity["cache"] = int(value) 

2982 elif "ORDER_FLAG" in option: 

2983 identity["order"] = value == "Y" 

2984 return identity 

2985 

2986 @reflection.cache 

2987 def get_table_comment(self, connection, table_name, schema=None, **kw): 

2988 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2989 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2990 """ 

2991 data = self.get_multi_table_comment( 

2992 connection, 

2993 schema=schema, 

2994 filter_names=[table_name], 

2995 scope=ObjectScope.ANY, 

2996 kind=ObjectKind.ANY, 

2997 **kw, 

2998 ) 

2999 return self._value_or_raise(data, table_name, schema) 

3000 

3001 @lru_cache() 

3002 def _comment_query(self, owner, scope, kind, has_filter_names): 

3003 # NOTE: all_tab_comments / all_mview_comments have a row for all 

3004 # object even if they don't have comments 

3005 queries = [] 

3006 if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind: 

3007 # all_tab_comments returns also plain views 

3008 tbl_view = select( 

3009 dictionary.all_tab_comments.c.table_name, 

3010 dictionary.all_tab_comments.c.comments, 

3011 ).where( 

3012 dictionary.all_tab_comments.c.owner == owner, 

3013 dictionary.all_tab_comments.c.table_name.not_like("BIN$%"), 

3014 ) 

3015 if ObjectKind.VIEW not in kind: 

3016 tbl_view = tbl_view.where( 

3017 dictionary.all_tab_comments.c.table_type == "TABLE" 

3018 ) 

3019 elif ObjectKind.TABLE not in kind: 

3020 tbl_view = tbl_view.where( 

3021 dictionary.all_tab_comments.c.table_type == "VIEW" 

3022 ) 

3023 queries.append(tbl_view) 

3024 if ObjectKind.MATERIALIZED_VIEW in kind: 

3025 mat_view = select( 

3026 dictionary.all_mview_comments.c.mview_name.label("table_name"), 

3027 dictionary.all_mview_comments.c.comments, 

3028 ).where( 

3029 dictionary.all_mview_comments.c.owner == owner, 

3030 dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"), 

3031 ) 

3032 queries.append(mat_view) 

3033 if len(queries) == 1: 

3034 query = queries[0] 

3035 else: 

3036 union = sql.union_all(*queries).subquery("tables_and_views") 

3037 query = select(union.c.table_name, union.c.comments) 

3038 

3039 name_col = query.selected_columns.table_name 

3040 

3041 if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY): 

3042 temp = "Y" if scope is ObjectScope.TEMPORARY else "N" 

3043 # need distinct since materialized view are listed also 

3044 # as tables in all_objects 

3045 query = query.distinct().join( 

3046 dictionary.all_objects, 

3047 and_( 

3048 dictionary.all_objects.c.owner == owner, 

3049 dictionary.all_objects.c.object_name == name_col, 

3050 dictionary.all_objects.c.temporary == temp, 

3051 ), 

3052 ) 

3053 if has_filter_names: 

3054 query = query.where(name_col.in_(bindparam("filter_names"))) 

3055 return query 

3056 

3057 @_handle_synonyms_decorator 

3058 def get_multi_table_comment( 

3059 self, 

3060 connection, 

3061 *, 

3062 schema, 

3063 filter_names, 

3064 scope, 

3065 kind, 

3066 dblink=None, 

3067 **kw, 

3068 ): 

3069 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3070 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3071 """ 

3072 owner = self.denormalize_schema_name( 

3073 schema or self.default_schema_name 

3074 ) 

3075 has_filter_names, params = self._prepare_filter_names(filter_names) 

3076 query = self._comment_query(owner, scope, kind, has_filter_names) 

3077 

3078 result = self._execute_reflection( 

3079 connection, query, dblink, returns_long=False, params=params 

3080 ) 

3081 default = ReflectionDefaults.table_comment 

3082 # materialized views by default seem to have a comment like 

3083 # "snapshot table for snapshot owner.mat_view_name" 

3084 ignore_mat_view = "snapshot table for snapshot " 

3085 return ( 

3086 ( 

3087 (schema, self.normalize_name(table)), 

3088 ( 

3089 {"text": comment} 

3090 if comment is not None 

3091 and not comment.startswith(ignore_mat_view) 

3092 else default() 

3093 ), 

3094 ) 

3095 for table, comment in result 

3096 ) 

3097 

3098 @reflection.cache 

3099 def get_indexes(self, connection, table_name, schema=None, **kw): 

3100 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3101 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3102 """ 

3103 data = self.get_multi_indexes( 

3104 connection, 

3105 schema=schema, 

3106 filter_names=[table_name], 

3107 scope=ObjectScope.ANY, 

3108 kind=ObjectKind.ANY, 

3109 **kw, 

3110 ) 

3111 return self._value_or_raise(data, table_name, schema) 

3112 

3113 @lru_cache() 

3114 def _index_query(self, owner): 

3115 return ( 

3116 select( 

3117 dictionary.all_ind_columns.c.table_name, 

3118 dictionary.all_ind_columns.c.index_name, 

3119 dictionary.all_ind_columns.c.column_name, 

3120 dictionary.all_indexes.c.index_type, 

3121 dictionary.all_indexes.c.uniqueness, 

3122 dictionary.all_indexes.c.compression, 

3123 dictionary.all_indexes.c.prefix_length, 

3124 dictionary.all_ind_columns.c.descend, 

3125 dictionary.all_ind_expressions.c.column_expression, 

3126 ) 

3127 .select_from(dictionary.all_ind_columns) 

3128 .join( 

3129 dictionary.all_indexes, 

3130 sql.and_( 

3131 dictionary.all_ind_columns.c.index_name 

3132 == dictionary.all_indexes.c.index_name, 

3133 dictionary.all_ind_columns.c.index_owner 

3134 == dictionary.all_indexes.c.owner, 

3135 ), 

3136 ) 

3137 .outerjoin( 

3138 # NOTE: this adds about 20% to the query time. Using a 

3139 # case expression with a scalar subquery only when needed 

3140 # with the assumption that most indexes are not expression 

3141 # would be faster but oracle does not like that with 

3142 # LONG datatype. It errors with: 

3143 # ORA-00997: illegal use of LONG datatype 

3144 dictionary.all_ind_expressions, 

3145 sql.and_( 

3146 dictionary.all_ind_expressions.c.index_name 

3147 == dictionary.all_ind_columns.c.index_name, 

3148 dictionary.all_ind_expressions.c.index_owner 

3149 == dictionary.all_ind_columns.c.index_owner, 

3150 dictionary.all_ind_expressions.c.column_position 

3151 == dictionary.all_ind_columns.c.column_position, 

3152 ), 

3153 ) 

3154 .where( 

3155 dictionary.all_indexes.c.table_owner == owner, 

3156 dictionary.all_indexes.c.table_name.in_( 

3157 bindparam("all_objects") 

3158 ), 

3159 ) 

3160 .order_by( 

3161 dictionary.all_ind_columns.c.index_name, 

3162 dictionary.all_ind_columns.c.column_position, 

3163 ) 

3164 ) 

3165 

3166 @reflection.flexi_cache( 

3167 ("schema", InternalTraversal.dp_string), 

3168 ("dblink", InternalTraversal.dp_string), 

3169 ("all_objects", InternalTraversal.dp_string_list), 

3170 ) 

3171 def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw): 

3172 owner = self.denormalize_schema_name( 

3173 schema or self.default_schema_name 

3174 ) 

3175 

3176 query = self._index_query(owner) 

3177 

3178 pks = { 

3179 row_dict["constraint_name"] 

3180 for row_dict in self._get_all_constraint_rows( 

3181 connection, schema, dblink, all_objects, **kw 

3182 ) 

3183 if row_dict["constraint_type"] == "P" 

3184 } 

3185 

3186 # all_ind_expressions.column_expression is LONG 

3187 result = self._run_batches( 

3188 connection, 

3189 query, 

3190 dblink, 

3191 returns_long=True, 

3192 mappings=True, 

3193 all_objects=all_objects, 

3194 ) 

3195 

3196 return [ 

3197 row_dict 

3198 for row_dict in result 

3199 if row_dict["index_name"] not in pks 

3200 ] 

3201 

3202 @_handle_synonyms_decorator 

3203 def get_multi_indexes( 

3204 self, 

3205 connection, 

3206 *, 

3207 schema, 

3208 filter_names, 

3209 scope, 

3210 kind, 

3211 dblink=None, 

3212 **kw, 

3213 ): 

3214 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3215 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3216 """ 

3217 all_objects = self._get_all_objects( 

3218 connection, schema, scope, kind, filter_names, dblink, **kw 

3219 ) 

3220 

3221 uniqueness = {"NONUNIQUE": False, "UNIQUE": True} 

3222 enabled = {"DISABLED": False, "ENABLED": True} 

3223 is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"} 

3224 

3225 indexes = defaultdict(dict) 

3226 

3227 for row_dict in self._get_indexes_rows( 

3228 connection, schema, dblink, all_objects, **kw 

3229 ): 

3230 index_name = self.normalize_name(row_dict["index_name"]) 

3231 table_name = self.normalize_name(row_dict["table_name"]) 

3232 table_indexes = indexes[(schema, table_name)] 

3233 

3234 if index_name not in table_indexes: 

3235 table_indexes[index_name] = index_dict = { 

3236 "name": index_name, 

3237 "column_names": [], 

3238 "dialect_options": {}, 

3239 "unique": uniqueness.get(row_dict["uniqueness"], False), 

3240 } 

3241 do = index_dict["dialect_options"] 

3242 if row_dict["index_type"] in is_bitmap: 

3243 do["oracle_bitmap"] = True 

3244 if enabled.get(row_dict["compression"], False): 

3245 do["oracle_compress"] = row_dict["prefix_length"] 

3246 

3247 else: 

3248 index_dict = table_indexes[index_name] 

3249 

3250 expr = row_dict["column_expression"] 

3251 if expr is not None: 

3252 index_dict["column_names"].append(None) 

3253 if "expressions" in index_dict: 

3254 index_dict["expressions"].append(expr) 

3255 else: 

3256 index_dict["expressions"] = index_dict["column_names"][:-1] 

3257 index_dict["expressions"].append(expr) 

3258 

3259 if row_dict["descend"].lower() != "asc": 

3260 assert row_dict["descend"].lower() == "desc" 

3261 cs = index_dict.setdefault("column_sorting", {}) 

3262 cs[expr] = ("desc",) 

3263 else: 

3264 assert row_dict["descend"].lower() == "asc" 

3265 cn = self.normalize_name(row_dict["column_name"]) 

3266 index_dict["column_names"].append(cn) 

3267 if "expressions" in index_dict: 

3268 index_dict["expressions"].append(cn) 

3269 

3270 default = ReflectionDefaults.indexes 

3271 

3272 return ( 

3273 (key, list(indexes[key].values()) if key in indexes else default()) 

3274 for key in ( 

3275 (schema, self.normalize_name(obj_name)) 

3276 for obj_name in all_objects 

3277 ) 

3278 ) 

3279 

3280 @reflection.cache 

3281 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

3282 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3283 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3284 """ 

3285 data = self.get_multi_pk_constraint( 

3286 connection, 

3287 schema=schema, 

3288 filter_names=[table_name], 

3289 scope=ObjectScope.ANY, 

3290 kind=ObjectKind.ANY, 

3291 **kw, 

3292 ) 

3293 return self._value_or_raise(data, table_name, schema) 

3294 

3295 @lru_cache() 

3296 def _constraint_query(self, owner): 

3297 local = dictionary.all_cons_columns.alias("local") 

3298 remote = dictionary.all_cons_columns.alias("remote") 

3299 return ( 

3300 select( 

3301 dictionary.all_constraints.c.table_name, 

3302 dictionary.all_constraints.c.constraint_type, 

3303 dictionary.all_constraints.c.constraint_name, 

3304 local.c.column_name.label("local_column"), 

3305 remote.c.table_name.label("remote_table"), 

3306 remote.c.column_name.label("remote_column"), 

3307 remote.c.owner.label("remote_owner"), 

3308 dictionary.all_constraints.c.search_condition, 

3309 dictionary.all_constraints.c.delete_rule, 

3310 ) 

3311 .select_from(dictionary.all_constraints) 

3312 .join( 

3313 local, 

3314 and_( 

3315 local.c.owner == dictionary.all_constraints.c.owner, 

3316 dictionary.all_constraints.c.constraint_name 

3317 == local.c.constraint_name, 

3318 ), 

3319 ) 

3320 .outerjoin( 

3321 remote, 

3322 and_( 

3323 dictionary.all_constraints.c.r_owner == remote.c.owner, 

3324 dictionary.all_constraints.c.r_constraint_name 

3325 == remote.c.constraint_name, 

3326 or_( 

3327 remote.c.position.is_(sql.null()), 

3328 local.c.position == remote.c.position, 

3329 ), 

3330 ), 

3331 ) 

3332 .where( 

3333 dictionary.all_constraints.c.owner == owner, 

3334 dictionary.all_constraints.c.table_name.in_( 

3335 bindparam("all_objects") 

3336 ), 

3337 dictionary.all_constraints.c.constraint_type.in_( 

3338 ("R", "P", "U", "C") 

3339 ), 

3340 ) 

3341 .order_by( 

3342 dictionary.all_constraints.c.constraint_name, local.c.position 

3343 ) 

3344 ) 

3345 

3346 @reflection.flexi_cache( 

3347 ("schema", InternalTraversal.dp_string), 

3348 ("dblink", InternalTraversal.dp_string), 

3349 ("all_objects", InternalTraversal.dp_string_list), 

3350 ) 

3351 def _get_all_constraint_rows( 

3352 self, connection, schema, dblink, all_objects, **kw 

3353 ): 

3354 owner = self.denormalize_schema_name( 

3355 schema or self.default_schema_name 

3356 ) 

3357 query = self._constraint_query(owner) 

3358 

3359 # since the result is cached a list must be created 

3360 values = list( 

3361 self._run_batches( 

3362 connection, 

3363 query, 

3364 dblink, 

3365 returns_long=False, 

3366 mappings=True, 

3367 all_objects=all_objects, 

3368 ) 

3369 ) 

3370 return values 

3371 

3372 @_handle_synonyms_decorator 

3373 def get_multi_pk_constraint( 

3374 self, 

3375 connection, 

3376 *, 

3377 scope, 

3378 schema, 

3379 filter_names, 

3380 kind, 

3381 dblink=None, 

3382 **kw, 

3383 ): 

3384 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3385 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3386 """ 

3387 all_objects = self._get_all_objects( 

3388 connection, schema, scope, kind, filter_names, dblink, **kw 

3389 ) 

3390 

3391 primary_keys = defaultdict(dict) 

3392 default = ReflectionDefaults.pk_constraint 

3393 

3394 for row_dict in self._get_all_constraint_rows( 

3395 connection, schema, dblink, all_objects, **kw 

3396 ): 

3397 if row_dict["constraint_type"] != "P": 

3398 continue 

3399 table_name = self.normalize_name(row_dict["table_name"]) 

3400 constraint_name = self.normalize_name(row_dict["constraint_name"]) 

3401 column_name = self.normalize_name(row_dict["local_column"]) 

3402 

3403 table_pk = primary_keys[(schema, table_name)] 

3404 if not table_pk: 

3405 table_pk["name"] = constraint_name 

3406 table_pk["constrained_columns"] = [column_name] 

3407 else: 

3408 table_pk["constrained_columns"].append(column_name) 

3409 

3410 return ( 

3411 (key, primary_keys[key] if key in primary_keys else default()) 

3412 for key in ( 

3413 (schema, self.normalize_name(obj_name)) 

3414 for obj_name in all_objects 

3415 ) 

3416 ) 

3417 

3418 @reflection.cache 

3419 def get_foreign_keys( 

3420 self, 

3421 connection, 

3422 table_name, 

3423 schema=None, 

3424 **kw, 

3425 ): 

3426 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3427 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3428 """ 

3429 data = self.get_multi_foreign_keys( 

3430 connection, 

3431 schema=schema, 

3432 filter_names=[table_name], 

3433 scope=ObjectScope.ANY, 

3434 kind=ObjectKind.ANY, 

3435 **kw, 

3436 ) 

3437 return self._value_or_raise(data, table_name, schema) 

3438 

3439 @_handle_synonyms_decorator 

3440 def get_multi_foreign_keys( 

3441 self, 

3442 connection, 

3443 *, 

3444 scope, 

3445 schema, 

3446 filter_names, 

3447 kind, 

3448 dblink=None, 

3449 **kw, 

3450 ): 

3451 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3452 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3453 """ 

3454 all_objects = self._get_all_objects( 

3455 connection, schema, scope, kind, filter_names, dblink, **kw 

3456 ) 

3457 

3458 resolve_synonyms = kw.get("oracle_resolve_synonyms", False) 

3459 

3460 owner = self.denormalize_schema_name( 

3461 schema or self.default_schema_name 

3462 ) 

3463 

3464 all_remote_owners = set() 

3465 fkeys = defaultdict(dict) 

3466 

3467 for row_dict in self._get_all_constraint_rows( 

3468 connection, schema, dblink, all_objects, **kw 

3469 ): 

3470 if row_dict["constraint_type"] != "R": 

3471 continue 

3472 

3473 table_name = self.normalize_name(row_dict["table_name"]) 

3474 constraint_name = self.normalize_name(row_dict["constraint_name"]) 

3475 table_fkey = fkeys[(schema, table_name)] 

3476 

3477 assert constraint_name is not None 

3478 

3479 local_column = self.normalize_name(row_dict["local_column"]) 

3480 remote_table = self.normalize_name(row_dict["remote_table"]) 

3481 remote_column = self.normalize_name(row_dict["remote_column"]) 

3482 remote_owner_orig = row_dict["remote_owner"] 

3483 remote_owner = self.normalize_name(remote_owner_orig) 

3484 if remote_owner_orig is not None: 

3485 all_remote_owners.add(remote_owner_orig) 

3486 

3487 if remote_table is None: 

3488 # ticket 363 

3489 if dblink and not dblink.startswith("@"): 

3490 dblink = f"@{dblink}" 

3491 util.warn( 

3492 "Got 'None' querying 'table_name' from " 

3493 f"all_cons_columns{dblink or ''} - does the user have " 

3494 "proper rights to the table?" 

3495 ) 

3496 continue 

3497 

3498 if constraint_name not in table_fkey: 

3499 table_fkey[constraint_name] = fkey = { 

3500 "name": constraint_name, 

3501 "constrained_columns": [], 

3502 "referred_schema": None, 

3503 "referred_table": remote_table, 

3504 "referred_columns": [], 

3505 "options": {}, 

3506 } 

3507 

3508 if resolve_synonyms: 

3509 # will be removed below 

3510 fkey["_ref_schema"] = remote_owner 

3511 

3512 if schema is not None or remote_owner_orig != owner: 

3513 fkey["referred_schema"] = remote_owner 

3514 

3515 delete_rule = row_dict["delete_rule"] 

3516 if delete_rule != "NO ACTION": 

3517 fkey["options"]["ondelete"] = delete_rule 

3518 

3519 else: 

3520 fkey = table_fkey[constraint_name] 

3521 

3522 fkey["constrained_columns"].append(local_column) 

3523 fkey["referred_columns"].append(remote_column) 

3524 

3525 if resolve_synonyms and all_remote_owners: 

3526 query = select( 

3527 dictionary.all_synonyms.c.owner, 

3528 dictionary.all_synonyms.c.table_name, 

3529 dictionary.all_synonyms.c.table_owner, 

3530 dictionary.all_synonyms.c.synonym_name, 

3531 ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners)) 

3532 

3533 result = self._execute_reflection( 

3534 connection, query, dblink, returns_long=False 

3535 ).mappings() 

3536 

3537 remote_owners_lut = {} 

3538 for row in result: 

3539 synonym_owner = self.normalize_name(row["owner"]) 

3540 table_name = self.normalize_name(row["table_name"]) 

3541 

3542 remote_owners_lut[(synonym_owner, table_name)] = ( 

3543 row["table_owner"], 

3544 row["synonym_name"], 

3545 ) 

3546 

3547 empty = (None, None) 

3548 for table_fkeys in fkeys.values(): 

3549 for table_fkey in table_fkeys.values(): 

3550 key = ( 

3551 table_fkey.pop("_ref_schema"), 

3552 table_fkey["referred_table"], 

3553 ) 

3554 remote_owner, syn_name = remote_owners_lut.get(key, empty) 

3555 if syn_name: 

3556 sn = self.normalize_name(syn_name) 

3557 table_fkey["referred_table"] = sn 

3558 if schema is not None or remote_owner != owner: 

3559 ro = self.normalize_name(remote_owner) 

3560 table_fkey["referred_schema"] = ro 

3561 else: 

3562 table_fkey["referred_schema"] = None 

3563 default = ReflectionDefaults.foreign_keys 

3564 

3565 return ( 

3566 (key, list(fkeys[key].values()) if key in fkeys else default()) 

3567 for key in ( 

3568 (schema, self.normalize_name(obj_name)) 

3569 for obj_name in all_objects 

3570 ) 

3571 ) 

3572 

3573 @reflection.cache 

3574 def get_unique_constraints( 

3575 self, connection, table_name, schema=None, **kw 

3576 ): 

3577 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3578 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3579 """ 

3580 data = self.get_multi_unique_constraints( 

3581 connection, 

3582 schema=schema, 

3583 filter_names=[table_name], 

3584 scope=ObjectScope.ANY, 

3585 kind=ObjectKind.ANY, 

3586 **kw, 

3587 ) 

3588 return self._value_or_raise(data, table_name, schema) 

3589 

3590 @_handle_synonyms_decorator 

3591 def get_multi_unique_constraints( 

3592 self, 

3593 connection, 

3594 *, 

3595 scope, 

3596 schema, 

3597 filter_names, 

3598 kind, 

3599 dblink=None, 

3600 **kw, 

3601 ): 

3602 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3603 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3604 """ 

3605 all_objects = self._get_all_objects( 

3606 connection, schema, scope, kind, filter_names, dblink, **kw 

3607 ) 

3608 

3609 unique_cons = defaultdict(dict) 

3610 

3611 index_names = { 

3612 row_dict["index_name"] 

3613 for row_dict in self._get_indexes_rows( 

3614 connection, schema, dblink, all_objects, **kw 

3615 ) 

3616 } 

3617 

3618 for row_dict in self._get_all_constraint_rows( 

3619 connection, schema, dblink, all_objects, **kw 

3620 ): 

3621 if row_dict["constraint_type"] != "U": 

3622 continue 

3623 table_name = self.normalize_name(row_dict["table_name"]) 

3624 constraint_name_orig = row_dict["constraint_name"] 

3625 constraint_name = self.normalize_name(constraint_name_orig) 

3626 column_name = self.normalize_name(row_dict["local_column"]) 

3627 table_uc = unique_cons[(schema, table_name)] 

3628 

3629 assert constraint_name is not None 

3630 

3631 if constraint_name not in table_uc: 

3632 table_uc[constraint_name] = uc = { 

3633 "name": constraint_name, 

3634 "column_names": [], 

3635 "duplicates_index": ( 

3636 constraint_name 

3637 if constraint_name_orig in index_names 

3638 else None 

3639 ), 

3640 } 

3641 else: 

3642 uc = table_uc[constraint_name] 

3643 

3644 uc["column_names"].append(column_name) 

3645 

3646 default = ReflectionDefaults.unique_constraints 

3647 

3648 return ( 

3649 ( 

3650 key, 

3651 ( 

3652 list(unique_cons[key].values()) 

3653 if key in unique_cons 

3654 else default() 

3655 ), 

3656 ) 

3657 for key in ( 

3658 (schema, self.normalize_name(obj_name)) 

3659 for obj_name in all_objects 

3660 ) 

3661 ) 

3662 

3663 @reflection.cache 

3664 def get_view_definition( 

3665 self, 

3666 connection, 

3667 view_name, 

3668 schema=None, 

3669 dblink=None, 

3670 **kw, 

3671 ): 

3672 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3673 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3674 """ 

3675 if kw.get("oracle_resolve_synonyms", False): 

3676 synonyms = self._get_synonyms( 

3677 connection, schema, filter_names=[view_name], dblink=dblink 

3678 ) 

3679 if synonyms: 

3680 assert len(synonyms) == 1 

3681 row_dict = synonyms[0] 

3682 dblink = self.normalize_name(row_dict["db_link"]) 

3683 schema = row_dict["table_owner"] 

3684 view_name = row_dict["table_name"] 

3685 

3686 name = self.denormalize_name(view_name) 

3687 owner = self.denormalize_schema_name( 

3688 schema or self.default_schema_name 

3689 ) 

3690 query = ( 

3691 select(dictionary.all_views.c.text) 

3692 .where( 

3693 dictionary.all_views.c.view_name == name, 

3694 dictionary.all_views.c.owner == owner, 

3695 ) 

3696 .union_all( 

3697 select(dictionary.all_mviews.c.query).where( 

3698 dictionary.all_mviews.c.mview_name == name, 

3699 dictionary.all_mviews.c.owner == owner, 

3700 ) 

3701 ) 

3702 ) 

3703 

3704 rp = self._execute_reflection( 

3705 connection, query, dblink, returns_long=False 

3706 ).scalar() 

3707 if rp is None: 

3708 raise exc.NoSuchTableError( 

3709 f"{schema}.{view_name}" if schema else view_name 

3710 ) 

3711 else: 

3712 return rp 

3713 

3714 @reflection.cache 

3715 def get_check_constraints( 

3716 self, connection, table_name, schema=None, include_all=False, **kw 

3717 ): 

3718 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3719 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3720 """ 

3721 data = self.get_multi_check_constraints( 

3722 connection, 

3723 schema=schema, 

3724 filter_names=[table_name], 

3725 scope=ObjectScope.ANY, 

3726 include_all=include_all, 

3727 kind=ObjectKind.ANY, 

3728 **kw, 

3729 ) 

3730 return self._value_or_raise(data, table_name, schema) 

3731 

3732 @_handle_synonyms_decorator 

3733 def get_multi_check_constraints( 

3734 self, 

3735 connection, 

3736 *, 

3737 schema, 

3738 filter_names, 

3739 dblink=None, 

3740 scope, 

3741 kind, 

3742 include_all=False, 

3743 **kw, 

3744 ): 

3745 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3746 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3747 """ 

3748 all_objects = self._get_all_objects( 

3749 connection, schema, scope, kind, filter_names, dblink, **kw 

3750 ) 

3751 

3752 not_null = re.compile(r"..+?. IS NOT NULL$") 

3753 

3754 check_constraints = defaultdict(list) 

3755 

3756 for row_dict in self._get_all_constraint_rows( 

3757 connection, schema, dblink, all_objects, **kw 

3758 ): 

3759 if row_dict["constraint_type"] != "C": 

3760 continue 

3761 table_name = self.normalize_name(row_dict["table_name"]) 

3762 constraint_name = self.normalize_name(row_dict["constraint_name"]) 

3763 search_condition = row_dict["search_condition"] 

3764 

3765 table_checks = check_constraints[(schema, table_name)] 

3766 if constraint_name is not None and ( 

3767 include_all or not not_null.match(search_condition) 

3768 ): 

3769 table_checks.append( 

3770 {"name": constraint_name, "sqltext": search_condition} 

3771 ) 

3772 

3773 default = ReflectionDefaults.check_constraints 

3774 

3775 return ( 

3776 ( 

3777 key, 

3778 ( 

3779 check_constraints[key] 

3780 if key in check_constraints 

3781 else default() 

3782 ), 

3783 ) 

3784 for key in ( 

3785 (schema, self.normalize_name(obj_name)) 

3786 for obj_name in all_objects 

3787 ) 

3788 ) 

3789 

3790 def _list_dblinks(self, connection, dblink=None): 

3791 query = select(dictionary.all_db_links.c.db_link) 

3792 links = self._execute_reflection( 

3793 connection, query, dblink, returns_long=False 

3794 ).scalars() 

3795 return [self.normalize_name(link) for link in links] 

3796 

3797 

3798class _OuterJoinColumn(sql.ClauseElement): 

3799 __visit_name__ = "outer_join_column" 

3800 

3801 def __init__(self, column): 

3802 self.column = column