1# dialects/oracle/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r"""
11.. dialect:: oracle
12 :name: Oracle Database
13 :normal_support: 11+
14 :best_effort: 9+
15
16
17Auto Increment Behavior
18-----------------------
19
20SQLAlchemy Table objects which include integer primary keys are usually assumed
21to have "autoincrementing" behavior, meaning they can generate their own
22primary key values upon INSERT. For use within Oracle Database, two options are
23available, which are the use of IDENTITY columns (Oracle Database 12 and above
24only) or the association of a SEQUENCE with the column.
25
26Specifying GENERATED AS IDENTITY (Oracle Database 12 and above)
27~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
28
29Starting from version 12, Oracle Database can make use of identity columns
30using the :class:`_sql.Identity` to specify the autoincrementing behavior::
31
32 t = Table(
33 "mytable",
34 metadata,
35 Column("id", Integer, Identity(start=3), primary_key=True),
36 Column(...),
37 ...,
38 )
39
40The CREATE TABLE for the above :class:`_schema.Table` object would be:
41
42.. sourcecode:: sql
43
44 CREATE TABLE mytable (
45 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
46 ...,
47 PRIMARY KEY (id)
48 )
49
50The :class:`_schema.Identity` object support many options to control the
51"autoincrementing" behavior of the column, like the starting value, the
52incrementing value, etc. In addition to the standard options, Oracle Database
53supports setting :paramref:`_schema.Identity.always` to ``None`` to use the
54default generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
55setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
56in conjunction with a 'BY DEFAULT' identity column.
57
58Using a SEQUENCE (all Oracle Database versions)
59~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
60
61Older version of Oracle Database had no "autoincrement" feature: SQLAlchemy
62relies upon sequences to produce these values. With the older Oracle Database
63versions, *a sequence must always be explicitly specified to enable
64autoincrement*. This is divergent with the majority of documentation examples
65which assume the usage of an autoincrement-capable database. To specify
66sequences, use the sqlalchemy.schema.Sequence object which is passed to a
67Column construct::
68
69 t = Table(
70 "mytable",
71 metadata,
72 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
73 Column(...),
74 ...,
75 )
76
77This step is also required when using table reflection, i.e. autoload_with=engine::
78
79 t = Table(
80 "mytable",
81 metadata,
82 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
83 autoload_with=engine,
84 )
85
86.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
87 in a :class:`_schema.Column` to specify the option of an autoincrementing
88 column.
89
90.. _oracle_isolation_level:
91
92Transaction Isolation Level / Autocommit
93----------------------------------------
94
95Oracle Database supports "READ COMMITTED" and "SERIALIZABLE" modes of
96isolation. The AUTOCOMMIT isolation level is also supported by the
97python-oracledb and cx_Oracle dialects.
98
99To set using per-connection execution options::
100
101 connection = engine.connect()
102 connection = connection.execution_options(isolation_level="AUTOCOMMIT")
103
104For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle Database dialects sets
105the level at the session level using ``ALTER SESSION``, which is reverted back
106to its default setting when the connection is returned to the connection pool.
107
108Valid values for ``isolation_level`` include:
109
110* ``READ COMMITTED``
111* ``AUTOCOMMIT``
112* ``SERIALIZABLE``
113
114.. note:: The implementation for the
115 :meth:`_engine.Connection.get_isolation_level` method as implemented by the
116 Oracle Database dialects necessarily force the start of a transaction using the
117 Oracle Database DBMS_TRANSACTION.LOCAL_TRANSACTION_ID function; otherwise no
118 level is normally readable.
119
120 Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
121 raise an exception if the ``v$transaction`` view is not available due to
122 permissions or other reasons, which is a common occurrence in Oracle Database
123 installations.
124
125 The python-oracledb and cx_Oracle dialects attempt to call the
126 :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
127 its first connection to the database in order to acquire the
128 "default"isolation level. This default level is necessary so that the level
129 can be reset on a connection after it has been temporarily modified using
130 :meth:`_engine.Connection.execution_options` method. In the common event
131 that the :meth:`_engine.Connection.get_isolation_level` method raises an
132 exception due to ``v$transaction`` not being readable as well as any other
133 database-related failure, the level is assumed to be "READ COMMITTED". No
134 warning is emitted for this initial first-connect condition as it is
135 expected to be a common restriction on Oracle databases.
136
137.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_Oracle dialect
138 as well as the notion of a default isolation level
139
140.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
141 reading of the isolation level.
142
143.. versionchanged:: 1.3.22 In the event that the default isolation
144 level cannot be read due to permissions on the v$transaction view as
145 is common in Oracle installations, the default isolation level is hardcoded
146 to "READ COMMITTED" which was the behavior prior to 1.3.21.
147
148.. seealso::
149
150 :ref:`dbapi_autocommit`
151
152Identifier Casing
153-----------------
154
155In Oracle Database, the data dictionary represents all case insensitive
156identifier names using UPPERCASE text. This is in contradiction to the
157expectations of SQLAlchemy, which assume a case insensitive name is represented
158as lowercase text.
159
160As an example of case insensitive identifier names, consider the following table:
161
162.. sourcecode:: sql
163
164 CREATE TABLE MyTable (Identifier INTEGER PRIMARY KEY)
165
166If you were to ask Oracle Database for information about this table, the
167table name would be reported as ``MYTABLE`` and the column name would
168be reported as ``IDENTIFIER``. Compare to most other databases such as
169PostgreSQL and MySQL which would report these names as ``mytable`` and
170``identifier``. The names are **not quoted, therefore are case insensitive**.
171The special casing of ``MyTable`` and ``Identifier`` would only be maintained
172if they were quoted in the table definition:
173
174.. sourcecode:: sql
175
176 CREATE TABLE "MyTable" ("Identifier" INTEGER PRIMARY KEY)
177
178When constructing a SQLAlchemy :class:`.Table` object, **an all lowercase name
179is considered to be case insensitive**. So the following table assumes
180case insensitive names::
181
182 Table("mytable", metadata, Column("identifier", Integer, primary_key=True))
183
184Whereas when mixed case or UPPERCASE names are used, case sensitivity is
185assumed::
186
187 Table("MyTable", metadata, Column("Identifier", Integer, primary_key=True))
188
189A similar situation occurs at the database driver level when emitting a
190textual SQL SELECT statement and looking at column names in the DBAPI
191``cursor.description`` attribute. A database like PostgreSQL will normalize
192case insensitive names to be lowercase::
193
194 >>> pg_engine = create_engine("postgresql://scott:tiger@localhost/test")
195 >>> pg_connection = pg_engine.connect()
196 >>> result = pg_connection.exec_driver_sql("SELECT 1 AS SomeName")
197 >>> result.cursor.description
198 (Column(name='somename', type_code=23),)
199
200Whereas Oracle normalizes them to UPPERCASE::
201
202 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
203 >>> oracle_connection = oracle_engine.connect()
204 >>> result = oracle_connection.exec_driver_sql(
205 ... "SELECT 1 AS SomeName FROM DUAL"
206 ... )
207 >>> result.cursor.description
208 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
209
210In order to achieve cross-database parity for the two cases of a. table
211reflection and b. textual-only SQL statement round trips, SQLAlchemy performs a step
212called **name normalization** when using the Oracle dialect. This process may
213also apply to other third party dialects that have similar UPPERCASE handling
214of case insensitive names.
215
216When using name normalization, SQLAlchemy attempts to detect if a name is
217case insensitive by checking if all characters are UPPERCASE letters only;
218if so, then it assumes this is a case insensitive name and is delivered as
219a lowercase name.
220
221For table reflection, a tablename that is seen represented as all UPPERCASE
222in Oracle Database's catalog tables will be assumed to have a case insensitive
223name. This is what allows the ``Table`` definition to use lower case names
224and be equally compatible from a reflection point of view on Oracle Database
225and all other databases such as PostgreSQL and MySQL::
226
227 # matches a table created with CREATE TABLE mytable
228 Table("mytable", metadata, autoload_with=some_engine)
229
230Above, the all lowercase name ``"mytable"`` is case insensitive; it will match
231a table reported by PostgreSQL as ``"mytable"`` and a table reported by
232Oracle as ``"MYTABLE"``. If name normalization were not present, it would
233not be possible for the above :class:`.Table` definition to be introspectable
234in a cross-database way, since we are dealing with a case insensitive name
235that is not reported by each database in the same way.
236
237Case sensitivity can be forced on in this case, such as if we wanted to represent
238the quoted tablename ``"MYTABLE"`` with that exact casing, most simply by using
239that casing directly, which will be seen as a case sensitive name::
240
241 # matches a table created with CREATE TABLE "MYTABLE"
242 Table("MYTABLE", metadata, autoload_with=some_engine)
243
244For the unusual case of a quoted all-lowercase name, the :class:`.quoted_name`
245construct may be used::
246
247 from sqlalchemy import quoted_name
248
249 # matches a table created with CREATE TABLE "mytable"
250 Table(
251 quoted_name("mytable", quote=True), metadata, autoload_with=some_engine
252 )
253
254Name normalization also takes place when handling result sets from **purely
255textual SQL strings**, that have no other :class:`.Table` or :class:`.Column`
256metadata associated with them. This includes SQL strings executed using
257:meth:`.Connection.exec_driver_sql` and SQL strings executed using the
258:func:`.text` construct which do not include :class:`.Column` metadata.
259
260Returning to the Oracle Database SELECT statement, we see that even though
261``cursor.description`` reports the column name as ``SOMENAME``, SQLAlchemy
262name normalizes this to ``somename``::
263
264 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
265 >>> oracle_connection = oracle_engine.connect()
266 >>> result = oracle_connection.exec_driver_sql(
267 ... "SELECT 1 AS SomeName FROM DUAL"
268 ... )
269 >>> result.cursor.description
270 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
271 >>> result.keys()
272 RMKeyView(['somename'])
273
274The single scenario where the above behavior produces inaccurate results
275is when using an all-uppercase, quoted name. SQLAlchemy has no way to determine
276that a particular name in ``cursor.description`` was quoted, and is therefore
277case sensitive, or was not quoted, and should be name normalized::
278
279 >>> result = oracle_connection.exec_driver_sql(
280 ... 'SELECT 1 AS "SOMENAME" FROM DUAL'
281 ... )
282 >>> result.cursor.description
283 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
284 >>> result.keys()
285 RMKeyView(['somename'])
286
287For this case, a new feature will be available in SQLAlchemy 2.1 to disable
288the name normalization behavior in specific cases.
289
290
291.. _oracle_max_identifier_lengths:
292
293Maximum Identifier Lengths
294--------------------------
295
296SQLAlchemy is sensitive to the maximum identifier length supported by Oracle
297Database. This affects generated SQL label names as well as the generation of
298constraint names, particularly in the case where the constraint naming
299convention feature described at :ref:`constraint_naming_conventions` is being
300used.
301
302Oracle Database 12.2 increased the default maximum identifier length from 30 to
303128. As of SQLAlchemy 1.4, the default maximum identifier length for the Oracle
304dialects is 128 characters. Upon first connection, the maximum length actually
305supported by the database is obtained. In all cases, setting the
306:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
307change and the value given will be used as is::
308
309 engine = create_engine(
310 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1",
311 max_identifier_length=30,
312 )
313
314If :paramref:`_sa.create_engine.max_identifier_length` is not set, the oracledb
315dialect internally uses the ``max_identifier_length`` attribute available on
316driver connections since python-oracledb version 2.5. When using an older
317driver version, or using the cx_Oracle dialect, SQLAlchemy will instead attempt
318to use the query ``SELECT value FROM v$parameter WHERE name = 'compatible'``
319upon first connect in order to determine the effective compatibility version of
320the database. The "compatibility" version is a version number that is
321independent of the actual database version. It is used to assist database
322migration. It is configured by an Oracle Database initialization parameter. The
323compatibility version then determines the maximum allowed identifier length for
324the database. If the V$ view is not available, the database version information
325is used instead.
326
327The maximum identifier length comes into play both when generating anonymized
328SQL labels in SELECT statements, but more crucially when generating constraint
329names from a naming convention. It is this area that has created the need for
330SQLAlchemy to change this default conservatively. For example, the following
331naming convention produces two very different constraint names based on the
332identifier length::
333
334 from sqlalchemy import Column
335 from sqlalchemy import Index
336 from sqlalchemy import Integer
337 from sqlalchemy import MetaData
338 from sqlalchemy import Table
339 from sqlalchemy.dialects import oracle
340 from sqlalchemy.schema import CreateIndex
341
342 m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
343
344 t = Table(
345 "t",
346 m,
347 Column("some_column_name_1", Integer),
348 Column("some_column_name_2", Integer),
349 Column("some_column_name_3", Integer),
350 )
351
352 ix = Index(
353 None,
354 t.c.some_column_name_1,
355 t.c.some_column_name_2,
356 t.c.some_column_name_3,
357 )
358
359 oracle_dialect = oracle.dialect(max_identifier_length=30)
360 print(CreateIndex(ix).compile(dialect=oracle_dialect))
361
362With an identifier length of 30, the above CREATE INDEX looks like:
363
364.. sourcecode:: sql
365
366 CREATE INDEX ix_some_column_name_1s_70cd ON t
367 (some_column_name_1, some_column_name_2, some_column_name_3)
368
369However with length of 128, it becomes::
370
371.. sourcecode:: sql
372
373 CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
374 (some_column_name_1, some_column_name_2, some_column_name_3)
375
376Applications which have run versions of SQLAlchemy prior to 1.4 on Oracle
377Database version 12.2 or greater are therefore subject to the scenario of a
378database migration that wishes to "DROP CONSTRAINT" on a name that was
379previously generated with the shorter length. This migration will fail when
380the identifier length is changed without the name of the index or constraint
381first being adjusted. Such applications are strongly advised to make use of
382:paramref:`_sa.create_engine.max_identifier_length` in order to maintain
383control of the generation of truncated names, and to fully review and test all
384database migrations in a staging environment when changing this value to ensure
385that the impact of this change has been mitigated.
386
387.. versionchanged:: 1.4 the default max_identifier_length for Oracle Database
388 is 128 characters, which is adjusted down to 30 upon first connect if the
389 Oracle Database, or its compatibility setting, are lower than version 12.2.
390
391
392LIMIT/OFFSET/FETCH Support
393--------------------------
394
395Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make use
396of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming Oracle Database 12c or
397above, and assuming the SELECT statement is not embedded within a compound
398statement like UNION. This syntax is also available directly by using the
399:meth:`_sql.Select.fetch` method.
400
401.. versionchanged:: 2.0 the Oracle Database dialects now use ``FETCH FIRST N
402 ROW / OFFSET N ROWS`` for all :meth:`_sql.Select.limit` and
403 :meth:`_sql.Select.offset` usage including within the ORM and legacy
404 :class:`_orm.Query`. To force the legacy behavior using window functions,
405 specify the ``enable_offset_fetch=False`` dialect parameter to
406 :func:`_sa.create_engine`.
407
408The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle Database
409version by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`,
410which will force the use of "legacy" mode that makes use of window functions.
411This mode is also selected automatically when using a version of Oracle
412Database prior to 12c.
413
414When using legacy mode, or when a :class:`.Select` statement with limit/offset
415is embedded in a compound statement, an emulated approach for LIMIT / OFFSET
416based on window functions is used, which involves creation of a subquery using
417``ROW_NUMBER`` that is prone to performance issues as well as SQL construction
418issues for complex statements. However, this approach is supported by all
419Oracle Database versions. See notes below.
420
421Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used)
422~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
423
424If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the
425ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an
426Oracle Database version prior to 12c, the following notes apply:
427
428* SQLAlchemy currently makes use of ROWNUM to achieve
429 LIMIT/OFFSET; the exact methodology is taken from
430 https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
431
432* the "FIRST_ROWS()" optimization keyword is not used by default. To enable
433 the usage of this optimization directive, specify ``optimize_limits=True``
434 to :func:`_sa.create_engine`.
435
436 .. versionchanged:: 1.4
437
438 The Oracle Database dialect renders limit/offset integer values using a
439 "post compile" scheme which renders the integer directly before passing
440 the statement to the cursor for execution. The ``use_binds_for_limits``
441 flag no longer has an effect.
442
443 .. seealso::
444
445 :ref:`change_4808`.
446
447.. _oracle_returning:
448
449RETURNING Support
450-----------------
451
452Oracle Database supports RETURNING fully for INSERT, UPDATE and DELETE
453statements that are invoked with a single collection of bound parameters (that
454is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally
455support RETURNING with :term:`executemany` statements). Multiple rows may be
456returned as well.
457
458.. versionchanged:: 2.0 the Oracle Database backend has full support for
459 RETURNING on parity with other backends.
460
461
462ON UPDATE CASCADE
463-----------------
464
465Oracle Database doesn't have native ON UPDATE CASCADE functionality. A trigger
466based solution is available at
467https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
468
469When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
470cascading updates - specify ForeignKey objects using the
471"deferrable=True, initially='deferred'" keyword arguments,
472and specify "passive_updates=False" on each relationship().
473
474Oracle Database 8 Compatibility
475-------------------------------
476
477.. warning:: The status of Oracle Database 8 compatibility is not known for
478 SQLAlchemy 2.0.
479
480When Oracle Database 8 is detected, the dialect internally configures itself to
481the following behaviors:
482
483* the use_ansi flag is set to False. This has the effect of converting all
484 JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
485 makes use of Oracle's (+) operator.
486
487* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
488 the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued
489 instead. This because these types don't seem to work correctly on Oracle 8
490 even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and
491 :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
492 NVARCHAR2 and NCLOB.
493
494
495Synonym/DBLINK Reflection
496-------------------------
497
498When using reflection with Table objects, the dialect can optionally search
499for tables indicated by synonyms, either in local or remote schemas or
500accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
501a keyword argument to the :class:`_schema.Table` construct::
502
503 some_table = Table(
504 "some_table", autoload_with=some_engine, oracle_resolve_synonyms=True
505 )
506
507When this flag is set, the given name (such as ``some_table`` above) will be
508searched not just in the ``ALL_TABLES`` view, but also within the
509``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
510name. If the synonym is located and refers to a DBLINK, the Oracle Database
511dialects know how to locate the table's information using DBLINK syntax(e.g.
512``@dblink``).
513
514``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
515accepted, including methods such as :meth:`_schema.MetaData.reflect` and
516:meth:`_reflection.Inspector.get_columns`.
517
518If synonyms are not in use, this flag should be left disabled.
519
520.. _oracle_constraint_reflection:
521
522Constraint Reflection
523---------------------
524
525The Oracle Database dialects can return information about foreign key, unique,
526and CHECK constraints, as well as indexes on tables.
527
528Raw information regarding these constraints can be acquired using
529:meth:`_reflection.Inspector.get_foreign_keys`,
530:meth:`_reflection.Inspector.get_unique_constraints`,
531:meth:`_reflection.Inspector.get_check_constraints`, and
532:meth:`_reflection.Inspector.get_indexes`.
533
534.. versionchanged:: 1.2 The Oracle Database dialect can now reflect UNIQUE and
535 CHECK constraints.
536
537When using reflection at the :class:`_schema.Table` level, the
538:class:`_schema.Table`
539will also include these constraints.
540
541Note the following caveats:
542
543* When using the :meth:`_reflection.Inspector.get_check_constraints` method,
544 Oracle Database builds a special "IS NOT NULL" constraint for columns that
545 specify "NOT NULL". This constraint is **not** returned by default; to
546 include the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
547
548 from sqlalchemy import create_engine, inspect
549
550 engine = create_engine(
551 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1"
552 )
553 inspector = inspect(engine)
554 all_check_constraints = inspector.get_check_constraints(
555 "some_table", include_all=True
556 )
557
558* in most cases, when reflecting a :class:`_schema.Table`, a UNIQUE constraint
559 will **not** be available as a :class:`.UniqueConstraint` object, as Oracle
560 Database mirrors unique constraints with a UNIQUE index in most cases (the
561 exception seems to be when two or more unique constraints represent the same
562 columns); the :class:`_schema.Table` will instead represent these using
563 :class:`.Index` with the ``unique=True`` flag set.
564
565* Oracle Database creates an implicit index for the primary key of a table;
566 this index is **excluded** from all index results.
567
568* the list of columns reflected for an index will not include column names
569 that start with SYS_NC.
570
571Table names with SYSTEM/SYSAUX tablespaces
572-------------------------------------------
573
574The :meth:`_reflection.Inspector.get_table_names` and
575:meth:`_reflection.Inspector.get_temp_table_names`
576methods each return a list of table names for the current engine. These methods
577are also part of the reflection which occurs within an operation such as
578:meth:`_schema.MetaData.reflect`. By default,
579these operations exclude the ``SYSTEM``
580and ``SYSAUX`` tablespaces from the operation. In order to change this, the
581default list of tablespaces excluded can be changed at the engine level using
582the ``exclude_tablespaces`` parameter::
583
584 # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
585 e = create_engine(
586 "oracle+oracledb://scott:tiger@localhost:1521/?service_name=freepdb1",
587 exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"],
588 )
589
590.. _oracle_float_support:
591
592FLOAT / DOUBLE Support and Behaviors
593------------------------------------
594
595The SQLAlchemy :class:`.Float` and :class:`.Double` datatypes are generic
596datatypes that resolve to the "least surprising" datatype for a given backend.
597For Oracle Database, this means they resolve to the ``FLOAT`` and ``DOUBLE``
598types::
599
600 >>> from sqlalchemy import cast, literal, Float
601 >>> from sqlalchemy.dialects import oracle
602 >>> float_datatype = Float()
603 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
604 CAST(:param_1 AS FLOAT)
605
606Oracle's ``FLOAT`` / ``DOUBLE`` datatypes are aliases for ``NUMBER``. Oracle
607Database stores ``NUMBER`` values with full precision, not floating point
608precision, which means that ``FLOAT`` / ``DOUBLE`` do not actually behave like
609native FP values. Oracle Database instead offers special datatypes
610``BINARY_FLOAT`` and ``BINARY_DOUBLE`` to deliver real 4- and 8- byte FP
611values.
612
613SQLAlchemy supports these datatypes directly using :class:`.BINARY_FLOAT` and
614:class:`.BINARY_DOUBLE`. To use the :class:`.Float` or :class:`.Double`
615datatypes in a database agnostic way, while allowing Oracle backends to utilize
616one of these types, use the :meth:`.TypeEngine.with_variant` method to set up a
617variant::
618
619 >>> from sqlalchemy import cast, literal, Float
620 >>> from sqlalchemy.dialects import oracle
621 >>> float_datatype = Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
622 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
623 CAST(:param_1 AS BINARY_FLOAT)
624
625E.g. to use this datatype in a :class:`.Table` definition::
626
627 my_table = Table(
628 "my_table",
629 metadata,
630 Column(
631 "fp_data", Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
632 ),
633 )
634
635DateTime Compatibility
636----------------------
637
638Oracle Database has no datatype known as ``DATETIME``, it instead has only
639``DATE``, which can actually store a date and time value. For this reason, the
640Oracle Database dialects provide a type :class:`_oracle.DATE` which is a
641subclass of :class:`.DateTime`. This type has no special behavior, and is only
642present as a "marker" for this type; additionally, when a database column is
643reflected and the type is reported as ``DATE``, the time-supporting
644:class:`_oracle.DATE` type is used.
645
646.. _oracle_table_options:
647
648Oracle Database Table Options
649-----------------------------
650
651The CREATE TABLE phrase supports the following options with Oracle Database
652dialects in conjunction with the :class:`_schema.Table` construct:
653
654
655* ``ON COMMIT``::
656
657 Table(
658 "some_table",
659 metadata,
660 ...,
661 prefixes=["GLOBAL TEMPORARY"],
662 oracle_on_commit="PRESERVE ROWS",
663 )
664
665*
666 ``COMPRESS``::
667
668 Table(
669 "mytable", metadata, Column("data", String(32)), oracle_compress=True
670 )
671
672 Table("mytable", metadata, Column("data", String(32)), oracle_compress=6)
673
674 The ``oracle_compress`` parameter accepts either an integer compression
675 level, or ``True`` to use the default compression level.
676
677*
678 ``TABLESPACE``::
679
680 Table("mytable", metadata, ..., oracle_tablespace="EXAMPLE_TABLESPACE")
681
682 The ``oracle_tablespace`` parameter specifies the tablespace in which the
683 table is to be created. This is useful when you want to create a table in a
684 tablespace other than the default tablespace of the user.
685
686 .. versionadded:: 2.0.37
687
688.. _oracle_index_options:
689
690Oracle Database Specific Index Options
691--------------------------------------
692
693Bitmap Indexes
694~~~~~~~~~~~~~~
695
696You can specify the ``oracle_bitmap`` parameter to create a bitmap index
697instead of a B-tree index::
698
699 Index("my_index", my_table.c.data, oracle_bitmap=True)
700
701Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
702check for such limitations, only the database will.
703
704Index compression
705~~~~~~~~~~~~~~~~~
706
707Oracle Database has a more efficient storage mode for indexes containing lots
708of repeated values. Use the ``oracle_compress`` parameter to turn on key
709compression::
710
711 Index("my_index", my_table.c.data, oracle_compress=True)
712
713 Index(
714 "my_index",
715 my_table.c.data1,
716 my_table.c.data2,
717 unique=True,
718 oracle_compress=1,
719 )
720
721The ``oracle_compress`` parameter accepts either an integer specifying the
722number of prefix columns to compress, or ``True`` to use the default (all
723columns for non-unique indexes, all but the last column for unique indexes).
724
725.. _oracle_vector_datatype:
726
727VECTOR Datatype
728---------------
729
730Oracle Database 23ai introduced a new VECTOR datatype for artificial intelligence
731and machine learning search operations. The VECTOR datatype is a homogeneous array
732of 8-bit signed integers, 8-bit unsigned integers (binary), 32-bit floating-point
733numbers, or 64-bit floating-point numbers.
734
735A vector's storage type can be either DENSE or SPARSE. A dense vector contains
736meaningful values in most or all of its dimensions. In contrast, a sparse vector
737has non-zero values in only a few dimensions, with the majority being zero.
738
739Sparse vectors are represented by the total number of vector dimensions, an array
740of indices, and an array of values where each value’s location in the vector is
741indicated by the corresponding indices array position. All other vector values are
742treated as zero.
743
744The storage formats that can be used with sparse vectors are float32, float64, and
745int8. Note that the binary storage format cannot be used with sparse vectors.
746
747Sparse vectors are supported when you are using Oracle Database 23.7 or later.
748
749.. seealso::
750
751 `Using VECTOR Data
752 <https://python-oracledb.readthedocs.io/en/latest/user_guide/vector_data_type.html>`_ - in the documentation
753 for the :ref:`oracledb` driver.
754
755.. versionadded:: 2.0.41 - Added VECTOR datatype
756
757.. versionadded:: 2.0.43 - Added DENSE/SPARSE support
758
759CREATE TABLE support for VECTOR
760~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
761
762With the :class:`.VECTOR` datatype, you can specify the number of dimensions,
763the storage format, and the storage type for the data. Valid values for the
764storage format are enum members of :class:`.VectorStorageFormat`. Valid values
765for the storage type are enum members of :class:`.VectorStorageType`. If
766storage type is not specified, a DENSE vector is created by default.
767
768To create a table that includes a :class:`.VECTOR` column::
769
770 from sqlalchemy.dialects.oracle import (
771 VECTOR,
772 VectorStorageFormat,
773 VectorStorageType,
774 )
775
776 t = Table(
777 "t1",
778 metadata,
779 Column("id", Integer, primary_key=True),
780 Column(
781 "embedding",
782 VECTOR(
783 dim=3,
784 storage_format=VectorStorageFormat.FLOAT32,
785 storage_type=VectorStorageType.SPARSE,
786 ),
787 ),
788 Column(...),
789 ...,
790 )
791
792Vectors can also be defined with an arbitrary number of dimensions and formats.
793This allows you to specify vectors of different dimensions with the various
794storage formats mentioned below.
795
796**Examples**
797
798* In this case, the storage format is flexible, allowing any vector type data to be
799 inserted, such as INT8 or BINARY etc::
800
801 vector_col: Mapped[array.array] = mapped_column(VECTOR(dim=3))
802
803* The dimension is flexible in this case, meaning that any dimension vector can
804 be used::
805
806 vector_col: Mapped[array.array] = mapped_column(
807 VECTOR(storage_format=VectorStorageType.INT8)
808 )
809
810* Both the dimensions and the storage format are flexible. It creates a DENSE vector::
811
812 vector_col: Mapped[array.array] = mapped_column(VECTOR)
813
814* To create a SPARSE vector with both dimensions and the storage format as flexible,
815 use the :attr:`.VectorStorageType.SPARSE` storage type::
816
817 vector_col: Mapped[array.array] = mapped_column(
818 VECTOR(storage_type=VectorStorageType.SPARSE)
819 )
820
821Python Datatypes for VECTOR
822~~~~~~~~~~~~~~~~~~~~~~~~~~~
823
824VECTOR data can be inserted using Python list or Python ``array.array()`` objects.
825Python arrays of type FLOAT (32-bit), DOUBLE (64-bit), INT (8-bit signed integers),
826or BINARY (8-bit unsigned integers) are used as bind values when inserting
827VECTOR columns::
828
829 from sqlalchemy import insert, select
830
831 with engine.begin() as conn:
832 conn.execute(
833 insert(t1),
834 {"id": 1, "embedding": [1, 2, 3]},
835 )
836
837Data can be inserted into a sparse vector using the :class:`_oracle.SparseVector`
838class, creating an object consisting of the number of dimensions, an array of indices, and a
839corresponding array of values::
840
841 from sqlalchemy import insert, select
842 from sqlalchemy.dialects.oracle import SparseVector
843
844 sparse_val = SparseVector(10, [1, 2], array.array("d", [23.45, 221.22]))
845
846 with engine.begin() as conn:
847 conn.execute(
848 insert(t1),
849 {"id": 1, "embedding": sparse_val},
850 )
851
852VECTOR Indexes
853~~~~~~~~~~~~~~
854
855The VECTOR feature supports an Oracle-specific parameter ``oracle_vector``
856on the :class:`.Index` construct, which allows the construction of VECTOR
857indexes.
858
859SPARSE vectors cannot be used in the creation of vector indexes.
860
861To utilize VECTOR indexing, set the ``oracle_vector`` parameter to True to use
862the default values provided by Oracle. HNSW is the default indexing method::
863
864 from sqlalchemy import Index
865
866 Index(
867 "vector_index",
868 t1.c.embedding,
869 oracle_vector=True,
870 )
871
872The full range of parameters for vector indexes are available by using the
873:class:`.VectorIndexConfig` dataclass in place of a boolean; this dataclass
874allows full configuration of the index::
875
876 Index(
877 "hnsw_vector_index",
878 t1.c.embedding,
879 oracle_vector=VectorIndexConfig(
880 index_type=VectorIndexType.HNSW,
881 distance=VectorDistanceType.COSINE,
882 accuracy=90,
883 hnsw_neighbors=5,
884 hnsw_efconstruction=20,
885 parallel=10,
886 ),
887 )
888
889 Index(
890 "ivf_vector_index",
891 t1.c.embedding,
892 oracle_vector=VectorIndexConfig(
893 index_type=VectorIndexType.IVF,
894 distance=VectorDistanceType.DOT,
895 accuracy=90,
896 ivf_neighbor_partitions=5,
897 ),
898 )
899
900For complete explanation of these parameters, see the Oracle documentation linked
901below.
902
903.. seealso::
904
905 `CREATE VECTOR INDEX <https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B396C369-54BB-4098-A0DD-7C54B3A0D66F>`_ - in the Oracle documentation
906
907
908
909Similarity Searching
910~~~~~~~~~~~~~~~~~~~~
911
912When using the :class:`_oracle.VECTOR` datatype with a :class:`.Column` or similar
913ORM mapped construct, additional comparison functions are available, including:
914
915* ``l2_distance``
916* ``cosine_distance``
917* ``inner_product``
918
919Example Usage::
920
921 result_vector = connection.scalars(
922 select(t1).order_by(t1.embedding.l2_distance([2, 3, 4])).limit(3)
923 )
924
925 for user in vector:
926 print(user.id, user.embedding)
927
928FETCH APPROXIMATE support
929~~~~~~~~~~~~~~~~~~~~~~~~~
930
931Approximate vector search can only be performed when all syntax and semantic
932rules are satisfied, the corresponding vector index is available, and the
933query optimizer determines to perform it. If any of these conditions are
934unmet, then an approximate search is not performed. In this case the query
935returns exact results.
936
937To enable approximate searching during similarity searches on VECTORS, the
938``oracle_fetch_approximate`` parameter may be used with the :meth:`.Select.fetch`
939clause to add ``FETCH APPROX`` to the SELECT statement::
940
941 select(users_table).fetch(5, oracle_fetch_approximate=True)
942
943""" # noqa
944
945from __future__ import annotations
946
947from collections import defaultdict
948from dataclasses import fields
949from functools import lru_cache
950from functools import wraps
951import re
952
953from . import dictionary
954from .types import _OracleBoolean
955from .types import _OracleDate
956from .types import BFILE
957from .types import BINARY_DOUBLE
958from .types import BINARY_FLOAT
959from .types import DATE
960from .types import FLOAT
961from .types import INTERVAL
962from .types import LONG
963from .types import NCLOB
964from .types import NUMBER
965from .types import NVARCHAR2 # noqa
966from .types import OracleRaw # noqa
967from .types import RAW
968from .types import ROWID # noqa
969from .types import TIMESTAMP
970from .types import VARCHAR2 # noqa
971from .vector import VECTOR
972from .vector import VectorIndexConfig
973from .vector import VectorIndexType
974from ... import Computed
975from ... import exc
976from ... import schema as sa_schema
977from ... import sql
978from ... import util
979from ...engine import default
980from ...engine import ObjectKind
981from ...engine import ObjectScope
982from ...engine import reflection
983from ...engine.reflection import ReflectionDefaults
984from ...sql import and_
985from ...sql import bindparam
986from ...sql import compiler
987from ...sql import expression
988from ...sql import func
989from ...sql import null
990from ...sql import or_
991from ...sql import select
992from ...sql import selectable as sa_selectable
993from ...sql import sqltypes
994from ...sql import util as sql_util
995from ...sql import visitors
996from ...sql.visitors import InternalTraversal
997from ...types import BLOB
998from ...types import CHAR
999from ...types import CLOB
1000from ...types import DOUBLE_PRECISION
1001from ...types import INTEGER
1002from ...types import NCHAR
1003from ...types import NVARCHAR
1004from ...types import REAL
1005from ...types import VARCHAR
1006
1007RESERVED_WORDS = set(
1008 "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
1009 "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
1010 "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
1011 "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
1012 "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
1013 "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
1014 "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
1015 "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
1016 "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
1017)
1018
1019NO_ARG_FNS = set(
1020 "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split()
1021)
1022
1023
1024colspecs = {
1025 sqltypes.Boolean: _OracleBoolean,
1026 sqltypes.Interval: INTERVAL,
1027 sqltypes.DateTime: DATE,
1028 sqltypes.Date: _OracleDate,
1029}
1030
1031ischema_names = {
1032 "VARCHAR2": VARCHAR,
1033 "NVARCHAR2": NVARCHAR,
1034 "CHAR": CHAR,
1035 "NCHAR": NCHAR,
1036 "DATE": DATE,
1037 "NUMBER": NUMBER,
1038 "BLOB": BLOB,
1039 "BFILE": BFILE,
1040 "CLOB": CLOB,
1041 "NCLOB": NCLOB,
1042 "TIMESTAMP": TIMESTAMP,
1043 "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
1044 "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP,
1045 "INTERVAL DAY TO SECOND": INTERVAL,
1046 "RAW": RAW,
1047 "FLOAT": FLOAT,
1048 "DOUBLE PRECISION": DOUBLE_PRECISION,
1049 "REAL": REAL,
1050 "LONG": LONG,
1051 "BINARY_DOUBLE": BINARY_DOUBLE,
1052 "BINARY_FLOAT": BINARY_FLOAT,
1053 "ROWID": ROWID,
1054 "VECTOR": VECTOR,
1055}
1056
1057
1058class OracleTypeCompiler(compiler.GenericTypeCompiler):
1059 # Note:
1060 # Oracle DATE == DATETIME
1061 # Oracle does not allow milliseconds in DATE
1062 # Oracle does not support TIME columns
1063
1064 def visit_datetime(self, type_, **kw):
1065 return self.visit_DATE(type_, **kw)
1066
1067 def visit_float(self, type_, **kw):
1068 return self.visit_FLOAT(type_, **kw)
1069
1070 def visit_double(self, type_, **kw):
1071 return self.visit_DOUBLE_PRECISION(type_, **kw)
1072
1073 def visit_unicode(self, type_, **kw):
1074 if self.dialect._use_nchar_for_unicode:
1075 return self.visit_NVARCHAR2(type_, **kw)
1076 else:
1077 return self.visit_VARCHAR2(type_, **kw)
1078
1079 def visit_INTERVAL(self, type_, **kw):
1080 return "INTERVAL DAY%s TO SECOND%s" % (
1081 type_.day_precision is not None
1082 and "(%d)" % type_.day_precision
1083 or "",
1084 type_.second_precision is not None
1085 and "(%d)" % type_.second_precision
1086 or "",
1087 )
1088
1089 def visit_LONG(self, type_, **kw):
1090 return "LONG"
1091
1092 def visit_TIMESTAMP(self, type_, **kw):
1093 if getattr(type_, "local_timezone", False):
1094 return "TIMESTAMP WITH LOCAL TIME ZONE"
1095 elif type_.timezone:
1096 return "TIMESTAMP WITH TIME ZONE"
1097 else:
1098 return "TIMESTAMP"
1099
1100 def visit_DOUBLE_PRECISION(self, type_, **kw):
1101 return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
1102
1103 def visit_BINARY_DOUBLE(self, type_, **kw):
1104 return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
1105
1106 def visit_BINARY_FLOAT(self, type_, **kw):
1107 return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
1108
1109 def visit_FLOAT(self, type_, **kw):
1110 kw["_requires_binary_precision"] = True
1111 return self._generate_numeric(type_, "FLOAT", **kw)
1112
1113 def visit_NUMBER(self, type_, **kw):
1114 return self._generate_numeric(type_, "NUMBER", **kw)
1115
1116 def _generate_numeric(
1117 self,
1118 type_,
1119 name,
1120 precision=None,
1121 scale=None,
1122 _requires_binary_precision=False,
1123 **kw,
1124 ):
1125 if precision is None:
1126 precision = getattr(type_, "precision", None)
1127
1128 if _requires_binary_precision:
1129 binary_precision = getattr(type_, "binary_precision", None)
1130
1131 if precision and binary_precision is None:
1132 # https://www.oracletutorial.com/oracle-basics/oracle-float/
1133 estimated_binary_precision = int(precision / 0.30103)
1134 raise exc.ArgumentError(
1135 "Oracle Database FLOAT types use 'binary precision', "
1136 "which does not convert cleanly from decimal "
1137 "'precision'. Please specify "
1138 "this type with a separate Oracle Database variant, such "
1139 f"as {type_.__class__.__name__}(precision={precision})."
1140 f"with_variant(oracle.FLOAT"
1141 f"(binary_precision="
1142 f"{estimated_binary_precision}), 'oracle'), so that the "
1143 "Oracle Database specific 'binary_precision' may be "
1144 "specified accurately."
1145 )
1146 else:
1147 precision = binary_precision
1148
1149 if scale is None:
1150 scale = getattr(type_, "scale", None)
1151
1152 if precision is None:
1153 return name
1154 elif scale is None:
1155 n = "%(name)s(%(precision)s)"
1156 return n % {"name": name, "precision": precision}
1157 else:
1158 n = "%(name)s(%(precision)s, %(scale)s)"
1159 return n % {"name": name, "precision": precision, "scale": scale}
1160
1161 def visit_string(self, type_, **kw):
1162 return self.visit_VARCHAR2(type_, **kw)
1163
1164 def visit_VARCHAR2(self, type_, **kw):
1165 return self._visit_varchar(type_, "", "2")
1166
1167 def visit_NVARCHAR2(self, type_, **kw):
1168 return self._visit_varchar(type_, "N", "2")
1169
1170 visit_NVARCHAR = visit_NVARCHAR2
1171
1172 def visit_VARCHAR(self, type_, **kw):
1173 return self._visit_varchar(type_, "", "")
1174
1175 def _visit_varchar(self, type_, n, num):
1176 if not type_.length:
1177 return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
1178 elif not n and self.dialect._supports_char_length:
1179 varchar = "VARCHAR%(two)s(%(length)s CHAR)"
1180 return varchar % {"length": type_.length, "two": num}
1181 else:
1182 varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
1183 return varchar % {"length": type_.length, "two": num, "n": n}
1184
1185 def visit_text(self, type_, **kw):
1186 return self.visit_CLOB(type_, **kw)
1187
1188 def visit_unicode_text(self, type_, **kw):
1189 if self.dialect._use_nchar_for_unicode:
1190 return self.visit_NCLOB(type_, **kw)
1191 else:
1192 return self.visit_CLOB(type_, **kw)
1193
1194 def visit_large_binary(self, type_, **kw):
1195 return self.visit_BLOB(type_, **kw)
1196
1197 def visit_big_integer(self, type_, **kw):
1198 return self.visit_NUMBER(type_, precision=19, **kw)
1199
1200 def visit_boolean(self, type_, **kw):
1201 return self.visit_SMALLINT(type_, **kw)
1202
1203 def visit_RAW(self, type_, **kw):
1204 if type_.length:
1205 return "RAW(%(length)s)" % {"length": type_.length}
1206 else:
1207 return "RAW"
1208
1209 def visit_ROWID(self, type_, **kw):
1210 return "ROWID"
1211
1212 def visit_VECTOR(self, type_, **kw):
1213 dim = type_.dim if type_.dim is not None else "*"
1214 storage_format = (
1215 type_.storage_format.value
1216 if type_.storage_format is not None
1217 else "*"
1218 )
1219 storage_type = (
1220 type_.storage_type.value if type_.storage_type is not None else "*"
1221 )
1222 return f"VECTOR({dim},{storage_format},{storage_type})"
1223
1224
1225class OracleCompiler(compiler.SQLCompiler):
1226 """Oracle compiler modifies the lexical structure of Select
1227 statements to work under non-ANSI configured Oracle databases, if
1228 the use_ansi flag is False.
1229 """
1230
1231 compound_keywords = util.update_copy(
1232 compiler.SQLCompiler.compound_keywords,
1233 {expression.CompoundSelect.EXCEPT: "MINUS"},
1234 )
1235
1236 def __init__(self, *args, **kwargs):
1237 self.__wheres = {}
1238 super().__init__(*args, **kwargs)
1239
1240 def visit_mod_binary(self, binary, operator, **kw):
1241 return "mod(%s, %s)" % (
1242 self.process(binary.left, **kw),
1243 self.process(binary.right, **kw),
1244 )
1245
1246 def visit_now_func(self, fn, **kw):
1247 return "CURRENT_TIMESTAMP"
1248
1249 def visit_char_length_func(self, fn, **kw):
1250 return "LENGTH" + self.function_argspec(fn, **kw)
1251
1252 def visit_match_op_binary(self, binary, operator, **kw):
1253 return "CONTAINS (%s, %s)" % (
1254 self.process(binary.left),
1255 self.process(binary.right),
1256 )
1257
1258 def visit_true(self, expr, **kw):
1259 return "1"
1260
1261 def visit_false(self, expr, **kw):
1262 return "0"
1263
1264 def get_cte_preamble(self, recursive):
1265 return "WITH"
1266
1267 def get_select_hint_text(self, byfroms):
1268 return " ".join("/*+ %s */" % text for table, text in byfroms.items())
1269
1270 def function_argspec(self, fn, **kw):
1271 if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
1272 return compiler.SQLCompiler.function_argspec(self, fn, **kw)
1273 else:
1274 return ""
1275
1276 def visit_function(self, func, **kw):
1277 text = super().visit_function(func, **kw)
1278 if kw.get("asfrom", False) and func.name.lower() != "table":
1279 text = "TABLE (%s)" % text
1280 return text
1281
1282 def visit_table_valued_column(self, element, **kw):
1283 text = super().visit_table_valued_column(element, **kw)
1284 text = text + ".COLUMN_VALUE"
1285 return text
1286
1287 def default_from(self):
1288 """Called when a ``SELECT`` statement has no froms,
1289 and no ``FROM`` clause is to be appended.
1290
1291 The Oracle compiler tacks a "FROM DUAL" to the statement.
1292 """
1293
1294 return " FROM DUAL"
1295
1296 def visit_join(self, join, from_linter=None, **kwargs):
1297 if self.dialect.use_ansi:
1298 return compiler.SQLCompiler.visit_join(
1299 self, join, from_linter=from_linter, **kwargs
1300 )
1301 else:
1302 if from_linter:
1303 from_linter.edges.add((join.left, join.right))
1304
1305 kwargs["asfrom"] = True
1306 if isinstance(join.right, expression.FromGrouping):
1307 right = join.right.element
1308 else:
1309 right = join.right
1310 return (
1311 self.process(join.left, from_linter=from_linter, **kwargs)
1312 + ", "
1313 + self.process(right, from_linter=from_linter, **kwargs)
1314 )
1315
1316 def _get_nonansi_join_whereclause(self, froms):
1317 clauses = []
1318
1319 def visit_join(join):
1320 if join.isouter:
1321 # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
1322 # "apply the outer join operator (+) to all columns of B in
1323 # the join condition in the WHERE clause" - that is,
1324 # unconditionally regardless of operator or the other side
1325 def visit_binary(binary):
1326 if isinstance(
1327 binary.left, expression.ColumnClause
1328 ) and join.right.is_derived_from(binary.left.table):
1329 binary.left = _OuterJoinColumn(binary.left)
1330 elif isinstance(
1331 binary.right, expression.ColumnClause
1332 ) and join.right.is_derived_from(binary.right.table):
1333 binary.right = _OuterJoinColumn(binary.right)
1334
1335 clauses.append(
1336 visitors.cloned_traverse(
1337 join.onclause, {}, {"binary": visit_binary}
1338 )
1339 )
1340 else:
1341 clauses.append(join.onclause)
1342
1343 for j in join.left, join.right:
1344 if isinstance(j, expression.Join):
1345 visit_join(j)
1346 elif isinstance(j, expression.FromGrouping):
1347 visit_join(j.element)
1348
1349 for f in froms:
1350 if isinstance(f, expression.Join):
1351 visit_join(f)
1352
1353 if not clauses:
1354 return None
1355 else:
1356 return sql.and_(*clauses)
1357
1358 def visit_outer_join_column(self, vc, **kw):
1359 return self.process(vc.column, **kw) + "(+)"
1360
1361 def visit_sequence(self, seq, **kw):
1362 return self.preparer.format_sequence(seq) + ".nextval"
1363
1364 def get_render_as_alias_suffix(self, alias_name_text):
1365 """Oracle doesn't like ``FROM table AS alias``"""
1366
1367 return " " + alias_name_text
1368
1369 def returning_clause(
1370 self, stmt, returning_cols, *, populate_result_map, **kw
1371 ):
1372 columns = []
1373 binds = []
1374
1375 for i, column in enumerate(
1376 expression._select_iterables(returning_cols)
1377 ):
1378 if (
1379 self.isupdate
1380 and isinstance(column, sa_schema.Column)
1381 and isinstance(column.server_default, Computed)
1382 and not self.dialect._supports_update_returning_computed_cols
1383 ):
1384 util.warn(
1385 "Computed columns don't work with Oracle Database UPDATE "
1386 "statements that use RETURNING; the value of the column "
1387 "*before* the UPDATE takes place is returned. It is "
1388 "advised to not use RETURNING with an Oracle Database "
1389 "computed column. Consider setting implicit_returning "
1390 "to False on the Table object in order to avoid implicit "
1391 "RETURNING clauses from being generated for this Table."
1392 )
1393 if column.type._has_column_expression:
1394 col_expr = column.type.column_expression(column)
1395 else:
1396 col_expr = column
1397
1398 outparam = sql.outparam("ret_%d" % i, type_=column.type)
1399 self.binds[outparam.key] = outparam
1400 binds.append(
1401 self.bindparam_string(self._truncate_bindparam(outparam))
1402 )
1403
1404 # has_out_parameters would in a normal case be set to True
1405 # as a result of the compiler visiting an outparam() object.
1406 # in this case, the above outparam() objects are not being
1407 # visited. Ensure the statement itself didn't have other
1408 # outparam() objects independently.
1409 # technically, this could be supported, but as it would be
1410 # a very strange use case without a clear rationale, disallow it
1411 if self.has_out_parameters:
1412 raise exc.InvalidRequestError(
1413 "Using explicit outparam() objects with "
1414 "UpdateBase.returning() in the same Core DML statement "
1415 "is not supported in the Oracle Database dialects."
1416 )
1417
1418 self._oracle_returning = True
1419
1420 columns.append(self.process(col_expr, within_columns_clause=False))
1421 if populate_result_map:
1422 self._add_to_result_map(
1423 getattr(col_expr, "name", col_expr._anon_name_label),
1424 getattr(col_expr, "name", col_expr._anon_name_label),
1425 (
1426 column,
1427 getattr(column, "name", None),
1428 getattr(column, "key", None),
1429 ),
1430 column.type,
1431 )
1432
1433 return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
1434
1435 def _row_limit_clause(self, select, **kw):
1436 """Oracle Database 12c supports OFFSET/FETCH operators
1437 Use it instead subquery with row_number
1438
1439 """
1440
1441 if (
1442 select._fetch_clause is not None
1443 or not self.dialect._supports_offset_fetch
1444 ):
1445 return super()._row_limit_clause(
1446 select, use_literal_execute_for_simple_int=True, **kw
1447 )
1448 else:
1449 return self.fetch_clause(
1450 select,
1451 fetch_clause=self._get_limit_or_fetch(select),
1452 use_literal_execute_for_simple_int=True,
1453 **kw,
1454 )
1455
1456 def _get_limit_or_fetch(self, select):
1457 if select._fetch_clause is None:
1458 return select._limit_clause
1459 else:
1460 return select._fetch_clause
1461
1462 def fetch_clause(
1463 self,
1464 select,
1465 fetch_clause=None,
1466 require_offset=False,
1467 use_literal_execute_for_simple_int=False,
1468 **kw,
1469 ):
1470 text = super().fetch_clause(
1471 select,
1472 fetch_clause=fetch_clause,
1473 require_offset=require_offset,
1474 use_literal_execute_for_simple_int=(
1475 use_literal_execute_for_simple_int
1476 ),
1477 **kw,
1478 )
1479
1480 if select.dialect_options["oracle"]["fetch_approximate"]:
1481 text = re.sub("FETCH FIRST", "FETCH APPROX FIRST", text)
1482
1483 return text
1484
1485 def translate_select_structure(self, select_stmt, **kwargs):
1486 select = select_stmt
1487
1488 if not getattr(select, "_oracle_visit", None):
1489 if not self.dialect.use_ansi:
1490 froms = self._display_froms_for_select(
1491 select, kwargs.get("asfrom", False)
1492 )
1493 whereclause = self._get_nonansi_join_whereclause(froms)
1494 if whereclause is not None:
1495 select = select.where(whereclause)
1496 select._oracle_visit = True
1497
1498 # if fetch is used this is not needed
1499 if (
1500 select._has_row_limiting_clause
1501 and not self.dialect._supports_offset_fetch
1502 and select._fetch_clause is None
1503 ):
1504 limit_clause = select._limit_clause
1505 offset_clause = select._offset_clause
1506
1507 if select._simple_int_clause(limit_clause):
1508 limit_clause = limit_clause.render_literal_execute()
1509
1510 if select._simple_int_clause(offset_clause):
1511 offset_clause = offset_clause.render_literal_execute()
1512
1513 # currently using form at:
1514 # https://blogs.oracle.com/oraclemagazine/\
1515 # on-rownum-and-limiting-results
1516
1517 orig_select = select
1518 select = select._generate()
1519 select._oracle_visit = True
1520
1521 # add expressions to accommodate FOR UPDATE OF
1522 for_update = select._for_update_arg
1523 if for_update is not None and for_update.of:
1524 for_update = for_update._clone()
1525 for_update._copy_internals()
1526
1527 for elem in for_update.of:
1528 if not select.selected_columns.contains_column(elem):
1529 select = select.add_columns(elem)
1530
1531 # Wrap the middle select and add the hint
1532 inner_subquery = select.alias()
1533 limitselect = sql.select(
1534 *[
1535 c
1536 for c in inner_subquery.c
1537 if orig_select.selected_columns.corresponding_column(c)
1538 is not None
1539 ]
1540 )
1541
1542 if (
1543 limit_clause is not None
1544 and self.dialect.optimize_limits
1545 and select._simple_int_clause(limit_clause)
1546 ):
1547 limitselect = limitselect.prefix_with(
1548 expression.text(
1549 "/*+ FIRST_ROWS(%s) */"
1550 % self.process(limit_clause, **kwargs)
1551 )
1552 )
1553
1554 limitselect._oracle_visit = True
1555 limitselect._is_wrapper = True
1556
1557 # add expressions to accommodate FOR UPDATE OF
1558 if for_update is not None and for_update.of:
1559 adapter = sql_util.ClauseAdapter(inner_subquery)
1560 for_update.of = [
1561 adapter.traverse(elem) for elem in for_update.of
1562 ]
1563
1564 # If needed, add the limiting clause
1565 if limit_clause is not None:
1566 if select._simple_int_clause(limit_clause) and (
1567 offset_clause is None
1568 or select._simple_int_clause(offset_clause)
1569 ):
1570 max_row = limit_clause
1571
1572 if offset_clause is not None:
1573 max_row = max_row + offset_clause
1574
1575 else:
1576 max_row = limit_clause
1577
1578 if offset_clause is not None:
1579 max_row = max_row + offset_clause
1580 limitselect = limitselect.where(
1581 sql.literal_column("ROWNUM") <= max_row
1582 )
1583
1584 # If needed, add the ora_rn, and wrap again with offset.
1585 if offset_clause is None:
1586 limitselect._for_update_arg = for_update
1587 select = limitselect
1588 else:
1589 limitselect = limitselect.add_columns(
1590 sql.literal_column("ROWNUM").label("ora_rn")
1591 )
1592 limitselect._oracle_visit = True
1593 limitselect._is_wrapper = True
1594
1595 if for_update is not None and for_update.of:
1596 limitselect_cols = limitselect.selected_columns
1597 for elem in for_update.of:
1598 if (
1599 limitselect_cols.corresponding_column(elem)
1600 is None
1601 ):
1602 limitselect = limitselect.add_columns(elem)
1603
1604 limit_subquery = limitselect.alias()
1605 origselect_cols = orig_select.selected_columns
1606 offsetselect = sql.select(
1607 *[
1608 c
1609 for c in limit_subquery.c
1610 if origselect_cols.corresponding_column(c)
1611 is not None
1612 ]
1613 )
1614
1615 offsetselect._oracle_visit = True
1616 offsetselect._is_wrapper = True
1617
1618 if for_update is not None and for_update.of:
1619 adapter = sql_util.ClauseAdapter(limit_subquery)
1620 for_update.of = [
1621 adapter.traverse(elem) for elem in for_update.of
1622 ]
1623
1624 offsetselect = offsetselect.where(
1625 sql.literal_column("ora_rn") > offset_clause
1626 )
1627
1628 offsetselect._for_update_arg = for_update
1629 select = offsetselect
1630
1631 return select
1632
1633 def limit_clause(self, select, **kw):
1634 return ""
1635
1636 def visit_empty_set_expr(self, type_, **kw):
1637 return "SELECT 1 FROM DUAL WHERE 1!=1"
1638
1639 def for_update_clause(self, select, **kw):
1640 if self.is_subquery():
1641 return ""
1642
1643 tmp = " FOR UPDATE"
1644
1645 if select._for_update_arg.of:
1646 tmp += " OF " + ", ".join(
1647 self.process(elem, **kw) for elem in select._for_update_arg.of
1648 )
1649
1650 if select._for_update_arg.nowait:
1651 tmp += " NOWAIT"
1652 if select._for_update_arg.skip_locked:
1653 tmp += " SKIP LOCKED"
1654
1655 return tmp
1656
1657 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1658 return "DECODE(%s, %s, 0, 1) = 1" % (
1659 self.process(binary.left),
1660 self.process(binary.right),
1661 )
1662
1663 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1664 return "DECODE(%s, %s, 0, 1) = 0" % (
1665 self.process(binary.left),
1666 self.process(binary.right),
1667 )
1668
1669 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1670 string = self.process(binary.left, **kw)
1671 pattern = self.process(binary.right, **kw)
1672 flags = binary.modifiers["flags"]
1673 if flags is None:
1674 return "REGEXP_LIKE(%s, %s)" % (string, pattern)
1675 else:
1676 return "REGEXP_LIKE(%s, %s, %s)" % (
1677 string,
1678 pattern,
1679 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1680 )
1681
1682 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1683 return "NOT %s" % self.visit_regexp_match_op_binary(
1684 binary, operator, **kw
1685 )
1686
1687 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
1688 string = self.process(binary.left, **kw)
1689 pattern_replace = self.process(binary.right, **kw)
1690 flags = binary.modifiers["flags"]
1691 if flags is None:
1692 return "REGEXP_REPLACE(%s, %s)" % (
1693 string,
1694 pattern_replace,
1695 )
1696 else:
1697 return "REGEXP_REPLACE(%s, %s, %s)" % (
1698 string,
1699 pattern_replace,
1700 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1701 )
1702
1703 def visit_aggregate_strings_func(self, fn, **kw):
1704 return "LISTAGG%s" % self.function_argspec(fn, **kw)
1705
1706 def _visit_bitwise(self, binary, fn_name, custom_right=None, **kw):
1707 left = self.process(binary.left, **kw)
1708 right = self.process(
1709 custom_right if custom_right is not None else binary.right, **kw
1710 )
1711 return f"{fn_name}({left}, {right})"
1712
1713 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1714 return self._visit_bitwise(binary, "BITXOR", **kw)
1715
1716 def visit_bitwise_or_op_binary(self, binary, operator, **kw):
1717 return self._visit_bitwise(binary, "BITOR", **kw)
1718
1719 def visit_bitwise_and_op_binary(self, binary, operator, **kw):
1720 return self._visit_bitwise(binary, "BITAND", **kw)
1721
1722 def visit_bitwise_rshift_op_binary(self, binary, operator, **kw):
1723 raise exc.CompileError("Cannot compile bitwise_rshift in oracle")
1724
1725 def visit_bitwise_lshift_op_binary(self, binary, operator, **kw):
1726 raise exc.CompileError("Cannot compile bitwise_lshift in oracle")
1727
1728 def visit_bitwise_not_op_unary_operator(self, element, operator, **kw):
1729 raise exc.CompileError("Cannot compile bitwise_not in oracle")
1730
1731
1732class OracleDDLCompiler(compiler.DDLCompiler):
1733
1734 def _build_vector_index_config(
1735 self, vector_index_config: VectorIndexConfig
1736 ) -> str:
1737 parts = []
1738 sql_param_name = {
1739 "hnsw_neighbors": "neighbors",
1740 "hnsw_efconstruction": "efconstruction",
1741 "ivf_neighbor_partitions": "neighbor partitions",
1742 "ivf_sample_per_partition": "sample_per_partition",
1743 "ivf_min_vectors_per_partition": "min_vectors_per_partition",
1744 }
1745 if vector_index_config.index_type == VectorIndexType.HNSW:
1746 parts.append("ORGANIZATION INMEMORY NEIGHBOR GRAPH")
1747 elif vector_index_config.index_type == VectorIndexType.IVF:
1748 parts.append("ORGANIZATION NEIGHBOR PARTITIONS")
1749 if vector_index_config.distance is not None:
1750 parts.append(f"DISTANCE {vector_index_config.distance.value}")
1751
1752 if vector_index_config.accuracy is not None:
1753 parts.append(
1754 f"WITH TARGET ACCURACY {vector_index_config.accuracy}"
1755 )
1756
1757 parameters_str = [f"type {vector_index_config.index_type.name}"]
1758 prefix = vector_index_config.index_type.name.lower() + "_"
1759
1760 for field in fields(vector_index_config):
1761 if field.name.startswith(prefix):
1762 key = sql_param_name.get(field.name)
1763 value = getattr(vector_index_config, field.name)
1764 if value is not None:
1765 parameters_str.append(f"{key} {value}")
1766
1767 parameters_str = ", ".join(parameters_str)
1768 parts.append(f"PARAMETERS ({parameters_str})")
1769
1770 if vector_index_config.parallel is not None:
1771 parts.append(f"PARALLEL {vector_index_config.parallel}")
1772
1773 return " ".join(parts)
1774
1775 def define_constraint_cascades(self, constraint):
1776 text = ""
1777 if constraint.ondelete is not None:
1778 text += " ON DELETE %s" % constraint.ondelete
1779
1780 # oracle has no ON UPDATE CASCADE -
1781 # its only available via triggers
1782 # https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
1783 if constraint.onupdate is not None:
1784 util.warn(
1785 "Oracle Database does not contain native UPDATE CASCADE "
1786 "functionality - onupdates will not be rendered for foreign "
1787 "keys. Consider using deferrable=True, initially='deferred' "
1788 "or triggers."
1789 )
1790
1791 return text
1792
1793 def visit_drop_table_comment(self, drop, **kw):
1794 return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
1795 drop.element
1796 )
1797
1798 def visit_create_index(self, create, **kw):
1799 index = create.element
1800 self._verify_index_table(index)
1801 preparer = self.preparer
1802 text = "CREATE "
1803 if index.unique:
1804 text += "UNIQUE "
1805 if index.dialect_options["oracle"]["bitmap"]:
1806 text += "BITMAP "
1807 vector_options = index.dialect_options["oracle"]["vector"]
1808 if vector_options:
1809 text += "VECTOR "
1810 text += "INDEX %s ON %s (%s)" % (
1811 self._prepared_index_name(index, include_schema=True),
1812 preparer.format_table(index.table, use_schema=True),
1813 ", ".join(
1814 self.sql_compiler.process(
1815 expr, include_table=False, literal_binds=True
1816 )
1817 for expr in index.expressions
1818 ),
1819 )
1820 if index.dialect_options["oracle"]["compress"] is not False:
1821 if index.dialect_options["oracle"]["compress"] is True:
1822 text += " COMPRESS"
1823 else:
1824 text += " COMPRESS %d" % (
1825 index.dialect_options["oracle"]["compress"]
1826 )
1827 if vector_options:
1828 if vector_options is True:
1829 vector_options = VectorIndexConfig()
1830
1831 text += " " + self._build_vector_index_config(vector_options)
1832 return text
1833
1834 def post_create_table(self, table):
1835 table_opts = []
1836 opts = table.dialect_options["oracle"]
1837
1838 if opts["on_commit"]:
1839 on_commit_options = opts["on_commit"].replace("_", " ").upper()
1840 table_opts.append("\n ON COMMIT %s" % on_commit_options)
1841
1842 if opts["compress"]:
1843 if opts["compress"] is True:
1844 table_opts.append("\n COMPRESS")
1845 else:
1846 table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
1847 if opts["tablespace"]:
1848 table_opts.append(
1849 "\n TABLESPACE %s" % self.preparer.quote(opts["tablespace"])
1850 )
1851 return "".join(table_opts)
1852
1853 def get_identity_options(self, identity_options):
1854 text = super().get_identity_options(identity_options)
1855 text = text.replace("NO MINVALUE", "NOMINVALUE")
1856 text = text.replace("NO MAXVALUE", "NOMAXVALUE")
1857 text = text.replace("NO CYCLE", "NOCYCLE")
1858 if identity_options.order is not None:
1859 text += " ORDER" if identity_options.order else " NOORDER"
1860 return text.strip()
1861
1862 def visit_computed_column(self, generated, **kw):
1863 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
1864 generated.sqltext, include_table=False, literal_binds=True
1865 )
1866 if generated.persisted is True:
1867 raise exc.CompileError(
1868 "Oracle Database computed columns do not support 'stored' "
1869 "persistence; set the 'persisted' flag to None or False for "
1870 "Oracle Database support."
1871 )
1872 elif generated.persisted is False:
1873 text += " VIRTUAL"
1874 return text
1875
1876 def visit_identity_column(self, identity, **kw):
1877 if identity.always is None:
1878 kind = ""
1879 else:
1880 kind = "ALWAYS" if identity.always else "BY DEFAULT"
1881 text = "GENERATED %s" % kind
1882 if identity.on_null:
1883 text += " ON NULL"
1884 text += " AS IDENTITY"
1885 options = self.get_identity_options(identity)
1886 if options:
1887 text += " (%s)" % options
1888 return text
1889
1890
1891class OracleIdentifierPreparer(compiler.IdentifierPreparer):
1892 reserved_words = {x.lower() for x in RESERVED_WORDS}
1893 illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
1894 ["_", "$"]
1895 )
1896
1897 def _bindparam_requires_quotes(self, value):
1898 """Return True if the given identifier requires quoting."""
1899 lc_value = value.lower()
1900 return (
1901 lc_value in self.reserved_words
1902 or value[0] in self.illegal_initial_characters
1903 or not self.legal_characters.match(str(value))
1904 )
1905
1906 def format_savepoint(self, savepoint):
1907 name = savepoint.ident.lstrip("_")
1908 return super().format_savepoint(savepoint, name)
1909
1910
1911class OracleExecutionContext(default.DefaultExecutionContext):
1912 def fire_sequence(self, seq, type_):
1913 return self._execute_scalar(
1914 "SELECT "
1915 + self.identifier_preparer.format_sequence(seq)
1916 + ".nextval FROM DUAL",
1917 type_,
1918 )
1919
1920 def pre_exec(self):
1921 if self.statement and "_oracle_dblink" in self.execution_options:
1922 self.statement = self.statement.replace(
1923 dictionary.DB_LINK_PLACEHOLDER,
1924 self.execution_options["_oracle_dblink"],
1925 )
1926
1927
1928class OracleDialect(default.DefaultDialect):
1929 name = "oracle"
1930 supports_statement_cache = True
1931 supports_alter = True
1932 max_identifier_length = 128
1933
1934 _supports_offset_fetch = True
1935
1936 insert_returning = True
1937 update_returning = True
1938 delete_returning = True
1939
1940 div_is_floordiv = False
1941
1942 supports_simple_order_by_label = False
1943 cte_follows_insert = True
1944 returns_native_bytes = True
1945
1946 supports_sequences = True
1947 sequences_optional = False
1948 postfetch_lastrowid = False
1949
1950 default_paramstyle = "named"
1951 colspecs = colspecs
1952 ischema_names = ischema_names
1953 requires_name_normalize = True
1954
1955 supports_comments = True
1956
1957 supports_default_values = False
1958 supports_default_metavalue = True
1959 supports_empty_insert = False
1960 supports_identity_columns = True
1961
1962 statement_compiler = OracleCompiler
1963 ddl_compiler = OracleDDLCompiler
1964 type_compiler_cls = OracleTypeCompiler
1965 preparer = OracleIdentifierPreparer
1966 execution_ctx_cls = OracleExecutionContext
1967
1968 reflection_options = ("oracle_resolve_synonyms",)
1969
1970 _use_nchar_for_unicode = False
1971
1972 construct_arguments = [
1973 (
1974 sa_schema.Table,
1975 {
1976 "resolve_synonyms": False,
1977 "on_commit": None,
1978 "compress": False,
1979 "tablespace": None,
1980 },
1981 ),
1982 (
1983 sa_schema.Index,
1984 {
1985 "bitmap": False,
1986 "compress": False,
1987 "vector": False,
1988 },
1989 ),
1990 (sa_selectable.Select, {"fetch_approximate": False}),
1991 (sa_selectable.CompoundSelect, {"fetch_approximate": False}),
1992 ]
1993
1994 @util.deprecated_params(
1995 use_binds_for_limits=(
1996 "1.4",
1997 "The ``use_binds_for_limits`` Oracle Database dialect parameter "
1998 "is deprecated. The dialect now renders LIMIT / OFFSET integers "
1999 "inline in all cases using a post-compilation hook, so that the "
2000 "value is still represented by a 'bound parameter' on the Core "
2001 "Expression side.",
2002 )
2003 )
2004 def __init__(
2005 self,
2006 use_ansi=True,
2007 optimize_limits=False,
2008 use_binds_for_limits=None,
2009 use_nchar_for_unicode=False,
2010 exclude_tablespaces=("SYSTEM", "SYSAUX"),
2011 enable_offset_fetch=True,
2012 **kwargs,
2013 ):
2014 default.DefaultDialect.__init__(self, **kwargs)
2015 self._use_nchar_for_unicode = use_nchar_for_unicode
2016 self.use_ansi = use_ansi
2017 self.optimize_limits = optimize_limits
2018 self.exclude_tablespaces = exclude_tablespaces
2019 self.enable_offset_fetch = self._supports_offset_fetch = (
2020 enable_offset_fetch
2021 )
2022
2023 def initialize(self, connection):
2024 super().initialize(connection)
2025
2026 # Oracle 8i has RETURNING:
2027 # https://docs.oracle.com/cd/A87860_01/doc/index.htm
2028
2029 # so does Oracle8:
2030 # https://docs.oracle.com/cd/A64702_01/doc/index.htm
2031
2032 if self._is_oracle_8:
2033 self.colspecs = self.colspecs.copy()
2034 self.colspecs.pop(sqltypes.Interval)
2035 self.use_ansi = False
2036
2037 self.supports_identity_columns = self.server_version_info >= (12,)
2038 self._supports_offset_fetch = (
2039 self.enable_offset_fetch and self.server_version_info >= (12,)
2040 )
2041
2042 def _get_effective_compat_server_version_info(self, connection):
2043 # dialect does not need compat levels below 12.2, so don't query
2044 # in those cases
2045
2046 if self.server_version_info < (12, 2):
2047 return self.server_version_info
2048 try:
2049 compat = connection.exec_driver_sql(
2050 "SELECT value FROM v$parameter WHERE name = 'compatible'"
2051 ).scalar()
2052 except exc.DBAPIError:
2053 compat = None
2054
2055 if compat:
2056 try:
2057 return tuple(int(x) for x in compat.split("."))
2058 except:
2059 return self.server_version_info
2060 else:
2061 return self.server_version_info
2062
2063 @property
2064 def _is_oracle_8(self):
2065 return self.server_version_info and self.server_version_info < (9,)
2066
2067 @property
2068 def _supports_table_compression(self):
2069 return self.server_version_info and self.server_version_info >= (10, 1)
2070
2071 @property
2072 def _supports_table_compress_for(self):
2073 return self.server_version_info and self.server_version_info >= (11,)
2074
2075 @property
2076 def _supports_char_length(self):
2077 return not self._is_oracle_8
2078
2079 @property
2080 def _supports_update_returning_computed_cols(self):
2081 # on version 18 this error is no longet present while it happens on 11
2082 # it may work also on versions before the 18
2083 return self.server_version_info and self.server_version_info >= (18,)
2084
2085 @property
2086 def _supports_except_all(self):
2087 return self.server_version_info and self.server_version_info >= (21,)
2088
2089 def do_release_savepoint(self, connection, name):
2090 # Oracle does not support RELEASE SAVEPOINT
2091 pass
2092
2093 def _check_max_identifier_length(self, connection):
2094 if self._get_effective_compat_server_version_info(connection) < (
2095 12,
2096 2,
2097 ):
2098 return 30
2099 else:
2100 # use the default
2101 return None
2102
2103 def get_isolation_level_values(self, dbapi_connection):
2104 return ["READ COMMITTED", "SERIALIZABLE"]
2105
2106 def get_default_isolation_level(self, dbapi_conn):
2107 try:
2108 return self.get_isolation_level(dbapi_conn)
2109 except NotImplementedError:
2110 raise
2111 except:
2112 return "READ COMMITTED"
2113
2114 def _execute_reflection(
2115 self, connection, query, dblink, returns_long, params=None
2116 ):
2117 if dblink and not dblink.startswith("@"):
2118 dblink = f"@{dblink}"
2119 execution_options = {
2120 # handle db links
2121 "_oracle_dblink": dblink or "",
2122 # override any schema translate map
2123 "schema_translate_map": None,
2124 }
2125
2126 if dblink and returns_long:
2127 # Oracle seems to error with
2128 # "ORA-00997: illegal use of LONG datatype" when returning
2129 # LONG columns via a dblink in a query with bind params
2130 # This type seems to be very hard to cast into something else
2131 # so it seems easier to just use bind param in this case
2132 def visit_bindparam(bindparam):
2133 bindparam.literal_execute = True
2134
2135 query = visitors.cloned_traverse(
2136 query, {}, {"bindparam": visit_bindparam}
2137 )
2138 return connection.execute(
2139 query, params, execution_options=execution_options
2140 )
2141
2142 @util.memoized_property
2143 def _has_table_query(self):
2144 # materialized views are returned by all_tables
2145 tables = (
2146 select(
2147 dictionary.all_tables.c.table_name,
2148 dictionary.all_tables.c.owner,
2149 )
2150 .union_all(
2151 select(
2152 dictionary.all_views.c.view_name.label("table_name"),
2153 dictionary.all_views.c.owner,
2154 )
2155 )
2156 .subquery("tables_and_views")
2157 )
2158
2159 query = select(tables.c.table_name).where(
2160 tables.c.table_name == bindparam("table_name"),
2161 tables.c.owner == bindparam("owner"),
2162 )
2163 return query
2164
2165 @reflection.cache
2166 def has_table(
2167 self, connection, table_name, schema=None, dblink=None, **kw
2168 ):
2169 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2170 self._ensure_has_table_connection(connection)
2171
2172 if not schema:
2173 schema = self.default_schema_name
2174
2175 params = {
2176 "table_name": self.denormalize_name(table_name),
2177 "owner": self.denormalize_schema_name(schema),
2178 }
2179 cursor = self._execute_reflection(
2180 connection,
2181 self._has_table_query,
2182 dblink,
2183 returns_long=False,
2184 params=params,
2185 )
2186 return bool(cursor.scalar())
2187
2188 @reflection.cache
2189 def has_sequence(
2190 self, connection, sequence_name, schema=None, dblink=None, **kw
2191 ):
2192 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2193 if not schema:
2194 schema = self.default_schema_name
2195
2196 query = select(dictionary.all_sequences.c.sequence_name).where(
2197 dictionary.all_sequences.c.sequence_name
2198 == self.denormalize_schema_name(sequence_name),
2199 dictionary.all_sequences.c.sequence_owner
2200 == self.denormalize_schema_name(schema),
2201 )
2202
2203 cursor = self._execute_reflection(
2204 connection, query, dblink, returns_long=False
2205 )
2206 return bool(cursor.scalar())
2207
2208 def _get_default_schema_name(self, connection):
2209 return self.normalize_name(
2210 connection.exec_driver_sql(
2211 "select sys_context( 'userenv', 'current_schema' ) from dual"
2212 ).scalar()
2213 )
2214
2215 def denormalize_schema_name(self, name):
2216 # look for quoted_name
2217 force = getattr(name, "quote", None)
2218 if force is None and name == "public":
2219 # look for case insensitive, no quoting specified, "public"
2220 return "PUBLIC"
2221 return super().denormalize_name(name)
2222
2223 @reflection.flexi_cache(
2224 ("schema", InternalTraversal.dp_string),
2225 ("filter_names", InternalTraversal.dp_string_list),
2226 ("dblink", InternalTraversal.dp_string),
2227 )
2228 def _get_synonyms(self, connection, schema, filter_names, dblink, **kw):
2229 owner = self.denormalize_schema_name(
2230 schema or self.default_schema_name
2231 )
2232
2233 has_filter_names, params = self._prepare_filter_names(filter_names)
2234 query = select(
2235 dictionary.all_synonyms.c.synonym_name,
2236 dictionary.all_synonyms.c.table_name,
2237 dictionary.all_synonyms.c.table_owner,
2238 dictionary.all_synonyms.c.db_link,
2239 ).where(dictionary.all_synonyms.c.owner == owner)
2240 if has_filter_names:
2241 query = query.where(
2242 dictionary.all_synonyms.c.synonym_name.in_(
2243 params["filter_names"]
2244 )
2245 )
2246 result = self._execute_reflection(
2247 connection, query, dblink, returns_long=False
2248 ).mappings()
2249 return result.all()
2250
2251 @lru_cache()
2252 def _all_objects_query(
2253 self, owner, scope, kind, has_filter_names, has_mat_views
2254 ):
2255 query = (
2256 select(dictionary.all_objects.c.object_name)
2257 .select_from(dictionary.all_objects)
2258 .where(dictionary.all_objects.c.owner == owner)
2259 )
2260
2261 # NOTE: materialized views are listed in all_objects twice;
2262 # once as MATERIALIZE VIEW and once as TABLE
2263 if kind is ObjectKind.ANY:
2264 # materilaized view are listed also as tables so there is no
2265 # need to add them to the in_.
2266 query = query.where(
2267 dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW"))
2268 )
2269 else:
2270 object_type = []
2271 if ObjectKind.VIEW in kind:
2272 object_type.append("VIEW")
2273 if (
2274 ObjectKind.MATERIALIZED_VIEW in kind
2275 and ObjectKind.TABLE not in kind
2276 ):
2277 # materilaized view are listed also as tables so there is no
2278 # need to add them to the in_ if also selecting tables.
2279 object_type.append("MATERIALIZED VIEW")
2280 if ObjectKind.TABLE in kind:
2281 object_type.append("TABLE")
2282 if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind:
2283 # materialized view are listed also as tables,
2284 # so they need to be filtered out
2285 # EXCEPT ALL / MINUS profiles as faster than using
2286 # NOT EXISTS or NOT IN with a subquery, but it's in
2287 # general faster to get the mat view names and exclude
2288 # them only when needed
2289 query = query.where(
2290 dictionary.all_objects.c.object_name.not_in(
2291 bindparam("mat_views")
2292 )
2293 )
2294 query = query.where(
2295 dictionary.all_objects.c.object_type.in_(object_type)
2296 )
2297
2298 # handles scope
2299 if scope is ObjectScope.DEFAULT:
2300 query = query.where(dictionary.all_objects.c.temporary == "N")
2301 elif scope is ObjectScope.TEMPORARY:
2302 query = query.where(dictionary.all_objects.c.temporary == "Y")
2303
2304 if has_filter_names:
2305 query = query.where(
2306 dictionary.all_objects.c.object_name.in_(
2307 bindparam("filter_names")
2308 )
2309 )
2310 return query
2311
2312 @reflection.flexi_cache(
2313 ("schema", InternalTraversal.dp_string),
2314 ("scope", InternalTraversal.dp_plain_obj),
2315 ("kind", InternalTraversal.dp_plain_obj),
2316 ("filter_names", InternalTraversal.dp_string_list),
2317 ("dblink", InternalTraversal.dp_string),
2318 )
2319 def _get_all_objects(
2320 self, connection, schema, scope, kind, filter_names, dblink, **kw
2321 ):
2322 owner = self.denormalize_schema_name(
2323 schema or self.default_schema_name
2324 )
2325
2326 has_filter_names, params = self._prepare_filter_names(filter_names)
2327 has_mat_views = False
2328 if (
2329 ObjectKind.TABLE in kind
2330 and ObjectKind.MATERIALIZED_VIEW not in kind
2331 ):
2332 # see note in _all_objects_query
2333 mat_views = self.get_materialized_view_names(
2334 connection, schema, dblink, _normalize=False, **kw
2335 )
2336 if mat_views:
2337 params["mat_views"] = mat_views
2338 has_mat_views = True
2339
2340 query = self._all_objects_query(
2341 owner, scope, kind, has_filter_names, has_mat_views
2342 )
2343
2344 result = self._execute_reflection(
2345 connection, query, dblink, returns_long=False, params=params
2346 ).scalars()
2347
2348 return result.all()
2349
2350 def _handle_synonyms_decorator(fn):
2351 @wraps(fn)
2352 def wrapper(self, *args, **kwargs):
2353 return self._handle_synonyms(fn, *args, **kwargs)
2354
2355 return wrapper
2356
2357 def _handle_synonyms(self, fn, connection, *args, **kwargs):
2358 if not kwargs.get("oracle_resolve_synonyms", False):
2359 return fn(self, connection, *args, **kwargs)
2360
2361 original_kw = kwargs.copy()
2362 schema = kwargs.pop("schema", None)
2363 result = self._get_synonyms(
2364 connection,
2365 schema=schema,
2366 filter_names=kwargs.pop("filter_names", None),
2367 dblink=kwargs.pop("dblink", None),
2368 info_cache=kwargs.get("info_cache", None),
2369 )
2370
2371 dblinks_owners = defaultdict(dict)
2372 for row in result:
2373 key = row["db_link"], row["table_owner"]
2374 tn = self.normalize_name(row["table_name"])
2375 dblinks_owners[key][tn] = row["synonym_name"]
2376
2377 if not dblinks_owners:
2378 # No synonym, do the plain thing
2379 return fn(self, connection, *args, **original_kw)
2380
2381 data = {}
2382 for (dblink, table_owner), mapping in dblinks_owners.items():
2383 call_kw = {
2384 **original_kw,
2385 "schema": table_owner,
2386 "dblink": self.normalize_name(dblink),
2387 "filter_names": mapping.keys(),
2388 }
2389 call_result = fn(self, connection, *args, **call_kw)
2390 for (_, tn), value in call_result:
2391 synonym_name = self.normalize_name(mapping[tn])
2392 data[(schema, synonym_name)] = value
2393 return data.items()
2394
2395 @reflection.cache
2396 def get_schema_names(self, connection, dblink=None, **kw):
2397 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2398 query = select(dictionary.all_users.c.username).order_by(
2399 dictionary.all_users.c.username
2400 )
2401 result = self._execute_reflection(
2402 connection, query, dblink, returns_long=False
2403 ).scalars()
2404 return [self.normalize_name(row) for row in result]
2405
2406 @reflection.cache
2407 def get_table_names(self, connection, schema=None, dblink=None, **kw):
2408 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2409 # note that table_names() isn't loading DBLINKed or synonym'ed tables
2410 if schema is None:
2411 schema = self.default_schema_name
2412
2413 den_schema = self.denormalize_schema_name(schema)
2414 if kw.get("oracle_resolve_synonyms", False):
2415 tables = (
2416 select(
2417 dictionary.all_tables.c.table_name,
2418 dictionary.all_tables.c.owner,
2419 dictionary.all_tables.c.iot_name,
2420 dictionary.all_tables.c.duration,
2421 dictionary.all_tables.c.tablespace_name,
2422 )
2423 .union_all(
2424 select(
2425 dictionary.all_synonyms.c.synonym_name.label(
2426 "table_name"
2427 ),
2428 dictionary.all_synonyms.c.owner,
2429 dictionary.all_tables.c.iot_name,
2430 dictionary.all_tables.c.duration,
2431 dictionary.all_tables.c.tablespace_name,
2432 )
2433 .select_from(dictionary.all_tables)
2434 .join(
2435 dictionary.all_synonyms,
2436 and_(
2437 dictionary.all_tables.c.table_name
2438 == dictionary.all_synonyms.c.table_name,
2439 dictionary.all_tables.c.owner
2440 == func.coalesce(
2441 dictionary.all_synonyms.c.table_owner,
2442 dictionary.all_synonyms.c.owner,
2443 ),
2444 ),
2445 )
2446 )
2447 .subquery("available_tables")
2448 )
2449 else:
2450 tables = dictionary.all_tables
2451
2452 query = select(tables.c.table_name)
2453 if self.exclude_tablespaces:
2454 query = query.where(
2455 func.coalesce(
2456 tables.c.tablespace_name, "no tablespace"
2457 ).not_in(self.exclude_tablespaces)
2458 )
2459 query = query.where(
2460 tables.c.owner == den_schema,
2461 tables.c.iot_name.is_(null()),
2462 tables.c.duration.is_(null()),
2463 )
2464
2465 # remove materialized views
2466 mat_query = select(
2467 dictionary.all_mviews.c.mview_name.label("table_name")
2468 ).where(dictionary.all_mviews.c.owner == den_schema)
2469
2470 query = (
2471 query.except_all(mat_query)
2472 if self._supports_except_all
2473 else query.except_(mat_query)
2474 )
2475
2476 result = self._execute_reflection(
2477 connection, query, dblink, returns_long=False
2478 ).scalars()
2479 return [self.normalize_name(row) for row in result]
2480
2481 @reflection.cache
2482 def get_temp_table_names(self, connection, dblink=None, **kw):
2483 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2484 schema = self.denormalize_schema_name(self.default_schema_name)
2485
2486 query = select(dictionary.all_tables.c.table_name)
2487 if self.exclude_tablespaces:
2488 query = query.where(
2489 func.coalesce(
2490 dictionary.all_tables.c.tablespace_name, "no tablespace"
2491 ).not_in(self.exclude_tablespaces)
2492 )
2493 query = query.where(
2494 dictionary.all_tables.c.owner == schema,
2495 dictionary.all_tables.c.iot_name.is_(null()),
2496 dictionary.all_tables.c.duration.is_not(null()),
2497 )
2498
2499 result = self._execute_reflection(
2500 connection, query, dblink, returns_long=False
2501 ).scalars()
2502 return [self.normalize_name(row) for row in result]
2503
2504 @reflection.cache
2505 def get_materialized_view_names(
2506 self, connection, schema=None, dblink=None, _normalize=True, **kw
2507 ):
2508 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2509 if not schema:
2510 schema = self.default_schema_name
2511
2512 query = select(dictionary.all_mviews.c.mview_name).where(
2513 dictionary.all_mviews.c.owner
2514 == self.denormalize_schema_name(schema)
2515 )
2516 result = self._execute_reflection(
2517 connection, query, dblink, returns_long=False
2518 ).scalars()
2519 if _normalize:
2520 return [self.normalize_name(row) for row in result]
2521 else:
2522 return result.all()
2523
2524 @reflection.cache
2525 def get_view_names(self, connection, schema=None, dblink=None, **kw):
2526 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2527 if not schema:
2528 schema = self.default_schema_name
2529
2530 query = select(dictionary.all_views.c.view_name).where(
2531 dictionary.all_views.c.owner
2532 == self.denormalize_schema_name(schema)
2533 )
2534 result = self._execute_reflection(
2535 connection, query, dblink, returns_long=False
2536 ).scalars()
2537 return [self.normalize_name(row) for row in result]
2538
2539 @reflection.cache
2540 def get_sequence_names(self, connection, schema=None, dblink=None, **kw):
2541 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2542 if not schema:
2543 schema = self.default_schema_name
2544 query = select(dictionary.all_sequences.c.sequence_name).where(
2545 dictionary.all_sequences.c.sequence_owner
2546 == self.denormalize_schema_name(schema)
2547 )
2548
2549 result = self._execute_reflection(
2550 connection, query, dblink, returns_long=False
2551 ).scalars()
2552 return [self.normalize_name(row) for row in result]
2553
2554 def _value_or_raise(self, data, table, schema):
2555 table = self.normalize_name(str(table))
2556 try:
2557 return dict(data)[(schema, table)]
2558 except KeyError:
2559 raise exc.NoSuchTableError(
2560 f"{schema}.{table}" if schema else table
2561 ) from None
2562
2563 def _prepare_filter_names(self, filter_names):
2564 if filter_names:
2565 fn = [self.denormalize_name(name) for name in filter_names]
2566 return True, {"filter_names": fn}
2567 else:
2568 return False, {}
2569
2570 @reflection.cache
2571 def get_table_options(self, connection, table_name, schema=None, **kw):
2572 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2573 ``oracle_resolve_synonyms`` to resolve names to synonyms
2574 """
2575 data = self.get_multi_table_options(
2576 connection,
2577 schema=schema,
2578 filter_names=[table_name],
2579 scope=ObjectScope.ANY,
2580 kind=ObjectKind.ANY,
2581 **kw,
2582 )
2583 return self._value_or_raise(data, table_name, schema)
2584
2585 @lru_cache()
2586 def _table_options_query(
2587 self, owner, scope, kind, has_filter_names, has_mat_views
2588 ):
2589 query = select(
2590 dictionary.all_tables.c.table_name,
2591 (
2592 dictionary.all_tables.c.compression
2593 if self._supports_table_compression
2594 else sql.null().label("compression")
2595 ),
2596 (
2597 dictionary.all_tables.c.compress_for
2598 if self._supports_table_compress_for
2599 else sql.null().label("compress_for")
2600 ),
2601 dictionary.all_tables.c.tablespace_name,
2602 ).where(dictionary.all_tables.c.owner == owner)
2603 if has_filter_names:
2604 query = query.where(
2605 dictionary.all_tables.c.table_name.in_(
2606 bindparam("filter_names")
2607 )
2608 )
2609 if scope is ObjectScope.DEFAULT:
2610 query = query.where(dictionary.all_tables.c.duration.is_(null()))
2611 elif scope is ObjectScope.TEMPORARY:
2612 query = query.where(
2613 dictionary.all_tables.c.duration.is_not(null())
2614 )
2615
2616 if (
2617 has_mat_views
2618 and ObjectKind.TABLE in kind
2619 and ObjectKind.MATERIALIZED_VIEW not in kind
2620 ):
2621 # can't use EXCEPT ALL / MINUS here because we don't have an
2622 # excludable row vs. the query above
2623 # outerjoin + where null works better on oracle 21 but 11 does
2624 # not like it at all. this is the next best thing
2625
2626 query = query.where(
2627 dictionary.all_tables.c.table_name.not_in(
2628 bindparam("mat_views")
2629 )
2630 )
2631 elif (
2632 ObjectKind.TABLE not in kind
2633 and ObjectKind.MATERIALIZED_VIEW in kind
2634 ):
2635 query = query.where(
2636 dictionary.all_tables.c.table_name.in_(bindparam("mat_views"))
2637 )
2638 return query
2639
2640 @_handle_synonyms_decorator
2641 def get_multi_table_options(
2642 self,
2643 connection,
2644 *,
2645 schema,
2646 filter_names,
2647 scope,
2648 kind,
2649 dblink=None,
2650 **kw,
2651 ):
2652 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2653 ``oracle_resolve_synonyms`` to resolve names to synonyms
2654 """
2655 owner = self.denormalize_schema_name(
2656 schema or self.default_schema_name
2657 )
2658
2659 has_filter_names, params = self._prepare_filter_names(filter_names)
2660 has_mat_views = False
2661
2662 if (
2663 ObjectKind.TABLE in kind
2664 and ObjectKind.MATERIALIZED_VIEW not in kind
2665 ):
2666 # see note in _table_options_query
2667 mat_views = self.get_materialized_view_names(
2668 connection, schema, dblink, _normalize=False, **kw
2669 )
2670 if mat_views:
2671 params["mat_views"] = mat_views
2672 has_mat_views = True
2673 elif (
2674 ObjectKind.TABLE not in kind
2675 and ObjectKind.MATERIALIZED_VIEW in kind
2676 ):
2677 mat_views = self.get_materialized_view_names(
2678 connection, schema, dblink, _normalize=False, **kw
2679 )
2680 params["mat_views"] = mat_views
2681
2682 options = {}
2683 default = ReflectionDefaults.table_options
2684
2685 if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind:
2686 query = self._table_options_query(
2687 owner, scope, kind, has_filter_names, has_mat_views
2688 )
2689 result = self._execute_reflection(
2690 connection, query, dblink, returns_long=False, params=params
2691 )
2692
2693 for table, compression, compress_for, tablespace in result:
2694 data = default()
2695 if compression == "ENABLED":
2696 data["oracle_compress"] = compress_for
2697 if tablespace:
2698 data["oracle_tablespace"] = tablespace
2699 options[(schema, self.normalize_name(table))] = data
2700 if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope:
2701 # add the views (no temporary views)
2702 for view in self.get_view_names(connection, schema, dblink, **kw):
2703 if not filter_names or view in filter_names:
2704 options[(schema, view)] = default()
2705
2706 return options.items()
2707
2708 @reflection.cache
2709 def get_columns(self, connection, table_name, schema=None, **kw):
2710 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2711 ``oracle_resolve_synonyms`` to resolve names to synonyms
2712 """
2713
2714 data = self.get_multi_columns(
2715 connection,
2716 schema=schema,
2717 filter_names=[table_name],
2718 scope=ObjectScope.ANY,
2719 kind=ObjectKind.ANY,
2720 **kw,
2721 )
2722 return self._value_or_raise(data, table_name, schema)
2723
2724 def _run_batches(
2725 self, connection, query, dblink, returns_long, mappings, all_objects
2726 ):
2727 each_batch = 500
2728 batches = list(all_objects)
2729 while batches:
2730 batch = batches[0:each_batch]
2731 batches[0:each_batch] = []
2732
2733 result = self._execute_reflection(
2734 connection,
2735 query,
2736 dblink,
2737 returns_long=returns_long,
2738 params={"all_objects": batch},
2739 )
2740 if mappings:
2741 yield from result.mappings()
2742 else:
2743 yield from result
2744
2745 @lru_cache()
2746 def _column_query(self, owner):
2747 all_cols = dictionary.all_tab_cols
2748 all_comments = dictionary.all_col_comments
2749 all_ids = dictionary.all_tab_identity_cols
2750
2751 if self.server_version_info >= (12,):
2752 add_cols = (
2753 all_cols.c.default_on_null,
2754 sql.case(
2755 (all_ids.c.table_name.is_(None), sql.null()),
2756 else_=all_ids.c.generation_type
2757 + ","
2758 + all_ids.c.identity_options,
2759 ).label("identity_options"),
2760 )
2761 join_identity_cols = True
2762 else:
2763 add_cols = (
2764 sql.null().label("default_on_null"),
2765 sql.null().label("identity_options"),
2766 )
2767 join_identity_cols = False
2768
2769 # NOTE: on oracle cannot create tables/views without columns and
2770 # a table cannot have all column hidden:
2771 # ORA-54039: table must have at least one column that is not invisible
2772 # all_tab_cols returns data for tables/views/mat-views.
2773 # all_tab_cols does not return recycled tables
2774
2775 query = (
2776 select(
2777 all_cols.c.table_name,
2778 all_cols.c.column_name,
2779 all_cols.c.data_type,
2780 all_cols.c.char_length,
2781 all_cols.c.data_length,
2782 all_cols.c.data_precision,
2783 all_cols.c.data_scale,
2784 all_cols.c.nullable,
2785 all_cols.c.data_default,
2786 all_comments.c.comments,
2787 all_cols.c.virtual_column,
2788 *add_cols,
2789 ).select_from(all_cols)
2790 # NOTE: all_col_comments has a row for each column even if no
2791 # comment is present, so a join could be performed, but there
2792 # seems to be no difference compared to an outer join
2793 .outerjoin(
2794 all_comments,
2795 and_(
2796 all_cols.c.table_name == all_comments.c.table_name,
2797 all_cols.c.column_name == all_comments.c.column_name,
2798 all_cols.c.owner == all_comments.c.owner,
2799 ),
2800 )
2801 )
2802 if join_identity_cols:
2803 query = query.outerjoin(
2804 all_ids,
2805 and_(
2806 all_cols.c.table_name == all_ids.c.table_name,
2807 all_cols.c.column_name == all_ids.c.column_name,
2808 all_cols.c.owner == all_ids.c.owner,
2809 ),
2810 )
2811
2812 query = query.where(
2813 all_cols.c.table_name.in_(bindparam("all_objects")),
2814 all_cols.c.hidden_column == "NO",
2815 all_cols.c.owner == owner,
2816 ).order_by(all_cols.c.table_name, all_cols.c.column_id)
2817 return query
2818
2819 @_handle_synonyms_decorator
2820 def get_multi_columns(
2821 self,
2822 connection,
2823 *,
2824 schema,
2825 filter_names,
2826 scope,
2827 kind,
2828 dblink=None,
2829 **kw,
2830 ):
2831 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2832 ``oracle_resolve_synonyms`` to resolve names to synonyms
2833 """
2834 owner = self.denormalize_schema_name(
2835 schema or self.default_schema_name
2836 )
2837 query = self._column_query(owner)
2838
2839 if (
2840 filter_names
2841 and kind is ObjectKind.ANY
2842 and scope is ObjectScope.ANY
2843 ):
2844 all_objects = [self.denormalize_name(n) for n in filter_names]
2845 else:
2846 all_objects = self._get_all_objects(
2847 connection, schema, scope, kind, filter_names, dblink, **kw
2848 )
2849
2850 columns = defaultdict(list)
2851
2852 # all_tab_cols.data_default is LONG
2853 result = self._run_batches(
2854 connection,
2855 query,
2856 dblink,
2857 returns_long=True,
2858 mappings=True,
2859 all_objects=all_objects,
2860 )
2861
2862 def maybe_int(value):
2863 if isinstance(value, float) and value.is_integer():
2864 return int(value)
2865 else:
2866 return value
2867
2868 remove_size = re.compile(r"\(\d+\)")
2869
2870 for row_dict in result:
2871 table_name = self.normalize_name(row_dict["table_name"])
2872 orig_colname = row_dict["column_name"]
2873 colname = self.normalize_name(orig_colname)
2874 coltype = row_dict["data_type"]
2875 precision = maybe_int(row_dict["data_precision"])
2876
2877 if coltype == "NUMBER":
2878 scale = maybe_int(row_dict["data_scale"])
2879 if precision is None and scale == 0:
2880 coltype = INTEGER()
2881 else:
2882 coltype = NUMBER(precision, scale)
2883 elif coltype == "FLOAT":
2884 # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm
2885 if precision == 126:
2886 # The DOUBLE PRECISION datatype is a floating-point
2887 # number with binary precision 126.
2888 coltype = DOUBLE_PRECISION()
2889 elif precision == 63:
2890 # The REAL datatype is a floating-point number with a
2891 # binary precision of 63, or 18 decimal.
2892 coltype = REAL()
2893 else:
2894 # non standard precision
2895 coltype = FLOAT(binary_precision=precision)
2896
2897 elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
2898 char_length = maybe_int(row_dict["char_length"])
2899 coltype = self.ischema_names.get(coltype)(char_length)
2900 elif coltype == "RAW":
2901 data_length = maybe_int(row_dict["data_length"])
2902 coltype = RAW(data_length)
2903 elif "WITH TIME ZONE" in coltype:
2904 coltype = TIMESTAMP(timezone=True)
2905 elif "WITH LOCAL TIME ZONE" in coltype:
2906 coltype = TIMESTAMP(local_timezone=True)
2907 else:
2908 coltype = re.sub(remove_size, "", coltype)
2909 try:
2910 coltype = self.ischema_names[coltype]
2911 except KeyError:
2912 util.warn(
2913 "Did not recognize type '%s' of column '%s'"
2914 % (coltype, colname)
2915 )
2916 coltype = sqltypes.NULLTYPE
2917
2918 default = row_dict["data_default"]
2919 if row_dict["virtual_column"] == "YES":
2920 computed = dict(sqltext=default)
2921 default = None
2922 else:
2923 computed = None
2924
2925 identity_options = row_dict["identity_options"]
2926 if identity_options is not None:
2927 identity = self._parse_identity_options(
2928 identity_options, row_dict["default_on_null"]
2929 )
2930 default = None
2931 else:
2932 identity = None
2933
2934 cdict = {
2935 "name": colname,
2936 "type": coltype,
2937 "nullable": row_dict["nullable"] == "Y",
2938 "default": default,
2939 "comment": row_dict["comments"],
2940 }
2941 if orig_colname.lower() == orig_colname:
2942 cdict["quote"] = True
2943 if computed is not None:
2944 cdict["computed"] = computed
2945 if identity is not None:
2946 cdict["identity"] = identity
2947
2948 columns[(schema, table_name)].append(cdict)
2949
2950 # NOTE: default not needed since all tables have columns
2951 # default = ReflectionDefaults.columns
2952 # return (
2953 # (key, value if value else default())
2954 # for key, value in columns.items()
2955 # )
2956 return columns.items()
2957
2958 def _parse_identity_options(self, identity_options, default_on_null):
2959 # identity_options is a string that starts with 'ALWAYS,' or
2960 # 'BY DEFAULT,' and continues with
2961 # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
2962 # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
2963 # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
2964 parts = [p.strip() for p in identity_options.split(",")]
2965 identity = {
2966 "always": parts[0] == "ALWAYS",
2967 "on_null": default_on_null == "YES",
2968 }
2969
2970 for part in parts[1:]:
2971 option, value = part.split(":")
2972 value = value.strip()
2973
2974 if "START WITH" in option:
2975 identity["start"] = int(value)
2976 elif "INCREMENT BY" in option:
2977 identity["increment"] = int(value)
2978 elif "MAX_VALUE" in option:
2979 identity["maxvalue"] = int(value)
2980 elif "MIN_VALUE" in option:
2981 identity["minvalue"] = int(value)
2982 elif "CYCLE_FLAG" in option:
2983 identity["cycle"] = value == "Y"
2984 elif "CACHE_SIZE" in option:
2985 identity["cache"] = int(value)
2986 elif "ORDER_FLAG" in option:
2987 identity["order"] = value == "Y"
2988 return identity
2989
2990 @reflection.cache
2991 def get_table_comment(self, connection, table_name, schema=None, **kw):
2992 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2993 ``oracle_resolve_synonyms`` to resolve names to synonyms
2994 """
2995 data = self.get_multi_table_comment(
2996 connection,
2997 schema=schema,
2998 filter_names=[table_name],
2999 scope=ObjectScope.ANY,
3000 kind=ObjectKind.ANY,
3001 **kw,
3002 )
3003 return self._value_or_raise(data, table_name, schema)
3004
3005 @lru_cache()
3006 def _comment_query(self, owner, scope, kind, has_filter_names):
3007 # NOTE: all_tab_comments / all_mview_comments have a row for all
3008 # object even if they don't have comments
3009 queries = []
3010 if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind:
3011 # all_tab_comments returns also plain views
3012 tbl_view = select(
3013 dictionary.all_tab_comments.c.table_name,
3014 dictionary.all_tab_comments.c.comments,
3015 ).where(
3016 dictionary.all_tab_comments.c.owner == owner,
3017 dictionary.all_tab_comments.c.table_name.not_like("BIN$%"),
3018 )
3019 if ObjectKind.VIEW not in kind:
3020 tbl_view = tbl_view.where(
3021 dictionary.all_tab_comments.c.table_type == "TABLE"
3022 )
3023 elif ObjectKind.TABLE not in kind:
3024 tbl_view = tbl_view.where(
3025 dictionary.all_tab_comments.c.table_type == "VIEW"
3026 )
3027 queries.append(tbl_view)
3028 if ObjectKind.MATERIALIZED_VIEW in kind:
3029 mat_view = select(
3030 dictionary.all_mview_comments.c.mview_name.label("table_name"),
3031 dictionary.all_mview_comments.c.comments,
3032 ).where(
3033 dictionary.all_mview_comments.c.owner == owner,
3034 dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"),
3035 )
3036 queries.append(mat_view)
3037 if len(queries) == 1:
3038 query = queries[0]
3039 else:
3040 union = sql.union_all(*queries).subquery("tables_and_views")
3041 query = select(union.c.table_name, union.c.comments)
3042
3043 name_col = query.selected_columns.table_name
3044
3045 if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY):
3046 temp = "Y" if scope is ObjectScope.TEMPORARY else "N"
3047 # need distinct since materialized view are listed also
3048 # as tables in all_objects
3049 query = query.distinct().join(
3050 dictionary.all_objects,
3051 and_(
3052 dictionary.all_objects.c.owner == owner,
3053 dictionary.all_objects.c.object_name == name_col,
3054 dictionary.all_objects.c.temporary == temp,
3055 ),
3056 )
3057 if has_filter_names:
3058 query = query.where(name_col.in_(bindparam("filter_names")))
3059 return query
3060
3061 @_handle_synonyms_decorator
3062 def get_multi_table_comment(
3063 self,
3064 connection,
3065 *,
3066 schema,
3067 filter_names,
3068 scope,
3069 kind,
3070 dblink=None,
3071 **kw,
3072 ):
3073 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3074 ``oracle_resolve_synonyms`` to resolve names to synonyms
3075 """
3076 owner = self.denormalize_schema_name(
3077 schema or self.default_schema_name
3078 )
3079 has_filter_names, params = self._prepare_filter_names(filter_names)
3080 query = self._comment_query(owner, scope, kind, has_filter_names)
3081
3082 result = self._execute_reflection(
3083 connection, query, dblink, returns_long=False, params=params
3084 )
3085 default = ReflectionDefaults.table_comment
3086 # materialized views by default seem to have a comment like
3087 # "snapshot table for snapshot owner.mat_view_name"
3088 ignore_mat_view = "snapshot table for snapshot "
3089 return (
3090 (
3091 (schema, self.normalize_name(table)),
3092 (
3093 {"text": comment}
3094 if comment is not None
3095 and not comment.startswith(ignore_mat_view)
3096 else default()
3097 ),
3098 )
3099 for table, comment in result
3100 )
3101
3102 @reflection.cache
3103 def get_indexes(self, connection, table_name, schema=None, **kw):
3104 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3105 ``oracle_resolve_synonyms`` to resolve names to synonyms
3106 """
3107 data = self.get_multi_indexes(
3108 connection,
3109 schema=schema,
3110 filter_names=[table_name],
3111 scope=ObjectScope.ANY,
3112 kind=ObjectKind.ANY,
3113 **kw,
3114 )
3115 return self._value_or_raise(data, table_name, schema)
3116
3117 @lru_cache()
3118 def _index_query(self, owner):
3119 return (
3120 select(
3121 dictionary.all_ind_columns.c.table_name,
3122 dictionary.all_ind_columns.c.index_name,
3123 dictionary.all_ind_columns.c.column_name,
3124 dictionary.all_indexes.c.index_type,
3125 dictionary.all_indexes.c.uniqueness,
3126 dictionary.all_indexes.c.compression,
3127 dictionary.all_indexes.c.prefix_length,
3128 dictionary.all_ind_columns.c.descend,
3129 dictionary.all_ind_expressions.c.column_expression,
3130 )
3131 .select_from(dictionary.all_ind_columns)
3132 .join(
3133 dictionary.all_indexes,
3134 sql.and_(
3135 dictionary.all_ind_columns.c.index_name
3136 == dictionary.all_indexes.c.index_name,
3137 dictionary.all_ind_columns.c.index_owner
3138 == dictionary.all_indexes.c.owner,
3139 ),
3140 )
3141 .outerjoin(
3142 # NOTE: this adds about 20% to the query time. Using a
3143 # case expression with a scalar subquery only when needed
3144 # with the assumption that most indexes are not expression
3145 # would be faster but oracle does not like that with
3146 # LONG datatype. It errors with:
3147 # ORA-00997: illegal use of LONG datatype
3148 dictionary.all_ind_expressions,
3149 sql.and_(
3150 dictionary.all_ind_expressions.c.index_name
3151 == dictionary.all_ind_columns.c.index_name,
3152 dictionary.all_ind_expressions.c.index_owner
3153 == dictionary.all_ind_columns.c.index_owner,
3154 dictionary.all_ind_expressions.c.column_position
3155 == dictionary.all_ind_columns.c.column_position,
3156 ),
3157 )
3158 .where(
3159 dictionary.all_indexes.c.table_owner == owner,
3160 dictionary.all_indexes.c.table_name.in_(
3161 bindparam("all_objects")
3162 ),
3163 )
3164 .order_by(
3165 dictionary.all_ind_columns.c.index_name,
3166 dictionary.all_ind_columns.c.column_position,
3167 )
3168 )
3169
3170 @reflection.flexi_cache(
3171 ("schema", InternalTraversal.dp_string),
3172 ("dblink", InternalTraversal.dp_string),
3173 ("all_objects", InternalTraversal.dp_string_list),
3174 )
3175 def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw):
3176 owner = self.denormalize_schema_name(
3177 schema or self.default_schema_name
3178 )
3179
3180 query = self._index_query(owner)
3181
3182 pks = {
3183 row_dict["constraint_name"]
3184 for row_dict in self._get_all_constraint_rows(
3185 connection, schema, dblink, all_objects, **kw
3186 )
3187 if row_dict["constraint_type"] == "P"
3188 }
3189
3190 # all_ind_expressions.column_expression is LONG
3191 result = self._run_batches(
3192 connection,
3193 query,
3194 dblink,
3195 returns_long=True,
3196 mappings=True,
3197 all_objects=all_objects,
3198 )
3199
3200 return [
3201 row_dict
3202 for row_dict in result
3203 if row_dict["index_name"] not in pks
3204 ]
3205
3206 @_handle_synonyms_decorator
3207 def get_multi_indexes(
3208 self,
3209 connection,
3210 *,
3211 schema,
3212 filter_names,
3213 scope,
3214 kind,
3215 dblink=None,
3216 **kw,
3217 ):
3218 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3219 ``oracle_resolve_synonyms`` to resolve names to synonyms
3220 """
3221 all_objects = self._get_all_objects(
3222 connection, schema, scope, kind, filter_names, dblink, **kw
3223 )
3224
3225 uniqueness = {"NONUNIQUE": False, "UNIQUE": True}
3226 enabled = {"DISABLED": False, "ENABLED": True}
3227 is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"}
3228
3229 indexes = defaultdict(dict)
3230
3231 for row_dict in self._get_indexes_rows(
3232 connection, schema, dblink, all_objects, **kw
3233 ):
3234 index_name = self.normalize_name(row_dict["index_name"])
3235 table_name = self.normalize_name(row_dict["table_name"])
3236 table_indexes = indexes[(schema, table_name)]
3237
3238 if index_name not in table_indexes:
3239 table_indexes[index_name] = index_dict = {
3240 "name": index_name,
3241 "column_names": [],
3242 "dialect_options": {},
3243 "unique": uniqueness.get(row_dict["uniqueness"], False),
3244 }
3245 do = index_dict["dialect_options"]
3246 if row_dict["index_type"] in is_bitmap:
3247 do["oracle_bitmap"] = True
3248 if enabled.get(row_dict["compression"], False):
3249 do["oracle_compress"] = row_dict["prefix_length"]
3250
3251 else:
3252 index_dict = table_indexes[index_name]
3253
3254 expr = row_dict["column_expression"]
3255 if expr is not None:
3256 index_dict["column_names"].append(None)
3257 if "expressions" in index_dict:
3258 index_dict["expressions"].append(expr)
3259 else:
3260 index_dict["expressions"] = index_dict["column_names"][:-1]
3261 index_dict["expressions"].append(expr)
3262
3263 if row_dict["descend"].lower() != "asc":
3264 assert row_dict["descend"].lower() == "desc"
3265 cs = index_dict.setdefault("column_sorting", {})
3266 cs[expr] = ("desc",)
3267 else:
3268 assert row_dict["descend"].lower() == "asc"
3269 cn = self.normalize_name(row_dict["column_name"])
3270 index_dict["column_names"].append(cn)
3271 if "expressions" in index_dict:
3272 index_dict["expressions"].append(cn)
3273
3274 default = ReflectionDefaults.indexes
3275
3276 return (
3277 (key, list(indexes[key].values()) if key in indexes else default())
3278 for key in (
3279 (schema, self.normalize_name(obj_name))
3280 for obj_name in all_objects
3281 )
3282 )
3283
3284 @reflection.cache
3285 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
3286 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3287 ``oracle_resolve_synonyms`` to resolve names to synonyms
3288 """
3289 data = self.get_multi_pk_constraint(
3290 connection,
3291 schema=schema,
3292 filter_names=[table_name],
3293 scope=ObjectScope.ANY,
3294 kind=ObjectKind.ANY,
3295 **kw,
3296 )
3297 return self._value_or_raise(data, table_name, schema)
3298
3299 @lru_cache()
3300 def _constraint_query(self, owner):
3301 local = dictionary.all_cons_columns.alias("local")
3302 remote = dictionary.all_cons_columns.alias("remote")
3303 return (
3304 select(
3305 dictionary.all_constraints.c.table_name,
3306 dictionary.all_constraints.c.constraint_type,
3307 dictionary.all_constraints.c.constraint_name,
3308 local.c.column_name.label("local_column"),
3309 remote.c.table_name.label("remote_table"),
3310 remote.c.column_name.label("remote_column"),
3311 remote.c.owner.label("remote_owner"),
3312 dictionary.all_constraints.c.search_condition,
3313 dictionary.all_constraints.c.delete_rule,
3314 )
3315 .select_from(dictionary.all_constraints)
3316 .join(
3317 local,
3318 and_(
3319 local.c.owner == dictionary.all_constraints.c.owner,
3320 dictionary.all_constraints.c.constraint_name
3321 == local.c.constraint_name,
3322 ),
3323 )
3324 .outerjoin(
3325 remote,
3326 and_(
3327 dictionary.all_constraints.c.r_owner == remote.c.owner,
3328 dictionary.all_constraints.c.r_constraint_name
3329 == remote.c.constraint_name,
3330 or_(
3331 remote.c.position.is_(sql.null()),
3332 local.c.position == remote.c.position,
3333 ),
3334 ),
3335 )
3336 .where(
3337 dictionary.all_constraints.c.owner == owner,
3338 dictionary.all_constraints.c.table_name.in_(
3339 bindparam("all_objects")
3340 ),
3341 dictionary.all_constraints.c.constraint_type.in_(
3342 ("R", "P", "U", "C")
3343 ),
3344 )
3345 .order_by(
3346 dictionary.all_constraints.c.constraint_name, local.c.position
3347 )
3348 )
3349
3350 @reflection.flexi_cache(
3351 ("schema", InternalTraversal.dp_string),
3352 ("dblink", InternalTraversal.dp_string),
3353 ("all_objects", InternalTraversal.dp_string_list),
3354 )
3355 def _get_all_constraint_rows(
3356 self, connection, schema, dblink, all_objects, **kw
3357 ):
3358 owner = self.denormalize_schema_name(
3359 schema or self.default_schema_name
3360 )
3361 query = self._constraint_query(owner)
3362
3363 # since the result is cached a list must be created
3364 values = list(
3365 self._run_batches(
3366 connection,
3367 query,
3368 dblink,
3369 returns_long=False,
3370 mappings=True,
3371 all_objects=all_objects,
3372 )
3373 )
3374 return values
3375
3376 @_handle_synonyms_decorator
3377 def get_multi_pk_constraint(
3378 self,
3379 connection,
3380 *,
3381 scope,
3382 schema,
3383 filter_names,
3384 kind,
3385 dblink=None,
3386 **kw,
3387 ):
3388 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3389 ``oracle_resolve_synonyms`` to resolve names to synonyms
3390 """
3391 all_objects = self._get_all_objects(
3392 connection, schema, scope, kind, filter_names, dblink, **kw
3393 )
3394
3395 primary_keys = defaultdict(dict)
3396 default = ReflectionDefaults.pk_constraint
3397
3398 for row_dict in self._get_all_constraint_rows(
3399 connection, schema, dblink, all_objects, **kw
3400 ):
3401 if row_dict["constraint_type"] != "P":
3402 continue
3403 table_name = self.normalize_name(row_dict["table_name"])
3404 constraint_name = self.normalize_name(row_dict["constraint_name"])
3405 column_name = self.normalize_name(row_dict["local_column"])
3406
3407 table_pk = primary_keys[(schema, table_name)]
3408 if not table_pk:
3409 table_pk["name"] = constraint_name
3410 table_pk["constrained_columns"] = [column_name]
3411 else:
3412 table_pk["constrained_columns"].append(column_name)
3413
3414 return (
3415 (key, primary_keys[key] if key in primary_keys else default())
3416 for key in (
3417 (schema, self.normalize_name(obj_name))
3418 for obj_name in all_objects
3419 )
3420 )
3421
3422 @reflection.cache
3423 def get_foreign_keys(
3424 self,
3425 connection,
3426 table_name,
3427 schema=None,
3428 **kw,
3429 ):
3430 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3431 ``oracle_resolve_synonyms`` to resolve names to synonyms
3432 """
3433 data = self.get_multi_foreign_keys(
3434 connection,
3435 schema=schema,
3436 filter_names=[table_name],
3437 scope=ObjectScope.ANY,
3438 kind=ObjectKind.ANY,
3439 **kw,
3440 )
3441 return self._value_or_raise(data, table_name, schema)
3442
3443 @_handle_synonyms_decorator
3444 def get_multi_foreign_keys(
3445 self,
3446 connection,
3447 *,
3448 scope,
3449 schema,
3450 filter_names,
3451 kind,
3452 dblink=None,
3453 **kw,
3454 ):
3455 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3456 ``oracle_resolve_synonyms`` to resolve names to synonyms
3457 """
3458 all_objects = self._get_all_objects(
3459 connection, schema, scope, kind, filter_names, dblink, **kw
3460 )
3461
3462 resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
3463
3464 owner = self.denormalize_schema_name(
3465 schema or self.default_schema_name
3466 )
3467
3468 all_remote_owners = set()
3469 fkeys = defaultdict(dict)
3470
3471 for row_dict in self._get_all_constraint_rows(
3472 connection, schema, dblink, all_objects, **kw
3473 ):
3474 if row_dict["constraint_type"] != "R":
3475 continue
3476
3477 table_name = self.normalize_name(row_dict["table_name"])
3478 constraint_name = self.normalize_name(row_dict["constraint_name"])
3479 table_fkey = fkeys[(schema, table_name)]
3480
3481 assert constraint_name is not None
3482
3483 local_column = self.normalize_name(row_dict["local_column"])
3484 remote_table = self.normalize_name(row_dict["remote_table"])
3485 remote_column = self.normalize_name(row_dict["remote_column"])
3486 remote_owner_orig = row_dict["remote_owner"]
3487 remote_owner = self.normalize_name(remote_owner_orig)
3488 if remote_owner_orig is not None:
3489 all_remote_owners.add(remote_owner_orig)
3490
3491 if remote_table is None:
3492 # ticket 363
3493 if dblink and not dblink.startswith("@"):
3494 dblink = f"@{dblink}"
3495 util.warn(
3496 "Got 'None' querying 'table_name' from "
3497 f"all_cons_columns{dblink or ''} - does the user have "
3498 "proper rights to the table?"
3499 )
3500 continue
3501
3502 if constraint_name not in table_fkey:
3503 table_fkey[constraint_name] = fkey = {
3504 "name": constraint_name,
3505 "constrained_columns": [],
3506 "referred_schema": None,
3507 "referred_table": remote_table,
3508 "referred_columns": [],
3509 "options": {},
3510 }
3511
3512 if resolve_synonyms:
3513 # will be removed below
3514 fkey["_ref_schema"] = remote_owner
3515
3516 if schema is not None or remote_owner_orig != owner:
3517 fkey["referred_schema"] = remote_owner
3518
3519 delete_rule = row_dict["delete_rule"]
3520 if delete_rule != "NO ACTION":
3521 fkey["options"]["ondelete"] = delete_rule
3522
3523 else:
3524 fkey = table_fkey[constraint_name]
3525
3526 fkey["constrained_columns"].append(local_column)
3527 fkey["referred_columns"].append(remote_column)
3528
3529 if resolve_synonyms and all_remote_owners:
3530 query = select(
3531 dictionary.all_synonyms.c.owner,
3532 dictionary.all_synonyms.c.table_name,
3533 dictionary.all_synonyms.c.table_owner,
3534 dictionary.all_synonyms.c.synonym_name,
3535 ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners))
3536
3537 result = self._execute_reflection(
3538 connection, query, dblink, returns_long=False
3539 ).mappings()
3540
3541 remote_owners_lut = {}
3542 for row in result:
3543 synonym_owner = self.normalize_name(row["owner"])
3544 table_name = self.normalize_name(row["table_name"])
3545
3546 remote_owners_lut[(synonym_owner, table_name)] = (
3547 row["table_owner"],
3548 row["synonym_name"],
3549 )
3550
3551 empty = (None, None)
3552 for table_fkeys in fkeys.values():
3553 for table_fkey in table_fkeys.values():
3554 key = (
3555 table_fkey.pop("_ref_schema"),
3556 table_fkey["referred_table"],
3557 )
3558 remote_owner, syn_name = remote_owners_lut.get(key, empty)
3559 if syn_name:
3560 sn = self.normalize_name(syn_name)
3561 table_fkey["referred_table"] = sn
3562 if schema is not None or remote_owner != owner:
3563 ro = self.normalize_name(remote_owner)
3564 table_fkey["referred_schema"] = ro
3565 else:
3566 table_fkey["referred_schema"] = None
3567 default = ReflectionDefaults.foreign_keys
3568
3569 return (
3570 (key, list(fkeys[key].values()) if key in fkeys else default())
3571 for key in (
3572 (schema, self.normalize_name(obj_name))
3573 for obj_name in all_objects
3574 )
3575 )
3576
3577 @reflection.cache
3578 def get_unique_constraints(
3579 self, connection, table_name, schema=None, **kw
3580 ):
3581 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3582 ``oracle_resolve_synonyms`` to resolve names to synonyms
3583 """
3584 data = self.get_multi_unique_constraints(
3585 connection,
3586 schema=schema,
3587 filter_names=[table_name],
3588 scope=ObjectScope.ANY,
3589 kind=ObjectKind.ANY,
3590 **kw,
3591 )
3592 return self._value_or_raise(data, table_name, schema)
3593
3594 @_handle_synonyms_decorator
3595 def get_multi_unique_constraints(
3596 self,
3597 connection,
3598 *,
3599 scope,
3600 schema,
3601 filter_names,
3602 kind,
3603 dblink=None,
3604 **kw,
3605 ):
3606 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3607 ``oracle_resolve_synonyms`` to resolve names to synonyms
3608 """
3609 all_objects = self._get_all_objects(
3610 connection, schema, scope, kind, filter_names, dblink, **kw
3611 )
3612
3613 unique_cons = defaultdict(dict)
3614
3615 index_names = {
3616 row_dict["index_name"]
3617 for row_dict in self._get_indexes_rows(
3618 connection, schema, dblink, all_objects, **kw
3619 )
3620 }
3621
3622 for row_dict in self._get_all_constraint_rows(
3623 connection, schema, dblink, all_objects, **kw
3624 ):
3625 if row_dict["constraint_type"] != "U":
3626 continue
3627 table_name = self.normalize_name(row_dict["table_name"])
3628 constraint_name_orig = row_dict["constraint_name"]
3629 constraint_name = self.normalize_name(constraint_name_orig)
3630 column_name = self.normalize_name(row_dict["local_column"])
3631 table_uc = unique_cons[(schema, table_name)]
3632
3633 assert constraint_name is not None
3634
3635 if constraint_name not in table_uc:
3636 table_uc[constraint_name] = uc = {
3637 "name": constraint_name,
3638 "column_names": [],
3639 "duplicates_index": (
3640 constraint_name
3641 if constraint_name_orig in index_names
3642 else None
3643 ),
3644 }
3645 else:
3646 uc = table_uc[constraint_name]
3647
3648 uc["column_names"].append(column_name)
3649
3650 default = ReflectionDefaults.unique_constraints
3651
3652 return (
3653 (
3654 key,
3655 (
3656 list(unique_cons[key].values())
3657 if key in unique_cons
3658 else default()
3659 ),
3660 )
3661 for key in (
3662 (schema, self.normalize_name(obj_name))
3663 for obj_name in all_objects
3664 )
3665 )
3666
3667 @reflection.cache
3668 def get_view_definition(
3669 self,
3670 connection,
3671 view_name,
3672 schema=None,
3673 dblink=None,
3674 **kw,
3675 ):
3676 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3677 ``oracle_resolve_synonyms`` to resolve names to synonyms
3678 """
3679 if kw.get("oracle_resolve_synonyms", False):
3680 synonyms = self._get_synonyms(
3681 connection, schema, filter_names=[view_name], dblink=dblink
3682 )
3683 if synonyms:
3684 assert len(synonyms) == 1
3685 row_dict = synonyms[0]
3686 dblink = self.normalize_name(row_dict["db_link"])
3687 schema = row_dict["table_owner"]
3688 view_name = row_dict["table_name"]
3689
3690 name = self.denormalize_name(view_name)
3691 owner = self.denormalize_schema_name(
3692 schema or self.default_schema_name
3693 )
3694 query = (
3695 select(dictionary.all_views.c.text)
3696 .where(
3697 dictionary.all_views.c.view_name == name,
3698 dictionary.all_views.c.owner == owner,
3699 )
3700 .union_all(
3701 select(dictionary.all_mviews.c.query).where(
3702 dictionary.all_mviews.c.mview_name == name,
3703 dictionary.all_mviews.c.owner == owner,
3704 )
3705 )
3706 )
3707
3708 rp = self._execute_reflection(
3709 connection, query, dblink, returns_long=False
3710 ).scalar()
3711 if rp is None:
3712 raise exc.NoSuchTableError(
3713 f"{schema}.{view_name}" if schema else view_name
3714 )
3715 else:
3716 return rp
3717
3718 @reflection.cache
3719 def get_check_constraints(
3720 self, connection, table_name, schema=None, include_all=False, **kw
3721 ):
3722 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3723 ``oracle_resolve_synonyms`` to resolve names to synonyms
3724 """
3725 data = self.get_multi_check_constraints(
3726 connection,
3727 schema=schema,
3728 filter_names=[table_name],
3729 scope=ObjectScope.ANY,
3730 include_all=include_all,
3731 kind=ObjectKind.ANY,
3732 **kw,
3733 )
3734 return self._value_or_raise(data, table_name, schema)
3735
3736 @_handle_synonyms_decorator
3737 def get_multi_check_constraints(
3738 self,
3739 connection,
3740 *,
3741 schema,
3742 filter_names,
3743 dblink=None,
3744 scope,
3745 kind,
3746 include_all=False,
3747 **kw,
3748 ):
3749 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3750 ``oracle_resolve_synonyms`` to resolve names to synonyms
3751 """
3752 all_objects = self._get_all_objects(
3753 connection, schema, scope, kind, filter_names, dblink, **kw
3754 )
3755
3756 not_null = re.compile(r"..+?. IS NOT NULL$")
3757
3758 check_constraints = defaultdict(list)
3759
3760 for row_dict in self._get_all_constraint_rows(
3761 connection, schema, dblink, all_objects, **kw
3762 ):
3763 if row_dict["constraint_type"] != "C":
3764 continue
3765 table_name = self.normalize_name(row_dict["table_name"])
3766 constraint_name = self.normalize_name(row_dict["constraint_name"])
3767 search_condition = row_dict["search_condition"]
3768
3769 table_checks = check_constraints[(schema, table_name)]
3770 if constraint_name is not None and (
3771 include_all or not not_null.match(search_condition)
3772 ):
3773 table_checks.append(
3774 {"name": constraint_name, "sqltext": search_condition}
3775 )
3776
3777 default = ReflectionDefaults.check_constraints
3778
3779 return (
3780 (
3781 key,
3782 (
3783 check_constraints[key]
3784 if key in check_constraints
3785 else default()
3786 ),
3787 )
3788 for key in (
3789 (schema, self.normalize_name(obj_name))
3790 for obj_name in all_objects
3791 )
3792 )
3793
3794 def _list_dblinks(self, connection, dblink=None):
3795 query = select(dictionary.all_db_links.c.db_link)
3796 links = self._execute_reflection(
3797 connection, query, dblink, returns_long=False
3798 ).scalars()
3799 return [self.normalize_name(link) for link in links]
3800
3801
3802class _OuterJoinColumn(sql.ClauseElement):
3803 __visit_name__ = "outer_join_column"
3804
3805 def __init__(self, column):
3806 self.column = column