Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1174 statements  

1# dialects/oracle/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10r""" 

11.. dialect:: oracle 

12 :name: Oracle Database 

13 :normal_support: 11+ 

14 :best_effort: 9+ 

15 

16 

17Auto Increment Behavior 

18----------------------- 

19 

20SQLAlchemy Table objects which include integer primary keys are usually assumed 

21to have "autoincrementing" behavior, meaning they can generate their own 

22primary key values upon INSERT. For use within Oracle Database, two options are 

23available, which are the use of IDENTITY columns (Oracle Database 12 and above 

24only) or the association of a SEQUENCE with the column. 

25 

26Specifying GENERATED AS IDENTITY (Oracle Database 12 and above) 

27~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

28 

29Starting from version 12, Oracle Database can make use of identity columns 

30using the :class:`_sql.Identity` to specify the autoincrementing behavior:: 

31 

32 t = Table( 

33 "mytable", 

34 metadata, 

35 Column("id", Integer, Identity(start=3), primary_key=True), 

36 Column(...), 

37 ..., 

38 ) 

39 

40The CREATE TABLE for the above :class:`_schema.Table` object would be: 

41 

42.. sourcecode:: sql 

43 

44 CREATE TABLE mytable ( 

45 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3), 

46 ..., 

47 PRIMARY KEY (id) 

48 ) 

49 

50The :class:`_schema.Identity` object support many options to control the 

51"autoincrementing" behavior of the column, like the starting value, the 

52incrementing value, etc. In addition to the standard options, Oracle Database 

53supports setting :paramref:`_schema.Identity.always` to ``None`` to use the 

54default generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports 

55setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL 

56in conjunction with a 'BY DEFAULT' identity column. 

57 

58Using a SEQUENCE (all Oracle Database versions) 

59~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

60 

61Older version of Oracle Database had no "autoincrement" feature: SQLAlchemy 

62relies upon sequences to produce these values. With the older Oracle Database 

63versions, *a sequence must always be explicitly specified to enable 

64autoincrement*. This is divergent with the majority of documentation examples 

65which assume the usage of an autoincrement-capable database. To specify 

66sequences, use the sqlalchemy.schema.Sequence object which is passed to a 

67Column construct:: 

68 

69 t = Table( 

70 "mytable", 

71 metadata, 

72 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True), 

73 Column(...), 

74 ..., 

75 ) 

76 

77This step is also required when using table reflection, i.e. autoload_with=engine:: 

78 

79 t = Table( 

80 "mytable", 

81 metadata, 

82 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True), 

83 autoload_with=engine, 

84 ) 

85 

86.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

87 in a :class:`_schema.Column` to specify the option of an autoincrementing 

88 column. 

89 

90.. _oracle_isolation_level: 

91 

92Transaction Isolation Level / Autocommit 

93---------------------------------------- 

94 

95Oracle Database supports "READ COMMITTED" and "SERIALIZABLE" modes of 

96isolation. The AUTOCOMMIT isolation level is also supported by the 

97python-oracledb and cx_Oracle dialects. 

98 

99To set using per-connection execution options:: 

100 

101 connection = engine.connect() 

102 connection = connection.execution_options(isolation_level="AUTOCOMMIT") 

103 

104For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle Database dialects sets 

105the level at the session level using ``ALTER SESSION``, which is reverted back 

106to its default setting when the connection is returned to the connection pool. 

107 

108Valid values for ``isolation_level`` include: 

109 

110* ``READ COMMITTED`` 

111* ``AUTOCOMMIT`` 

112* ``SERIALIZABLE`` 

113 

114.. note:: The implementation for the 

115 :meth:`_engine.Connection.get_isolation_level` method as implemented by the 

116 Oracle Database dialects necessarily force the start of a transaction using the 

117 Oracle Database DBMS_TRANSACTION.LOCAL_TRANSACTION_ID function; otherwise no 

118 level is normally readable. 

119 

120 Additionally, the :meth:`_engine.Connection.get_isolation_level` method will 

121 raise an exception if the ``v$transaction`` view is not available due to 

122 permissions or other reasons, which is a common occurrence in Oracle Database 

123 installations. 

124 

125 The python-oracledb and cx_Oracle dialects attempt to call the 

126 :meth:`_engine.Connection.get_isolation_level` method when the dialect makes 

127 its first connection to the database in order to acquire the 

128 "default"isolation level. This default level is necessary so that the level 

129 can be reset on a connection after it has been temporarily modified using 

130 :meth:`_engine.Connection.execution_options` method. In the common event 

131 that the :meth:`_engine.Connection.get_isolation_level` method raises an 

132 exception due to ``v$transaction`` not being readable as well as any other 

133 database-related failure, the level is assumed to be "READ COMMITTED". No 

134 warning is emitted for this initial first-connect condition as it is 

135 expected to be a common restriction on Oracle databases. 

136 

137.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_Oracle dialect 

138 as well as the notion of a default isolation level 

139 

140.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live 

141 reading of the isolation level. 

142 

143.. versionchanged:: 1.3.22 In the event that the default isolation 

144 level cannot be read due to permissions on the v$transaction view as 

145 is common in Oracle installations, the default isolation level is hardcoded 

146 to "READ COMMITTED" which was the behavior prior to 1.3.21. 

147 

148.. seealso:: 

149 

150 :ref:`dbapi_autocommit` 

151 

152Identifier Casing 

153----------------- 

154 

155In Oracle Database, the data dictionary represents all case insensitive 

156identifier names using UPPERCASE text. This is in contradiction to the 

157expectations of SQLAlchemy, which assume a case insensitive name is represented 

158as lowercase text. 

159 

160As an example of case insensitive identifier names, consider the following table: 

161 

162.. sourcecode:: sql 

163 

164 CREATE TABLE MyTable (Identifier INTEGER PRIMARY KEY) 

165 

166If you were to ask Oracle Database for information about this table, the 

167table name would be reported as ``MYTABLE`` and the column name would 

168be reported as ``IDENTIFIER``. Compare to most other databases such as 

169PostgreSQL and MySQL which would report these names as ``mytable`` and 

170``identifier``. The names are **not quoted, therefore are case insensitive**. 

171The special casing of ``MyTable`` and ``Identifier`` would only be maintained 

172if they were quoted in the table definition: 

173 

174.. sourcecode:: sql 

175 

176 CREATE TABLE "MyTable" ("Identifier" INTEGER PRIMARY KEY) 

177 

178When constructing a SQLAlchemy :class:`.Table` object, **an all lowercase name 

179is considered to be case insensitive**. So the following table assumes 

180case insensitive names:: 

181 

182 Table("mytable", metadata, Column("identifier", Integer, primary_key=True)) 

183 

184Whereas when mixed case or UPPERCASE names are used, case sensitivity is 

185assumed:: 

186 

187 Table("MyTable", metadata, Column("Identifier", Integer, primary_key=True)) 

188 

189A similar situation occurs at the database driver level when emitting a 

190textual SQL SELECT statement and looking at column names in the DBAPI 

191``cursor.description`` attribute. A database like PostgreSQL will normalize 

192case insensitive names to be lowercase:: 

193 

194 >>> pg_engine = create_engine("postgresql://scott:tiger@localhost/test") 

195 >>> pg_connection = pg_engine.connect() 

196 >>> result = pg_connection.exec_driver_sql("SELECT 1 AS SomeName") 

197 >>> result.cursor.description 

198 (Column(name='somename', type_code=23),) 

199 

200Whereas Oracle normalizes them to UPPERCASE:: 

201 

202 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe") 

203 >>> oracle_connection = oracle_engine.connect() 

204 >>> result = oracle_connection.exec_driver_sql( 

205 ... "SELECT 1 AS SomeName FROM DUAL" 

206 ... ) 

207 >>> result.cursor.description 

208 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)] 

209 

210In order to achieve cross-database parity for the two cases of a. table 

211reflection and b. textual-only SQL statement round trips, SQLAlchemy performs a step 

212called **name normalization** when using the Oracle dialect. This process may 

213also apply to other third party dialects that have similar UPPERCASE handling 

214of case insensitive names. 

215 

216When using name normalization, SQLAlchemy attempts to detect if a name is 

217case insensitive by checking if all characters are UPPERCASE letters only; 

218if so, then it assumes this is a case insensitive name and is delivered as 

219a lowercase name. 

220 

221For table reflection, a tablename that is seen represented as all UPPERCASE 

222in Oracle Database's catalog tables will be assumed to have a case insensitive 

223name. This is what allows the ``Table`` definition to use lower case names 

224and be equally compatible from a reflection point of view on Oracle Database 

225and all other databases such as PostgreSQL and MySQL:: 

226 

227 # matches a table created with CREATE TABLE mytable 

228 Table("mytable", metadata, autoload_with=some_engine) 

229 

230Above, the all lowercase name ``"mytable"`` is case insensitive; it will match 

231a table reported by PostgreSQL as ``"mytable"`` and a table reported by 

232Oracle as ``"MYTABLE"``. If name normalization were not present, it would 

233not be possible for the above :class:`.Table` definition to be introspectable 

234in a cross-database way, since we are dealing with a case insensitive name 

235that is not reported by each database in the same way. 

236 

237Case sensitivity can be forced on in this case, such as if we wanted to represent 

238the quoted tablename ``"MYTABLE"`` with that exact casing, most simply by using 

239that casing directly, which will be seen as a case sensitive name:: 

240 

241 # matches a table created with CREATE TABLE "MYTABLE" 

242 Table("MYTABLE", metadata, autoload_with=some_engine) 

243 

244For the unusual case of a quoted all-lowercase name, the :class:`.quoted_name` 

245construct may be used:: 

246 

247 from sqlalchemy import quoted_name 

248 

249 # matches a table created with CREATE TABLE "mytable" 

250 Table( 

251 quoted_name("mytable", quote=True), metadata, autoload_with=some_engine 

252 ) 

253 

254Name normalization also takes place when handling result sets from **purely 

255textual SQL strings**, that have no other :class:`.Table` or :class:`.Column` 

256metadata associated with them. This includes SQL strings executed using 

257:meth:`.Connection.exec_driver_sql` and SQL strings executed using the 

258:func:`.text` construct which do not include :class:`.Column` metadata. 

259 

260Returning to the Oracle Database SELECT statement, we see that even though 

261``cursor.description`` reports the column name as ``SOMENAME``, SQLAlchemy 

262name normalizes this to ``somename``:: 

263 

264 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe") 

265 >>> oracle_connection = oracle_engine.connect() 

266 >>> result = oracle_connection.exec_driver_sql( 

267 ... "SELECT 1 AS SomeName FROM DUAL" 

268 ... ) 

269 >>> result.cursor.description 

270 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)] 

271 >>> result.keys() 

272 RMKeyView(['somename']) 

273 

274The single scenario where the above behavior produces inaccurate results 

275is when using an all-uppercase, quoted name. SQLAlchemy has no way to determine 

276that a particular name in ``cursor.description`` was quoted, and is therefore 

277case sensitive, or was not quoted, and should be name normalized:: 

278 

279 >>> result = oracle_connection.exec_driver_sql( 

280 ... 'SELECT 1 AS "SOMENAME" FROM DUAL' 

281 ... ) 

282 >>> result.cursor.description 

283 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)] 

284 >>> result.keys() 

285 RMKeyView(['somename']) 

286 

287For this case, a new feature will be available in SQLAlchemy 2.1 to disable 

288the name normalization behavior in specific cases. 

289 

290 

291.. _oracle_max_identifier_lengths: 

292 

293Maximum Identifier Lengths 

294-------------------------- 

295 

296SQLAlchemy is sensitive to the maximum identifier length supported by Oracle 

297Database. This affects generated SQL label names as well as the generation of 

298constraint names, particularly in the case where the constraint naming 

299convention feature described at :ref:`constraint_naming_conventions` is being 

300used. 

301 

302Oracle Database 12.2 increased the default maximum identifier length from 30 to 

303128. As of SQLAlchemy 1.4, the default maximum identifier length for the Oracle 

304dialects is 128 characters. Upon first connection, the maximum length actually 

305supported by the database is obtained. In all cases, setting the 

306:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this 

307change and the value given will be used as is:: 

308 

309 engine = create_engine( 

310 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1", 

311 max_identifier_length=30, 

312 ) 

313 

314If :paramref:`_sa.create_engine.max_identifier_length` is not set, the oracledb 

315dialect internally uses the ``max_identifier_length`` attribute available on 

316driver connections since python-oracledb version 2.5. When using an older 

317driver version, or using the cx_Oracle dialect, SQLAlchemy will instead attempt 

318to use the query ``SELECT value FROM v$parameter WHERE name = 'compatible'`` 

319upon first connect in order to determine the effective compatibility version of 

320the database. The "compatibility" version is a version number that is 

321independent of the actual database version. It is used to assist database 

322migration. It is configured by an Oracle Database initialization parameter. The 

323compatibility version then determines the maximum allowed identifier length for 

324the database. If the V$ view is not available, the database version information 

325is used instead. 

326 

327The maximum identifier length comes into play both when generating anonymized 

328SQL labels in SELECT statements, but more crucially when generating constraint 

329names from a naming convention. It is this area that has created the need for 

330SQLAlchemy to change this default conservatively. For example, the following 

331naming convention produces two very different constraint names based on the 

332identifier length:: 

333 

334 from sqlalchemy import Column 

335 from sqlalchemy import Index 

336 from sqlalchemy import Integer 

337 from sqlalchemy import MetaData 

338 from sqlalchemy import Table 

339 from sqlalchemy.dialects import oracle 

340 from sqlalchemy.schema import CreateIndex 

341 

342 m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"}) 

343 

344 t = Table( 

345 "t", 

346 m, 

347 Column("some_column_name_1", Integer), 

348 Column("some_column_name_2", Integer), 

349 Column("some_column_name_3", Integer), 

350 ) 

351 

352 ix = Index( 

353 None, 

354 t.c.some_column_name_1, 

355 t.c.some_column_name_2, 

356 t.c.some_column_name_3, 

357 ) 

358 

359 oracle_dialect = oracle.dialect(max_identifier_length=30) 

360 print(CreateIndex(ix).compile(dialect=oracle_dialect)) 

361 

362With an identifier length of 30, the above CREATE INDEX looks like: 

363 

364.. sourcecode:: sql 

365 

366 CREATE INDEX ix_some_column_name_1s_70cd ON t 

367 (some_column_name_1, some_column_name_2, some_column_name_3) 

368 

369However with length of 128, it becomes:: 

370 

371.. sourcecode:: sql 

372 

373 CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t 

374 (some_column_name_1, some_column_name_2, some_column_name_3) 

375 

376Applications which have run versions of SQLAlchemy prior to 1.4 on Oracle 

377Database version 12.2 or greater are therefore subject to the scenario of a 

378database migration that wishes to "DROP CONSTRAINT" on a name that was 

379previously generated with the shorter length. This migration will fail when 

380the identifier length is changed without the name of the index or constraint 

381first being adjusted. Such applications are strongly advised to make use of 

382:paramref:`_sa.create_engine.max_identifier_length` in order to maintain 

383control of the generation of truncated names, and to fully review and test all 

384database migrations in a staging environment when changing this value to ensure 

385that the impact of this change has been mitigated. 

386 

387.. versionchanged:: 1.4 the default max_identifier_length for Oracle Database 

388 is 128 characters, which is adjusted down to 30 upon first connect if the 

389 Oracle Database, or its compatibility setting, are lower than version 12.2. 

390 

391 

392LIMIT/OFFSET/FETCH Support 

393-------------------------- 

394 

395Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make use 

396of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming Oracle Database 12c or 

397above, and assuming the SELECT statement is not embedded within a compound 

398statement like UNION. This syntax is also available directly by using the 

399:meth:`_sql.Select.fetch` method. 

400 

401.. versionchanged:: 2.0 the Oracle Database dialects now use ``FETCH FIRST N 

402 ROW / OFFSET N ROWS`` for all :meth:`_sql.Select.limit` and 

403 :meth:`_sql.Select.offset` usage including within the ORM and legacy 

404 :class:`_orm.Query`. To force the legacy behavior using window functions, 

405 specify the ``enable_offset_fetch=False`` dialect parameter to 

406 :func:`_sa.create_engine`. 

407 

408The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle Database 

409version by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`, 

410which will force the use of "legacy" mode that makes use of window functions. 

411This mode is also selected automatically when using a version of Oracle 

412Database prior to 12c. 

413 

414When using legacy mode, or when a :class:`.Select` statement with limit/offset 

415is embedded in a compound statement, an emulated approach for LIMIT / OFFSET 

416based on window functions is used, which involves creation of a subquery using 

417``ROW_NUMBER`` that is prone to performance issues as well as SQL construction 

418issues for complex statements. However, this approach is supported by all 

419Oracle Database versions. See notes below. 

420 

421Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used) 

422~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

423 

424If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the 

425ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an 

426Oracle Database version prior to 12c, the following notes apply: 

427 

428* SQLAlchemy currently makes use of ROWNUM to achieve 

429 LIMIT/OFFSET; the exact methodology is taken from 

430 https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results . 

431 

432* the "FIRST_ROWS()" optimization keyword is not used by default. To enable 

433 the usage of this optimization directive, specify ``optimize_limits=True`` 

434 to :func:`_sa.create_engine`. 

435 

436 .. versionchanged:: 1.4 

437 

438 The Oracle Database dialect renders limit/offset integer values using a 

439 "post compile" scheme which renders the integer directly before passing 

440 the statement to the cursor for execution. The ``use_binds_for_limits`` 

441 flag no longer has an effect. 

442 

443 .. seealso:: 

444 

445 :ref:`change_4808`. 

446 

447.. _oracle_returning: 

448 

449RETURNING Support 

450----------------- 

451 

452Oracle Database supports RETURNING fully for INSERT, UPDATE and DELETE 

453statements that are invoked with a single collection of bound parameters (that 

454is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally 

455support RETURNING with :term:`executemany` statements). Multiple rows may be 

456returned as well. 

457 

458.. versionchanged:: 2.0 the Oracle Database backend has full support for 

459 RETURNING on parity with other backends. 

460 

461 

462ON UPDATE CASCADE 

463----------------- 

464 

465Oracle Database doesn't have native ON UPDATE CASCADE functionality. A trigger 

466based solution is available at 

467https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html 

468 

469When using the SQLAlchemy ORM, the ORM has limited ability to manually issue 

470cascading updates - specify ForeignKey objects using the 

471"deferrable=True, initially='deferred'" keyword arguments, 

472and specify "passive_updates=False" on each relationship(). 

473 

474Oracle Database 8 Compatibility 

475------------------------------- 

476 

477.. warning:: The status of Oracle Database 8 compatibility is not known for 

478 SQLAlchemy 2.0. 

479 

480When Oracle Database 8 is detected, the dialect internally configures itself to 

481the following behaviors: 

482 

483* the use_ansi flag is set to False. This has the effect of converting all 

484 JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN 

485 makes use of Oracle's (+) operator. 

486 

487* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when 

488 the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued 

489 instead. This because these types don't seem to work correctly on Oracle 8 

490 even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and 

491 :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate 

492 NVARCHAR2 and NCLOB. 

493 

494 

495Synonym/DBLINK Reflection 

496------------------------- 

497 

498When using reflection with Table objects, the dialect can optionally search 

499for tables indicated by synonyms, either in local or remote schemas or 

500accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as 

501a keyword argument to the :class:`_schema.Table` construct:: 

502 

503 some_table = Table( 

504 "some_table", autoload_with=some_engine, oracle_resolve_synonyms=True 

505 ) 

506 

507When this flag is set, the given name (such as ``some_table`` above) will be 

508searched not just in the ``ALL_TABLES`` view, but also within the 

509``ALL_SYNONYMS`` view to see if this name is actually a synonym to another 

510name. If the synonym is located and refers to a DBLINK, the Oracle Database 

511dialects know how to locate the table's information using DBLINK syntax(e.g. 

512``@dblink``). 

513 

514``oracle_resolve_synonyms`` is accepted wherever reflection arguments are 

515accepted, including methods such as :meth:`_schema.MetaData.reflect` and 

516:meth:`_reflection.Inspector.get_columns`. 

517 

518If synonyms are not in use, this flag should be left disabled. 

519 

520.. _oracle_constraint_reflection: 

521 

522Constraint Reflection 

523--------------------- 

524 

525The Oracle Database dialects can return information about foreign key, unique, 

526and CHECK constraints, as well as indexes on tables. 

527 

528Raw information regarding these constraints can be acquired using 

529:meth:`_reflection.Inspector.get_foreign_keys`, 

530:meth:`_reflection.Inspector.get_unique_constraints`, 

531:meth:`_reflection.Inspector.get_check_constraints`, and 

532:meth:`_reflection.Inspector.get_indexes`. 

533 

534.. versionchanged:: 1.2 The Oracle Database dialect can now reflect UNIQUE and 

535 CHECK constraints. 

536 

537When using reflection at the :class:`_schema.Table` level, the 

538:class:`_schema.Table` 

539will also include these constraints. 

540 

541Note the following caveats: 

542 

543* When using the :meth:`_reflection.Inspector.get_check_constraints` method, 

544 Oracle Database builds a special "IS NOT NULL" constraint for columns that 

545 specify "NOT NULL". This constraint is **not** returned by default; to 

546 include the "IS NOT NULL" constraints, pass the flag ``include_all=True``:: 

547 

548 from sqlalchemy import create_engine, inspect 

549 

550 engine = create_engine( 

551 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1" 

552 ) 

553 inspector = inspect(engine) 

554 all_check_constraints = inspector.get_check_constraints( 

555 "some_table", include_all=True 

556 ) 

557 

558* in most cases, when reflecting a :class:`_schema.Table`, a UNIQUE constraint 

559 will **not** be available as a :class:`.UniqueConstraint` object, as Oracle 

560 Database mirrors unique constraints with a UNIQUE index in most cases (the 

561 exception seems to be when two or more unique constraints represent the same 

562 columns); the :class:`_schema.Table` will instead represent these using 

563 :class:`.Index` with the ``unique=True`` flag set. 

564 

565* Oracle Database creates an implicit index for the primary key of a table; 

566 this index is **excluded** from all index results. 

567 

568* the list of columns reflected for an index will not include column names 

569 that start with SYS_NC. 

570 

571Table names with SYSTEM/SYSAUX tablespaces 

572------------------------------------------- 

573 

574The :meth:`_reflection.Inspector.get_table_names` and 

575:meth:`_reflection.Inspector.get_temp_table_names` 

576methods each return a list of table names for the current engine. These methods 

577are also part of the reflection which occurs within an operation such as 

578:meth:`_schema.MetaData.reflect`. By default, 

579these operations exclude the ``SYSTEM`` 

580and ``SYSAUX`` tablespaces from the operation. In order to change this, the 

581default list of tablespaces excluded can be changed at the engine level using 

582the ``exclude_tablespaces`` parameter:: 

583 

584 # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM 

585 e = create_engine( 

586 "oracle+oracledb://scott:tiger@localhost:1521/?service_name=freepdb1", 

587 exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"], 

588 ) 

589 

590.. _oracle_float_support: 

591 

592FLOAT / DOUBLE Support and Behaviors 

593------------------------------------ 

594 

595The SQLAlchemy :class:`.Float` and :class:`.Double` datatypes are generic 

596datatypes that resolve to the "least surprising" datatype for a given backend. 

597For Oracle Database, this means they resolve to the ``FLOAT`` and ``DOUBLE`` 

598types:: 

599 

600 >>> from sqlalchemy import cast, literal, Float 

601 >>> from sqlalchemy.dialects import oracle 

602 >>> float_datatype = Float() 

603 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect())) 

604 CAST(:param_1 AS FLOAT) 

605 

606Oracle's ``FLOAT`` / ``DOUBLE`` datatypes are aliases for ``NUMBER``. Oracle 

607Database stores ``NUMBER`` values with full precision, not floating point 

608precision, which means that ``FLOAT`` / ``DOUBLE`` do not actually behave like 

609native FP values. Oracle Database instead offers special datatypes 

610``BINARY_FLOAT`` and ``BINARY_DOUBLE`` to deliver real 4- and 8- byte FP 

611values. 

612 

613SQLAlchemy supports these datatypes directly using :class:`.BINARY_FLOAT` and 

614:class:`.BINARY_DOUBLE`. To use the :class:`.Float` or :class:`.Double` 

615datatypes in a database agnostic way, while allowing Oracle backends to utilize 

616one of these types, use the :meth:`.TypeEngine.with_variant` method to set up a 

617variant:: 

618 

619 >>> from sqlalchemy import cast, literal, Float 

620 >>> from sqlalchemy.dialects import oracle 

621 >>> float_datatype = Float().with_variant(oracle.BINARY_FLOAT(), "oracle") 

622 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect())) 

623 CAST(:param_1 AS BINARY_FLOAT) 

624 

625E.g. to use this datatype in a :class:`.Table` definition:: 

626 

627 my_table = Table( 

628 "my_table", 

629 metadata, 

630 Column( 

631 "fp_data", Float().with_variant(oracle.BINARY_FLOAT(), "oracle") 

632 ), 

633 ) 

634 

635DateTime Compatibility 

636---------------------- 

637 

638Oracle Database has no datatype known as ``DATETIME``, it instead has only 

639``DATE``, which can actually store a date and time value. For this reason, the 

640Oracle Database dialects provide a type :class:`_oracle.DATE` which is a 

641subclass of :class:`.DateTime`. This type has no special behavior, and is only 

642present as a "marker" for this type; additionally, when a database column is 

643reflected and the type is reported as ``DATE``, the time-supporting 

644:class:`_oracle.DATE` type is used. 

645 

646.. _oracle_table_options: 

647 

648Oracle Database Table Options 

649----------------------------- 

650 

651The CREATE TABLE phrase supports the following options with Oracle Database 

652dialects in conjunction with the :class:`_schema.Table` construct: 

653 

654 

655* ``ON COMMIT``:: 

656 

657 Table( 

658 "some_table", 

659 metadata, 

660 ..., 

661 prefixes=["GLOBAL TEMPORARY"], 

662 oracle_on_commit="PRESERVE ROWS", 

663 ) 

664 

665* 

666 ``COMPRESS``:: 

667 

668 Table( 

669 "mytable", metadata, Column("data", String(32)), oracle_compress=True 

670 ) 

671 

672 Table("mytable", metadata, Column("data", String(32)), oracle_compress=6) 

673 

674 The ``oracle_compress`` parameter accepts either an integer compression 

675 level, or ``True`` to use the default compression level. 

676 

677* 

678 ``TABLESPACE``:: 

679 

680 Table("mytable", metadata, ..., oracle_tablespace="EXAMPLE_TABLESPACE") 

681 

682 The ``oracle_tablespace`` parameter specifies the tablespace in which the 

683 table is to be created. This is useful when you want to create a table in a 

684 tablespace other than the default tablespace of the user. 

685 

686 .. versionadded:: 2.0.37 

687 

688.. _oracle_index_options: 

689 

690Oracle Database Specific Index Options 

691-------------------------------------- 

692 

693Bitmap Indexes 

694~~~~~~~~~~~~~~ 

695 

696You can specify the ``oracle_bitmap`` parameter to create a bitmap index 

697instead of a B-tree index:: 

698 

699 Index("my_index", my_table.c.data, oracle_bitmap=True) 

700 

701Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not 

702check for such limitations, only the database will. 

703 

704Index compression 

705~~~~~~~~~~~~~~~~~ 

706 

707Oracle Database has a more efficient storage mode for indexes containing lots 

708of repeated values. Use the ``oracle_compress`` parameter to turn on key 

709compression:: 

710 

711 Index("my_index", my_table.c.data, oracle_compress=True) 

712 

713 Index( 

714 "my_index", 

715 my_table.c.data1, 

716 my_table.c.data2, 

717 unique=True, 

718 oracle_compress=1, 

719 ) 

720 

721The ``oracle_compress`` parameter accepts either an integer specifying the 

722number of prefix columns to compress, or ``True`` to use the default (all 

723columns for non-unique indexes, all but the last column for unique indexes). 

724 

725.. _oracle_vector_datatype: 

726 

727VECTOR Datatype 

728--------------- 

729 

730Oracle Database 23ai introduced a new VECTOR datatype for artificial intelligence 

731and machine learning search operations. The VECTOR datatype is a homogeneous array 

732of 8-bit signed integers, 8-bit unsigned integers (binary), 32-bit floating-point numbers, 

733or 64-bit floating-point numbers. 

734 

735.. seealso:: 

736 

737 `Using VECTOR Data 

738 <https://python-oracledb.readthedocs.io/en/latest/user_guide/vector_data_type.html>`_ - in the documentation 

739 for the :ref:`oracledb` driver. 

740 

741.. versionadded:: 2.0.41 

742 

743CREATE TABLE support for VECTOR 

744~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

745 

746With the :class:`.VECTOR` datatype, you can specify the dimension for the data 

747and the storage format. Valid values for storage format are enum values from 

748:class:`.VectorStorageFormat`. To create a table that includes a 

749:class:`.VECTOR` column:: 

750 

751 from sqlalchemy.dialects.oracle import VECTOR, VectorStorageFormat 

752 

753 t = Table( 

754 "t1", 

755 metadata, 

756 Column("id", Integer, primary_key=True), 

757 Column( 

758 "embedding", 

759 VECTOR(dim=3, storage_format=VectorStorageFormat.FLOAT32), 

760 ), 

761 Column(...), 

762 ..., 

763 ) 

764 

765Vectors can also be defined with an arbitrary number of dimensions and formats. 

766This allows you to specify vectors of different dimensions with the various 

767storage formats mentioned above. 

768 

769**Examples** 

770 

771* In this case, the storage format is flexible, allowing any vector type data to be inserted, 

772 such as INT8 or BINARY etc:: 

773 

774 vector_col: Mapped[array.array] = mapped_column(VECTOR(dim=3)) 

775 

776* The dimension is flexible in this case, meaning that any dimension vector can be used:: 

777 

778 vector_col: Mapped[array.array] = mapped_column( 

779 VECTOR(storage_format=VectorStorageType.INT8) 

780 ) 

781 

782* Both the dimensions and the storage format are flexible:: 

783 

784 vector_col: Mapped[array.array] = mapped_column(VECTOR) 

785 

786Python Datatypes for VECTOR 

787~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

788 

789VECTOR data can be inserted using Python list or Python ``array.array()`` objects. 

790Python arrays of type FLOAT (32-bit), DOUBLE (64-bit), or INT (8-bit signed integer) 

791are used as bind values when inserting VECTOR columns:: 

792 

793 from sqlalchemy import insert, select 

794 

795 with engine.begin() as conn: 

796 conn.execute( 

797 insert(t1), 

798 {"id": 1, "embedding": [1, 2, 3]}, 

799 ) 

800 

801VECTOR Indexes 

802~~~~~~~~~~~~~~ 

803 

804The VECTOR feature supports an Oracle-specific parameter ``oracle_vector`` 

805on the :class:`.Index` construct, which allows the construction of VECTOR 

806indexes. 

807 

808To utilize VECTOR indexing, set the ``oracle_vector`` parameter to True to use 

809the default values provided by Oracle. HNSW is the default indexing method:: 

810 

811 from sqlalchemy import Index 

812 

813 Index( 

814 "vector_index", 

815 t1.c.embedding, 

816 oracle_vector=True, 

817 ) 

818 

819The full range of parameters for vector indexes are available by using the 

820:class:`.VectorIndexConfig` dataclass in place of a boolean; this dataclass 

821allows full configuration of the index:: 

822 

823 Index( 

824 "hnsw_vector_index", 

825 t1.c.embedding, 

826 oracle_vector=VectorIndexConfig( 

827 index_type=VectorIndexType.HNSW, 

828 distance=VectorDistanceType.COSINE, 

829 accuracy=90, 

830 hnsw_neighbors=5, 

831 hnsw_efconstruction=20, 

832 parallel=10, 

833 ), 

834 ) 

835 

836 Index( 

837 "ivf_vector_index", 

838 t1.c.embedding, 

839 oracle_vector=VectorIndexConfig( 

840 index_type=VectorIndexType.IVF, 

841 distance=VectorDistanceType.DOT, 

842 accuracy=90, 

843 ivf_neighbor_partitions=5, 

844 ), 

845 ) 

846 

847For complete explanation of these parameters, see the Oracle documentation linked 

848below. 

849 

850.. seealso:: 

851 

852 `CREATE VECTOR INDEX <https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B396C369-54BB-4098-A0DD-7C54B3A0D66F>`_ - in the Oracle documentation 

853 

854 

855 

856Similarity Searching 

857~~~~~~~~~~~~~~~~~~~~ 

858 

859When using the :class:`_oracle.VECTOR` datatype with a :class:`.Column` or similar 

860ORM mapped construct, additional comparison functions are available, including: 

861 

862* ``l2_distance`` 

863* ``cosine_distance`` 

864* ``inner_product`` 

865 

866Example Usage:: 

867 

868 result_vector = connection.scalars( 

869 select(t1).order_by(t1.embedding.l2_distance([2, 3, 4])).limit(3) 

870 ) 

871 

872 for user in vector: 

873 print(user.id, user.embedding) 

874 

875FETCH APPROXIMATE support 

876~~~~~~~~~~~~~~~~~~~~~~~~~ 

877 

878Approximate vector search can only be performed when all syntax and semantic 

879rules are satisfied, the corresponding vector index is available, and the 

880query optimizer determines to perform it. If any of these conditions are 

881unmet, then an approximate search is not performed. In this case the query 

882returns exact results. 

883 

884To enable approximate searching during similarity searches on VECTORS, the 

885``oracle_fetch_approximate`` parameter may be used with the :meth:`.Select.fetch` 

886clause to add ``FETCH APPROX`` to the SELECT statement:: 

887 

888 select(users_table).fetch(5, oracle_fetch_approximate=True) 

889 

890""" # noqa 

891 

892from __future__ import annotations 

893 

894from collections import defaultdict 

895from dataclasses import fields 

896from functools import lru_cache 

897from functools import wraps 

898import re 

899 

900from . import dictionary 

901from .types import _OracleBoolean 

902from .types import _OracleDate 

903from .types import BFILE 

904from .types import BINARY_DOUBLE 

905from .types import BINARY_FLOAT 

906from .types import DATE 

907from .types import FLOAT 

908from .types import INTERVAL 

909from .types import LONG 

910from .types import NCLOB 

911from .types import NUMBER 

912from .types import NVARCHAR2 # noqa 

913from .types import OracleRaw # noqa 

914from .types import RAW 

915from .types import ROWID # noqa 

916from .types import TIMESTAMP 

917from .types import VARCHAR2 # noqa 

918from .vector import VECTOR 

919from .vector import VectorIndexConfig 

920from .vector import VectorIndexType 

921from ... import Computed 

922from ... import exc 

923from ... import schema as sa_schema 

924from ... import sql 

925from ... import util 

926from ...engine import default 

927from ...engine import ObjectKind 

928from ...engine import ObjectScope 

929from ...engine import reflection 

930from ...engine.reflection import ReflectionDefaults 

931from ...sql import and_ 

932from ...sql import bindparam 

933from ...sql import compiler 

934from ...sql import expression 

935from ...sql import func 

936from ...sql import null 

937from ...sql import or_ 

938from ...sql import select 

939from ...sql import selectable as sa_selectable 

940from ...sql import sqltypes 

941from ...sql import util as sql_util 

942from ...sql import visitors 

943from ...sql.visitors import InternalTraversal 

944from ...types import BLOB 

945from ...types import CHAR 

946from ...types import CLOB 

947from ...types import DOUBLE_PRECISION 

948from ...types import INTEGER 

949from ...types import NCHAR 

950from ...types import NVARCHAR 

951from ...types import REAL 

952from ...types import VARCHAR 

953 

954RESERVED_WORDS = set( 

955 "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN " 

956 "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED " 

957 "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE " 

958 "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE " 

959 "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES " 

960 "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS " 

961 "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER " 

962 "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR " 

963 "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split() 

964) 

965 

966NO_ARG_FNS = set( 

967 "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split() 

968) 

969 

970 

971colspecs = { 

972 sqltypes.Boolean: _OracleBoolean, 

973 sqltypes.Interval: INTERVAL, 

974 sqltypes.DateTime: DATE, 

975 sqltypes.Date: _OracleDate, 

976} 

977 

978ischema_names = { 

979 "VARCHAR2": VARCHAR, 

980 "NVARCHAR2": NVARCHAR, 

981 "CHAR": CHAR, 

982 "NCHAR": NCHAR, 

983 "DATE": DATE, 

984 "NUMBER": NUMBER, 

985 "BLOB": BLOB, 

986 "BFILE": BFILE, 

987 "CLOB": CLOB, 

988 "NCLOB": NCLOB, 

989 "TIMESTAMP": TIMESTAMP, 

990 "TIMESTAMP WITH TIME ZONE": TIMESTAMP, 

991 "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP, 

992 "INTERVAL DAY TO SECOND": INTERVAL, 

993 "RAW": RAW, 

994 "FLOAT": FLOAT, 

995 "DOUBLE PRECISION": DOUBLE_PRECISION, 

996 "REAL": REAL, 

997 "LONG": LONG, 

998 "BINARY_DOUBLE": BINARY_DOUBLE, 

999 "BINARY_FLOAT": BINARY_FLOAT, 

1000 "ROWID": ROWID, 

1001 "VECTOR": VECTOR, 

1002} 

1003 

1004 

1005class OracleTypeCompiler(compiler.GenericTypeCompiler): 

1006 # Note: 

1007 # Oracle DATE == DATETIME 

1008 # Oracle does not allow milliseconds in DATE 

1009 # Oracle does not support TIME columns 

1010 

1011 def visit_datetime(self, type_, **kw): 

1012 return self.visit_DATE(type_, **kw) 

1013 

1014 def visit_float(self, type_, **kw): 

1015 return self.visit_FLOAT(type_, **kw) 

1016 

1017 def visit_double(self, type_, **kw): 

1018 return self.visit_DOUBLE_PRECISION(type_, **kw) 

1019 

1020 def visit_unicode(self, type_, **kw): 

1021 if self.dialect._use_nchar_for_unicode: 

1022 return self.visit_NVARCHAR2(type_, **kw) 

1023 else: 

1024 return self.visit_VARCHAR2(type_, **kw) 

1025 

1026 def visit_INTERVAL(self, type_, **kw): 

1027 return "INTERVAL DAY%s TO SECOND%s" % ( 

1028 type_.day_precision is not None 

1029 and "(%d)" % type_.day_precision 

1030 or "", 

1031 type_.second_precision is not None 

1032 and "(%d)" % type_.second_precision 

1033 or "", 

1034 ) 

1035 

1036 def visit_LONG(self, type_, **kw): 

1037 return "LONG" 

1038 

1039 def visit_TIMESTAMP(self, type_, **kw): 

1040 if getattr(type_, "local_timezone", False): 

1041 return "TIMESTAMP WITH LOCAL TIME ZONE" 

1042 elif type_.timezone: 

1043 return "TIMESTAMP WITH TIME ZONE" 

1044 else: 

1045 return "TIMESTAMP" 

1046 

1047 def visit_DOUBLE_PRECISION(self, type_, **kw): 

1048 return self._generate_numeric(type_, "DOUBLE PRECISION", **kw) 

1049 

1050 def visit_BINARY_DOUBLE(self, type_, **kw): 

1051 return self._generate_numeric(type_, "BINARY_DOUBLE", **kw) 

1052 

1053 def visit_BINARY_FLOAT(self, type_, **kw): 

1054 return self._generate_numeric(type_, "BINARY_FLOAT", **kw) 

1055 

1056 def visit_FLOAT(self, type_, **kw): 

1057 kw["_requires_binary_precision"] = True 

1058 return self._generate_numeric(type_, "FLOAT", **kw) 

1059 

1060 def visit_NUMBER(self, type_, **kw): 

1061 return self._generate_numeric(type_, "NUMBER", **kw) 

1062 

1063 def _generate_numeric( 

1064 self, 

1065 type_, 

1066 name, 

1067 precision=None, 

1068 scale=None, 

1069 _requires_binary_precision=False, 

1070 **kw, 

1071 ): 

1072 if precision is None: 

1073 precision = getattr(type_, "precision", None) 

1074 

1075 if _requires_binary_precision: 

1076 binary_precision = getattr(type_, "binary_precision", None) 

1077 

1078 if precision and binary_precision is None: 

1079 # https://www.oracletutorial.com/oracle-basics/oracle-float/ 

1080 estimated_binary_precision = int(precision / 0.30103) 

1081 raise exc.ArgumentError( 

1082 "Oracle Database FLOAT types use 'binary precision', " 

1083 "which does not convert cleanly from decimal " 

1084 "'precision'. Please specify " 

1085 "this type with a separate Oracle Database variant, such " 

1086 f"as {type_.__class__.__name__}(precision={precision})." 

1087 f"with_variant(oracle.FLOAT" 

1088 f"(binary_precision=" 

1089 f"{estimated_binary_precision}), 'oracle'), so that the " 

1090 "Oracle Database specific 'binary_precision' may be " 

1091 "specified accurately." 

1092 ) 

1093 else: 

1094 precision = binary_precision 

1095 

1096 if scale is None: 

1097 scale = getattr(type_, "scale", None) 

1098 

1099 if precision is None: 

1100 return name 

1101 elif scale is None: 

1102 n = "%(name)s(%(precision)s)" 

1103 return n % {"name": name, "precision": precision} 

1104 else: 

1105 n = "%(name)s(%(precision)s, %(scale)s)" 

1106 return n % {"name": name, "precision": precision, "scale": scale} 

1107 

1108 def visit_string(self, type_, **kw): 

1109 return self.visit_VARCHAR2(type_, **kw) 

1110 

1111 def visit_VARCHAR2(self, type_, **kw): 

1112 return self._visit_varchar(type_, "", "2") 

1113 

1114 def visit_NVARCHAR2(self, type_, **kw): 

1115 return self._visit_varchar(type_, "N", "2") 

1116 

1117 visit_NVARCHAR = visit_NVARCHAR2 

1118 

1119 def visit_VARCHAR(self, type_, **kw): 

1120 return self._visit_varchar(type_, "", "") 

1121 

1122 def _visit_varchar(self, type_, n, num): 

1123 if not type_.length: 

1124 return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n} 

1125 elif not n and self.dialect._supports_char_length: 

1126 varchar = "VARCHAR%(two)s(%(length)s CHAR)" 

1127 return varchar % {"length": type_.length, "two": num} 

1128 else: 

1129 varchar = "%(n)sVARCHAR%(two)s(%(length)s)" 

1130 return varchar % {"length": type_.length, "two": num, "n": n} 

1131 

1132 def visit_text(self, type_, **kw): 

1133 return self.visit_CLOB(type_, **kw) 

1134 

1135 def visit_unicode_text(self, type_, **kw): 

1136 if self.dialect._use_nchar_for_unicode: 

1137 return self.visit_NCLOB(type_, **kw) 

1138 else: 

1139 return self.visit_CLOB(type_, **kw) 

1140 

1141 def visit_large_binary(self, type_, **kw): 

1142 return self.visit_BLOB(type_, **kw) 

1143 

1144 def visit_big_integer(self, type_, **kw): 

1145 return self.visit_NUMBER(type_, precision=19, **kw) 

1146 

1147 def visit_boolean(self, type_, **kw): 

1148 return self.visit_SMALLINT(type_, **kw) 

1149 

1150 def visit_RAW(self, type_, **kw): 

1151 if type_.length: 

1152 return "RAW(%(length)s)" % {"length": type_.length} 

1153 else: 

1154 return "RAW" 

1155 

1156 def visit_ROWID(self, type_, **kw): 

1157 return "ROWID" 

1158 

1159 def visit_VECTOR(self, type_, **kw): 

1160 if type_.dim is None and type_.storage_format is None: 

1161 return "VECTOR(*,*)" 

1162 elif type_.storage_format is None: 

1163 return f"VECTOR({type_.dim},*)" 

1164 elif type_.dim is None: 

1165 return f"VECTOR(*,{type_.storage_format.value})" 

1166 else: 

1167 return f"VECTOR({type_.dim},{type_.storage_format.value})" 

1168 

1169 

1170class OracleCompiler(compiler.SQLCompiler): 

1171 """Oracle compiler modifies the lexical structure of Select 

1172 statements to work under non-ANSI configured Oracle databases, if 

1173 the use_ansi flag is False. 

1174 """ 

1175 

1176 compound_keywords = util.update_copy( 

1177 compiler.SQLCompiler.compound_keywords, 

1178 {expression.CompoundSelect.EXCEPT: "MINUS"}, 

1179 ) 

1180 

1181 def __init__(self, *args, **kwargs): 

1182 self.__wheres = {} 

1183 super().__init__(*args, **kwargs) 

1184 

1185 def visit_mod_binary(self, binary, operator, **kw): 

1186 return "mod(%s, %s)" % ( 

1187 self.process(binary.left, **kw), 

1188 self.process(binary.right, **kw), 

1189 ) 

1190 

1191 def visit_now_func(self, fn, **kw): 

1192 return "CURRENT_TIMESTAMP" 

1193 

1194 def visit_char_length_func(self, fn, **kw): 

1195 return "LENGTH" + self.function_argspec(fn, **kw) 

1196 

1197 def visit_match_op_binary(self, binary, operator, **kw): 

1198 return "CONTAINS (%s, %s)" % ( 

1199 self.process(binary.left), 

1200 self.process(binary.right), 

1201 ) 

1202 

1203 def visit_true(self, expr, **kw): 

1204 return "1" 

1205 

1206 def visit_false(self, expr, **kw): 

1207 return "0" 

1208 

1209 def get_cte_preamble(self, recursive): 

1210 return "WITH" 

1211 

1212 def get_select_hint_text(self, byfroms): 

1213 return " ".join("/*+ %s */" % text for table, text in byfroms.items()) 

1214 

1215 def function_argspec(self, fn, **kw): 

1216 if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS: 

1217 return compiler.SQLCompiler.function_argspec(self, fn, **kw) 

1218 else: 

1219 return "" 

1220 

1221 def visit_function(self, func, **kw): 

1222 text = super().visit_function(func, **kw) 

1223 if kw.get("asfrom", False) and func.name.lower() != "table": 

1224 text = "TABLE (%s)" % text 

1225 return text 

1226 

1227 def visit_table_valued_column(self, element, **kw): 

1228 text = super().visit_table_valued_column(element, **kw) 

1229 text = text + ".COLUMN_VALUE" 

1230 return text 

1231 

1232 def default_from(self): 

1233 """Called when a ``SELECT`` statement has no froms, 

1234 and no ``FROM`` clause is to be appended. 

1235 

1236 The Oracle compiler tacks a "FROM DUAL" to the statement. 

1237 """ 

1238 

1239 return " FROM DUAL" 

1240 

1241 def visit_join(self, join, from_linter=None, **kwargs): 

1242 if self.dialect.use_ansi: 

1243 return compiler.SQLCompiler.visit_join( 

1244 self, join, from_linter=from_linter, **kwargs 

1245 ) 

1246 else: 

1247 if from_linter: 

1248 from_linter.edges.add((join.left, join.right)) 

1249 

1250 kwargs["asfrom"] = True 

1251 if isinstance(join.right, expression.FromGrouping): 

1252 right = join.right.element 

1253 else: 

1254 right = join.right 

1255 return ( 

1256 self.process(join.left, from_linter=from_linter, **kwargs) 

1257 + ", " 

1258 + self.process(right, from_linter=from_linter, **kwargs) 

1259 ) 

1260 

1261 def _get_nonansi_join_whereclause(self, froms): 

1262 clauses = [] 

1263 

1264 def visit_join(join): 

1265 if join.isouter: 

1266 # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354 

1267 # "apply the outer join operator (+) to all columns of B in 

1268 # the join condition in the WHERE clause" - that is, 

1269 # unconditionally regardless of operator or the other side 

1270 def visit_binary(binary): 

1271 if isinstance( 

1272 binary.left, expression.ColumnClause 

1273 ) and join.right.is_derived_from(binary.left.table): 

1274 binary.left = _OuterJoinColumn(binary.left) 

1275 elif isinstance( 

1276 binary.right, expression.ColumnClause 

1277 ) and join.right.is_derived_from(binary.right.table): 

1278 binary.right = _OuterJoinColumn(binary.right) 

1279 

1280 clauses.append( 

1281 visitors.cloned_traverse( 

1282 join.onclause, {}, {"binary": visit_binary} 

1283 ) 

1284 ) 

1285 else: 

1286 clauses.append(join.onclause) 

1287 

1288 for j in join.left, join.right: 

1289 if isinstance(j, expression.Join): 

1290 visit_join(j) 

1291 elif isinstance(j, expression.FromGrouping): 

1292 visit_join(j.element) 

1293 

1294 for f in froms: 

1295 if isinstance(f, expression.Join): 

1296 visit_join(f) 

1297 

1298 if not clauses: 

1299 return None 

1300 else: 

1301 return sql.and_(*clauses) 

1302 

1303 def visit_outer_join_column(self, vc, **kw): 

1304 return self.process(vc.column, **kw) + "(+)" 

1305 

1306 def visit_sequence(self, seq, **kw): 

1307 return self.preparer.format_sequence(seq) + ".nextval" 

1308 

1309 def get_render_as_alias_suffix(self, alias_name_text): 

1310 """Oracle doesn't like ``FROM table AS alias``""" 

1311 

1312 return " " + alias_name_text 

1313 

1314 def returning_clause( 

1315 self, stmt, returning_cols, *, populate_result_map, **kw 

1316 ): 

1317 columns = [] 

1318 binds = [] 

1319 

1320 for i, column in enumerate( 

1321 expression._select_iterables(returning_cols) 

1322 ): 

1323 if ( 

1324 self.isupdate 

1325 and isinstance(column, sa_schema.Column) 

1326 and isinstance(column.server_default, Computed) 

1327 and not self.dialect._supports_update_returning_computed_cols 

1328 ): 

1329 util.warn( 

1330 "Computed columns don't work with Oracle Database UPDATE " 

1331 "statements that use RETURNING; the value of the column " 

1332 "*before* the UPDATE takes place is returned. It is " 

1333 "advised to not use RETURNING with an Oracle Database " 

1334 "computed column. Consider setting implicit_returning " 

1335 "to False on the Table object in order to avoid implicit " 

1336 "RETURNING clauses from being generated for this Table." 

1337 ) 

1338 if column.type._has_column_expression: 

1339 col_expr = column.type.column_expression(column) 

1340 else: 

1341 col_expr = column 

1342 

1343 outparam = sql.outparam("ret_%d" % i, type_=column.type) 

1344 self.binds[outparam.key] = outparam 

1345 binds.append( 

1346 self.bindparam_string(self._truncate_bindparam(outparam)) 

1347 ) 

1348 

1349 # has_out_parameters would in a normal case be set to True 

1350 # as a result of the compiler visiting an outparam() object. 

1351 # in this case, the above outparam() objects are not being 

1352 # visited. Ensure the statement itself didn't have other 

1353 # outparam() objects independently. 

1354 # technically, this could be supported, but as it would be 

1355 # a very strange use case without a clear rationale, disallow it 

1356 if self.has_out_parameters: 

1357 raise exc.InvalidRequestError( 

1358 "Using explicit outparam() objects with " 

1359 "UpdateBase.returning() in the same Core DML statement " 

1360 "is not supported in the Oracle Database dialects." 

1361 ) 

1362 

1363 self._oracle_returning = True 

1364 

1365 columns.append(self.process(col_expr, within_columns_clause=False)) 

1366 if populate_result_map: 

1367 self._add_to_result_map( 

1368 getattr(col_expr, "name", col_expr._anon_name_label), 

1369 getattr(col_expr, "name", col_expr._anon_name_label), 

1370 ( 

1371 column, 

1372 getattr(column, "name", None), 

1373 getattr(column, "key", None), 

1374 ), 

1375 column.type, 

1376 ) 

1377 

1378 return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds) 

1379 

1380 def _row_limit_clause(self, select, **kw): 

1381 """Oracle Database 12c supports OFFSET/FETCH operators 

1382 Use it instead subquery with row_number 

1383 

1384 """ 

1385 

1386 if ( 

1387 select._fetch_clause is not None 

1388 or not self.dialect._supports_offset_fetch 

1389 ): 

1390 return super()._row_limit_clause( 

1391 select, use_literal_execute_for_simple_int=True, **kw 

1392 ) 

1393 else: 

1394 return self.fetch_clause( 

1395 select, 

1396 fetch_clause=self._get_limit_or_fetch(select), 

1397 use_literal_execute_for_simple_int=True, 

1398 **kw, 

1399 ) 

1400 

1401 def _get_limit_or_fetch(self, select): 

1402 if select._fetch_clause is None: 

1403 return select._limit_clause 

1404 else: 

1405 return select._fetch_clause 

1406 

1407 def fetch_clause( 

1408 self, 

1409 select, 

1410 fetch_clause=None, 

1411 require_offset=False, 

1412 use_literal_execute_for_simple_int=False, 

1413 **kw, 

1414 ): 

1415 text = super().fetch_clause( 

1416 select, 

1417 fetch_clause=fetch_clause, 

1418 require_offset=require_offset, 

1419 use_literal_execute_for_simple_int=( 

1420 use_literal_execute_for_simple_int 

1421 ), 

1422 **kw, 

1423 ) 

1424 

1425 if select.dialect_options["oracle"]["fetch_approximate"]: 

1426 text = re.sub("FETCH FIRST", "FETCH APPROX FIRST", text) 

1427 

1428 return text 

1429 

1430 def translate_select_structure(self, select_stmt, **kwargs): 

1431 select = select_stmt 

1432 

1433 if not getattr(select, "_oracle_visit", None): 

1434 if not self.dialect.use_ansi: 

1435 froms = self._display_froms_for_select( 

1436 select, kwargs.get("asfrom", False) 

1437 ) 

1438 whereclause = self._get_nonansi_join_whereclause(froms) 

1439 if whereclause is not None: 

1440 select = select.where(whereclause) 

1441 select._oracle_visit = True 

1442 

1443 # if fetch is used this is not needed 

1444 if ( 

1445 select._has_row_limiting_clause 

1446 and not self.dialect._supports_offset_fetch 

1447 and select._fetch_clause is None 

1448 ): 

1449 limit_clause = select._limit_clause 

1450 offset_clause = select._offset_clause 

1451 

1452 if select._simple_int_clause(limit_clause): 

1453 limit_clause = limit_clause.render_literal_execute() 

1454 

1455 if select._simple_int_clause(offset_clause): 

1456 offset_clause = offset_clause.render_literal_execute() 

1457 

1458 # currently using form at: 

1459 # https://blogs.oracle.com/oraclemagazine/\ 

1460 # on-rownum-and-limiting-results 

1461 

1462 orig_select = select 

1463 select = select._generate() 

1464 select._oracle_visit = True 

1465 

1466 # add expressions to accommodate FOR UPDATE OF 

1467 for_update = select._for_update_arg 

1468 if for_update is not None and for_update.of: 

1469 for_update = for_update._clone() 

1470 for_update._copy_internals() 

1471 

1472 for elem in for_update.of: 

1473 if not select.selected_columns.contains_column(elem): 

1474 select = select.add_columns(elem) 

1475 

1476 # Wrap the middle select and add the hint 

1477 inner_subquery = select.alias() 

1478 limitselect = sql.select( 

1479 *[ 

1480 c 

1481 for c in inner_subquery.c 

1482 if orig_select.selected_columns.corresponding_column(c) 

1483 is not None 

1484 ] 

1485 ) 

1486 

1487 if ( 

1488 limit_clause is not None 

1489 and self.dialect.optimize_limits 

1490 and select._simple_int_clause(limit_clause) 

1491 ): 

1492 limitselect = limitselect.prefix_with( 

1493 expression.text( 

1494 "/*+ FIRST_ROWS(%s) */" 

1495 % self.process(limit_clause, **kwargs) 

1496 ) 

1497 ) 

1498 

1499 limitselect._oracle_visit = True 

1500 limitselect._is_wrapper = True 

1501 

1502 # add expressions to accommodate FOR UPDATE OF 

1503 if for_update is not None and for_update.of: 

1504 adapter = sql_util.ClauseAdapter(inner_subquery) 

1505 for_update.of = [ 

1506 adapter.traverse(elem) for elem in for_update.of 

1507 ] 

1508 

1509 # If needed, add the limiting clause 

1510 if limit_clause is not None: 

1511 if select._simple_int_clause(limit_clause) and ( 

1512 offset_clause is None 

1513 or select._simple_int_clause(offset_clause) 

1514 ): 

1515 max_row = limit_clause 

1516 

1517 if offset_clause is not None: 

1518 max_row = max_row + offset_clause 

1519 

1520 else: 

1521 max_row = limit_clause 

1522 

1523 if offset_clause is not None: 

1524 max_row = max_row + offset_clause 

1525 limitselect = limitselect.where( 

1526 sql.literal_column("ROWNUM") <= max_row 

1527 ) 

1528 

1529 # If needed, add the ora_rn, and wrap again with offset. 

1530 if offset_clause is None: 

1531 limitselect._for_update_arg = for_update 

1532 select = limitselect 

1533 else: 

1534 limitselect = limitselect.add_columns( 

1535 sql.literal_column("ROWNUM").label("ora_rn") 

1536 ) 

1537 limitselect._oracle_visit = True 

1538 limitselect._is_wrapper = True 

1539 

1540 if for_update is not None and for_update.of: 

1541 limitselect_cols = limitselect.selected_columns 

1542 for elem in for_update.of: 

1543 if ( 

1544 limitselect_cols.corresponding_column(elem) 

1545 is None 

1546 ): 

1547 limitselect = limitselect.add_columns(elem) 

1548 

1549 limit_subquery = limitselect.alias() 

1550 origselect_cols = orig_select.selected_columns 

1551 offsetselect = sql.select( 

1552 *[ 

1553 c 

1554 for c in limit_subquery.c 

1555 if origselect_cols.corresponding_column(c) 

1556 is not None 

1557 ] 

1558 ) 

1559 

1560 offsetselect._oracle_visit = True 

1561 offsetselect._is_wrapper = True 

1562 

1563 if for_update is not None and for_update.of: 

1564 adapter = sql_util.ClauseAdapter(limit_subquery) 

1565 for_update.of = [ 

1566 adapter.traverse(elem) for elem in for_update.of 

1567 ] 

1568 

1569 offsetselect = offsetselect.where( 

1570 sql.literal_column("ora_rn") > offset_clause 

1571 ) 

1572 

1573 offsetselect._for_update_arg = for_update 

1574 select = offsetselect 

1575 

1576 return select 

1577 

1578 def limit_clause(self, select, **kw): 

1579 return "" 

1580 

1581 def visit_empty_set_expr(self, type_, **kw): 

1582 return "SELECT 1 FROM DUAL WHERE 1!=1" 

1583 

1584 def for_update_clause(self, select, **kw): 

1585 if self.is_subquery(): 

1586 return "" 

1587 

1588 tmp = " FOR UPDATE" 

1589 

1590 if select._for_update_arg.of: 

1591 tmp += " OF " + ", ".join( 

1592 self.process(elem, **kw) for elem in select._for_update_arg.of 

1593 ) 

1594 

1595 if select._for_update_arg.nowait: 

1596 tmp += " NOWAIT" 

1597 if select._for_update_arg.skip_locked: 

1598 tmp += " SKIP LOCKED" 

1599 

1600 return tmp 

1601 

1602 def visit_is_distinct_from_binary(self, binary, operator, **kw): 

1603 return "DECODE(%s, %s, 0, 1) = 1" % ( 

1604 self.process(binary.left), 

1605 self.process(binary.right), 

1606 ) 

1607 

1608 def visit_is_not_distinct_from_binary(self, binary, operator, **kw): 

1609 return "DECODE(%s, %s, 0, 1) = 0" % ( 

1610 self.process(binary.left), 

1611 self.process(binary.right), 

1612 ) 

1613 

1614 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1615 string = self.process(binary.left, **kw) 

1616 pattern = self.process(binary.right, **kw) 

1617 flags = binary.modifiers["flags"] 

1618 if flags is None: 

1619 return "REGEXP_LIKE(%s, %s)" % (string, pattern) 

1620 else: 

1621 return "REGEXP_LIKE(%s, %s, %s)" % ( 

1622 string, 

1623 pattern, 

1624 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

1625 ) 

1626 

1627 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1628 return "NOT %s" % self.visit_regexp_match_op_binary( 

1629 binary, operator, **kw 

1630 ) 

1631 

1632 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

1633 string = self.process(binary.left, **kw) 

1634 pattern_replace = self.process(binary.right, **kw) 

1635 flags = binary.modifiers["flags"] 

1636 if flags is None: 

1637 return "REGEXP_REPLACE(%s, %s)" % ( 

1638 string, 

1639 pattern_replace, 

1640 ) 

1641 else: 

1642 return "REGEXP_REPLACE(%s, %s, %s)" % ( 

1643 string, 

1644 pattern_replace, 

1645 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

1646 ) 

1647 

1648 def visit_aggregate_strings_func(self, fn, **kw): 

1649 return "LISTAGG%s" % self.function_argspec(fn, **kw) 

1650 

1651 def _visit_bitwise(self, binary, fn_name, custom_right=None, **kw): 

1652 left = self.process(binary.left, **kw) 

1653 right = self.process( 

1654 custom_right if custom_right is not None else binary.right, **kw 

1655 ) 

1656 return f"{fn_name}({left}, {right})" 

1657 

1658 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1659 return self._visit_bitwise(binary, "BITXOR", **kw) 

1660 

1661 def visit_bitwise_or_op_binary(self, binary, operator, **kw): 

1662 return self._visit_bitwise(binary, "BITOR", **kw) 

1663 

1664 def visit_bitwise_and_op_binary(self, binary, operator, **kw): 

1665 return self._visit_bitwise(binary, "BITAND", **kw) 

1666 

1667 def visit_bitwise_rshift_op_binary(self, binary, operator, **kw): 

1668 raise exc.CompileError("Cannot compile bitwise_rshift in oracle") 

1669 

1670 def visit_bitwise_lshift_op_binary(self, binary, operator, **kw): 

1671 raise exc.CompileError("Cannot compile bitwise_lshift in oracle") 

1672 

1673 def visit_bitwise_not_op_unary_operator(self, element, operator, **kw): 

1674 raise exc.CompileError("Cannot compile bitwise_not in oracle") 

1675 

1676 

1677class OracleDDLCompiler(compiler.DDLCompiler): 

1678 

1679 def _build_vector_index_config( 

1680 self, vector_index_config: VectorIndexConfig 

1681 ) -> str: 

1682 parts = [] 

1683 sql_param_name = { 

1684 "hnsw_neighbors": "neighbors", 

1685 "hnsw_efconstruction": "efconstruction", 

1686 "ivf_neighbor_partitions": "neighbor partitions", 

1687 "ivf_sample_per_partition": "sample_per_partition", 

1688 "ivf_min_vectors_per_partition": "min_vectors_per_partition", 

1689 } 

1690 if vector_index_config.index_type == VectorIndexType.HNSW: 

1691 parts.append("ORGANIZATION INMEMORY NEIGHBOR GRAPH") 

1692 elif vector_index_config.index_type == VectorIndexType.IVF: 

1693 parts.append("ORGANIZATION NEIGHBOR PARTITIONS") 

1694 if vector_index_config.distance is not None: 

1695 parts.append(f"DISTANCE {vector_index_config.distance.value}") 

1696 

1697 if vector_index_config.accuracy is not None: 

1698 parts.append( 

1699 f"WITH TARGET ACCURACY {vector_index_config.accuracy}" 

1700 ) 

1701 

1702 parameters_str = [f"type {vector_index_config.index_type.name}"] 

1703 prefix = vector_index_config.index_type.name.lower() + "_" 

1704 

1705 for field in fields(vector_index_config): 

1706 if field.name.startswith(prefix): 

1707 key = sql_param_name.get(field.name) 

1708 value = getattr(vector_index_config, field.name) 

1709 if value is not None: 

1710 parameters_str.append(f"{key} {value}") 

1711 

1712 parameters_str = ", ".join(parameters_str) 

1713 parts.append(f"PARAMETERS ({parameters_str})") 

1714 

1715 if vector_index_config.parallel is not None: 

1716 parts.append(f"PARALLEL {vector_index_config.parallel}") 

1717 

1718 return " ".join(parts) 

1719 

1720 def define_constraint_cascades(self, constraint): 

1721 text = "" 

1722 if constraint.ondelete is not None: 

1723 text += " ON DELETE %s" % constraint.ondelete 

1724 

1725 # oracle has no ON UPDATE CASCADE - 

1726 # its only available via triggers 

1727 # https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html 

1728 if constraint.onupdate is not None: 

1729 util.warn( 

1730 "Oracle Database does not contain native UPDATE CASCADE " 

1731 "functionality - onupdates will not be rendered for foreign " 

1732 "keys. Consider using deferrable=True, initially='deferred' " 

1733 "or triggers." 

1734 ) 

1735 

1736 return text 

1737 

1738 def visit_drop_table_comment(self, drop, **kw): 

1739 return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table( 

1740 drop.element 

1741 ) 

1742 

1743 def visit_create_index(self, create, **kw): 

1744 index = create.element 

1745 self._verify_index_table(index) 

1746 preparer = self.preparer 

1747 text = "CREATE " 

1748 if index.unique: 

1749 text += "UNIQUE " 

1750 if index.dialect_options["oracle"]["bitmap"]: 

1751 text += "BITMAP " 

1752 vector_options = index.dialect_options["oracle"]["vector"] 

1753 if vector_options: 

1754 text += "VECTOR " 

1755 text += "INDEX %s ON %s (%s)" % ( 

1756 self._prepared_index_name(index, include_schema=True), 

1757 preparer.format_table(index.table, use_schema=True), 

1758 ", ".join( 

1759 self.sql_compiler.process( 

1760 expr, include_table=False, literal_binds=True 

1761 ) 

1762 for expr in index.expressions 

1763 ), 

1764 ) 

1765 if index.dialect_options["oracle"]["compress"] is not False: 

1766 if index.dialect_options["oracle"]["compress"] is True: 

1767 text += " COMPRESS" 

1768 else: 

1769 text += " COMPRESS %d" % ( 

1770 index.dialect_options["oracle"]["compress"] 

1771 ) 

1772 if vector_options: 

1773 if vector_options is True: 

1774 vector_options = VectorIndexConfig() 

1775 

1776 text += " " + self._build_vector_index_config(vector_options) 

1777 return text 

1778 

1779 def post_create_table(self, table): 

1780 table_opts = [] 

1781 opts = table.dialect_options["oracle"] 

1782 

1783 if opts["on_commit"]: 

1784 on_commit_options = opts["on_commit"].replace("_", " ").upper() 

1785 table_opts.append("\n ON COMMIT %s" % on_commit_options) 

1786 

1787 if opts["compress"]: 

1788 if opts["compress"] is True: 

1789 table_opts.append("\n COMPRESS") 

1790 else: 

1791 table_opts.append("\n COMPRESS FOR %s" % (opts["compress"])) 

1792 if opts["tablespace"]: 

1793 table_opts.append( 

1794 "\n TABLESPACE %s" % self.preparer.quote(opts["tablespace"]) 

1795 ) 

1796 return "".join(table_opts) 

1797 

1798 def get_identity_options(self, identity_options): 

1799 text = super().get_identity_options(identity_options) 

1800 text = text.replace("NO MINVALUE", "NOMINVALUE") 

1801 text = text.replace("NO MAXVALUE", "NOMAXVALUE") 

1802 text = text.replace("NO CYCLE", "NOCYCLE") 

1803 if identity_options.order is not None: 

1804 text += " ORDER" if identity_options.order else " NOORDER" 

1805 return text.strip() 

1806 

1807 def visit_computed_column(self, generated, **kw): 

1808 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( 

1809 generated.sqltext, include_table=False, literal_binds=True 

1810 ) 

1811 if generated.persisted is True: 

1812 raise exc.CompileError( 

1813 "Oracle Database computed columns do not support 'stored' " 

1814 "persistence; set the 'persisted' flag to None or False for " 

1815 "Oracle Database support." 

1816 ) 

1817 elif generated.persisted is False: 

1818 text += " VIRTUAL" 

1819 return text 

1820 

1821 def visit_identity_column(self, identity, **kw): 

1822 if identity.always is None: 

1823 kind = "" 

1824 else: 

1825 kind = "ALWAYS" if identity.always else "BY DEFAULT" 

1826 text = "GENERATED %s" % kind 

1827 if identity.on_null: 

1828 text += " ON NULL" 

1829 text += " AS IDENTITY" 

1830 options = self.get_identity_options(identity) 

1831 if options: 

1832 text += " (%s)" % options 

1833 return text 

1834 

1835 

1836class OracleIdentifierPreparer(compiler.IdentifierPreparer): 

1837 reserved_words = {x.lower() for x in RESERVED_WORDS} 

1838 illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union( 

1839 ["_", "$"] 

1840 ) 

1841 

1842 def _bindparam_requires_quotes(self, value): 

1843 """Return True if the given identifier requires quoting.""" 

1844 lc_value = value.lower() 

1845 return ( 

1846 lc_value in self.reserved_words 

1847 or value[0] in self.illegal_initial_characters 

1848 or not self.legal_characters.match(str(value)) 

1849 ) 

1850 

1851 def format_savepoint(self, savepoint): 

1852 name = savepoint.ident.lstrip("_") 

1853 return super().format_savepoint(savepoint, name) 

1854 

1855 

1856class OracleExecutionContext(default.DefaultExecutionContext): 

1857 def fire_sequence(self, seq, type_): 

1858 return self._execute_scalar( 

1859 "SELECT " 

1860 + self.identifier_preparer.format_sequence(seq) 

1861 + ".nextval FROM DUAL", 

1862 type_, 

1863 ) 

1864 

1865 def pre_exec(self): 

1866 if self.statement and "_oracle_dblink" in self.execution_options: 

1867 self.statement = self.statement.replace( 

1868 dictionary.DB_LINK_PLACEHOLDER, 

1869 self.execution_options["_oracle_dblink"], 

1870 ) 

1871 

1872 

1873class OracleDialect(default.DefaultDialect): 

1874 name = "oracle" 

1875 supports_statement_cache = True 

1876 supports_alter = True 

1877 max_identifier_length = 128 

1878 

1879 _supports_offset_fetch = True 

1880 

1881 insert_returning = True 

1882 update_returning = True 

1883 delete_returning = True 

1884 

1885 div_is_floordiv = False 

1886 

1887 supports_simple_order_by_label = False 

1888 cte_follows_insert = True 

1889 returns_native_bytes = True 

1890 

1891 supports_sequences = True 

1892 sequences_optional = False 

1893 postfetch_lastrowid = False 

1894 

1895 default_paramstyle = "named" 

1896 colspecs = colspecs 

1897 ischema_names = ischema_names 

1898 requires_name_normalize = True 

1899 

1900 supports_comments = True 

1901 

1902 supports_default_values = False 

1903 supports_default_metavalue = True 

1904 supports_empty_insert = False 

1905 supports_identity_columns = True 

1906 

1907 statement_compiler = OracleCompiler 

1908 ddl_compiler = OracleDDLCompiler 

1909 type_compiler_cls = OracleTypeCompiler 

1910 preparer = OracleIdentifierPreparer 

1911 execution_ctx_cls = OracleExecutionContext 

1912 

1913 reflection_options = ("oracle_resolve_synonyms",) 

1914 

1915 _use_nchar_for_unicode = False 

1916 

1917 construct_arguments = [ 

1918 ( 

1919 sa_schema.Table, 

1920 { 

1921 "resolve_synonyms": False, 

1922 "on_commit": None, 

1923 "compress": False, 

1924 "tablespace": None, 

1925 }, 

1926 ), 

1927 ( 

1928 sa_schema.Index, 

1929 { 

1930 "bitmap": False, 

1931 "compress": False, 

1932 "vector": False, 

1933 }, 

1934 ), 

1935 (sa_selectable.Select, {"fetch_approximate": False}), 

1936 (sa_selectable.CompoundSelect, {"fetch_approximate": False}), 

1937 ] 

1938 

1939 @util.deprecated_params( 

1940 use_binds_for_limits=( 

1941 "1.4", 

1942 "The ``use_binds_for_limits`` Oracle Database dialect parameter " 

1943 "is deprecated. The dialect now renders LIMIT / OFFSET integers " 

1944 "inline in all cases using a post-compilation hook, so that the " 

1945 "value is still represented by a 'bound parameter' on the Core " 

1946 "Expression side.", 

1947 ) 

1948 ) 

1949 def __init__( 

1950 self, 

1951 use_ansi=True, 

1952 optimize_limits=False, 

1953 use_binds_for_limits=None, 

1954 use_nchar_for_unicode=False, 

1955 exclude_tablespaces=("SYSTEM", "SYSAUX"), 

1956 enable_offset_fetch=True, 

1957 **kwargs, 

1958 ): 

1959 default.DefaultDialect.__init__(self, **kwargs) 

1960 self._use_nchar_for_unicode = use_nchar_for_unicode 

1961 self.use_ansi = use_ansi 

1962 self.optimize_limits = optimize_limits 

1963 self.exclude_tablespaces = exclude_tablespaces 

1964 self.enable_offset_fetch = self._supports_offset_fetch = ( 

1965 enable_offset_fetch 

1966 ) 

1967 

1968 def initialize(self, connection): 

1969 super().initialize(connection) 

1970 

1971 # Oracle 8i has RETURNING: 

1972 # https://docs.oracle.com/cd/A87860_01/doc/index.htm 

1973 

1974 # so does Oracle8: 

1975 # https://docs.oracle.com/cd/A64702_01/doc/index.htm 

1976 

1977 if self._is_oracle_8: 

1978 self.colspecs = self.colspecs.copy() 

1979 self.colspecs.pop(sqltypes.Interval) 

1980 self.use_ansi = False 

1981 

1982 self.supports_identity_columns = self.server_version_info >= (12,) 

1983 self._supports_offset_fetch = ( 

1984 self.enable_offset_fetch and self.server_version_info >= (12,) 

1985 ) 

1986 

1987 def _get_effective_compat_server_version_info(self, connection): 

1988 # dialect does not need compat levels below 12.2, so don't query 

1989 # in those cases 

1990 

1991 if self.server_version_info < (12, 2): 

1992 return self.server_version_info 

1993 try: 

1994 compat = connection.exec_driver_sql( 

1995 "SELECT value FROM v$parameter WHERE name = 'compatible'" 

1996 ).scalar() 

1997 except exc.DBAPIError: 

1998 compat = None 

1999 

2000 if compat: 

2001 try: 

2002 return tuple(int(x) for x in compat.split(".")) 

2003 except: 

2004 return self.server_version_info 

2005 else: 

2006 return self.server_version_info 

2007 

2008 @property 

2009 def _is_oracle_8(self): 

2010 return self.server_version_info and self.server_version_info < (9,) 

2011 

2012 @property 

2013 def _supports_table_compression(self): 

2014 return self.server_version_info and self.server_version_info >= (10, 1) 

2015 

2016 @property 

2017 def _supports_table_compress_for(self): 

2018 return self.server_version_info and self.server_version_info >= (11,) 

2019 

2020 @property 

2021 def _supports_char_length(self): 

2022 return not self._is_oracle_8 

2023 

2024 @property 

2025 def _supports_update_returning_computed_cols(self): 

2026 # on version 18 this error is no longet present while it happens on 11 

2027 # it may work also on versions before the 18 

2028 return self.server_version_info and self.server_version_info >= (18,) 

2029 

2030 @property 

2031 def _supports_except_all(self): 

2032 return self.server_version_info and self.server_version_info >= (21,) 

2033 

2034 def do_release_savepoint(self, connection, name): 

2035 # Oracle does not support RELEASE SAVEPOINT 

2036 pass 

2037 

2038 def _check_max_identifier_length(self, connection): 

2039 if self._get_effective_compat_server_version_info(connection) < ( 

2040 12, 

2041 2, 

2042 ): 

2043 return 30 

2044 else: 

2045 # use the default 

2046 return None 

2047 

2048 def get_isolation_level_values(self, dbapi_connection): 

2049 return ["READ COMMITTED", "SERIALIZABLE"] 

2050 

2051 def get_default_isolation_level(self, dbapi_conn): 

2052 try: 

2053 return self.get_isolation_level(dbapi_conn) 

2054 except NotImplementedError: 

2055 raise 

2056 except: 

2057 return "READ COMMITTED" 

2058 

2059 def _execute_reflection( 

2060 self, connection, query, dblink, returns_long, params=None 

2061 ): 

2062 if dblink and not dblink.startswith("@"): 

2063 dblink = f"@{dblink}" 

2064 execution_options = { 

2065 # handle db links 

2066 "_oracle_dblink": dblink or "", 

2067 # override any schema translate map 

2068 "schema_translate_map": None, 

2069 } 

2070 

2071 if dblink and returns_long: 

2072 # Oracle seems to error with 

2073 # "ORA-00997: illegal use of LONG datatype" when returning 

2074 # LONG columns via a dblink in a query with bind params 

2075 # This type seems to be very hard to cast into something else 

2076 # so it seems easier to just use bind param in this case 

2077 def visit_bindparam(bindparam): 

2078 bindparam.literal_execute = True 

2079 

2080 query = visitors.cloned_traverse( 

2081 query, {}, {"bindparam": visit_bindparam} 

2082 ) 

2083 return connection.execute( 

2084 query, params, execution_options=execution_options 

2085 ) 

2086 

2087 @util.memoized_property 

2088 def _has_table_query(self): 

2089 # materialized views are returned by all_tables 

2090 tables = ( 

2091 select( 

2092 dictionary.all_tables.c.table_name, 

2093 dictionary.all_tables.c.owner, 

2094 ) 

2095 .union_all( 

2096 select( 

2097 dictionary.all_views.c.view_name.label("table_name"), 

2098 dictionary.all_views.c.owner, 

2099 ) 

2100 ) 

2101 .subquery("tables_and_views") 

2102 ) 

2103 

2104 query = select(tables.c.table_name).where( 

2105 tables.c.table_name == bindparam("table_name"), 

2106 tables.c.owner == bindparam("owner"), 

2107 ) 

2108 return query 

2109 

2110 @reflection.cache 

2111 def has_table( 

2112 self, connection, table_name, schema=None, dblink=None, **kw 

2113 ): 

2114 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2115 self._ensure_has_table_connection(connection) 

2116 

2117 if not schema: 

2118 schema = self.default_schema_name 

2119 

2120 params = { 

2121 "table_name": self.denormalize_name(table_name), 

2122 "owner": self.denormalize_schema_name(schema), 

2123 } 

2124 cursor = self._execute_reflection( 

2125 connection, 

2126 self._has_table_query, 

2127 dblink, 

2128 returns_long=False, 

2129 params=params, 

2130 ) 

2131 return bool(cursor.scalar()) 

2132 

2133 @reflection.cache 

2134 def has_sequence( 

2135 self, connection, sequence_name, schema=None, dblink=None, **kw 

2136 ): 

2137 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2138 if not schema: 

2139 schema = self.default_schema_name 

2140 

2141 query = select(dictionary.all_sequences.c.sequence_name).where( 

2142 dictionary.all_sequences.c.sequence_name 

2143 == self.denormalize_schema_name(sequence_name), 

2144 dictionary.all_sequences.c.sequence_owner 

2145 == self.denormalize_schema_name(schema), 

2146 ) 

2147 

2148 cursor = self._execute_reflection( 

2149 connection, query, dblink, returns_long=False 

2150 ) 

2151 return bool(cursor.scalar()) 

2152 

2153 def _get_default_schema_name(self, connection): 

2154 return self.normalize_name( 

2155 connection.exec_driver_sql( 

2156 "select sys_context( 'userenv', 'current_schema' ) from dual" 

2157 ).scalar() 

2158 ) 

2159 

2160 def denormalize_schema_name(self, name): 

2161 # look for quoted_name 

2162 force = getattr(name, "quote", None) 

2163 if force is None and name == "public": 

2164 # look for case insensitive, no quoting specified, "public" 

2165 return "PUBLIC" 

2166 return super().denormalize_name(name) 

2167 

2168 @reflection.flexi_cache( 

2169 ("schema", InternalTraversal.dp_string), 

2170 ("filter_names", InternalTraversal.dp_string_list), 

2171 ("dblink", InternalTraversal.dp_string), 

2172 ) 

2173 def _get_synonyms(self, connection, schema, filter_names, dblink, **kw): 

2174 owner = self.denormalize_schema_name( 

2175 schema or self.default_schema_name 

2176 ) 

2177 

2178 has_filter_names, params = self._prepare_filter_names(filter_names) 

2179 query = select( 

2180 dictionary.all_synonyms.c.synonym_name, 

2181 dictionary.all_synonyms.c.table_name, 

2182 dictionary.all_synonyms.c.table_owner, 

2183 dictionary.all_synonyms.c.db_link, 

2184 ).where(dictionary.all_synonyms.c.owner == owner) 

2185 if has_filter_names: 

2186 query = query.where( 

2187 dictionary.all_synonyms.c.synonym_name.in_( 

2188 params["filter_names"] 

2189 ) 

2190 ) 

2191 result = self._execute_reflection( 

2192 connection, query, dblink, returns_long=False 

2193 ).mappings() 

2194 return result.all() 

2195 

2196 @lru_cache() 

2197 def _all_objects_query( 

2198 self, owner, scope, kind, has_filter_names, has_mat_views 

2199 ): 

2200 query = ( 

2201 select(dictionary.all_objects.c.object_name) 

2202 .select_from(dictionary.all_objects) 

2203 .where(dictionary.all_objects.c.owner == owner) 

2204 ) 

2205 

2206 # NOTE: materialized views are listed in all_objects twice; 

2207 # once as MATERIALIZE VIEW and once as TABLE 

2208 if kind is ObjectKind.ANY: 

2209 # materilaized view are listed also as tables so there is no 

2210 # need to add them to the in_. 

2211 query = query.where( 

2212 dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW")) 

2213 ) 

2214 else: 

2215 object_type = [] 

2216 if ObjectKind.VIEW in kind: 

2217 object_type.append("VIEW") 

2218 if ( 

2219 ObjectKind.MATERIALIZED_VIEW in kind 

2220 and ObjectKind.TABLE not in kind 

2221 ): 

2222 # materilaized view are listed also as tables so there is no 

2223 # need to add them to the in_ if also selecting tables. 

2224 object_type.append("MATERIALIZED VIEW") 

2225 if ObjectKind.TABLE in kind: 

2226 object_type.append("TABLE") 

2227 if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind: 

2228 # materialized view are listed also as tables, 

2229 # so they need to be filtered out 

2230 # EXCEPT ALL / MINUS profiles as faster than using 

2231 # NOT EXISTS or NOT IN with a subquery, but it's in 

2232 # general faster to get the mat view names and exclude 

2233 # them only when needed 

2234 query = query.where( 

2235 dictionary.all_objects.c.object_name.not_in( 

2236 bindparam("mat_views") 

2237 ) 

2238 ) 

2239 query = query.where( 

2240 dictionary.all_objects.c.object_type.in_(object_type) 

2241 ) 

2242 

2243 # handles scope 

2244 if scope is ObjectScope.DEFAULT: 

2245 query = query.where(dictionary.all_objects.c.temporary == "N") 

2246 elif scope is ObjectScope.TEMPORARY: 

2247 query = query.where(dictionary.all_objects.c.temporary == "Y") 

2248 

2249 if has_filter_names: 

2250 query = query.where( 

2251 dictionary.all_objects.c.object_name.in_( 

2252 bindparam("filter_names") 

2253 ) 

2254 ) 

2255 return query 

2256 

2257 @reflection.flexi_cache( 

2258 ("schema", InternalTraversal.dp_string), 

2259 ("scope", InternalTraversal.dp_plain_obj), 

2260 ("kind", InternalTraversal.dp_plain_obj), 

2261 ("filter_names", InternalTraversal.dp_string_list), 

2262 ("dblink", InternalTraversal.dp_string), 

2263 ) 

2264 def _get_all_objects( 

2265 self, connection, schema, scope, kind, filter_names, dblink, **kw 

2266 ): 

2267 owner = self.denormalize_schema_name( 

2268 schema or self.default_schema_name 

2269 ) 

2270 

2271 has_filter_names, params = self._prepare_filter_names(filter_names) 

2272 has_mat_views = False 

2273 if ( 

2274 ObjectKind.TABLE in kind 

2275 and ObjectKind.MATERIALIZED_VIEW not in kind 

2276 ): 

2277 # see note in _all_objects_query 

2278 mat_views = self.get_materialized_view_names( 

2279 connection, schema, dblink, _normalize=False, **kw 

2280 ) 

2281 if mat_views: 

2282 params["mat_views"] = mat_views 

2283 has_mat_views = True 

2284 

2285 query = self._all_objects_query( 

2286 owner, scope, kind, has_filter_names, has_mat_views 

2287 ) 

2288 

2289 result = self._execute_reflection( 

2290 connection, query, dblink, returns_long=False, params=params 

2291 ).scalars() 

2292 

2293 return result.all() 

2294 

2295 def _handle_synonyms_decorator(fn): 

2296 @wraps(fn) 

2297 def wrapper(self, *args, **kwargs): 

2298 return self._handle_synonyms(fn, *args, **kwargs) 

2299 

2300 return wrapper 

2301 

2302 def _handle_synonyms(self, fn, connection, *args, **kwargs): 

2303 if not kwargs.get("oracle_resolve_synonyms", False): 

2304 return fn(self, connection, *args, **kwargs) 

2305 

2306 original_kw = kwargs.copy() 

2307 schema = kwargs.pop("schema", None) 

2308 result = self._get_synonyms( 

2309 connection, 

2310 schema=schema, 

2311 filter_names=kwargs.pop("filter_names", None), 

2312 dblink=kwargs.pop("dblink", None), 

2313 info_cache=kwargs.get("info_cache", None), 

2314 ) 

2315 

2316 dblinks_owners = defaultdict(dict) 

2317 for row in result: 

2318 key = row["db_link"], row["table_owner"] 

2319 tn = self.normalize_name(row["table_name"]) 

2320 dblinks_owners[key][tn] = row["synonym_name"] 

2321 

2322 if not dblinks_owners: 

2323 # No synonym, do the plain thing 

2324 return fn(self, connection, *args, **original_kw) 

2325 

2326 data = {} 

2327 for (dblink, table_owner), mapping in dblinks_owners.items(): 

2328 call_kw = { 

2329 **original_kw, 

2330 "schema": table_owner, 

2331 "dblink": self.normalize_name(dblink), 

2332 "filter_names": mapping.keys(), 

2333 } 

2334 call_result = fn(self, connection, *args, **call_kw) 

2335 for (_, tn), value in call_result: 

2336 synonym_name = self.normalize_name(mapping[tn]) 

2337 data[(schema, synonym_name)] = value 

2338 return data.items() 

2339 

2340 @reflection.cache 

2341 def get_schema_names(self, connection, dblink=None, **kw): 

2342 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2343 query = select(dictionary.all_users.c.username).order_by( 

2344 dictionary.all_users.c.username 

2345 ) 

2346 result = self._execute_reflection( 

2347 connection, query, dblink, returns_long=False 

2348 ).scalars() 

2349 return [self.normalize_name(row) for row in result] 

2350 

2351 @reflection.cache 

2352 def get_table_names(self, connection, schema=None, dblink=None, **kw): 

2353 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2354 # note that table_names() isn't loading DBLINKed or synonym'ed tables 

2355 if schema is None: 

2356 schema = self.default_schema_name 

2357 

2358 den_schema = self.denormalize_schema_name(schema) 

2359 if kw.get("oracle_resolve_synonyms", False): 

2360 tables = ( 

2361 select( 

2362 dictionary.all_tables.c.table_name, 

2363 dictionary.all_tables.c.owner, 

2364 dictionary.all_tables.c.iot_name, 

2365 dictionary.all_tables.c.duration, 

2366 dictionary.all_tables.c.tablespace_name, 

2367 ) 

2368 .union_all( 

2369 select( 

2370 dictionary.all_synonyms.c.synonym_name.label( 

2371 "table_name" 

2372 ), 

2373 dictionary.all_synonyms.c.owner, 

2374 dictionary.all_tables.c.iot_name, 

2375 dictionary.all_tables.c.duration, 

2376 dictionary.all_tables.c.tablespace_name, 

2377 ) 

2378 .select_from(dictionary.all_tables) 

2379 .join( 

2380 dictionary.all_synonyms, 

2381 and_( 

2382 dictionary.all_tables.c.table_name 

2383 == dictionary.all_synonyms.c.table_name, 

2384 dictionary.all_tables.c.owner 

2385 == func.coalesce( 

2386 dictionary.all_synonyms.c.table_owner, 

2387 dictionary.all_synonyms.c.owner, 

2388 ), 

2389 ), 

2390 ) 

2391 ) 

2392 .subquery("available_tables") 

2393 ) 

2394 else: 

2395 tables = dictionary.all_tables 

2396 

2397 query = select(tables.c.table_name) 

2398 if self.exclude_tablespaces: 

2399 query = query.where( 

2400 func.coalesce( 

2401 tables.c.tablespace_name, "no tablespace" 

2402 ).not_in(self.exclude_tablespaces) 

2403 ) 

2404 query = query.where( 

2405 tables.c.owner == den_schema, 

2406 tables.c.iot_name.is_(null()), 

2407 tables.c.duration.is_(null()), 

2408 ) 

2409 

2410 # remove materialized views 

2411 mat_query = select( 

2412 dictionary.all_mviews.c.mview_name.label("table_name") 

2413 ).where(dictionary.all_mviews.c.owner == den_schema) 

2414 

2415 query = ( 

2416 query.except_all(mat_query) 

2417 if self._supports_except_all 

2418 else query.except_(mat_query) 

2419 ) 

2420 

2421 result = self._execute_reflection( 

2422 connection, query, dblink, returns_long=False 

2423 ).scalars() 

2424 return [self.normalize_name(row) for row in result] 

2425 

2426 @reflection.cache 

2427 def get_temp_table_names(self, connection, dblink=None, **kw): 

2428 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2429 schema = self.denormalize_schema_name(self.default_schema_name) 

2430 

2431 query = select(dictionary.all_tables.c.table_name) 

2432 if self.exclude_tablespaces: 

2433 query = query.where( 

2434 func.coalesce( 

2435 dictionary.all_tables.c.tablespace_name, "no tablespace" 

2436 ).not_in(self.exclude_tablespaces) 

2437 ) 

2438 query = query.where( 

2439 dictionary.all_tables.c.owner == schema, 

2440 dictionary.all_tables.c.iot_name.is_(null()), 

2441 dictionary.all_tables.c.duration.is_not(null()), 

2442 ) 

2443 

2444 result = self._execute_reflection( 

2445 connection, query, dblink, returns_long=False 

2446 ).scalars() 

2447 return [self.normalize_name(row) for row in result] 

2448 

2449 @reflection.cache 

2450 def get_materialized_view_names( 

2451 self, connection, schema=None, dblink=None, _normalize=True, **kw 

2452 ): 

2453 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2454 if not schema: 

2455 schema = self.default_schema_name 

2456 

2457 query = select(dictionary.all_mviews.c.mview_name).where( 

2458 dictionary.all_mviews.c.owner 

2459 == self.denormalize_schema_name(schema) 

2460 ) 

2461 result = self._execute_reflection( 

2462 connection, query, dblink, returns_long=False 

2463 ).scalars() 

2464 if _normalize: 

2465 return [self.normalize_name(row) for row in result] 

2466 else: 

2467 return result.all() 

2468 

2469 @reflection.cache 

2470 def get_view_names(self, connection, schema=None, dblink=None, **kw): 

2471 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2472 if not schema: 

2473 schema = self.default_schema_name 

2474 

2475 query = select(dictionary.all_views.c.view_name).where( 

2476 dictionary.all_views.c.owner 

2477 == self.denormalize_schema_name(schema) 

2478 ) 

2479 result = self._execute_reflection( 

2480 connection, query, dblink, returns_long=False 

2481 ).scalars() 

2482 return [self.normalize_name(row) for row in result] 

2483 

2484 @reflection.cache 

2485 def get_sequence_names(self, connection, schema=None, dblink=None, **kw): 

2486 """Supported kw arguments are: ``dblink`` to reflect via a db link.""" 

2487 if not schema: 

2488 schema = self.default_schema_name 

2489 query = select(dictionary.all_sequences.c.sequence_name).where( 

2490 dictionary.all_sequences.c.sequence_owner 

2491 == self.denormalize_schema_name(schema) 

2492 ) 

2493 

2494 result = self._execute_reflection( 

2495 connection, query, dblink, returns_long=False 

2496 ).scalars() 

2497 return [self.normalize_name(row) for row in result] 

2498 

2499 def _value_or_raise(self, data, table, schema): 

2500 table = self.normalize_name(str(table)) 

2501 try: 

2502 return dict(data)[(schema, table)] 

2503 except KeyError: 

2504 raise exc.NoSuchTableError( 

2505 f"{schema}.{table}" if schema else table 

2506 ) from None 

2507 

2508 def _prepare_filter_names(self, filter_names): 

2509 if filter_names: 

2510 fn = [self.denormalize_name(name) for name in filter_names] 

2511 return True, {"filter_names": fn} 

2512 else: 

2513 return False, {} 

2514 

2515 @reflection.cache 

2516 def get_table_options(self, connection, table_name, schema=None, **kw): 

2517 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2518 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2519 """ 

2520 data = self.get_multi_table_options( 

2521 connection, 

2522 schema=schema, 

2523 filter_names=[table_name], 

2524 scope=ObjectScope.ANY, 

2525 kind=ObjectKind.ANY, 

2526 **kw, 

2527 ) 

2528 return self._value_or_raise(data, table_name, schema) 

2529 

2530 @lru_cache() 

2531 def _table_options_query( 

2532 self, owner, scope, kind, has_filter_names, has_mat_views 

2533 ): 

2534 query = select( 

2535 dictionary.all_tables.c.table_name, 

2536 ( 

2537 dictionary.all_tables.c.compression 

2538 if self._supports_table_compression 

2539 else sql.null().label("compression") 

2540 ), 

2541 ( 

2542 dictionary.all_tables.c.compress_for 

2543 if self._supports_table_compress_for 

2544 else sql.null().label("compress_for") 

2545 ), 

2546 dictionary.all_tables.c.tablespace_name, 

2547 ).where(dictionary.all_tables.c.owner == owner) 

2548 if has_filter_names: 

2549 query = query.where( 

2550 dictionary.all_tables.c.table_name.in_( 

2551 bindparam("filter_names") 

2552 ) 

2553 ) 

2554 if scope is ObjectScope.DEFAULT: 

2555 query = query.where(dictionary.all_tables.c.duration.is_(null())) 

2556 elif scope is ObjectScope.TEMPORARY: 

2557 query = query.where( 

2558 dictionary.all_tables.c.duration.is_not(null()) 

2559 ) 

2560 

2561 if ( 

2562 has_mat_views 

2563 and ObjectKind.TABLE in kind 

2564 and ObjectKind.MATERIALIZED_VIEW not in kind 

2565 ): 

2566 # cant use EXCEPT ALL / MINUS here because we don't have an 

2567 # excludable row vs. the query above 

2568 # outerjoin + where null works better on oracle 21 but 11 does 

2569 # not like it at all. this is the next best thing 

2570 

2571 query = query.where( 

2572 dictionary.all_tables.c.table_name.not_in( 

2573 bindparam("mat_views") 

2574 ) 

2575 ) 

2576 elif ( 

2577 ObjectKind.TABLE not in kind 

2578 and ObjectKind.MATERIALIZED_VIEW in kind 

2579 ): 

2580 query = query.where( 

2581 dictionary.all_tables.c.table_name.in_(bindparam("mat_views")) 

2582 ) 

2583 return query 

2584 

2585 @_handle_synonyms_decorator 

2586 def get_multi_table_options( 

2587 self, 

2588 connection, 

2589 *, 

2590 schema, 

2591 filter_names, 

2592 scope, 

2593 kind, 

2594 dblink=None, 

2595 **kw, 

2596 ): 

2597 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2598 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2599 """ 

2600 owner = self.denormalize_schema_name( 

2601 schema or self.default_schema_name 

2602 ) 

2603 

2604 has_filter_names, params = self._prepare_filter_names(filter_names) 

2605 has_mat_views = False 

2606 

2607 if ( 

2608 ObjectKind.TABLE in kind 

2609 and ObjectKind.MATERIALIZED_VIEW not in kind 

2610 ): 

2611 # see note in _table_options_query 

2612 mat_views = self.get_materialized_view_names( 

2613 connection, schema, dblink, _normalize=False, **kw 

2614 ) 

2615 if mat_views: 

2616 params["mat_views"] = mat_views 

2617 has_mat_views = True 

2618 elif ( 

2619 ObjectKind.TABLE not in kind 

2620 and ObjectKind.MATERIALIZED_VIEW in kind 

2621 ): 

2622 mat_views = self.get_materialized_view_names( 

2623 connection, schema, dblink, _normalize=False, **kw 

2624 ) 

2625 params["mat_views"] = mat_views 

2626 

2627 options = {} 

2628 default = ReflectionDefaults.table_options 

2629 

2630 if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind: 

2631 query = self._table_options_query( 

2632 owner, scope, kind, has_filter_names, has_mat_views 

2633 ) 

2634 result = self._execute_reflection( 

2635 connection, query, dblink, returns_long=False, params=params 

2636 ) 

2637 

2638 for table, compression, compress_for, tablespace in result: 

2639 data = default() 

2640 if compression == "ENABLED": 

2641 data["oracle_compress"] = compress_for 

2642 if tablespace: 

2643 data["oracle_tablespace"] = tablespace 

2644 options[(schema, self.normalize_name(table))] = data 

2645 if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope: 

2646 # add the views (no temporary views) 

2647 for view in self.get_view_names(connection, schema, dblink, **kw): 

2648 if not filter_names or view in filter_names: 

2649 options[(schema, view)] = default() 

2650 

2651 return options.items() 

2652 

2653 @reflection.cache 

2654 def get_columns(self, connection, table_name, schema=None, **kw): 

2655 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2656 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2657 """ 

2658 

2659 data = self.get_multi_columns( 

2660 connection, 

2661 schema=schema, 

2662 filter_names=[table_name], 

2663 scope=ObjectScope.ANY, 

2664 kind=ObjectKind.ANY, 

2665 **kw, 

2666 ) 

2667 return self._value_or_raise(data, table_name, schema) 

2668 

2669 def _run_batches( 

2670 self, connection, query, dblink, returns_long, mappings, all_objects 

2671 ): 

2672 each_batch = 500 

2673 batches = list(all_objects) 

2674 while batches: 

2675 batch = batches[0:each_batch] 

2676 batches[0:each_batch] = [] 

2677 

2678 result = self._execute_reflection( 

2679 connection, 

2680 query, 

2681 dblink, 

2682 returns_long=returns_long, 

2683 params={"all_objects": batch}, 

2684 ) 

2685 if mappings: 

2686 yield from result.mappings() 

2687 else: 

2688 yield from result 

2689 

2690 @lru_cache() 

2691 def _column_query(self, owner): 

2692 all_cols = dictionary.all_tab_cols 

2693 all_comments = dictionary.all_col_comments 

2694 all_ids = dictionary.all_tab_identity_cols 

2695 

2696 if self.server_version_info >= (12,): 

2697 add_cols = ( 

2698 all_cols.c.default_on_null, 

2699 sql.case( 

2700 (all_ids.c.table_name.is_(None), sql.null()), 

2701 else_=all_ids.c.generation_type 

2702 + "," 

2703 + all_ids.c.identity_options, 

2704 ).label("identity_options"), 

2705 ) 

2706 join_identity_cols = True 

2707 else: 

2708 add_cols = ( 

2709 sql.null().label("default_on_null"), 

2710 sql.null().label("identity_options"), 

2711 ) 

2712 join_identity_cols = False 

2713 

2714 # NOTE: on oracle cannot create tables/views without columns and 

2715 # a table cannot have all column hidden: 

2716 # ORA-54039: table must have at least one column that is not invisible 

2717 # all_tab_cols returns data for tables/views/mat-views. 

2718 # all_tab_cols does not return recycled tables 

2719 

2720 query = ( 

2721 select( 

2722 all_cols.c.table_name, 

2723 all_cols.c.column_name, 

2724 all_cols.c.data_type, 

2725 all_cols.c.char_length, 

2726 all_cols.c.data_precision, 

2727 all_cols.c.data_scale, 

2728 all_cols.c.nullable, 

2729 all_cols.c.data_default, 

2730 all_comments.c.comments, 

2731 all_cols.c.virtual_column, 

2732 *add_cols, 

2733 ).select_from(all_cols) 

2734 # NOTE: all_col_comments has a row for each column even if no 

2735 # comment is present, so a join could be performed, but there 

2736 # seems to be no difference compared to an outer join 

2737 .outerjoin( 

2738 all_comments, 

2739 and_( 

2740 all_cols.c.table_name == all_comments.c.table_name, 

2741 all_cols.c.column_name == all_comments.c.column_name, 

2742 all_cols.c.owner == all_comments.c.owner, 

2743 ), 

2744 ) 

2745 ) 

2746 if join_identity_cols: 

2747 query = query.outerjoin( 

2748 all_ids, 

2749 and_( 

2750 all_cols.c.table_name == all_ids.c.table_name, 

2751 all_cols.c.column_name == all_ids.c.column_name, 

2752 all_cols.c.owner == all_ids.c.owner, 

2753 ), 

2754 ) 

2755 

2756 query = query.where( 

2757 all_cols.c.table_name.in_(bindparam("all_objects")), 

2758 all_cols.c.hidden_column == "NO", 

2759 all_cols.c.owner == owner, 

2760 ).order_by(all_cols.c.table_name, all_cols.c.column_id) 

2761 return query 

2762 

2763 @_handle_synonyms_decorator 

2764 def get_multi_columns( 

2765 self, 

2766 connection, 

2767 *, 

2768 schema, 

2769 filter_names, 

2770 scope, 

2771 kind, 

2772 dblink=None, 

2773 **kw, 

2774 ): 

2775 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2776 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2777 """ 

2778 owner = self.denormalize_schema_name( 

2779 schema or self.default_schema_name 

2780 ) 

2781 query = self._column_query(owner) 

2782 

2783 if ( 

2784 filter_names 

2785 and kind is ObjectKind.ANY 

2786 and scope is ObjectScope.ANY 

2787 ): 

2788 all_objects = [self.denormalize_name(n) for n in filter_names] 

2789 else: 

2790 all_objects = self._get_all_objects( 

2791 connection, schema, scope, kind, filter_names, dblink, **kw 

2792 ) 

2793 

2794 columns = defaultdict(list) 

2795 

2796 # all_tab_cols.data_default is LONG 

2797 result = self._run_batches( 

2798 connection, 

2799 query, 

2800 dblink, 

2801 returns_long=True, 

2802 mappings=True, 

2803 all_objects=all_objects, 

2804 ) 

2805 

2806 def maybe_int(value): 

2807 if isinstance(value, float) and value.is_integer(): 

2808 return int(value) 

2809 else: 

2810 return value 

2811 

2812 remove_size = re.compile(r"\(\d+\)") 

2813 

2814 for row_dict in result: 

2815 table_name = self.normalize_name(row_dict["table_name"]) 

2816 orig_colname = row_dict["column_name"] 

2817 colname = self.normalize_name(orig_colname) 

2818 coltype = row_dict["data_type"] 

2819 precision = maybe_int(row_dict["data_precision"]) 

2820 

2821 if coltype == "NUMBER": 

2822 scale = maybe_int(row_dict["data_scale"]) 

2823 if precision is None and scale == 0: 

2824 coltype = INTEGER() 

2825 else: 

2826 coltype = NUMBER(precision, scale) 

2827 elif coltype == "FLOAT": 

2828 # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm 

2829 if precision == 126: 

2830 # The DOUBLE PRECISION datatype is a floating-point 

2831 # number with binary precision 126. 

2832 coltype = DOUBLE_PRECISION() 

2833 elif precision == 63: 

2834 # The REAL datatype is a floating-point number with a 

2835 # binary precision of 63, or 18 decimal. 

2836 coltype = REAL() 

2837 else: 

2838 # non standard precision 

2839 coltype = FLOAT(binary_precision=precision) 

2840 

2841 elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): 

2842 char_length = maybe_int(row_dict["char_length"]) 

2843 coltype = self.ischema_names.get(coltype)(char_length) 

2844 elif "WITH TIME ZONE" in coltype: 

2845 coltype = TIMESTAMP(timezone=True) 

2846 elif "WITH LOCAL TIME ZONE" in coltype: 

2847 coltype = TIMESTAMP(local_timezone=True) 

2848 else: 

2849 coltype = re.sub(remove_size, "", coltype) 

2850 try: 

2851 coltype = self.ischema_names[coltype] 

2852 except KeyError: 

2853 util.warn( 

2854 "Did not recognize type '%s' of column '%s'" 

2855 % (coltype, colname) 

2856 ) 

2857 coltype = sqltypes.NULLTYPE 

2858 

2859 default = row_dict["data_default"] 

2860 if row_dict["virtual_column"] == "YES": 

2861 computed = dict(sqltext=default) 

2862 default = None 

2863 else: 

2864 computed = None 

2865 

2866 identity_options = row_dict["identity_options"] 

2867 if identity_options is not None: 

2868 identity = self._parse_identity_options( 

2869 identity_options, row_dict["default_on_null"] 

2870 ) 

2871 default = None 

2872 else: 

2873 identity = None 

2874 

2875 cdict = { 

2876 "name": colname, 

2877 "type": coltype, 

2878 "nullable": row_dict["nullable"] == "Y", 

2879 "default": default, 

2880 "comment": row_dict["comments"], 

2881 } 

2882 if orig_colname.lower() == orig_colname: 

2883 cdict["quote"] = True 

2884 if computed is not None: 

2885 cdict["computed"] = computed 

2886 if identity is not None: 

2887 cdict["identity"] = identity 

2888 

2889 columns[(schema, table_name)].append(cdict) 

2890 

2891 # NOTE: default not needed since all tables have columns 

2892 # default = ReflectionDefaults.columns 

2893 # return ( 

2894 # (key, value if value else default()) 

2895 # for key, value in columns.items() 

2896 # ) 

2897 return columns.items() 

2898 

2899 def _parse_identity_options(self, identity_options, default_on_null): 

2900 # identity_options is a string that starts with 'ALWAYS,' or 

2901 # 'BY DEFAULT,' and continues with 

2902 # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1, 

2903 # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N, 

2904 # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N 

2905 parts = [p.strip() for p in identity_options.split(",")] 

2906 identity = { 

2907 "always": parts[0] == "ALWAYS", 

2908 "on_null": default_on_null == "YES", 

2909 } 

2910 

2911 for part in parts[1:]: 

2912 option, value = part.split(":") 

2913 value = value.strip() 

2914 

2915 if "START WITH" in option: 

2916 identity["start"] = int(value) 

2917 elif "INCREMENT BY" in option: 

2918 identity["increment"] = int(value) 

2919 elif "MAX_VALUE" in option: 

2920 identity["maxvalue"] = int(value) 

2921 elif "MIN_VALUE" in option: 

2922 identity["minvalue"] = int(value) 

2923 elif "CYCLE_FLAG" in option: 

2924 identity["cycle"] = value == "Y" 

2925 elif "CACHE_SIZE" in option: 

2926 identity["cache"] = int(value) 

2927 elif "ORDER_FLAG" in option: 

2928 identity["order"] = value == "Y" 

2929 return identity 

2930 

2931 @reflection.cache 

2932 def get_table_comment(self, connection, table_name, schema=None, **kw): 

2933 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

2934 ``oracle_resolve_synonyms`` to resolve names to synonyms 

2935 """ 

2936 data = self.get_multi_table_comment( 

2937 connection, 

2938 schema=schema, 

2939 filter_names=[table_name], 

2940 scope=ObjectScope.ANY, 

2941 kind=ObjectKind.ANY, 

2942 **kw, 

2943 ) 

2944 return self._value_or_raise(data, table_name, schema) 

2945 

2946 @lru_cache() 

2947 def _comment_query(self, owner, scope, kind, has_filter_names): 

2948 # NOTE: all_tab_comments / all_mview_comments have a row for all 

2949 # object even if they don't have comments 

2950 queries = [] 

2951 if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind: 

2952 # all_tab_comments returns also plain views 

2953 tbl_view = select( 

2954 dictionary.all_tab_comments.c.table_name, 

2955 dictionary.all_tab_comments.c.comments, 

2956 ).where( 

2957 dictionary.all_tab_comments.c.owner == owner, 

2958 dictionary.all_tab_comments.c.table_name.not_like("BIN$%"), 

2959 ) 

2960 if ObjectKind.VIEW not in kind: 

2961 tbl_view = tbl_view.where( 

2962 dictionary.all_tab_comments.c.table_type == "TABLE" 

2963 ) 

2964 elif ObjectKind.TABLE not in kind: 

2965 tbl_view = tbl_view.where( 

2966 dictionary.all_tab_comments.c.table_type == "VIEW" 

2967 ) 

2968 queries.append(tbl_view) 

2969 if ObjectKind.MATERIALIZED_VIEW in kind: 

2970 mat_view = select( 

2971 dictionary.all_mview_comments.c.mview_name.label("table_name"), 

2972 dictionary.all_mview_comments.c.comments, 

2973 ).where( 

2974 dictionary.all_mview_comments.c.owner == owner, 

2975 dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"), 

2976 ) 

2977 queries.append(mat_view) 

2978 if len(queries) == 1: 

2979 query = queries[0] 

2980 else: 

2981 union = sql.union_all(*queries).subquery("tables_and_views") 

2982 query = select(union.c.table_name, union.c.comments) 

2983 

2984 name_col = query.selected_columns.table_name 

2985 

2986 if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY): 

2987 temp = "Y" if scope is ObjectScope.TEMPORARY else "N" 

2988 # need distinct since materialized view are listed also 

2989 # as tables in all_objects 

2990 query = query.distinct().join( 

2991 dictionary.all_objects, 

2992 and_( 

2993 dictionary.all_objects.c.owner == owner, 

2994 dictionary.all_objects.c.object_name == name_col, 

2995 dictionary.all_objects.c.temporary == temp, 

2996 ), 

2997 ) 

2998 if has_filter_names: 

2999 query = query.where(name_col.in_(bindparam("filter_names"))) 

3000 return query 

3001 

3002 @_handle_synonyms_decorator 

3003 def get_multi_table_comment( 

3004 self, 

3005 connection, 

3006 *, 

3007 schema, 

3008 filter_names, 

3009 scope, 

3010 kind, 

3011 dblink=None, 

3012 **kw, 

3013 ): 

3014 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3015 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3016 """ 

3017 owner = self.denormalize_schema_name( 

3018 schema or self.default_schema_name 

3019 ) 

3020 has_filter_names, params = self._prepare_filter_names(filter_names) 

3021 query = self._comment_query(owner, scope, kind, has_filter_names) 

3022 

3023 result = self._execute_reflection( 

3024 connection, query, dblink, returns_long=False, params=params 

3025 ) 

3026 default = ReflectionDefaults.table_comment 

3027 # materialized views by default seem to have a comment like 

3028 # "snapshot table for snapshot owner.mat_view_name" 

3029 ignore_mat_view = "snapshot table for snapshot " 

3030 return ( 

3031 ( 

3032 (schema, self.normalize_name(table)), 

3033 ( 

3034 {"text": comment} 

3035 if comment is not None 

3036 and not comment.startswith(ignore_mat_view) 

3037 else default() 

3038 ), 

3039 ) 

3040 for table, comment in result 

3041 ) 

3042 

3043 @reflection.cache 

3044 def get_indexes(self, connection, table_name, schema=None, **kw): 

3045 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3046 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3047 """ 

3048 data = self.get_multi_indexes( 

3049 connection, 

3050 schema=schema, 

3051 filter_names=[table_name], 

3052 scope=ObjectScope.ANY, 

3053 kind=ObjectKind.ANY, 

3054 **kw, 

3055 ) 

3056 return self._value_or_raise(data, table_name, schema) 

3057 

3058 @lru_cache() 

3059 def _index_query(self, owner): 

3060 return ( 

3061 select( 

3062 dictionary.all_ind_columns.c.table_name, 

3063 dictionary.all_ind_columns.c.index_name, 

3064 dictionary.all_ind_columns.c.column_name, 

3065 dictionary.all_indexes.c.index_type, 

3066 dictionary.all_indexes.c.uniqueness, 

3067 dictionary.all_indexes.c.compression, 

3068 dictionary.all_indexes.c.prefix_length, 

3069 dictionary.all_ind_columns.c.descend, 

3070 dictionary.all_ind_expressions.c.column_expression, 

3071 ) 

3072 .select_from(dictionary.all_ind_columns) 

3073 .join( 

3074 dictionary.all_indexes, 

3075 sql.and_( 

3076 dictionary.all_ind_columns.c.index_name 

3077 == dictionary.all_indexes.c.index_name, 

3078 dictionary.all_ind_columns.c.index_owner 

3079 == dictionary.all_indexes.c.owner, 

3080 ), 

3081 ) 

3082 .outerjoin( 

3083 # NOTE: this adds about 20% to the query time. Using a 

3084 # case expression with a scalar subquery only when needed 

3085 # with the assumption that most indexes are not expression 

3086 # would be faster but oracle does not like that with 

3087 # LONG datatype. It errors with: 

3088 # ORA-00997: illegal use of LONG datatype 

3089 dictionary.all_ind_expressions, 

3090 sql.and_( 

3091 dictionary.all_ind_expressions.c.index_name 

3092 == dictionary.all_ind_columns.c.index_name, 

3093 dictionary.all_ind_expressions.c.index_owner 

3094 == dictionary.all_ind_columns.c.index_owner, 

3095 dictionary.all_ind_expressions.c.column_position 

3096 == dictionary.all_ind_columns.c.column_position, 

3097 ), 

3098 ) 

3099 .where( 

3100 dictionary.all_indexes.c.table_owner == owner, 

3101 dictionary.all_indexes.c.table_name.in_( 

3102 bindparam("all_objects") 

3103 ), 

3104 ) 

3105 .order_by( 

3106 dictionary.all_ind_columns.c.index_name, 

3107 dictionary.all_ind_columns.c.column_position, 

3108 ) 

3109 ) 

3110 

3111 @reflection.flexi_cache( 

3112 ("schema", InternalTraversal.dp_string), 

3113 ("dblink", InternalTraversal.dp_string), 

3114 ("all_objects", InternalTraversal.dp_string_list), 

3115 ) 

3116 def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw): 

3117 owner = self.denormalize_schema_name( 

3118 schema or self.default_schema_name 

3119 ) 

3120 

3121 query = self._index_query(owner) 

3122 

3123 pks = { 

3124 row_dict["constraint_name"] 

3125 for row_dict in self._get_all_constraint_rows( 

3126 connection, schema, dblink, all_objects, **kw 

3127 ) 

3128 if row_dict["constraint_type"] == "P" 

3129 } 

3130 

3131 # all_ind_expressions.column_expression is LONG 

3132 result = self._run_batches( 

3133 connection, 

3134 query, 

3135 dblink, 

3136 returns_long=True, 

3137 mappings=True, 

3138 all_objects=all_objects, 

3139 ) 

3140 

3141 return [ 

3142 row_dict 

3143 for row_dict in result 

3144 if row_dict["index_name"] not in pks 

3145 ] 

3146 

3147 @_handle_synonyms_decorator 

3148 def get_multi_indexes( 

3149 self, 

3150 connection, 

3151 *, 

3152 schema, 

3153 filter_names, 

3154 scope, 

3155 kind, 

3156 dblink=None, 

3157 **kw, 

3158 ): 

3159 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3160 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3161 """ 

3162 all_objects = self._get_all_objects( 

3163 connection, schema, scope, kind, filter_names, dblink, **kw 

3164 ) 

3165 

3166 uniqueness = {"NONUNIQUE": False, "UNIQUE": True} 

3167 enabled = {"DISABLED": False, "ENABLED": True} 

3168 is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"} 

3169 

3170 indexes = defaultdict(dict) 

3171 

3172 for row_dict in self._get_indexes_rows( 

3173 connection, schema, dblink, all_objects, **kw 

3174 ): 

3175 index_name = self.normalize_name(row_dict["index_name"]) 

3176 table_name = self.normalize_name(row_dict["table_name"]) 

3177 table_indexes = indexes[(schema, table_name)] 

3178 

3179 if index_name not in table_indexes: 

3180 table_indexes[index_name] = index_dict = { 

3181 "name": index_name, 

3182 "column_names": [], 

3183 "dialect_options": {}, 

3184 "unique": uniqueness.get(row_dict["uniqueness"], False), 

3185 } 

3186 do = index_dict["dialect_options"] 

3187 if row_dict["index_type"] in is_bitmap: 

3188 do["oracle_bitmap"] = True 

3189 if enabled.get(row_dict["compression"], False): 

3190 do["oracle_compress"] = row_dict["prefix_length"] 

3191 

3192 else: 

3193 index_dict = table_indexes[index_name] 

3194 

3195 expr = row_dict["column_expression"] 

3196 if expr is not None: 

3197 index_dict["column_names"].append(None) 

3198 if "expressions" in index_dict: 

3199 index_dict["expressions"].append(expr) 

3200 else: 

3201 index_dict["expressions"] = index_dict["column_names"][:-1] 

3202 index_dict["expressions"].append(expr) 

3203 

3204 if row_dict["descend"].lower() != "asc": 

3205 assert row_dict["descend"].lower() == "desc" 

3206 cs = index_dict.setdefault("column_sorting", {}) 

3207 cs[expr] = ("desc",) 

3208 else: 

3209 assert row_dict["descend"].lower() == "asc" 

3210 cn = self.normalize_name(row_dict["column_name"]) 

3211 index_dict["column_names"].append(cn) 

3212 if "expressions" in index_dict: 

3213 index_dict["expressions"].append(cn) 

3214 

3215 default = ReflectionDefaults.indexes 

3216 

3217 return ( 

3218 (key, list(indexes[key].values()) if key in indexes else default()) 

3219 for key in ( 

3220 (schema, self.normalize_name(obj_name)) 

3221 for obj_name in all_objects 

3222 ) 

3223 ) 

3224 

3225 @reflection.cache 

3226 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

3227 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3228 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3229 """ 

3230 data = self.get_multi_pk_constraint( 

3231 connection, 

3232 schema=schema, 

3233 filter_names=[table_name], 

3234 scope=ObjectScope.ANY, 

3235 kind=ObjectKind.ANY, 

3236 **kw, 

3237 ) 

3238 return self._value_or_raise(data, table_name, schema) 

3239 

3240 @lru_cache() 

3241 def _constraint_query(self, owner): 

3242 local = dictionary.all_cons_columns.alias("local") 

3243 remote = dictionary.all_cons_columns.alias("remote") 

3244 return ( 

3245 select( 

3246 dictionary.all_constraints.c.table_name, 

3247 dictionary.all_constraints.c.constraint_type, 

3248 dictionary.all_constraints.c.constraint_name, 

3249 local.c.column_name.label("local_column"), 

3250 remote.c.table_name.label("remote_table"), 

3251 remote.c.column_name.label("remote_column"), 

3252 remote.c.owner.label("remote_owner"), 

3253 dictionary.all_constraints.c.search_condition, 

3254 dictionary.all_constraints.c.delete_rule, 

3255 ) 

3256 .select_from(dictionary.all_constraints) 

3257 .join( 

3258 local, 

3259 and_( 

3260 local.c.owner == dictionary.all_constraints.c.owner, 

3261 dictionary.all_constraints.c.constraint_name 

3262 == local.c.constraint_name, 

3263 ), 

3264 ) 

3265 .outerjoin( 

3266 remote, 

3267 and_( 

3268 dictionary.all_constraints.c.r_owner == remote.c.owner, 

3269 dictionary.all_constraints.c.r_constraint_name 

3270 == remote.c.constraint_name, 

3271 or_( 

3272 remote.c.position.is_(sql.null()), 

3273 local.c.position == remote.c.position, 

3274 ), 

3275 ), 

3276 ) 

3277 .where( 

3278 dictionary.all_constraints.c.owner == owner, 

3279 dictionary.all_constraints.c.table_name.in_( 

3280 bindparam("all_objects") 

3281 ), 

3282 dictionary.all_constraints.c.constraint_type.in_( 

3283 ("R", "P", "U", "C") 

3284 ), 

3285 ) 

3286 .order_by( 

3287 dictionary.all_constraints.c.constraint_name, local.c.position 

3288 ) 

3289 ) 

3290 

3291 @reflection.flexi_cache( 

3292 ("schema", InternalTraversal.dp_string), 

3293 ("dblink", InternalTraversal.dp_string), 

3294 ("all_objects", InternalTraversal.dp_string_list), 

3295 ) 

3296 def _get_all_constraint_rows( 

3297 self, connection, schema, dblink, all_objects, **kw 

3298 ): 

3299 owner = self.denormalize_schema_name( 

3300 schema or self.default_schema_name 

3301 ) 

3302 query = self._constraint_query(owner) 

3303 

3304 # since the result is cached a list must be created 

3305 values = list( 

3306 self._run_batches( 

3307 connection, 

3308 query, 

3309 dblink, 

3310 returns_long=False, 

3311 mappings=True, 

3312 all_objects=all_objects, 

3313 ) 

3314 ) 

3315 return values 

3316 

3317 @_handle_synonyms_decorator 

3318 def get_multi_pk_constraint( 

3319 self, 

3320 connection, 

3321 *, 

3322 scope, 

3323 schema, 

3324 filter_names, 

3325 kind, 

3326 dblink=None, 

3327 **kw, 

3328 ): 

3329 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3330 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3331 """ 

3332 all_objects = self._get_all_objects( 

3333 connection, schema, scope, kind, filter_names, dblink, **kw 

3334 ) 

3335 

3336 primary_keys = defaultdict(dict) 

3337 default = ReflectionDefaults.pk_constraint 

3338 

3339 for row_dict in self._get_all_constraint_rows( 

3340 connection, schema, dblink, all_objects, **kw 

3341 ): 

3342 if row_dict["constraint_type"] != "P": 

3343 continue 

3344 table_name = self.normalize_name(row_dict["table_name"]) 

3345 constraint_name = self.normalize_name(row_dict["constraint_name"]) 

3346 column_name = self.normalize_name(row_dict["local_column"]) 

3347 

3348 table_pk = primary_keys[(schema, table_name)] 

3349 if not table_pk: 

3350 table_pk["name"] = constraint_name 

3351 table_pk["constrained_columns"] = [column_name] 

3352 else: 

3353 table_pk["constrained_columns"].append(column_name) 

3354 

3355 return ( 

3356 (key, primary_keys[key] if key in primary_keys else default()) 

3357 for key in ( 

3358 (schema, self.normalize_name(obj_name)) 

3359 for obj_name in all_objects 

3360 ) 

3361 ) 

3362 

3363 @reflection.cache 

3364 def get_foreign_keys( 

3365 self, 

3366 connection, 

3367 table_name, 

3368 schema=None, 

3369 **kw, 

3370 ): 

3371 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3372 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3373 """ 

3374 data = self.get_multi_foreign_keys( 

3375 connection, 

3376 schema=schema, 

3377 filter_names=[table_name], 

3378 scope=ObjectScope.ANY, 

3379 kind=ObjectKind.ANY, 

3380 **kw, 

3381 ) 

3382 return self._value_or_raise(data, table_name, schema) 

3383 

3384 @_handle_synonyms_decorator 

3385 def get_multi_foreign_keys( 

3386 self, 

3387 connection, 

3388 *, 

3389 scope, 

3390 schema, 

3391 filter_names, 

3392 kind, 

3393 dblink=None, 

3394 **kw, 

3395 ): 

3396 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3397 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3398 """ 

3399 all_objects = self._get_all_objects( 

3400 connection, schema, scope, kind, filter_names, dblink, **kw 

3401 ) 

3402 

3403 resolve_synonyms = kw.get("oracle_resolve_synonyms", False) 

3404 

3405 owner = self.denormalize_schema_name( 

3406 schema or self.default_schema_name 

3407 ) 

3408 

3409 all_remote_owners = set() 

3410 fkeys = defaultdict(dict) 

3411 

3412 for row_dict in self._get_all_constraint_rows( 

3413 connection, schema, dblink, all_objects, **kw 

3414 ): 

3415 if row_dict["constraint_type"] != "R": 

3416 continue 

3417 

3418 table_name = self.normalize_name(row_dict["table_name"]) 

3419 constraint_name = self.normalize_name(row_dict["constraint_name"]) 

3420 table_fkey = fkeys[(schema, table_name)] 

3421 

3422 assert constraint_name is not None 

3423 

3424 local_column = self.normalize_name(row_dict["local_column"]) 

3425 remote_table = self.normalize_name(row_dict["remote_table"]) 

3426 remote_column = self.normalize_name(row_dict["remote_column"]) 

3427 remote_owner_orig = row_dict["remote_owner"] 

3428 remote_owner = self.normalize_name(remote_owner_orig) 

3429 if remote_owner_orig is not None: 

3430 all_remote_owners.add(remote_owner_orig) 

3431 

3432 if remote_table is None: 

3433 # ticket 363 

3434 if dblink and not dblink.startswith("@"): 

3435 dblink = f"@{dblink}" 

3436 util.warn( 

3437 "Got 'None' querying 'table_name' from " 

3438 f"all_cons_columns{dblink or ''} - does the user have " 

3439 "proper rights to the table?" 

3440 ) 

3441 continue 

3442 

3443 if constraint_name not in table_fkey: 

3444 table_fkey[constraint_name] = fkey = { 

3445 "name": constraint_name, 

3446 "constrained_columns": [], 

3447 "referred_schema": None, 

3448 "referred_table": remote_table, 

3449 "referred_columns": [], 

3450 "options": {}, 

3451 } 

3452 

3453 if resolve_synonyms: 

3454 # will be removed below 

3455 fkey["_ref_schema"] = remote_owner 

3456 

3457 if schema is not None or remote_owner_orig != owner: 

3458 fkey["referred_schema"] = remote_owner 

3459 

3460 delete_rule = row_dict["delete_rule"] 

3461 if delete_rule != "NO ACTION": 

3462 fkey["options"]["ondelete"] = delete_rule 

3463 

3464 else: 

3465 fkey = table_fkey[constraint_name] 

3466 

3467 fkey["constrained_columns"].append(local_column) 

3468 fkey["referred_columns"].append(remote_column) 

3469 

3470 if resolve_synonyms and all_remote_owners: 

3471 query = select( 

3472 dictionary.all_synonyms.c.owner, 

3473 dictionary.all_synonyms.c.table_name, 

3474 dictionary.all_synonyms.c.table_owner, 

3475 dictionary.all_synonyms.c.synonym_name, 

3476 ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners)) 

3477 

3478 result = self._execute_reflection( 

3479 connection, query, dblink, returns_long=False 

3480 ).mappings() 

3481 

3482 remote_owners_lut = {} 

3483 for row in result: 

3484 synonym_owner = self.normalize_name(row["owner"]) 

3485 table_name = self.normalize_name(row["table_name"]) 

3486 

3487 remote_owners_lut[(synonym_owner, table_name)] = ( 

3488 row["table_owner"], 

3489 row["synonym_name"], 

3490 ) 

3491 

3492 empty = (None, None) 

3493 for table_fkeys in fkeys.values(): 

3494 for table_fkey in table_fkeys.values(): 

3495 key = ( 

3496 table_fkey.pop("_ref_schema"), 

3497 table_fkey["referred_table"], 

3498 ) 

3499 remote_owner, syn_name = remote_owners_lut.get(key, empty) 

3500 if syn_name: 

3501 sn = self.normalize_name(syn_name) 

3502 table_fkey["referred_table"] = sn 

3503 if schema is not None or remote_owner != owner: 

3504 ro = self.normalize_name(remote_owner) 

3505 table_fkey["referred_schema"] = ro 

3506 else: 

3507 table_fkey["referred_schema"] = None 

3508 default = ReflectionDefaults.foreign_keys 

3509 

3510 return ( 

3511 (key, list(fkeys[key].values()) if key in fkeys else default()) 

3512 for key in ( 

3513 (schema, self.normalize_name(obj_name)) 

3514 for obj_name in all_objects 

3515 ) 

3516 ) 

3517 

3518 @reflection.cache 

3519 def get_unique_constraints( 

3520 self, connection, table_name, schema=None, **kw 

3521 ): 

3522 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3523 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3524 """ 

3525 data = self.get_multi_unique_constraints( 

3526 connection, 

3527 schema=schema, 

3528 filter_names=[table_name], 

3529 scope=ObjectScope.ANY, 

3530 kind=ObjectKind.ANY, 

3531 **kw, 

3532 ) 

3533 return self._value_or_raise(data, table_name, schema) 

3534 

3535 @_handle_synonyms_decorator 

3536 def get_multi_unique_constraints( 

3537 self, 

3538 connection, 

3539 *, 

3540 scope, 

3541 schema, 

3542 filter_names, 

3543 kind, 

3544 dblink=None, 

3545 **kw, 

3546 ): 

3547 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3548 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3549 """ 

3550 all_objects = self._get_all_objects( 

3551 connection, schema, scope, kind, filter_names, dblink, **kw 

3552 ) 

3553 

3554 unique_cons = defaultdict(dict) 

3555 

3556 index_names = { 

3557 row_dict["index_name"] 

3558 for row_dict in self._get_indexes_rows( 

3559 connection, schema, dblink, all_objects, **kw 

3560 ) 

3561 } 

3562 

3563 for row_dict in self._get_all_constraint_rows( 

3564 connection, schema, dblink, all_objects, **kw 

3565 ): 

3566 if row_dict["constraint_type"] != "U": 

3567 continue 

3568 table_name = self.normalize_name(row_dict["table_name"]) 

3569 constraint_name_orig = row_dict["constraint_name"] 

3570 constraint_name = self.normalize_name(constraint_name_orig) 

3571 column_name = self.normalize_name(row_dict["local_column"]) 

3572 table_uc = unique_cons[(schema, table_name)] 

3573 

3574 assert constraint_name is not None 

3575 

3576 if constraint_name not in table_uc: 

3577 table_uc[constraint_name] = uc = { 

3578 "name": constraint_name, 

3579 "column_names": [], 

3580 "duplicates_index": ( 

3581 constraint_name 

3582 if constraint_name_orig in index_names 

3583 else None 

3584 ), 

3585 } 

3586 else: 

3587 uc = table_uc[constraint_name] 

3588 

3589 uc["column_names"].append(column_name) 

3590 

3591 default = ReflectionDefaults.unique_constraints 

3592 

3593 return ( 

3594 ( 

3595 key, 

3596 ( 

3597 list(unique_cons[key].values()) 

3598 if key in unique_cons 

3599 else default() 

3600 ), 

3601 ) 

3602 for key in ( 

3603 (schema, self.normalize_name(obj_name)) 

3604 for obj_name in all_objects 

3605 ) 

3606 ) 

3607 

3608 @reflection.cache 

3609 def get_view_definition( 

3610 self, 

3611 connection, 

3612 view_name, 

3613 schema=None, 

3614 dblink=None, 

3615 **kw, 

3616 ): 

3617 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3618 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3619 """ 

3620 if kw.get("oracle_resolve_synonyms", False): 

3621 synonyms = self._get_synonyms( 

3622 connection, schema, filter_names=[view_name], dblink=dblink 

3623 ) 

3624 if synonyms: 

3625 assert len(synonyms) == 1 

3626 row_dict = synonyms[0] 

3627 dblink = self.normalize_name(row_dict["db_link"]) 

3628 schema = row_dict["table_owner"] 

3629 view_name = row_dict["table_name"] 

3630 

3631 name = self.denormalize_name(view_name) 

3632 owner = self.denormalize_schema_name( 

3633 schema or self.default_schema_name 

3634 ) 

3635 query = ( 

3636 select(dictionary.all_views.c.text) 

3637 .where( 

3638 dictionary.all_views.c.view_name == name, 

3639 dictionary.all_views.c.owner == owner, 

3640 ) 

3641 .union_all( 

3642 select(dictionary.all_mviews.c.query).where( 

3643 dictionary.all_mviews.c.mview_name == name, 

3644 dictionary.all_mviews.c.owner == owner, 

3645 ) 

3646 ) 

3647 ) 

3648 

3649 rp = self._execute_reflection( 

3650 connection, query, dblink, returns_long=False 

3651 ).scalar() 

3652 if rp is None: 

3653 raise exc.NoSuchTableError( 

3654 f"{schema}.{view_name}" if schema else view_name 

3655 ) 

3656 else: 

3657 return rp 

3658 

3659 @reflection.cache 

3660 def get_check_constraints( 

3661 self, connection, table_name, schema=None, include_all=False, **kw 

3662 ): 

3663 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3664 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3665 """ 

3666 data = self.get_multi_check_constraints( 

3667 connection, 

3668 schema=schema, 

3669 filter_names=[table_name], 

3670 scope=ObjectScope.ANY, 

3671 include_all=include_all, 

3672 kind=ObjectKind.ANY, 

3673 **kw, 

3674 ) 

3675 return self._value_or_raise(data, table_name, schema) 

3676 

3677 @_handle_synonyms_decorator 

3678 def get_multi_check_constraints( 

3679 self, 

3680 connection, 

3681 *, 

3682 schema, 

3683 filter_names, 

3684 dblink=None, 

3685 scope, 

3686 kind, 

3687 include_all=False, 

3688 **kw, 

3689 ): 

3690 """Supported kw arguments are: ``dblink`` to reflect via a db link; 

3691 ``oracle_resolve_synonyms`` to resolve names to synonyms 

3692 """ 

3693 all_objects = self._get_all_objects( 

3694 connection, schema, scope, kind, filter_names, dblink, **kw 

3695 ) 

3696 

3697 not_null = re.compile(r"..+?. IS NOT NULL$") 

3698 

3699 check_constraints = defaultdict(list) 

3700 

3701 for row_dict in self._get_all_constraint_rows( 

3702 connection, schema, dblink, all_objects, **kw 

3703 ): 

3704 if row_dict["constraint_type"] != "C": 

3705 continue 

3706 table_name = self.normalize_name(row_dict["table_name"]) 

3707 constraint_name = self.normalize_name(row_dict["constraint_name"]) 

3708 search_condition = row_dict["search_condition"] 

3709 

3710 table_checks = check_constraints[(schema, table_name)] 

3711 if constraint_name is not None and ( 

3712 include_all or not not_null.match(search_condition) 

3713 ): 

3714 table_checks.append( 

3715 {"name": constraint_name, "sqltext": search_condition} 

3716 ) 

3717 

3718 default = ReflectionDefaults.check_constraints 

3719 

3720 return ( 

3721 ( 

3722 key, 

3723 ( 

3724 check_constraints[key] 

3725 if key in check_constraints 

3726 else default() 

3727 ), 

3728 ) 

3729 for key in ( 

3730 (schema, self.normalize_name(obj_name)) 

3731 for obj_name in all_objects 

3732 ) 

3733 ) 

3734 

3735 def _list_dblinks(self, connection, dblink=None): 

3736 query = select(dictionary.all_db_links.c.db_link) 

3737 links = self._execute_reflection( 

3738 connection, query, dblink, returns_long=False 

3739 ).scalars() 

3740 return [self.normalize_name(link) for link in links] 

3741 

3742 

3743class _OuterJoinColumn(sql.ClauseElement): 

3744 __visit_name__ = "outer_join_column" 

3745 

3746 def __init__(self, column): 

3747 self.column = column