1# dialects/oracle/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10r"""
11.. dialect:: oracle
12 :name: Oracle Database
13 :normal_support: 11+
14 :best_effort: 9+
15
16
17Auto Increment Behavior
18-----------------------
19
20SQLAlchemy Table objects which include integer primary keys are usually assumed
21to have "autoincrementing" behavior, meaning they can generate their own
22primary key values upon INSERT. For use within Oracle Database, two options are
23available, which are the use of IDENTITY columns (Oracle Database 12 and above
24only) or the association of a SEQUENCE with the column.
25
26Specifying GENERATED AS IDENTITY (Oracle Database 12 and above)
27~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
28
29Starting from version 12, Oracle Database can make use of identity columns
30using the :class:`_sql.Identity` to specify the autoincrementing behavior::
31
32 t = Table(
33 "mytable",
34 metadata,
35 Column("id", Integer, Identity(start=3), primary_key=True),
36 Column(...),
37 ...,
38 )
39
40The CREATE TABLE for the above :class:`_schema.Table` object would be:
41
42.. sourcecode:: sql
43
44 CREATE TABLE mytable (
45 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
46 ...,
47 PRIMARY KEY (id)
48 )
49
50The :class:`_schema.Identity` object support many options to control the
51"autoincrementing" behavior of the column, like the starting value, the
52incrementing value, etc. In addition to the standard options, Oracle Database
53supports setting :paramref:`_schema.Identity.always` to ``None`` to use the
54default generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
55setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
56in conjunction with a 'BY DEFAULT' identity column.
57
58Using a SEQUENCE (all Oracle Database versions)
59~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
60
61Older version of Oracle Database had no "autoincrement" feature: SQLAlchemy
62relies upon sequences to produce these values. With the older Oracle Database
63versions, *a sequence must always be explicitly specified to enable
64autoincrement*. This is divergent with the majority of documentation examples
65which assume the usage of an autoincrement-capable database. To specify
66sequences, use the sqlalchemy.schema.Sequence object which is passed to a
67Column construct::
68
69 t = Table(
70 "mytable",
71 metadata,
72 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
73 Column(...),
74 ...,
75 )
76
77This step is also required when using table reflection, i.e. autoload_with=engine::
78
79 t = Table(
80 "mytable",
81 metadata,
82 Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
83 autoload_with=engine,
84 )
85
86.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
87 in a :class:`_schema.Column` to specify the option of an autoincrementing
88 column.
89
90.. _oracle_isolation_level:
91
92Transaction Isolation Level / Autocommit
93----------------------------------------
94
95Oracle Database supports "READ COMMITTED" and "SERIALIZABLE" modes of
96isolation. The AUTOCOMMIT isolation level is also supported by the
97python-oracledb and cx_Oracle dialects.
98
99To set using per-connection execution options::
100
101 connection = engine.connect()
102 connection = connection.execution_options(isolation_level="AUTOCOMMIT")
103
104For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle Database dialects sets
105the level at the session level using ``ALTER SESSION``, which is reverted back
106to its default setting when the connection is returned to the connection pool.
107
108Valid values for ``isolation_level`` include:
109
110* ``READ COMMITTED``
111* ``AUTOCOMMIT``
112* ``SERIALIZABLE``
113
114.. note:: The implementation for the
115 :meth:`_engine.Connection.get_isolation_level` method as implemented by the
116 Oracle Database dialects necessarily force the start of a transaction using the
117 Oracle Database DBMS_TRANSACTION.LOCAL_TRANSACTION_ID function; otherwise no
118 level is normally readable.
119
120 Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
121 raise an exception if the ``v$transaction`` view is not available due to
122 permissions or other reasons, which is a common occurrence in Oracle Database
123 installations.
124
125 The python-oracledb and cx_Oracle dialects attempt to call the
126 :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
127 its first connection to the database in order to acquire the
128 "default"isolation level. This default level is necessary so that the level
129 can be reset on a connection after it has been temporarily modified using
130 :meth:`_engine.Connection.execution_options` method. In the common event
131 that the :meth:`_engine.Connection.get_isolation_level` method raises an
132 exception due to ``v$transaction`` not being readable as well as any other
133 database-related failure, the level is assumed to be "READ COMMITTED". No
134 warning is emitted for this initial first-connect condition as it is
135 expected to be a common restriction on Oracle databases.
136
137.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_Oracle dialect
138 as well as the notion of a default isolation level
139
140.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
141 reading of the isolation level.
142
143.. versionchanged:: 1.3.22 In the event that the default isolation
144 level cannot be read due to permissions on the v$transaction view as
145 is common in Oracle installations, the default isolation level is hardcoded
146 to "READ COMMITTED" which was the behavior prior to 1.3.21.
147
148.. seealso::
149
150 :ref:`dbapi_autocommit`
151
152Identifier Casing
153-----------------
154
155In Oracle Database, the data dictionary represents all case insensitive
156identifier names using UPPERCASE text. This is in contradiction to the
157expectations of SQLAlchemy, which assume a case insensitive name is represented
158as lowercase text.
159
160As an example of case insensitive identifier names, consider the following table:
161
162.. sourcecode:: sql
163
164 CREATE TABLE MyTable (Identifier INTEGER PRIMARY KEY)
165
166If you were to ask Oracle Database for information about this table, the
167table name would be reported as ``MYTABLE`` and the column name would
168be reported as ``IDENTIFIER``. Compare to most other databases such as
169PostgreSQL and MySQL which would report these names as ``mytable`` and
170``identifier``. The names are **not quoted, therefore are case insensitive**.
171The special casing of ``MyTable`` and ``Identifier`` would only be maintained
172if they were quoted in the table definition:
173
174.. sourcecode:: sql
175
176 CREATE TABLE "MyTable" ("Identifier" INTEGER PRIMARY KEY)
177
178When constructing a SQLAlchemy :class:`.Table` object, **an all lowercase name
179is considered to be case insensitive**. So the following table assumes
180case insensitive names::
181
182 Table("mytable", metadata, Column("identifier", Integer, primary_key=True))
183
184Whereas when mixed case or UPPERCASE names are used, case sensitivity is
185assumed::
186
187 Table("MyTable", metadata, Column("Identifier", Integer, primary_key=True))
188
189A similar situation occurs at the database driver level when emitting a
190textual SQL SELECT statement and looking at column names in the DBAPI
191``cursor.description`` attribute. A database like PostgreSQL will normalize
192case insensitive names to be lowercase::
193
194 >>> pg_engine = create_engine("postgresql://scott:tiger@localhost/test")
195 >>> pg_connection = pg_engine.connect()
196 >>> result = pg_connection.exec_driver_sql("SELECT 1 AS SomeName")
197 >>> result.cursor.description
198 (Column(name='somename', type_code=23),)
199
200Whereas Oracle normalizes them to UPPERCASE::
201
202 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
203 >>> oracle_connection = oracle_engine.connect()
204 >>> result = oracle_connection.exec_driver_sql(
205 ... "SELECT 1 AS SomeName FROM DUAL"
206 ... )
207 >>> result.cursor.description
208 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
209
210In order to achieve cross-database parity for the two cases of a. table
211reflection and b. textual-only SQL statement round trips, SQLAlchemy performs a step
212called **name normalization** when using the Oracle dialect. This process may
213also apply to other third party dialects that have similar UPPERCASE handling
214of case insensitive names.
215
216When using name normalization, SQLAlchemy attempts to detect if a name is
217case insensitive by checking if all characters are UPPERCASE letters only;
218if so, then it assumes this is a case insensitive name and is delivered as
219a lowercase name.
220
221For table reflection, a tablename that is seen represented as all UPPERCASE
222in Oracle Database's catalog tables will be assumed to have a case insensitive
223name. This is what allows the ``Table`` definition to use lower case names
224and be equally compatible from a reflection point of view on Oracle Database
225and all other databases such as PostgreSQL and MySQL::
226
227 # matches a table created with CREATE TABLE mytable
228 Table("mytable", metadata, autoload_with=some_engine)
229
230Above, the all lowercase name ``"mytable"`` is case insensitive; it will match
231a table reported by PostgreSQL as ``"mytable"`` and a table reported by
232Oracle as ``"MYTABLE"``. If name normalization were not present, it would
233not be possible for the above :class:`.Table` definition to be introspectable
234in a cross-database way, since we are dealing with a case insensitive name
235that is not reported by each database in the same way.
236
237Case sensitivity can be forced on in this case, such as if we wanted to represent
238the quoted tablename ``"MYTABLE"`` with that exact casing, most simply by using
239that casing directly, which will be seen as a case sensitive name::
240
241 # matches a table created with CREATE TABLE "MYTABLE"
242 Table("MYTABLE", metadata, autoload_with=some_engine)
243
244For the unusual case of a quoted all-lowercase name, the :class:`.quoted_name`
245construct may be used::
246
247 from sqlalchemy import quoted_name
248
249 # matches a table created with CREATE TABLE "mytable"
250 Table(
251 quoted_name("mytable", quote=True), metadata, autoload_with=some_engine
252 )
253
254Name normalization also takes place when handling result sets from **purely
255textual SQL strings**, that have no other :class:`.Table` or :class:`.Column`
256metadata associated with them. This includes SQL strings executed using
257:meth:`.Connection.exec_driver_sql` and SQL strings executed using the
258:func:`.text` construct which do not include :class:`.Column` metadata.
259
260Returning to the Oracle Database SELECT statement, we see that even though
261``cursor.description`` reports the column name as ``SOMENAME``, SQLAlchemy
262name normalizes this to ``somename``::
263
264 >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
265 >>> oracle_connection = oracle_engine.connect()
266 >>> result = oracle_connection.exec_driver_sql(
267 ... "SELECT 1 AS SomeName FROM DUAL"
268 ... )
269 >>> result.cursor.description
270 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
271 >>> result.keys()
272 RMKeyView(['somename'])
273
274The single scenario where the above behavior produces inaccurate results
275is when using an all-uppercase, quoted name. SQLAlchemy has no way to determine
276that a particular name in ``cursor.description`` was quoted, and is therefore
277case sensitive, or was not quoted, and should be name normalized::
278
279 >>> result = oracle_connection.exec_driver_sql(
280 ... 'SELECT 1 AS "SOMENAME" FROM DUAL'
281 ... )
282 >>> result.cursor.description
283 [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
284 >>> result.keys()
285 RMKeyView(['somename'])
286
287For this case, a new feature will be available in SQLAlchemy 2.1 to disable
288the name normalization behavior in specific cases.
289
290
291.. _oracle_max_identifier_lengths:
292
293Maximum Identifier Lengths
294--------------------------
295
296SQLAlchemy is sensitive to the maximum identifier length supported by Oracle
297Database. This affects generated SQL label names as well as the generation of
298constraint names, particularly in the case where the constraint naming
299convention feature described at :ref:`constraint_naming_conventions` is being
300used.
301
302Oracle Database 12.2 increased the default maximum identifier length from 30 to
303128. As of SQLAlchemy 1.4, the default maximum identifier length for the Oracle
304dialects is 128 characters. Upon first connection, the maximum length actually
305supported by the database is obtained. In all cases, setting the
306:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
307change and the value given will be used as is::
308
309 engine = create_engine(
310 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1",
311 max_identifier_length=30,
312 )
313
314If :paramref:`_sa.create_engine.max_identifier_length` is not set, the oracledb
315dialect internally uses the ``max_identifier_length`` attribute available on
316driver connections since python-oracledb version 2.5. When using an older
317driver version, or using the cx_Oracle dialect, SQLAlchemy will instead attempt
318to use the query ``SELECT value FROM v$parameter WHERE name = 'compatible'``
319upon first connect in order to determine the effective compatibility version of
320the database. The "compatibility" version is a version number that is
321independent of the actual database version. It is used to assist database
322migration. It is configured by an Oracle Database initialization parameter. The
323compatibility version then determines the maximum allowed identifier length for
324the database. If the V$ view is not available, the database version information
325is used instead.
326
327The maximum identifier length comes into play both when generating anonymized
328SQL labels in SELECT statements, but more crucially when generating constraint
329names from a naming convention. It is this area that has created the need for
330SQLAlchemy to change this default conservatively. For example, the following
331naming convention produces two very different constraint names based on the
332identifier length::
333
334 from sqlalchemy import Column
335 from sqlalchemy import Index
336 from sqlalchemy import Integer
337 from sqlalchemy import MetaData
338 from sqlalchemy import Table
339 from sqlalchemy.dialects import oracle
340 from sqlalchemy.schema import CreateIndex
341
342 m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
343
344 t = Table(
345 "t",
346 m,
347 Column("some_column_name_1", Integer),
348 Column("some_column_name_2", Integer),
349 Column("some_column_name_3", Integer),
350 )
351
352 ix = Index(
353 None,
354 t.c.some_column_name_1,
355 t.c.some_column_name_2,
356 t.c.some_column_name_3,
357 )
358
359 oracle_dialect = oracle.dialect(max_identifier_length=30)
360 print(CreateIndex(ix).compile(dialect=oracle_dialect))
361
362With an identifier length of 30, the above CREATE INDEX looks like:
363
364.. sourcecode:: sql
365
366 CREATE INDEX ix_some_column_name_1s_70cd ON t
367 (some_column_name_1, some_column_name_2, some_column_name_3)
368
369However with length of 128, it becomes::
370
371.. sourcecode:: sql
372
373 CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
374 (some_column_name_1, some_column_name_2, some_column_name_3)
375
376Applications which have run versions of SQLAlchemy prior to 1.4 on Oracle
377Database version 12.2 or greater are therefore subject to the scenario of a
378database migration that wishes to "DROP CONSTRAINT" on a name that was
379previously generated with the shorter length. This migration will fail when
380the identifier length is changed without the name of the index or constraint
381first being adjusted. Such applications are strongly advised to make use of
382:paramref:`_sa.create_engine.max_identifier_length` in order to maintain
383control of the generation of truncated names, and to fully review and test all
384database migrations in a staging environment when changing this value to ensure
385that the impact of this change has been mitigated.
386
387.. versionchanged:: 1.4 the default max_identifier_length for Oracle Database
388 is 128 characters, which is adjusted down to 30 upon first connect if the
389 Oracle Database, or its compatibility setting, are lower than version 12.2.
390
391
392LIMIT/OFFSET/FETCH Support
393--------------------------
394
395Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make use
396of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming Oracle Database 12c or
397above, and assuming the SELECT statement is not embedded within a compound
398statement like UNION. This syntax is also available directly by using the
399:meth:`_sql.Select.fetch` method.
400
401.. versionchanged:: 2.0 the Oracle Database dialects now use ``FETCH FIRST N
402 ROW / OFFSET N ROWS`` for all :meth:`_sql.Select.limit` and
403 :meth:`_sql.Select.offset` usage including within the ORM and legacy
404 :class:`_orm.Query`. To force the legacy behavior using window functions,
405 specify the ``enable_offset_fetch=False`` dialect parameter to
406 :func:`_sa.create_engine`.
407
408The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle Database
409version by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`,
410which will force the use of "legacy" mode that makes use of window functions.
411This mode is also selected automatically when using a version of Oracle
412Database prior to 12c.
413
414When using legacy mode, or when a :class:`.Select` statement with limit/offset
415is embedded in a compound statement, an emulated approach for LIMIT / OFFSET
416based on window functions is used, which involves creation of a subquery using
417``ROW_NUMBER`` that is prone to performance issues as well as SQL construction
418issues for complex statements. However, this approach is supported by all
419Oracle Database versions. See notes below.
420
421Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used)
422~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
423
424If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the
425ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an
426Oracle Database version prior to 12c, the following notes apply:
427
428* SQLAlchemy currently makes use of ROWNUM to achieve
429 LIMIT/OFFSET; the exact methodology is taken from
430 https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
431
432* the "FIRST_ROWS()" optimization keyword is not used by default. To enable
433 the usage of this optimization directive, specify ``optimize_limits=True``
434 to :func:`_sa.create_engine`.
435
436 .. versionchanged:: 1.4
437
438 The Oracle Database dialect renders limit/offset integer values using a
439 "post compile" scheme which renders the integer directly before passing
440 the statement to the cursor for execution. The ``use_binds_for_limits``
441 flag no longer has an effect.
442
443 .. seealso::
444
445 :ref:`change_4808`.
446
447.. _oracle_returning:
448
449RETURNING Support
450-----------------
451
452Oracle Database supports RETURNING fully for INSERT, UPDATE and DELETE
453statements that are invoked with a single collection of bound parameters (that
454is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally
455support RETURNING with :term:`executemany` statements). Multiple rows may be
456returned as well.
457
458.. versionchanged:: 2.0 the Oracle Database backend has full support for
459 RETURNING on parity with other backends.
460
461
462ON UPDATE CASCADE
463-----------------
464
465Oracle Database doesn't have native ON UPDATE CASCADE functionality. A trigger
466based solution is available at
467https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
468
469When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
470cascading updates - specify ForeignKey objects using the
471"deferrable=True, initially='deferred'" keyword arguments,
472and specify "passive_updates=False" on each relationship().
473
474Oracle Database 8 Compatibility
475-------------------------------
476
477.. warning:: The status of Oracle Database 8 compatibility is not known for
478 SQLAlchemy 2.0.
479
480When Oracle Database 8 is detected, the dialect internally configures itself to
481the following behaviors:
482
483* the use_ansi flag is set to False. This has the effect of converting all
484 JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
485 makes use of Oracle's (+) operator.
486
487* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
488 the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued
489 instead. This because these types don't seem to work correctly on Oracle 8
490 even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and
491 :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
492 NVARCHAR2 and NCLOB.
493
494
495Synonym/DBLINK Reflection
496-------------------------
497
498When using reflection with Table objects, the dialect can optionally search
499for tables indicated by synonyms, either in local or remote schemas or
500accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
501a keyword argument to the :class:`_schema.Table` construct::
502
503 some_table = Table(
504 "some_table", autoload_with=some_engine, oracle_resolve_synonyms=True
505 )
506
507When this flag is set, the given name (such as ``some_table`` above) will be
508searched not just in the ``ALL_TABLES`` view, but also within the
509``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
510name. If the synonym is located and refers to a DBLINK, the Oracle Database
511dialects know how to locate the table's information using DBLINK syntax(e.g.
512``@dblink``).
513
514``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
515accepted, including methods such as :meth:`_schema.MetaData.reflect` and
516:meth:`_reflection.Inspector.get_columns`.
517
518If synonyms are not in use, this flag should be left disabled.
519
520.. _oracle_constraint_reflection:
521
522Constraint Reflection
523---------------------
524
525The Oracle Database dialects can return information about foreign key, unique,
526and CHECK constraints, as well as indexes on tables.
527
528Raw information regarding these constraints can be acquired using
529:meth:`_reflection.Inspector.get_foreign_keys`,
530:meth:`_reflection.Inspector.get_unique_constraints`,
531:meth:`_reflection.Inspector.get_check_constraints`, and
532:meth:`_reflection.Inspector.get_indexes`.
533
534.. versionchanged:: 1.2 The Oracle Database dialect can now reflect UNIQUE and
535 CHECK constraints.
536
537When using reflection at the :class:`_schema.Table` level, the
538:class:`_schema.Table`
539will also include these constraints.
540
541Note the following caveats:
542
543* When using the :meth:`_reflection.Inspector.get_check_constraints` method,
544 Oracle Database builds a special "IS NOT NULL" constraint for columns that
545 specify "NOT NULL". This constraint is **not** returned by default; to
546 include the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
547
548 from sqlalchemy import create_engine, inspect
549
550 engine = create_engine(
551 "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1"
552 )
553 inspector = inspect(engine)
554 all_check_constraints = inspector.get_check_constraints(
555 "some_table", include_all=True
556 )
557
558* in most cases, when reflecting a :class:`_schema.Table`, a UNIQUE constraint
559 will **not** be available as a :class:`.UniqueConstraint` object, as Oracle
560 Database mirrors unique constraints with a UNIQUE index in most cases (the
561 exception seems to be when two or more unique constraints represent the same
562 columns); the :class:`_schema.Table` will instead represent these using
563 :class:`.Index` with the ``unique=True`` flag set.
564
565* Oracle Database creates an implicit index for the primary key of a table;
566 this index is **excluded** from all index results.
567
568* the list of columns reflected for an index will not include column names
569 that start with SYS_NC.
570
571Table names with SYSTEM/SYSAUX tablespaces
572-------------------------------------------
573
574The :meth:`_reflection.Inspector.get_table_names` and
575:meth:`_reflection.Inspector.get_temp_table_names`
576methods each return a list of table names for the current engine. These methods
577are also part of the reflection which occurs within an operation such as
578:meth:`_schema.MetaData.reflect`. By default,
579these operations exclude the ``SYSTEM``
580and ``SYSAUX`` tablespaces from the operation. In order to change this, the
581default list of tablespaces excluded can be changed at the engine level using
582the ``exclude_tablespaces`` parameter::
583
584 # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
585 e = create_engine(
586 "oracle+oracledb://scott:tiger@localhost:1521/?service_name=freepdb1",
587 exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"],
588 )
589
590.. _oracle_float_support:
591
592FLOAT / DOUBLE Support and Behaviors
593------------------------------------
594
595The SQLAlchemy :class:`.Float` and :class:`.Double` datatypes are generic
596datatypes that resolve to the "least surprising" datatype for a given backend.
597For Oracle Database, this means they resolve to the ``FLOAT`` and ``DOUBLE``
598types::
599
600 >>> from sqlalchemy import cast, literal, Float
601 >>> from sqlalchemy.dialects import oracle
602 >>> float_datatype = Float()
603 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
604 CAST(:param_1 AS FLOAT)
605
606Oracle's ``FLOAT`` / ``DOUBLE`` datatypes are aliases for ``NUMBER``. Oracle
607Database stores ``NUMBER`` values with full precision, not floating point
608precision, which means that ``FLOAT`` / ``DOUBLE`` do not actually behave like
609native FP values. Oracle Database instead offers special datatypes
610``BINARY_FLOAT`` and ``BINARY_DOUBLE`` to deliver real 4- and 8- byte FP
611values.
612
613SQLAlchemy supports these datatypes directly using :class:`.BINARY_FLOAT` and
614:class:`.BINARY_DOUBLE`. To use the :class:`.Float` or :class:`.Double`
615datatypes in a database agnostic way, while allowing Oracle backends to utilize
616one of these types, use the :meth:`.TypeEngine.with_variant` method to set up a
617variant::
618
619 >>> from sqlalchemy import cast, literal, Float
620 >>> from sqlalchemy.dialects import oracle
621 >>> float_datatype = Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
622 >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
623 CAST(:param_1 AS BINARY_FLOAT)
624
625E.g. to use this datatype in a :class:`.Table` definition::
626
627 my_table = Table(
628 "my_table",
629 metadata,
630 Column(
631 "fp_data", Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
632 ),
633 )
634
635DateTime Compatibility
636----------------------
637
638Oracle Database has no datatype known as ``DATETIME``, it instead has only
639``DATE``, which can actually store a date and time value. For this reason, the
640Oracle Database dialects provide a type :class:`_oracle.DATE` which is a
641subclass of :class:`.DateTime`. This type has no special behavior, and is only
642present as a "marker" for this type; additionally, when a database column is
643reflected and the type is reported as ``DATE``, the time-supporting
644:class:`_oracle.DATE` type is used.
645
646.. _oracle_table_options:
647
648Oracle Database Table Options
649-----------------------------
650
651The CREATE TABLE phrase supports the following options with Oracle Database
652dialects in conjunction with the :class:`_schema.Table` construct:
653
654
655* ``ON COMMIT``::
656
657 Table(
658 "some_table",
659 metadata,
660 ...,
661 prefixes=["GLOBAL TEMPORARY"],
662 oracle_on_commit="PRESERVE ROWS",
663 )
664
665*
666 ``COMPRESS``::
667
668 Table(
669 "mytable", metadata, Column("data", String(32)), oracle_compress=True
670 )
671
672 Table("mytable", metadata, Column("data", String(32)), oracle_compress=6)
673
674 The ``oracle_compress`` parameter accepts either an integer compression
675 level, or ``True`` to use the default compression level.
676
677*
678 ``TABLESPACE``::
679
680 Table("mytable", metadata, ..., oracle_tablespace="EXAMPLE_TABLESPACE")
681
682 The ``oracle_tablespace`` parameter specifies the tablespace in which the
683 table is to be created. This is useful when you want to create a table in a
684 tablespace other than the default tablespace of the user.
685
686 .. versionadded:: 2.0.37
687
688.. _oracle_index_options:
689
690Oracle Database Specific Index Options
691--------------------------------------
692
693Bitmap Indexes
694~~~~~~~~~~~~~~
695
696You can specify the ``oracle_bitmap`` parameter to create a bitmap index
697instead of a B-tree index::
698
699 Index("my_index", my_table.c.data, oracle_bitmap=True)
700
701Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
702check for such limitations, only the database will.
703
704Index compression
705~~~~~~~~~~~~~~~~~
706
707Oracle Database has a more efficient storage mode for indexes containing lots
708of repeated values. Use the ``oracle_compress`` parameter to turn on key
709compression::
710
711 Index("my_index", my_table.c.data, oracle_compress=True)
712
713 Index(
714 "my_index",
715 my_table.c.data1,
716 my_table.c.data2,
717 unique=True,
718 oracle_compress=1,
719 )
720
721The ``oracle_compress`` parameter accepts either an integer specifying the
722number of prefix columns to compress, or ``True`` to use the default (all
723columns for non-unique indexes, all but the last column for unique indexes).
724
725.. _oracle_vector_datatype:
726
727VECTOR Datatype
728---------------
729
730Oracle Database 23ai introduced a new VECTOR datatype for artificial intelligence
731and machine learning search operations. The VECTOR datatype is a homogeneous array
732of 8-bit signed integers, 8-bit unsigned integers (binary), 32-bit floating-point
733numbers, or 64-bit floating-point numbers.
734
735A vector's storage type can be either DENSE or SPARSE. A dense vector contains
736meaningful values in most or all of its dimensions. In contrast, a sparse vector
737has non-zero values in only a few dimensions, with the majority being zero.
738
739Sparse vectors are represented by the total number of vector dimensions, an array
740of indices, and an array of values where each value’s location in the vector is
741indicated by the corresponding indices array position. All other vector values are
742treated as zero.
743
744The storage formats that can be used with sparse vectors are float32, float64, and
745int8. Note that the binary storage format cannot be used with sparse vectors.
746
747Sparse vectors are supported when you are using Oracle Database 23.7 or later.
748
749.. seealso::
750
751 `Using VECTOR Data
752 <https://python-oracledb.readthedocs.io/en/latest/user_guide/vector_data_type.html>`_ - in the documentation
753 for the :ref:`oracledb` driver.
754
755.. versionadded:: 2.0.41 - Added VECTOR datatype
756
757.. versionadded:: 2.0.43 - Added DENSE/SPARSE support
758
759CREATE TABLE support for VECTOR
760~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
761
762With the :class:`.VECTOR` datatype, you can specify the number of dimensions,
763the storage format, and the storage type for the data. Valid values for the
764storage format are enum members of :class:`.VectorStorageFormat`. Valid values
765for the storage type are enum members of :class:`.VectorStorageType`. If
766storage type is not specified, a DENSE vector is created by default.
767
768To create a table that includes a :class:`.VECTOR` column::
769
770 from sqlalchemy.dialects.oracle import (
771 VECTOR,
772 VectorStorageFormat,
773 VectorStorageType,
774 )
775
776 t = Table(
777 "t1",
778 metadata,
779 Column("id", Integer, primary_key=True),
780 Column(
781 "embedding",
782 VECTOR(
783 dim=3,
784 storage_format=VectorStorageFormat.FLOAT32,
785 storage_type=VectorStorageType.SPARSE,
786 ),
787 ),
788 Column(...),
789 ...,
790 )
791
792Vectors can also be defined with an arbitrary number of dimensions and formats.
793This allows you to specify vectors of different dimensions with the various
794storage formats mentioned below.
795
796**Examples**
797
798* In this case, the storage format is flexible, allowing any vector type data to be
799 inserted, such as INT8 or BINARY etc::
800
801 vector_col: Mapped[array.array] = mapped_column(VECTOR(dim=3))
802
803* The dimension is flexible in this case, meaning that any dimension vector can
804 be used::
805
806 vector_col: Mapped[array.array] = mapped_column(
807 VECTOR(storage_format=VectorStorageType.INT8)
808 )
809
810* Both the dimensions and the storage format are flexible. It creates a DENSE vector::
811
812 vector_col: Mapped[array.array] = mapped_column(VECTOR)
813
814* To create a SPARSE vector with both dimensions and the storage format as flexible,
815 use the :attr:`.VectorStorageType.SPARSE` storage type::
816
817 vector_col: Mapped[array.array] = mapped_column(
818 VECTOR(storage_type=VectorStorageType.SPARSE)
819 )
820
821Python Datatypes for VECTOR
822~~~~~~~~~~~~~~~~~~~~~~~~~~~
823
824VECTOR data can be inserted using Python list or Python ``array.array()`` objects.
825Python arrays of type FLOAT (32-bit), DOUBLE (64-bit), INT (8-bit signed integers),
826or BINARY (8-bit unsigned integers) are used as bind values when inserting
827VECTOR columns::
828
829 from sqlalchemy import insert, select
830
831 with engine.begin() as conn:
832 conn.execute(
833 insert(t1),
834 {"id": 1, "embedding": [1, 2, 3]},
835 )
836
837Data can be inserted into a sparse vector using the :class:`_oracle.SparseVector`
838class, creating an object consisting of the number of dimensions, an array of indices, and a
839corresponding array of values::
840
841 from sqlalchemy import insert, select
842 from sqlalchemy.dialects.oracle import SparseVector
843
844 sparse_val = SparseVector(10, [1, 2], array.array("d", [23.45, 221.22]))
845
846 with engine.begin() as conn:
847 conn.execute(
848 insert(t1),
849 {"id": 1, "embedding": sparse_val},
850 )
851
852VECTOR Indexes
853~~~~~~~~~~~~~~
854
855The VECTOR feature supports an Oracle-specific parameter ``oracle_vector``
856on the :class:`.Index` construct, which allows the construction of VECTOR
857indexes.
858
859SPARSE vectors cannot be used in the creation of vector indexes.
860
861To utilize VECTOR indexing, set the ``oracle_vector`` parameter to True to use
862the default values provided by Oracle. HNSW is the default indexing method::
863
864 from sqlalchemy import Index
865
866 Index(
867 "vector_index",
868 t1.c.embedding,
869 oracle_vector=True,
870 )
871
872The full range of parameters for vector indexes are available by using the
873:class:`.VectorIndexConfig` dataclass in place of a boolean; this dataclass
874allows full configuration of the index::
875
876 Index(
877 "hnsw_vector_index",
878 t1.c.embedding,
879 oracle_vector=VectorIndexConfig(
880 index_type=VectorIndexType.HNSW,
881 distance=VectorDistanceType.COSINE,
882 accuracy=90,
883 hnsw_neighbors=5,
884 hnsw_efconstruction=20,
885 parallel=10,
886 ),
887 )
888
889 Index(
890 "ivf_vector_index",
891 t1.c.embedding,
892 oracle_vector=VectorIndexConfig(
893 index_type=VectorIndexType.IVF,
894 distance=VectorDistanceType.DOT,
895 accuracy=90,
896 ivf_neighbor_partitions=5,
897 ),
898 )
899
900For complete explanation of these parameters, see the Oracle documentation linked
901below.
902
903.. seealso::
904
905 `CREATE VECTOR INDEX <https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B396C369-54BB-4098-A0DD-7C54B3A0D66F>`_ - in the Oracle documentation
906
907
908
909Similarity Searching
910~~~~~~~~~~~~~~~~~~~~
911
912When using the :class:`_oracle.VECTOR` datatype with a :class:`.Column` or similar
913ORM mapped construct, additional comparison functions are available, including:
914
915* ``l2_distance``
916* ``cosine_distance``
917* ``inner_product``
918
919Example Usage::
920
921 result_vector = connection.scalars(
922 select(t1).order_by(t1.embedding.l2_distance([2, 3, 4])).limit(3)
923 )
924
925 for user in vector:
926 print(user.id, user.embedding)
927
928FETCH APPROXIMATE support
929~~~~~~~~~~~~~~~~~~~~~~~~~
930
931Approximate vector search can only be performed when all syntax and semantic
932rules are satisfied, the corresponding vector index is available, and the
933query optimizer determines to perform it. If any of these conditions are
934unmet, then an approximate search is not performed. In this case the query
935returns exact results.
936
937To enable approximate searching during similarity searches on VECTORS, the
938``oracle_fetch_approximate`` parameter may be used with the :meth:`.Select.fetch`
939clause to add ``FETCH APPROX`` to the SELECT statement::
940
941 select(users_table).fetch(5, oracle_fetch_approximate=True)
942
943""" # noqa
944
945from __future__ import annotations
946
947from collections import defaultdict
948from dataclasses import fields
949from functools import lru_cache
950from functools import wraps
951import re
952
953from . import dictionary
954from .types import _OracleBoolean
955from .types import _OracleDate
956from .types import BFILE
957from .types import BINARY_DOUBLE
958from .types import BINARY_FLOAT
959from .types import DATE
960from .types import FLOAT
961from .types import INTERVAL
962from .types import LONG
963from .types import NCLOB
964from .types import NUMBER
965from .types import NVARCHAR2 # noqa
966from .types import OracleRaw # noqa
967from .types import RAW
968from .types import ROWID # noqa
969from .types import TIMESTAMP
970from .types import VARCHAR2 # noqa
971from .vector import VECTOR
972from .vector import VectorIndexConfig
973from .vector import VectorIndexType
974from ... import Computed
975from ... import exc
976from ... import schema as sa_schema
977from ... import sql
978from ... import util
979from ...engine import default
980from ...engine import ObjectKind
981from ...engine import ObjectScope
982from ...engine import reflection
983from ...engine.reflection import ReflectionDefaults
984from ...sql import and_
985from ...sql import bindparam
986from ...sql import compiler
987from ...sql import expression
988from ...sql import func
989from ...sql import null
990from ...sql import or_
991from ...sql import select
992from ...sql import selectable as sa_selectable
993from ...sql import sqltypes
994from ...sql import util as sql_util
995from ...sql import visitors
996from ...sql.visitors import InternalTraversal
997from ...types import BLOB
998from ...types import CHAR
999from ...types import CLOB
1000from ...types import DOUBLE_PRECISION
1001from ...types import INTEGER
1002from ...types import NCHAR
1003from ...types import NVARCHAR
1004from ...types import REAL
1005from ...types import VARCHAR
1006
1007RESERVED_WORDS = set(
1008 "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
1009 "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
1010 "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
1011 "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
1012 "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
1013 "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
1014 "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
1015 "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
1016 "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
1017)
1018
1019NO_ARG_FNS = set(
1020 "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split()
1021)
1022
1023
1024colspecs = {
1025 sqltypes.Boolean: _OracleBoolean,
1026 sqltypes.Interval: INTERVAL,
1027 sqltypes.DateTime: DATE,
1028 sqltypes.Date: _OracleDate,
1029}
1030
1031ischema_names = {
1032 "VARCHAR2": VARCHAR,
1033 "NVARCHAR2": NVARCHAR,
1034 "CHAR": CHAR,
1035 "NCHAR": NCHAR,
1036 "DATE": DATE,
1037 "NUMBER": NUMBER,
1038 "BLOB": BLOB,
1039 "BFILE": BFILE,
1040 "CLOB": CLOB,
1041 "NCLOB": NCLOB,
1042 "TIMESTAMP": TIMESTAMP,
1043 "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
1044 "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP,
1045 "INTERVAL DAY TO SECOND": INTERVAL,
1046 "RAW": RAW,
1047 "FLOAT": FLOAT,
1048 "DOUBLE PRECISION": DOUBLE_PRECISION,
1049 "REAL": REAL,
1050 "LONG": LONG,
1051 "BINARY_DOUBLE": BINARY_DOUBLE,
1052 "BINARY_FLOAT": BINARY_FLOAT,
1053 "ROWID": ROWID,
1054 "VECTOR": VECTOR,
1055}
1056
1057
1058class OracleTypeCompiler(compiler.GenericTypeCompiler):
1059 # Note:
1060 # Oracle DATE == DATETIME
1061 # Oracle does not allow milliseconds in DATE
1062 # Oracle does not support TIME columns
1063
1064 def visit_datetime(self, type_, **kw):
1065 return self.visit_DATE(type_, **kw)
1066
1067 def visit_float(self, type_, **kw):
1068 return self.visit_FLOAT(type_, **kw)
1069
1070 def visit_double(self, type_, **kw):
1071 return self.visit_DOUBLE_PRECISION(type_, **kw)
1072
1073 def visit_unicode(self, type_, **kw):
1074 if self.dialect._use_nchar_for_unicode:
1075 return self.visit_NVARCHAR2(type_, **kw)
1076 else:
1077 return self.visit_VARCHAR2(type_, **kw)
1078
1079 def visit_INTERVAL(self, type_, **kw):
1080 return "INTERVAL DAY%s TO SECOND%s" % (
1081 type_.day_precision is not None
1082 and "(%d)" % type_.day_precision
1083 or "",
1084 type_.second_precision is not None
1085 and "(%d)" % type_.second_precision
1086 or "",
1087 )
1088
1089 def visit_LONG(self, type_, **kw):
1090 return "LONG"
1091
1092 def visit_TIMESTAMP(self, type_, **kw):
1093 if getattr(type_, "local_timezone", False):
1094 return "TIMESTAMP WITH LOCAL TIME ZONE"
1095 elif type_.timezone:
1096 return "TIMESTAMP WITH TIME ZONE"
1097 else:
1098 return "TIMESTAMP"
1099
1100 def visit_DOUBLE_PRECISION(self, type_, **kw):
1101 return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
1102
1103 def visit_BINARY_DOUBLE(self, type_, **kw):
1104 return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
1105
1106 def visit_BINARY_FLOAT(self, type_, **kw):
1107 return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
1108
1109 def visit_FLOAT(self, type_, **kw):
1110 kw["_requires_binary_precision"] = True
1111 return self._generate_numeric(type_, "FLOAT", **kw)
1112
1113 def visit_NUMBER(self, type_, **kw):
1114 return self._generate_numeric(type_, "NUMBER", **kw)
1115
1116 def _generate_numeric(
1117 self,
1118 type_,
1119 name,
1120 precision=None,
1121 scale=None,
1122 _requires_binary_precision=False,
1123 **kw,
1124 ):
1125 if precision is None:
1126 precision = getattr(type_, "precision", None)
1127
1128 if _requires_binary_precision:
1129 binary_precision = getattr(type_, "binary_precision", None)
1130
1131 if precision and binary_precision is None:
1132 # https://www.oracletutorial.com/oracle-basics/oracle-float/
1133 estimated_binary_precision = int(precision / 0.30103)
1134 raise exc.ArgumentError(
1135 "Oracle Database FLOAT types use 'binary precision', "
1136 "which does not convert cleanly from decimal "
1137 "'precision'. Please specify "
1138 "this type with a separate Oracle Database variant, such "
1139 f"as {type_.__class__.__name__}(precision={precision})."
1140 f"with_variant(oracle.FLOAT"
1141 f"(binary_precision="
1142 f"{estimated_binary_precision}), 'oracle'), so that the "
1143 "Oracle Database specific 'binary_precision' may be "
1144 "specified accurately."
1145 )
1146 else:
1147 precision = binary_precision
1148
1149 if scale is None:
1150 scale = getattr(type_, "scale", None)
1151
1152 if precision is None:
1153 return name
1154 elif scale is None:
1155 n = "%(name)s(%(precision)s)"
1156 return n % {"name": name, "precision": precision}
1157 else:
1158 n = "%(name)s(%(precision)s, %(scale)s)"
1159 return n % {"name": name, "precision": precision, "scale": scale}
1160
1161 def visit_string(self, type_, **kw):
1162 return self.visit_VARCHAR2(type_, **kw)
1163
1164 def visit_VARCHAR2(self, type_, **kw):
1165 return self._visit_varchar(type_, "", "2")
1166
1167 def visit_NVARCHAR2(self, type_, **kw):
1168 return self._visit_varchar(type_, "N", "2")
1169
1170 visit_NVARCHAR = visit_NVARCHAR2
1171
1172 def visit_VARCHAR(self, type_, **kw):
1173 return self._visit_varchar(type_, "", "")
1174
1175 def _visit_varchar(self, type_, n, num):
1176 if not type_.length:
1177 return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
1178 elif not n and self.dialect._supports_char_length:
1179 varchar = "VARCHAR%(two)s(%(length)s CHAR)"
1180 return varchar % {"length": type_.length, "two": num}
1181 else:
1182 varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
1183 return varchar % {"length": type_.length, "two": num, "n": n}
1184
1185 def visit_text(self, type_, **kw):
1186 return self.visit_CLOB(type_, **kw)
1187
1188 def visit_unicode_text(self, type_, **kw):
1189 if self.dialect._use_nchar_for_unicode:
1190 return self.visit_NCLOB(type_, **kw)
1191 else:
1192 return self.visit_CLOB(type_, **kw)
1193
1194 def visit_large_binary(self, type_, **kw):
1195 return self.visit_BLOB(type_, **kw)
1196
1197 def visit_big_integer(self, type_, **kw):
1198 return self.visit_NUMBER(type_, precision=19, **kw)
1199
1200 def visit_boolean(self, type_, **kw):
1201 return self.visit_SMALLINT(type_, **kw)
1202
1203 def visit_RAW(self, type_, **kw):
1204 if type_.length:
1205 return "RAW(%(length)s)" % {"length": type_.length}
1206 else:
1207 return "RAW"
1208
1209 def visit_ROWID(self, type_, **kw):
1210 return "ROWID"
1211
1212 def visit_VECTOR(self, type_, **kw):
1213 dim = type_.dim if type_.dim is not None else "*"
1214 storage_format = (
1215 type_.storage_format.value
1216 if type_.storage_format is not None
1217 else "*"
1218 )
1219 storage_type = (
1220 type_.storage_type.value if type_.storage_type is not None else "*"
1221 )
1222 return f"VECTOR({dim},{storage_format},{storage_type})"
1223
1224
1225class OracleCompiler(compiler.SQLCompiler):
1226 """Oracle compiler modifies the lexical structure of Select
1227 statements to work under non-ANSI configured Oracle databases, if
1228 the use_ansi flag is False.
1229 """
1230
1231 compound_keywords = util.update_copy(
1232 compiler.SQLCompiler.compound_keywords,
1233 {expression.CompoundSelect.EXCEPT: "MINUS"},
1234 )
1235
1236 def __init__(self, *args, **kwargs):
1237 self.__wheres = {}
1238 super().__init__(*args, **kwargs)
1239
1240 def visit_mod_binary(self, binary, operator, **kw):
1241 return "mod(%s, %s)" % (
1242 self.process(binary.left, **kw),
1243 self.process(binary.right, **kw),
1244 )
1245
1246 def visit_now_func(self, fn, **kw):
1247 return "CURRENT_TIMESTAMP"
1248
1249 def visit_char_length_func(self, fn, **kw):
1250 return "LENGTH" + self.function_argspec(fn, **kw)
1251
1252 def visit_match_op_binary(self, binary, operator, **kw):
1253 return "CONTAINS (%s, %s)" % (
1254 self.process(binary.left),
1255 self.process(binary.right),
1256 )
1257
1258 def visit_true(self, expr, **kw):
1259 return "1"
1260
1261 def visit_false(self, expr, **kw):
1262 return "0"
1263
1264 def get_cte_preamble(self, recursive):
1265 return "WITH"
1266
1267 def get_select_hint_text(self, byfroms):
1268 return " ".join("/*+ %s */" % text for table, text in byfroms.items())
1269
1270 def function_argspec(self, fn, **kw):
1271 if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
1272 return compiler.SQLCompiler.function_argspec(self, fn, **kw)
1273 else:
1274 return ""
1275
1276 def visit_function(self, func, **kw):
1277 text = super().visit_function(func, **kw)
1278 if kw.get("asfrom", False) and func.name.lower() != "table":
1279 text = "TABLE (%s)" % text
1280 return text
1281
1282 def visit_table_valued_column(self, element, **kw):
1283 text = super().visit_table_valued_column(element, **kw)
1284 text = text + ".COLUMN_VALUE"
1285 return text
1286
1287 def default_from(self):
1288 """Called when a ``SELECT`` statement has no froms,
1289 and no ``FROM`` clause is to be appended.
1290
1291 The Oracle compiler tacks a "FROM DUAL" to the statement.
1292 """
1293
1294 return " FROM DUAL"
1295
1296 def visit_join(self, join, from_linter=None, **kwargs):
1297 if self.dialect.use_ansi:
1298 return compiler.SQLCompiler.visit_join(
1299 self, join, from_linter=from_linter, **kwargs
1300 )
1301 else:
1302 if from_linter:
1303 from_linter.edges.add((join.left, join.right))
1304
1305 kwargs["asfrom"] = True
1306 if isinstance(join.right, expression.FromGrouping):
1307 right = join.right.element
1308 else:
1309 right = join.right
1310 return (
1311 self.process(join.left, from_linter=from_linter, **kwargs)
1312 + ", "
1313 + self.process(right, from_linter=from_linter, **kwargs)
1314 )
1315
1316 def _get_nonansi_join_whereclause(self, froms):
1317 clauses = []
1318
1319 def visit_join(join):
1320 if join.isouter:
1321 # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
1322 # "apply the outer join operator (+) to all columns of B in
1323 # the join condition in the WHERE clause" - that is,
1324 # unconditionally regardless of operator or the other side
1325 def visit_binary(binary):
1326 if isinstance(
1327 binary.left, expression.ColumnClause
1328 ) and join.right.is_derived_from(binary.left.table):
1329 binary.left = _OuterJoinColumn(binary.left)
1330 elif isinstance(
1331 binary.right, expression.ColumnClause
1332 ) and join.right.is_derived_from(binary.right.table):
1333 binary.right = _OuterJoinColumn(binary.right)
1334
1335 clauses.append(
1336 visitors.cloned_traverse(
1337 join.onclause, {}, {"binary": visit_binary}
1338 )
1339 )
1340 else:
1341 clauses.append(join.onclause)
1342
1343 for j in join.left, join.right:
1344 if isinstance(j, expression.Join):
1345 visit_join(j)
1346 elif isinstance(j, expression.FromGrouping):
1347 visit_join(j.element)
1348
1349 for f in froms:
1350 if isinstance(f, expression.Join):
1351 visit_join(f)
1352
1353 if not clauses:
1354 return None
1355 else:
1356 return sql.and_(*clauses)
1357
1358 def visit_outer_join_column(self, vc, **kw):
1359 return self.process(vc.column, **kw) + "(+)"
1360
1361 def visit_sequence(self, seq, **kw):
1362 return self.preparer.format_sequence(seq) + ".nextval"
1363
1364 def get_render_as_alias_suffix(self, alias_name_text):
1365 """Oracle doesn't like ``FROM table AS alias``"""
1366
1367 return " " + alias_name_text
1368
1369 def returning_clause(
1370 self, stmt, returning_cols, *, populate_result_map, **kw
1371 ):
1372 columns = []
1373 binds = []
1374
1375 for i, column in enumerate(
1376 expression._select_iterables(returning_cols)
1377 ):
1378 if (
1379 self.isupdate
1380 and isinstance(column, sa_schema.Column)
1381 and isinstance(column.server_default, Computed)
1382 and not self.dialect._supports_update_returning_computed_cols
1383 ):
1384 util.warn(
1385 "Computed columns don't work with Oracle Database UPDATE "
1386 "statements that use RETURNING; the value of the column "
1387 "*before* the UPDATE takes place is returned. It is "
1388 "advised to not use RETURNING with an Oracle Database "
1389 "computed column. Consider setting implicit_returning "
1390 "to False on the Table object in order to avoid implicit "
1391 "RETURNING clauses from being generated for this Table."
1392 )
1393 if column.type._has_column_expression:
1394 col_expr = column.type.column_expression(column)
1395 else:
1396 col_expr = column
1397
1398 outparam = sql.outparam("ret_%d" % i, type_=column.type)
1399 self.binds[outparam.key] = outparam
1400 binds.append(
1401 self.bindparam_string(self._truncate_bindparam(outparam))
1402 )
1403
1404 # has_out_parameters would in a normal case be set to True
1405 # as a result of the compiler visiting an outparam() object.
1406 # in this case, the above outparam() objects are not being
1407 # visited. Ensure the statement itself didn't have other
1408 # outparam() objects independently.
1409 # technically, this could be supported, but as it would be
1410 # a very strange use case without a clear rationale, disallow it
1411 if self.has_out_parameters:
1412 raise exc.InvalidRequestError(
1413 "Using explicit outparam() objects with "
1414 "UpdateBase.returning() in the same Core DML statement "
1415 "is not supported in the Oracle Database dialects."
1416 )
1417
1418 self._oracle_returning = True
1419
1420 columns.append(self.process(col_expr, within_columns_clause=False))
1421 if populate_result_map:
1422 self._add_to_result_map(
1423 getattr(col_expr, "name", col_expr._anon_name_label),
1424 getattr(col_expr, "name", col_expr._anon_name_label),
1425 (
1426 column,
1427 getattr(column, "name", None),
1428 getattr(column, "key", None),
1429 ),
1430 column.type,
1431 )
1432
1433 return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
1434
1435 def _row_limit_clause(self, select, **kw):
1436 """Oracle Database 12c supports OFFSET/FETCH operators
1437 Use it instead subquery with row_number
1438
1439 """
1440
1441 if (
1442 select._fetch_clause is not None
1443 or not self.dialect._supports_offset_fetch
1444 ):
1445 return super()._row_limit_clause(
1446 select, use_literal_execute_for_simple_int=True, **kw
1447 )
1448 else:
1449 return self.fetch_clause(
1450 select,
1451 fetch_clause=self._get_limit_or_fetch(select),
1452 use_literal_execute_for_simple_int=True,
1453 **kw,
1454 )
1455
1456 def _get_limit_or_fetch(self, select):
1457 if select._fetch_clause is None:
1458 return select._limit_clause
1459 else:
1460 return select._fetch_clause
1461
1462 def fetch_clause(
1463 self,
1464 select,
1465 fetch_clause=None,
1466 require_offset=False,
1467 use_literal_execute_for_simple_int=False,
1468 **kw,
1469 ):
1470 text = super().fetch_clause(
1471 select,
1472 fetch_clause=fetch_clause,
1473 require_offset=require_offset,
1474 use_literal_execute_for_simple_int=(
1475 use_literal_execute_for_simple_int
1476 ),
1477 **kw,
1478 )
1479
1480 if select.dialect_options["oracle"]["fetch_approximate"]:
1481 text = re.sub("FETCH FIRST", "FETCH APPROX FIRST", text)
1482
1483 return text
1484
1485 def translate_select_structure(self, select_stmt, **kwargs):
1486 select = select_stmt
1487
1488 if not getattr(select, "_oracle_visit", None):
1489 if not self.dialect.use_ansi:
1490 froms = self._display_froms_for_select(
1491 select, kwargs.get("asfrom", False)
1492 )
1493 whereclause = self._get_nonansi_join_whereclause(froms)
1494 if whereclause is not None:
1495 select = select.where(whereclause)
1496 select._oracle_visit = True
1497
1498 # if fetch is used this is not needed
1499 if (
1500 select._has_row_limiting_clause
1501 and not self.dialect._supports_offset_fetch
1502 and select._fetch_clause is None
1503 ):
1504 limit_clause = select._limit_clause
1505 offset_clause = select._offset_clause
1506
1507 if select._simple_int_clause(limit_clause):
1508 limit_clause = limit_clause.render_literal_execute()
1509
1510 if select._simple_int_clause(offset_clause):
1511 offset_clause = offset_clause.render_literal_execute()
1512
1513 # currently using form at:
1514 # https://blogs.oracle.com/oraclemagazine/\
1515 # on-rownum-and-limiting-results
1516
1517 orig_select = select
1518 select = select._generate()
1519 select._oracle_visit = True
1520
1521 # add expressions to accommodate FOR UPDATE OF
1522 for_update = select._for_update_arg
1523 if for_update is not None and for_update.of:
1524 for_update = for_update._clone()
1525 for_update._copy_internals()
1526
1527 for elem in for_update.of:
1528 if not select.selected_columns.contains_column(elem):
1529 select = select.add_columns(elem)
1530
1531 # Wrap the middle select and add the hint
1532 inner_subquery = select.alias()
1533 limitselect = sql.select(
1534 *[
1535 c
1536 for c in inner_subquery.c
1537 if orig_select.selected_columns.corresponding_column(c)
1538 is not None
1539 ]
1540 )
1541
1542 if (
1543 limit_clause is not None
1544 and self.dialect.optimize_limits
1545 and select._simple_int_clause(limit_clause)
1546 ):
1547 limitselect = limitselect.prefix_with(
1548 expression.text(
1549 "/*+ FIRST_ROWS(%s) */"
1550 % self.process(limit_clause, **kwargs)
1551 )
1552 )
1553
1554 limitselect._oracle_visit = True
1555 limitselect._is_wrapper = True
1556
1557 # add expressions to accommodate FOR UPDATE OF
1558 if for_update is not None and for_update.of:
1559 adapter = sql_util.ClauseAdapter(inner_subquery)
1560 for_update.of = [
1561 adapter.traverse(elem) for elem in for_update.of
1562 ]
1563
1564 # If needed, add the limiting clause
1565 if limit_clause is not None:
1566 if select._simple_int_clause(limit_clause) and (
1567 offset_clause is None
1568 or select._simple_int_clause(offset_clause)
1569 ):
1570 max_row = limit_clause
1571
1572 if offset_clause is not None:
1573 max_row = max_row + offset_clause
1574
1575 else:
1576 max_row = limit_clause
1577
1578 if offset_clause is not None:
1579 max_row = max_row + offset_clause
1580 limitselect = limitselect.where(
1581 sql.literal_column("ROWNUM") <= max_row
1582 )
1583
1584 # If needed, add the ora_rn, and wrap again with offset.
1585 if offset_clause is None:
1586 limitselect._for_update_arg = for_update
1587 select = limitselect
1588 else:
1589 limitselect = limitselect.add_columns(
1590 sql.literal_column("ROWNUM").label("ora_rn")
1591 )
1592 limitselect._oracle_visit = True
1593 limitselect._is_wrapper = True
1594
1595 if for_update is not None and for_update.of:
1596 limitselect_cols = limitselect.selected_columns
1597 for elem in for_update.of:
1598 if (
1599 limitselect_cols.corresponding_column(elem)
1600 is None
1601 ):
1602 limitselect = limitselect.add_columns(elem)
1603
1604 limit_subquery = limitselect.alias()
1605 origselect_cols = orig_select.selected_columns
1606 offsetselect = sql.select(
1607 *[
1608 c
1609 for c in limit_subquery.c
1610 if origselect_cols.corresponding_column(c)
1611 is not None
1612 ]
1613 )
1614
1615 offsetselect._oracle_visit = True
1616 offsetselect._is_wrapper = True
1617
1618 if for_update is not None and for_update.of:
1619 adapter = sql_util.ClauseAdapter(limit_subquery)
1620 for_update.of = [
1621 adapter.traverse(elem) for elem in for_update.of
1622 ]
1623
1624 offsetselect = offsetselect.where(
1625 sql.literal_column("ora_rn") > offset_clause
1626 )
1627
1628 offsetselect._for_update_arg = for_update
1629 select = offsetselect
1630
1631 return select
1632
1633 def limit_clause(self, select, **kw):
1634 return ""
1635
1636 def visit_empty_set_expr(self, type_, **kw):
1637 return "SELECT 1 FROM DUAL WHERE 1!=1"
1638
1639 def for_update_clause(self, select, **kw):
1640 if self.is_subquery():
1641 return ""
1642
1643 tmp = " FOR UPDATE"
1644
1645 if select._for_update_arg.of:
1646 tmp += " OF " + ", ".join(
1647 self.process(elem, **kw) for elem in select._for_update_arg.of
1648 )
1649
1650 if select._for_update_arg.nowait:
1651 tmp += " NOWAIT"
1652 if select._for_update_arg.skip_locked:
1653 tmp += " SKIP LOCKED"
1654
1655 return tmp
1656
1657 def visit_is_distinct_from_binary(self, binary, operator, **kw):
1658 return "DECODE(%s, %s, 0, 1) = 1" % (
1659 self.process(binary.left),
1660 self.process(binary.right),
1661 )
1662
1663 def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
1664 return "DECODE(%s, %s, 0, 1) = 0" % (
1665 self.process(binary.left),
1666 self.process(binary.right),
1667 )
1668
1669 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1670 string = self.process(binary.left, **kw)
1671 pattern = self.process(binary.right, **kw)
1672 flags = binary.modifiers["flags"]
1673 if flags is None:
1674 return "REGEXP_LIKE(%s, %s)" % (string, pattern)
1675 else:
1676 return "REGEXP_LIKE(%s, %s, %s)" % (
1677 string,
1678 pattern,
1679 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1680 )
1681
1682 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1683 return "NOT %s" % self.visit_regexp_match_op_binary(
1684 binary, operator, **kw
1685 )
1686
1687 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
1688 string = self.process(binary.left, **kw)
1689 pattern_replace = self.process(binary.right, **kw)
1690 flags = binary.modifiers["flags"]
1691 if flags is None:
1692 return "REGEXP_REPLACE(%s, %s)" % (
1693 string,
1694 pattern_replace,
1695 )
1696 else:
1697 return "REGEXP_REPLACE(%s, %s, %s)" % (
1698 string,
1699 pattern_replace,
1700 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1701 )
1702
1703 def visit_aggregate_strings_func(self, fn, **kw):
1704 return "LISTAGG%s" % self.function_argspec(fn, **kw)
1705
1706 def _visit_bitwise(self, binary, fn_name, custom_right=None, **kw):
1707 left = self.process(binary.left, **kw)
1708 right = self.process(
1709 custom_right if custom_right is not None else binary.right, **kw
1710 )
1711 return f"{fn_name}({left}, {right})"
1712
1713 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1714 return self._visit_bitwise(binary, "BITXOR", **kw)
1715
1716 def visit_bitwise_or_op_binary(self, binary, operator, **kw):
1717 return self._visit_bitwise(binary, "BITOR", **kw)
1718
1719 def visit_bitwise_and_op_binary(self, binary, operator, **kw):
1720 return self._visit_bitwise(binary, "BITAND", **kw)
1721
1722 def visit_bitwise_rshift_op_binary(self, binary, operator, **kw):
1723 raise exc.CompileError("Cannot compile bitwise_rshift in oracle")
1724
1725 def visit_bitwise_lshift_op_binary(self, binary, operator, **kw):
1726 raise exc.CompileError("Cannot compile bitwise_lshift in oracle")
1727
1728 def visit_bitwise_not_op_unary_operator(self, element, operator, **kw):
1729 raise exc.CompileError("Cannot compile bitwise_not in oracle")
1730
1731
1732class OracleDDLCompiler(compiler.DDLCompiler):
1733
1734 def _build_vector_index_config(
1735 self, vector_index_config: VectorIndexConfig
1736 ) -> str:
1737 parts = []
1738 sql_param_name = {
1739 "hnsw_neighbors": "neighbors",
1740 "hnsw_efconstruction": "efconstruction",
1741 "ivf_neighbor_partitions": "neighbor partitions",
1742 "ivf_sample_per_partition": "sample_per_partition",
1743 "ivf_min_vectors_per_partition": "min_vectors_per_partition",
1744 }
1745 if vector_index_config.index_type == VectorIndexType.HNSW:
1746 parts.append("ORGANIZATION INMEMORY NEIGHBOR GRAPH")
1747 elif vector_index_config.index_type == VectorIndexType.IVF:
1748 parts.append("ORGANIZATION NEIGHBOR PARTITIONS")
1749 if vector_index_config.distance is not None:
1750 parts.append(f"DISTANCE {vector_index_config.distance.value}")
1751
1752 if vector_index_config.accuracy is not None:
1753 parts.append(
1754 f"WITH TARGET ACCURACY {vector_index_config.accuracy}"
1755 )
1756
1757 parameters_str = [f"type {vector_index_config.index_type.name}"]
1758 prefix = vector_index_config.index_type.name.lower() + "_"
1759
1760 for field in fields(vector_index_config):
1761 if field.name.startswith(prefix):
1762 key = sql_param_name.get(field.name)
1763 value = getattr(vector_index_config, field.name)
1764 if value is not None:
1765 parameters_str.append(f"{key} {value}")
1766
1767 parameters_str = ", ".join(parameters_str)
1768 parts.append(f"PARAMETERS ({parameters_str})")
1769
1770 if vector_index_config.parallel is not None:
1771 parts.append(f"PARALLEL {vector_index_config.parallel}")
1772
1773 return " ".join(parts)
1774
1775 def define_constraint_cascades(self, constraint):
1776 text = ""
1777 if constraint.ondelete is not None:
1778 text += " ON DELETE %s" % constraint.ondelete
1779
1780 # oracle has no ON UPDATE CASCADE -
1781 # its only available via triggers
1782 # https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
1783 if constraint.onupdate is not None:
1784 util.warn(
1785 "Oracle Database does not contain native UPDATE CASCADE "
1786 "functionality - onupdates will not be rendered for foreign "
1787 "keys. Consider using deferrable=True, initially='deferred' "
1788 "or triggers."
1789 )
1790
1791 return text
1792
1793 def visit_drop_table_comment(self, drop, **kw):
1794 return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
1795 drop.element
1796 )
1797
1798 def visit_create_index(self, create, **kw):
1799 index = create.element
1800 self._verify_index_table(index)
1801 preparer = self.preparer
1802 text = "CREATE "
1803 if index.unique:
1804 text += "UNIQUE "
1805 if index.dialect_options["oracle"]["bitmap"]:
1806 text += "BITMAP "
1807 vector_options = index.dialect_options["oracle"]["vector"]
1808 if vector_options:
1809 text += "VECTOR "
1810 text += "INDEX %s ON %s (%s)" % (
1811 self._prepared_index_name(index, include_schema=True),
1812 preparer.format_table(index.table, use_schema=True),
1813 ", ".join(
1814 self.sql_compiler.process(
1815 expr, include_table=False, literal_binds=True
1816 )
1817 for expr in index.expressions
1818 ),
1819 )
1820 if index.dialect_options["oracle"]["compress"] is not False:
1821 if index.dialect_options["oracle"]["compress"] is True:
1822 text += " COMPRESS"
1823 else:
1824 text += " COMPRESS %d" % (
1825 index.dialect_options["oracle"]["compress"]
1826 )
1827 if vector_options:
1828 if vector_options is True:
1829 vector_options = VectorIndexConfig()
1830
1831 text += " " + self._build_vector_index_config(vector_options)
1832 return text
1833
1834 def post_create_table(self, table):
1835 table_opts = []
1836 opts = table.dialect_options["oracle"]
1837
1838 if opts["on_commit"]:
1839 on_commit_options = opts["on_commit"].replace("_", " ").upper()
1840 table_opts.append("\n ON COMMIT %s" % on_commit_options)
1841
1842 if opts["compress"]:
1843 if opts["compress"] is True:
1844 table_opts.append("\n COMPRESS")
1845 else:
1846 table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
1847 if opts["tablespace"]:
1848 table_opts.append(
1849 "\n TABLESPACE %s" % self.preparer.quote(opts["tablespace"])
1850 )
1851 return "".join(table_opts)
1852
1853 def get_identity_options(self, identity_options):
1854 text = super().get_identity_options(identity_options)
1855 text = text.replace("NO MINVALUE", "NOMINVALUE")
1856 text = text.replace("NO MAXVALUE", "NOMAXVALUE")
1857 text = text.replace("NO CYCLE", "NOCYCLE")
1858 if identity_options.order is not None:
1859 text += " ORDER" if identity_options.order else " NOORDER"
1860 return text.strip()
1861
1862 def visit_computed_column(self, generated, **kw):
1863 text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
1864 generated.sqltext, include_table=False, literal_binds=True
1865 )
1866 if generated.persisted is True:
1867 raise exc.CompileError(
1868 "Oracle Database computed columns do not support 'stored' "
1869 "persistence; set the 'persisted' flag to None or False for "
1870 "Oracle Database support."
1871 )
1872 elif generated.persisted is False:
1873 text += " VIRTUAL"
1874 return text
1875
1876 def visit_identity_column(self, identity, **kw):
1877 if identity.always is None:
1878 kind = ""
1879 else:
1880 kind = "ALWAYS" if identity.always else "BY DEFAULT"
1881 text = "GENERATED %s" % kind
1882 if identity.on_null:
1883 text += " ON NULL"
1884 text += " AS IDENTITY"
1885 options = self.get_identity_options(identity)
1886 if options:
1887 text += " (%s)" % options
1888 return text
1889
1890
1891class OracleIdentifierPreparer(compiler.IdentifierPreparer):
1892 reserved_words = {x.lower() for x in RESERVED_WORDS}
1893 illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
1894 ["_", "$"]
1895 )
1896
1897 def _bindparam_requires_quotes(self, value):
1898 """Return True if the given identifier requires quoting."""
1899 lc_value = value.lower()
1900 return (
1901 lc_value in self.reserved_words
1902 or value[0] in self.illegal_initial_characters
1903 or not self.legal_characters.match(str(value))
1904 )
1905
1906 def format_savepoint(self, savepoint):
1907 name = savepoint.ident.lstrip("_")
1908 return super().format_savepoint(savepoint, name)
1909
1910
1911class OracleExecutionContext(default.DefaultExecutionContext):
1912 def fire_sequence(self, seq, type_):
1913 return self._execute_scalar(
1914 "SELECT "
1915 + self.identifier_preparer.format_sequence(seq)
1916 + ".nextval FROM DUAL",
1917 type_,
1918 )
1919
1920 def pre_exec(self):
1921 if self.statement and "_oracle_dblink" in self.execution_options:
1922 self.statement = self.statement.replace(
1923 dictionary.DB_LINK_PLACEHOLDER,
1924 self.execution_options["_oracle_dblink"],
1925 )
1926
1927
1928class OracleDialect(default.DefaultDialect):
1929 name = "oracle"
1930 supports_statement_cache = True
1931 supports_alter = True
1932 max_identifier_length = 128
1933
1934 _supports_offset_fetch = True
1935
1936 insert_returning = True
1937 update_returning = True
1938 delete_returning = True
1939
1940 div_is_floordiv = False
1941
1942 supports_simple_order_by_label = False
1943 cte_follows_insert = True
1944 returns_native_bytes = True
1945
1946 supports_sequences = True
1947 sequences_optional = False
1948 postfetch_lastrowid = False
1949
1950 default_paramstyle = "named"
1951 colspecs = colspecs
1952 ischema_names = ischema_names
1953 requires_name_normalize = True
1954
1955 supports_comments = True
1956
1957 supports_default_values = False
1958 supports_default_metavalue = True
1959 supports_empty_insert = False
1960 supports_identity_columns = True
1961
1962 statement_compiler = OracleCompiler
1963 ddl_compiler = OracleDDLCompiler
1964 type_compiler_cls = OracleTypeCompiler
1965 preparer = OracleIdentifierPreparer
1966 execution_ctx_cls = OracleExecutionContext
1967
1968 reflection_options = ("oracle_resolve_synonyms",)
1969
1970 _use_nchar_for_unicode = False
1971
1972 construct_arguments = [
1973 (
1974 sa_schema.Table,
1975 {
1976 "resolve_synonyms": False,
1977 "on_commit": None,
1978 "compress": False,
1979 "tablespace": None,
1980 },
1981 ),
1982 (
1983 sa_schema.Index,
1984 {
1985 "bitmap": False,
1986 "compress": False,
1987 "vector": False,
1988 },
1989 ),
1990 (sa_selectable.Select, {"fetch_approximate": False}),
1991 (sa_selectable.CompoundSelect, {"fetch_approximate": False}),
1992 ]
1993
1994 @util.deprecated_params(
1995 use_binds_for_limits=(
1996 "1.4",
1997 "The ``use_binds_for_limits`` Oracle Database dialect parameter "
1998 "is deprecated. The dialect now renders LIMIT / OFFSET integers "
1999 "inline in all cases using a post-compilation hook, so that the "
2000 "value is still represented by a 'bound parameter' on the Core "
2001 "Expression side.",
2002 )
2003 )
2004 def __init__(
2005 self,
2006 use_ansi=True,
2007 optimize_limits=False,
2008 use_binds_for_limits=None,
2009 use_nchar_for_unicode=False,
2010 exclude_tablespaces=("SYSTEM", "SYSAUX"),
2011 enable_offset_fetch=True,
2012 **kwargs,
2013 ):
2014 default.DefaultDialect.__init__(self, **kwargs)
2015 self._use_nchar_for_unicode = use_nchar_for_unicode
2016 self.use_ansi = use_ansi
2017 self.optimize_limits = optimize_limits
2018 self.exclude_tablespaces = exclude_tablespaces
2019 self.enable_offset_fetch = self._supports_offset_fetch = (
2020 enable_offset_fetch
2021 )
2022
2023 def initialize(self, connection):
2024 super().initialize(connection)
2025
2026 # Oracle 8i has RETURNING:
2027 # https://docs.oracle.com/cd/A87860_01/doc/index.htm
2028
2029 # so does Oracle8:
2030 # https://docs.oracle.com/cd/A64702_01/doc/index.htm
2031
2032 if self._is_oracle_8:
2033 self.colspecs = self.colspecs.copy()
2034 self.colspecs.pop(sqltypes.Interval)
2035 self.use_ansi = False
2036
2037 self.supports_identity_columns = self.server_version_info >= (12,)
2038 self._supports_offset_fetch = (
2039 self.enable_offset_fetch and self.server_version_info >= (12,)
2040 )
2041
2042 def _get_effective_compat_server_version_info(self, connection):
2043 # dialect does not need compat levels below 12.2, so don't query
2044 # in those cases
2045
2046 if self.server_version_info < (12, 2):
2047 return self.server_version_info
2048 try:
2049 compat = connection.exec_driver_sql(
2050 "SELECT value FROM v$parameter WHERE name = 'compatible'"
2051 ).scalar()
2052 except exc.DBAPIError:
2053 compat = None
2054
2055 if compat:
2056 try:
2057 return tuple(int(x) for x in compat.split("."))
2058 except:
2059 return self.server_version_info
2060 else:
2061 return self.server_version_info
2062
2063 @property
2064 def _is_oracle_8(self):
2065 return self.server_version_info and self.server_version_info < (9,)
2066
2067 @property
2068 def _supports_table_compression(self):
2069 return self.server_version_info and self.server_version_info >= (10, 1)
2070
2071 @property
2072 def _supports_table_compress_for(self):
2073 return self.server_version_info and self.server_version_info >= (11,)
2074
2075 @property
2076 def _supports_char_length(self):
2077 return not self._is_oracle_8
2078
2079 @property
2080 def _supports_update_returning_computed_cols(self):
2081 # on version 18 this error is no longet present while it happens on 11
2082 # it may work also on versions before the 18
2083 return self.server_version_info and self.server_version_info >= (18,)
2084
2085 @property
2086 def _supports_except_all(self):
2087 return self.server_version_info and self.server_version_info >= (21,)
2088
2089 def do_release_savepoint(self, connection, name):
2090 # Oracle does not support RELEASE SAVEPOINT
2091 pass
2092
2093 def _check_max_identifier_length(self, connection):
2094 if self._get_effective_compat_server_version_info(connection) < (
2095 12,
2096 2,
2097 ):
2098 return 30
2099 else:
2100 # use the default
2101 return None
2102
2103 def get_isolation_level_values(self, dbapi_connection):
2104 return ["READ COMMITTED", "SERIALIZABLE"]
2105
2106 def get_default_isolation_level(self, dbapi_conn):
2107 try:
2108 return self.get_isolation_level(dbapi_conn)
2109 except NotImplementedError:
2110 raise
2111 except:
2112 return "READ COMMITTED"
2113
2114 def _execute_reflection(
2115 self, connection, query, dblink, returns_long, params=None
2116 ):
2117 if dblink and not dblink.startswith("@"):
2118 dblink = f"@{dblink}"
2119 execution_options = {
2120 # handle db links
2121 "_oracle_dblink": dblink or "",
2122 # override any schema translate map
2123 "schema_translate_map": None,
2124 }
2125
2126 if dblink and returns_long:
2127 # Oracle seems to error with
2128 # "ORA-00997: illegal use of LONG datatype" when returning
2129 # LONG columns via a dblink in a query with bind params
2130 # This type seems to be very hard to cast into something else
2131 # so it seems easier to just use bind param in this case
2132 def visit_bindparam(bindparam):
2133 bindparam.literal_execute = True
2134
2135 query = visitors.cloned_traverse(
2136 query, {}, {"bindparam": visit_bindparam}
2137 )
2138 return connection.execute(
2139 query, params, execution_options=execution_options
2140 )
2141
2142 @util.memoized_property
2143 def _has_table_query(self):
2144 # materialized views are returned by all_tables
2145 tables = (
2146 select(
2147 dictionary.all_tables.c.table_name,
2148 dictionary.all_tables.c.owner,
2149 )
2150 .union_all(
2151 select(
2152 dictionary.all_views.c.view_name.label("table_name"),
2153 dictionary.all_views.c.owner,
2154 )
2155 )
2156 .subquery("tables_and_views")
2157 )
2158
2159 query = select(tables.c.table_name).where(
2160 tables.c.table_name == bindparam("table_name"),
2161 tables.c.owner == bindparam("owner"),
2162 )
2163 return query
2164
2165 @reflection.cache
2166 def has_table(
2167 self, connection, table_name, schema=None, dblink=None, **kw
2168 ):
2169 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2170 self._ensure_has_table_connection(connection)
2171
2172 if not schema:
2173 schema = self.default_schema_name
2174
2175 params = {
2176 "table_name": self.denormalize_name(table_name),
2177 "owner": self.denormalize_schema_name(schema),
2178 }
2179 cursor = self._execute_reflection(
2180 connection,
2181 self._has_table_query,
2182 dblink,
2183 returns_long=False,
2184 params=params,
2185 )
2186 return bool(cursor.scalar())
2187
2188 @reflection.cache
2189 def has_sequence(
2190 self, connection, sequence_name, schema=None, dblink=None, **kw
2191 ):
2192 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2193 if not schema:
2194 schema = self.default_schema_name
2195
2196 query = select(dictionary.all_sequences.c.sequence_name).where(
2197 dictionary.all_sequences.c.sequence_name
2198 == self.denormalize_schema_name(sequence_name),
2199 dictionary.all_sequences.c.sequence_owner
2200 == self.denormalize_schema_name(schema),
2201 )
2202
2203 cursor = self._execute_reflection(
2204 connection, query, dblink, returns_long=False
2205 )
2206 return bool(cursor.scalar())
2207
2208 def _get_default_schema_name(self, connection):
2209 return self.normalize_name(
2210 connection.exec_driver_sql(
2211 "select sys_context( 'userenv', 'current_schema' ) from dual"
2212 ).scalar()
2213 )
2214
2215 def denormalize_schema_name(self, name):
2216 # look for quoted_name
2217 force = getattr(name, "quote", None)
2218 if force is None and name == "public":
2219 # look for case insensitive, no quoting specified, "public"
2220 return "PUBLIC"
2221 return super().denormalize_name(name)
2222
2223 @reflection.flexi_cache(
2224 ("schema", InternalTraversal.dp_string),
2225 ("filter_names", InternalTraversal.dp_string_list),
2226 ("dblink", InternalTraversal.dp_string),
2227 )
2228 def _get_synonyms(self, connection, schema, filter_names, dblink, **kw):
2229 owner = self.denormalize_schema_name(
2230 schema or self.default_schema_name
2231 )
2232
2233 has_filter_names, params = self._prepare_filter_names(filter_names)
2234 query = select(
2235 dictionary.all_synonyms.c.synonym_name,
2236 dictionary.all_synonyms.c.table_name,
2237 dictionary.all_synonyms.c.table_owner,
2238 dictionary.all_synonyms.c.db_link,
2239 ).where(dictionary.all_synonyms.c.owner == owner)
2240 if has_filter_names:
2241 query = query.where(
2242 dictionary.all_synonyms.c.synonym_name.in_(
2243 params["filter_names"]
2244 )
2245 )
2246 result = self._execute_reflection(
2247 connection, query, dblink, returns_long=False
2248 ).mappings()
2249 return result.all()
2250
2251 @lru_cache()
2252 def _all_objects_query(
2253 self, owner, scope, kind, has_filter_names, has_mat_views
2254 ):
2255 query = (
2256 select(dictionary.all_objects.c.object_name)
2257 .select_from(dictionary.all_objects)
2258 .where(dictionary.all_objects.c.owner == owner)
2259 )
2260
2261 # NOTE: materialized views are listed in all_objects twice;
2262 # once as MATERIALIZE VIEW and once as TABLE
2263 if kind is ObjectKind.ANY:
2264 # materilaized view are listed also as tables so there is no
2265 # need to add them to the in_.
2266 query = query.where(
2267 dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW"))
2268 )
2269 else:
2270 object_type = []
2271 if ObjectKind.VIEW in kind:
2272 object_type.append("VIEW")
2273 if (
2274 ObjectKind.MATERIALIZED_VIEW in kind
2275 and ObjectKind.TABLE not in kind
2276 ):
2277 # materilaized view are listed also as tables so there is no
2278 # need to add them to the in_ if also selecting tables.
2279 object_type.append("MATERIALIZED VIEW")
2280 if ObjectKind.TABLE in kind:
2281 object_type.append("TABLE")
2282 if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind:
2283 # materialized view are listed also as tables,
2284 # so they need to be filtered out
2285 # EXCEPT ALL / MINUS profiles as faster than using
2286 # NOT EXISTS or NOT IN with a subquery, but it's in
2287 # general faster to get the mat view names and exclude
2288 # them only when needed
2289 query = query.where(
2290 dictionary.all_objects.c.object_name.not_in(
2291 bindparam("mat_views")
2292 )
2293 )
2294 query = query.where(
2295 dictionary.all_objects.c.object_type.in_(object_type)
2296 )
2297
2298 # handles scope
2299 if scope is ObjectScope.DEFAULT:
2300 query = query.where(dictionary.all_objects.c.temporary == "N")
2301 elif scope is ObjectScope.TEMPORARY:
2302 query = query.where(dictionary.all_objects.c.temporary == "Y")
2303
2304 if has_filter_names:
2305 query = query.where(
2306 dictionary.all_objects.c.object_name.in_(
2307 bindparam("filter_names")
2308 )
2309 )
2310 return query
2311
2312 @reflection.flexi_cache(
2313 ("schema", InternalTraversal.dp_string),
2314 ("scope", InternalTraversal.dp_plain_obj),
2315 ("kind", InternalTraversal.dp_plain_obj),
2316 ("filter_names", InternalTraversal.dp_string_list),
2317 ("dblink", InternalTraversal.dp_string),
2318 )
2319 def _get_all_objects(
2320 self, connection, schema, scope, kind, filter_names, dblink, **kw
2321 ):
2322 owner = self.denormalize_schema_name(
2323 schema or self.default_schema_name
2324 )
2325
2326 has_filter_names, params = self._prepare_filter_names(filter_names)
2327 has_mat_views = False
2328 if (
2329 ObjectKind.TABLE in kind
2330 and ObjectKind.MATERIALIZED_VIEW not in kind
2331 ):
2332 # see note in _all_objects_query
2333 mat_views = self.get_materialized_view_names(
2334 connection, schema, dblink, _normalize=False, **kw
2335 )
2336 if mat_views:
2337 params["mat_views"] = mat_views
2338 has_mat_views = True
2339
2340 query = self._all_objects_query(
2341 owner, scope, kind, has_filter_names, has_mat_views
2342 )
2343
2344 result = self._execute_reflection(
2345 connection, query, dblink, returns_long=False, params=params
2346 ).scalars()
2347
2348 return result.all()
2349
2350 def _handle_synonyms_decorator(fn):
2351 @wraps(fn)
2352 def wrapper(self, *args, **kwargs):
2353 return self._handle_synonyms(fn, *args, **kwargs)
2354
2355 return wrapper
2356
2357 def _handle_synonyms(self, fn, connection, *args, **kwargs):
2358 if not kwargs.get("oracle_resolve_synonyms", False):
2359 return fn(self, connection, *args, **kwargs)
2360
2361 original_kw = kwargs.copy()
2362 schema = kwargs.pop("schema", None)
2363 result = self._get_synonyms(
2364 connection,
2365 schema=schema,
2366 filter_names=kwargs.pop("filter_names", None),
2367 dblink=kwargs.pop("dblink", None),
2368 info_cache=kwargs.get("info_cache", None),
2369 )
2370
2371 dblinks_owners = defaultdict(dict)
2372 for row in result:
2373 key = row["db_link"], row["table_owner"]
2374 tn = self.normalize_name(row["table_name"])
2375 dblinks_owners[key][tn] = row["synonym_name"]
2376
2377 if not dblinks_owners:
2378 # No synonym, do the plain thing
2379 return fn(self, connection, *args, **original_kw)
2380
2381 data = {}
2382 for (dblink, table_owner), mapping in dblinks_owners.items():
2383 call_kw = {
2384 **original_kw,
2385 "schema": table_owner,
2386 "dblink": self.normalize_name(dblink),
2387 "filter_names": mapping.keys(),
2388 }
2389 call_result = fn(self, connection, *args, **call_kw)
2390 for (_, tn), value in call_result:
2391 synonym_name = self.normalize_name(mapping[tn])
2392 data[(schema, synonym_name)] = value
2393 return data.items()
2394
2395 @reflection.cache
2396 def get_schema_names(self, connection, dblink=None, **kw):
2397 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2398 query = select(dictionary.all_users.c.username).order_by(
2399 dictionary.all_users.c.username
2400 )
2401 result = self._execute_reflection(
2402 connection, query, dblink, returns_long=False
2403 ).scalars()
2404 return [self.normalize_name(row) for row in result]
2405
2406 @reflection.cache
2407 def get_table_names(self, connection, schema=None, dblink=None, **kw):
2408 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2409 # note that table_names() isn't loading DBLINKed or synonym'ed tables
2410 if schema is None:
2411 schema = self.default_schema_name
2412
2413 den_schema = self.denormalize_schema_name(schema)
2414 if kw.get("oracle_resolve_synonyms", False):
2415 tables = (
2416 select(
2417 dictionary.all_tables.c.table_name,
2418 dictionary.all_tables.c.owner,
2419 dictionary.all_tables.c.iot_name,
2420 dictionary.all_tables.c.duration,
2421 dictionary.all_tables.c.tablespace_name,
2422 )
2423 .union_all(
2424 select(
2425 dictionary.all_synonyms.c.synonym_name.label(
2426 "table_name"
2427 ),
2428 dictionary.all_synonyms.c.owner,
2429 dictionary.all_tables.c.iot_name,
2430 dictionary.all_tables.c.duration,
2431 dictionary.all_tables.c.tablespace_name,
2432 )
2433 .select_from(dictionary.all_tables)
2434 .join(
2435 dictionary.all_synonyms,
2436 and_(
2437 dictionary.all_tables.c.table_name
2438 == dictionary.all_synonyms.c.table_name,
2439 dictionary.all_tables.c.owner
2440 == func.coalesce(
2441 dictionary.all_synonyms.c.table_owner,
2442 dictionary.all_synonyms.c.owner,
2443 ),
2444 ),
2445 )
2446 )
2447 .subquery("available_tables")
2448 )
2449 else:
2450 tables = dictionary.all_tables
2451
2452 query = select(tables.c.table_name)
2453 if self.exclude_tablespaces:
2454 query = query.where(
2455 func.coalesce(
2456 tables.c.tablespace_name, "no tablespace"
2457 ).not_in(self.exclude_tablespaces)
2458 )
2459 query = query.where(
2460 tables.c.owner == den_schema,
2461 tables.c.iot_name.is_(null()),
2462 tables.c.duration.is_(null()),
2463 )
2464
2465 # remove materialized views
2466 mat_query = select(
2467 dictionary.all_mviews.c.mview_name.label("table_name")
2468 ).where(dictionary.all_mviews.c.owner == den_schema)
2469
2470 query = (
2471 query.except_all(mat_query)
2472 if self._supports_except_all
2473 else query.except_(mat_query)
2474 )
2475
2476 result = self._execute_reflection(
2477 connection, query, dblink, returns_long=False
2478 ).scalars()
2479 return [self.normalize_name(row) for row in result]
2480
2481 @reflection.cache
2482 def get_temp_table_names(self, connection, dblink=None, **kw):
2483 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2484 schema = self.denormalize_schema_name(self.default_schema_name)
2485
2486 query = select(dictionary.all_tables.c.table_name)
2487 if self.exclude_tablespaces:
2488 query = query.where(
2489 func.coalesce(
2490 dictionary.all_tables.c.tablespace_name, "no tablespace"
2491 ).not_in(self.exclude_tablespaces)
2492 )
2493 query = query.where(
2494 dictionary.all_tables.c.owner == schema,
2495 dictionary.all_tables.c.iot_name.is_(null()),
2496 dictionary.all_tables.c.duration.is_not(null()),
2497 )
2498
2499 result = self._execute_reflection(
2500 connection, query, dblink, returns_long=False
2501 ).scalars()
2502 return [self.normalize_name(row) for row in result]
2503
2504 @reflection.cache
2505 def get_materialized_view_names(
2506 self, connection, schema=None, dblink=None, _normalize=True, **kw
2507 ):
2508 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2509 if not schema:
2510 schema = self.default_schema_name
2511
2512 query = select(dictionary.all_mviews.c.mview_name).where(
2513 dictionary.all_mviews.c.owner
2514 == self.denormalize_schema_name(schema)
2515 )
2516 result = self._execute_reflection(
2517 connection, query, dblink, returns_long=False
2518 ).scalars()
2519 if _normalize:
2520 return [self.normalize_name(row) for row in result]
2521 else:
2522 return result.all()
2523
2524 @reflection.cache
2525 def get_view_names(self, connection, schema=None, dblink=None, **kw):
2526 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2527 if not schema:
2528 schema = self.default_schema_name
2529
2530 query = select(dictionary.all_views.c.view_name).where(
2531 dictionary.all_views.c.owner
2532 == self.denormalize_schema_name(schema)
2533 )
2534 result = self._execute_reflection(
2535 connection, query, dblink, returns_long=False
2536 ).scalars()
2537 return [self.normalize_name(row) for row in result]
2538
2539 @reflection.cache
2540 def get_sequence_names(self, connection, schema=None, dblink=None, **kw):
2541 """Supported kw arguments are: ``dblink`` to reflect via a db link."""
2542 if not schema:
2543 schema = self.default_schema_name
2544 query = select(dictionary.all_sequences.c.sequence_name).where(
2545 dictionary.all_sequences.c.sequence_owner
2546 == self.denormalize_schema_name(schema)
2547 )
2548
2549 result = self._execute_reflection(
2550 connection, query, dblink, returns_long=False
2551 ).scalars()
2552 return [self.normalize_name(row) for row in result]
2553
2554 def _value_or_raise(self, data, table, schema):
2555 table = self.normalize_name(str(table))
2556 try:
2557 return dict(data)[(schema, table)]
2558 except KeyError:
2559 raise exc.NoSuchTableError(
2560 f"{schema}.{table}" if schema else table
2561 ) from None
2562
2563 def _prepare_filter_names(self, filter_names):
2564 if filter_names:
2565 fn = [self.denormalize_name(name) for name in filter_names]
2566 return True, {"filter_names": fn}
2567 else:
2568 return False, {}
2569
2570 @reflection.cache
2571 def get_table_options(self, connection, table_name, schema=None, **kw):
2572 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2573 ``oracle_resolve_synonyms`` to resolve names to synonyms
2574 """
2575 data = self.get_multi_table_options(
2576 connection,
2577 schema=schema,
2578 filter_names=[table_name],
2579 scope=ObjectScope.ANY,
2580 kind=ObjectKind.ANY,
2581 **kw,
2582 )
2583 return self._value_or_raise(data, table_name, schema)
2584
2585 @lru_cache()
2586 def _table_options_query(
2587 self, owner, scope, kind, has_filter_names, has_mat_views
2588 ):
2589 query = select(
2590 dictionary.all_tables.c.table_name,
2591 (
2592 dictionary.all_tables.c.compression
2593 if self._supports_table_compression
2594 else sql.null().label("compression")
2595 ),
2596 (
2597 dictionary.all_tables.c.compress_for
2598 if self._supports_table_compress_for
2599 else sql.null().label("compress_for")
2600 ),
2601 dictionary.all_tables.c.tablespace_name,
2602 ).where(dictionary.all_tables.c.owner == owner)
2603 if has_filter_names:
2604 query = query.where(
2605 dictionary.all_tables.c.table_name.in_(
2606 bindparam("filter_names")
2607 )
2608 )
2609 if scope is ObjectScope.DEFAULT:
2610 query = query.where(dictionary.all_tables.c.duration.is_(null()))
2611 elif scope is ObjectScope.TEMPORARY:
2612 query = query.where(
2613 dictionary.all_tables.c.duration.is_not(null())
2614 )
2615
2616 if (
2617 has_mat_views
2618 and ObjectKind.TABLE in kind
2619 and ObjectKind.MATERIALIZED_VIEW not in kind
2620 ):
2621 # cant use EXCEPT ALL / MINUS here because we don't have an
2622 # excludable row vs. the query above
2623 # outerjoin + where null works better on oracle 21 but 11 does
2624 # not like it at all. this is the next best thing
2625
2626 query = query.where(
2627 dictionary.all_tables.c.table_name.not_in(
2628 bindparam("mat_views")
2629 )
2630 )
2631 elif (
2632 ObjectKind.TABLE not in kind
2633 and ObjectKind.MATERIALIZED_VIEW in kind
2634 ):
2635 query = query.where(
2636 dictionary.all_tables.c.table_name.in_(bindparam("mat_views"))
2637 )
2638 return query
2639
2640 @_handle_synonyms_decorator
2641 def get_multi_table_options(
2642 self,
2643 connection,
2644 *,
2645 schema,
2646 filter_names,
2647 scope,
2648 kind,
2649 dblink=None,
2650 **kw,
2651 ):
2652 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2653 ``oracle_resolve_synonyms`` to resolve names to synonyms
2654 """
2655 owner = self.denormalize_schema_name(
2656 schema or self.default_schema_name
2657 )
2658
2659 has_filter_names, params = self._prepare_filter_names(filter_names)
2660 has_mat_views = False
2661
2662 if (
2663 ObjectKind.TABLE in kind
2664 and ObjectKind.MATERIALIZED_VIEW not in kind
2665 ):
2666 # see note in _table_options_query
2667 mat_views = self.get_materialized_view_names(
2668 connection, schema, dblink, _normalize=False, **kw
2669 )
2670 if mat_views:
2671 params["mat_views"] = mat_views
2672 has_mat_views = True
2673 elif (
2674 ObjectKind.TABLE not in kind
2675 and ObjectKind.MATERIALIZED_VIEW in kind
2676 ):
2677 mat_views = self.get_materialized_view_names(
2678 connection, schema, dblink, _normalize=False, **kw
2679 )
2680 params["mat_views"] = mat_views
2681
2682 options = {}
2683 default = ReflectionDefaults.table_options
2684
2685 if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind:
2686 query = self._table_options_query(
2687 owner, scope, kind, has_filter_names, has_mat_views
2688 )
2689 result = self._execute_reflection(
2690 connection, query, dblink, returns_long=False, params=params
2691 )
2692
2693 for table, compression, compress_for, tablespace in result:
2694 data = default()
2695 if compression == "ENABLED":
2696 data["oracle_compress"] = compress_for
2697 if tablespace:
2698 data["oracle_tablespace"] = tablespace
2699 options[(schema, self.normalize_name(table))] = data
2700 if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope:
2701 # add the views (no temporary views)
2702 for view in self.get_view_names(connection, schema, dblink, **kw):
2703 if not filter_names or view in filter_names:
2704 options[(schema, view)] = default()
2705
2706 return options.items()
2707
2708 @reflection.cache
2709 def get_columns(self, connection, table_name, schema=None, **kw):
2710 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2711 ``oracle_resolve_synonyms`` to resolve names to synonyms
2712 """
2713
2714 data = self.get_multi_columns(
2715 connection,
2716 schema=schema,
2717 filter_names=[table_name],
2718 scope=ObjectScope.ANY,
2719 kind=ObjectKind.ANY,
2720 **kw,
2721 )
2722 return self._value_or_raise(data, table_name, schema)
2723
2724 def _run_batches(
2725 self, connection, query, dblink, returns_long, mappings, all_objects
2726 ):
2727 each_batch = 500
2728 batches = list(all_objects)
2729 while batches:
2730 batch = batches[0:each_batch]
2731 batches[0:each_batch] = []
2732
2733 result = self._execute_reflection(
2734 connection,
2735 query,
2736 dblink,
2737 returns_long=returns_long,
2738 params={"all_objects": batch},
2739 )
2740 if mappings:
2741 yield from result.mappings()
2742 else:
2743 yield from result
2744
2745 @lru_cache()
2746 def _column_query(self, owner):
2747 all_cols = dictionary.all_tab_cols
2748 all_comments = dictionary.all_col_comments
2749 all_ids = dictionary.all_tab_identity_cols
2750
2751 if self.server_version_info >= (12,):
2752 add_cols = (
2753 all_cols.c.default_on_null,
2754 sql.case(
2755 (all_ids.c.table_name.is_(None), sql.null()),
2756 else_=all_ids.c.generation_type
2757 + ","
2758 + all_ids.c.identity_options,
2759 ).label("identity_options"),
2760 )
2761 join_identity_cols = True
2762 else:
2763 add_cols = (
2764 sql.null().label("default_on_null"),
2765 sql.null().label("identity_options"),
2766 )
2767 join_identity_cols = False
2768
2769 # NOTE: on oracle cannot create tables/views without columns and
2770 # a table cannot have all column hidden:
2771 # ORA-54039: table must have at least one column that is not invisible
2772 # all_tab_cols returns data for tables/views/mat-views.
2773 # all_tab_cols does not return recycled tables
2774
2775 query = (
2776 select(
2777 all_cols.c.table_name,
2778 all_cols.c.column_name,
2779 all_cols.c.data_type,
2780 all_cols.c.char_length,
2781 all_cols.c.data_precision,
2782 all_cols.c.data_scale,
2783 all_cols.c.nullable,
2784 all_cols.c.data_default,
2785 all_comments.c.comments,
2786 all_cols.c.virtual_column,
2787 *add_cols,
2788 ).select_from(all_cols)
2789 # NOTE: all_col_comments has a row for each column even if no
2790 # comment is present, so a join could be performed, but there
2791 # seems to be no difference compared to an outer join
2792 .outerjoin(
2793 all_comments,
2794 and_(
2795 all_cols.c.table_name == all_comments.c.table_name,
2796 all_cols.c.column_name == all_comments.c.column_name,
2797 all_cols.c.owner == all_comments.c.owner,
2798 ),
2799 )
2800 )
2801 if join_identity_cols:
2802 query = query.outerjoin(
2803 all_ids,
2804 and_(
2805 all_cols.c.table_name == all_ids.c.table_name,
2806 all_cols.c.column_name == all_ids.c.column_name,
2807 all_cols.c.owner == all_ids.c.owner,
2808 ),
2809 )
2810
2811 query = query.where(
2812 all_cols.c.table_name.in_(bindparam("all_objects")),
2813 all_cols.c.hidden_column == "NO",
2814 all_cols.c.owner == owner,
2815 ).order_by(all_cols.c.table_name, all_cols.c.column_id)
2816 return query
2817
2818 @_handle_synonyms_decorator
2819 def get_multi_columns(
2820 self,
2821 connection,
2822 *,
2823 schema,
2824 filter_names,
2825 scope,
2826 kind,
2827 dblink=None,
2828 **kw,
2829 ):
2830 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2831 ``oracle_resolve_synonyms`` to resolve names to synonyms
2832 """
2833 owner = self.denormalize_schema_name(
2834 schema or self.default_schema_name
2835 )
2836 query = self._column_query(owner)
2837
2838 if (
2839 filter_names
2840 and kind is ObjectKind.ANY
2841 and scope is ObjectScope.ANY
2842 ):
2843 all_objects = [self.denormalize_name(n) for n in filter_names]
2844 else:
2845 all_objects = self._get_all_objects(
2846 connection, schema, scope, kind, filter_names, dblink, **kw
2847 )
2848
2849 columns = defaultdict(list)
2850
2851 # all_tab_cols.data_default is LONG
2852 result = self._run_batches(
2853 connection,
2854 query,
2855 dblink,
2856 returns_long=True,
2857 mappings=True,
2858 all_objects=all_objects,
2859 )
2860
2861 def maybe_int(value):
2862 if isinstance(value, float) and value.is_integer():
2863 return int(value)
2864 else:
2865 return value
2866
2867 remove_size = re.compile(r"\(\d+\)")
2868
2869 for row_dict in result:
2870 table_name = self.normalize_name(row_dict["table_name"])
2871 orig_colname = row_dict["column_name"]
2872 colname = self.normalize_name(orig_colname)
2873 coltype = row_dict["data_type"]
2874 precision = maybe_int(row_dict["data_precision"])
2875
2876 if coltype == "NUMBER":
2877 scale = maybe_int(row_dict["data_scale"])
2878 if precision is None and scale == 0:
2879 coltype = INTEGER()
2880 else:
2881 coltype = NUMBER(precision, scale)
2882 elif coltype == "FLOAT":
2883 # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm
2884 if precision == 126:
2885 # The DOUBLE PRECISION datatype is a floating-point
2886 # number with binary precision 126.
2887 coltype = DOUBLE_PRECISION()
2888 elif precision == 63:
2889 # The REAL datatype is a floating-point number with a
2890 # binary precision of 63, or 18 decimal.
2891 coltype = REAL()
2892 else:
2893 # non standard precision
2894 coltype = FLOAT(binary_precision=precision)
2895
2896 elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
2897 char_length = maybe_int(row_dict["char_length"])
2898 coltype = self.ischema_names.get(coltype)(char_length)
2899 elif "WITH TIME ZONE" in coltype:
2900 coltype = TIMESTAMP(timezone=True)
2901 elif "WITH LOCAL TIME ZONE" in coltype:
2902 coltype = TIMESTAMP(local_timezone=True)
2903 else:
2904 coltype = re.sub(remove_size, "", coltype)
2905 try:
2906 coltype = self.ischema_names[coltype]
2907 except KeyError:
2908 util.warn(
2909 "Did not recognize type '%s' of column '%s'"
2910 % (coltype, colname)
2911 )
2912 coltype = sqltypes.NULLTYPE
2913
2914 default = row_dict["data_default"]
2915 if row_dict["virtual_column"] == "YES":
2916 computed = dict(sqltext=default)
2917 default = None
2918 else:
2919 computed = None
2920
2921 identity_options = row_dict["identity_options"]
2922 if identity_options is not None:
2923 identity = self._parse_identity_options(
2924 identity_options, row_dict["default_on_null"]
2925 )
2926 default = None
2927 else:
2928 identity = None
2929
2930 cdict = {
2931 "name": colname,
2932 "type": coltype,
2933 "nullable": row_dict["nullable"] == "Y",
2934 "default": default,
2935 "comment": row_dict["comments"],
2936 }
2937 if orig_colname.lower() == orig_colname:
2938 cdict["quote"] = True
2939 if computed is not None:
2940 cdict["computed"] = computed
2941 if identity is not None:
2942 cdict["identity"] = identity
2943
2944 columns[(schema, table_name)].append(cdict)
2945
2946 # NOTE: default not needed since all tables have columns
2947 # default = ReflectionDefaults.columns
2948 # return (
2949 # (key, value if value else default())
2950 # for key, value in columns.items()
2951 # )
2952 return columns.items()
2953
2954 def _parse_identity_options(self, identity_options, default_on_null):
2955 # identity_options is a string that starts with 'ALWAYS,' or
2956 # 'BY DEFAULT,' and continues with
2957 # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
2958 # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
2959 # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
2960 parts = [p.strip() for p in identity_options.split(",")]
2961 identity = {
2962 "always": parts[0] == "ALWAYS",
2963 "on_null": default_on_null == "YES",
2964 }
2965
2966 for part in parts[1:]:
2967 option, value = part.split(":")
2968 value = value.strip()
2969
2970 if "START WITH" in option:
2971 identity["start"] = int(value)
2972 elif "INCREMENT BY" in option:
2973 identity["increment"] = int(value)
2974 elif "MAX_VALUE" in option:
2975 identity["maxvalue"] = int(value)
2976 elif "MIN_VALUE" in option:
2977 identity["minvalue"] = int(value)
2978 elif "CYCLE_FLAG" in option:
2979 identity["cycle"] = value == "Y"
2980 elif "CACHE_SIZE" in option:
2981 identity["cache"] = int(value)
2982 elif "ORDER_FLAG" in option:
2983 identity["order"] = value == "Y"
2984 return identity
2985
2986 @reflection.cache
2987 def get_table_comment(self, connection, table_name, schema=None, **kw):
2988 """Supported kw arguments are: ``dblink`` to reflect via a db link;
2989 ``oracle_resolve_synonyms`` to resolve names to synonyms
2990 """
2991 data = self.get_multi_table_comment(
2992 connection,
2993 schema=schema,
2994 filter_names=[table_name],
2995 scope=ObjectScope.ANY,
2996 kind=ObjectKind.ANY,
2997 **kw,
2998 )
2999 return self._value_or_raise(data, table_name, schema)
3000
3001 @lru_cache()
3002 def _comment_query(self, owner, scope, kind, has_filter_names):
3003 # NOTE: all_tab_comments / all_mview_comments have a row for all
3004 # object even if they don't have comments
3005 queries = []
3006 if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind:
3007 # all_tab_comments returns also plain views
3008 tbl_view = select(
3009 dictionary.all_tab_comments.c.table_name,
3010 dictionary.all_tab_comments.c.comments,
3011 ).where(
3012 dictionary.all_tab_comments.c.owner == owner,
3013 dictionary.all_tab_comments.c.table_name.not_like("BIN$%"),
3014 )
3015 if ObjectKind.VIEW not in kind:
3016 tbl_view = tbl_view.where(
3017 dictionary.all_tab_comments.c.table_type == "TABLE"
3018 )
3019 elif ObjectKind.TABLE not in kind:
3020 tbl_view = tbl_view.where(
3021 dictionary.all_tab_comments.c.table_type == "VIEW"
3022 )
3023 queries.append(tbl_view)
3024 if ObjectKind.MATERIALIZED_VIEW in kind:
3025 mat_view = select(
3026 dictionary.all_mview_comments.c.mview_name.label("table_name"),
3027 dictionary.all_mview_comments.c.comments,
3028 ).where(
3029 dictionary.all_mview_comments.c.owner == owner,
3030 dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"),
3031 )
3032 queries.append(mat_view)
3033 if len(queries) == 1:
3034 query = queries[0]
3035 else:
3036 union = sql.union_all(*queries).subquery("tables_and_views")
3037 query = select(union.c.table_name, union.c.comments)
3038
3039 name_col = query.selected_columns.table_name
3040
3041 if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY):
3042 temp = "Y" if scope is ObjectScope.TEMPORARY else "N"
3043 # need distinct since materialized view are listed also
3044 # as tables in all_objects
3045 query = query.distinct().join(
3046 dictionary.all_objects,
3047 and_(
3048 dictionary.all_objects.c.owner == owner,
3049 dictionary.all_objects.c.object_name == name_col,
3050 dictionary.all_objects.c.temporary == temp,
3051 ),
3052 )
3053 if has_filter_names:
3054 query = query.where(name_col.in_(bindparam("filter_names")))
3055 return query
3056
3057 @_handle_synonyms_decorator
3058 def get_multi_table_comment(
3059 self,
3060 connection,
3061 *,
3062 schema,
3063 filter_names,
3064 scope,
3065 kind,
3066 dblink=None,
3067 **kw,
3068 ):
3069 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3070 ``oracle_resolve_synonyms`` to resolve names to synonyms
3071 """
3072 owner = self.denormalize_schema_name(
3073 schema or self.default_schema_name
3074 )
3075 has_filter_names, params = self._prepare_filter_names(filter_names)
3076 query = self._comment_query(owner, scope, kind, has_filter_names)
3077
3078 result = self._execute_reflection(
3079 connection, query, dblink, returns_long=False, params=params
3080 )
3081 default = ReflectionDefaults.table_comment
3082 # materialized views by default seem to have a comment like
3083 # "snapshot table for snapshot owner.mat_view_name"
3084 ignore_mat_view = "snapshot table for snapshot "
3085 return (
3086 (
3087 (schema, self.normalize_name(table)),
3088 (
3089 {"text": comment}
3090 if comment is not None
3091 and not comment.startswith(ignore_mat_view)
3092 else default()
3093 ),
3094 )
3095 for table, comment in result
3096 )
3097
3098 @reflection.cache
3099 def get_indexes(self, connection, table_name, schema=None, **kw):
3100 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3101 ``oracle_resolve_synonyms`` to resolve names to synonyms
3102 """
3103 data = self.get_multi_indexes(
3104 connection,
3105 schema=schema,
3106 filter_names=[table_name],
3107 scope=ObjectScope.ANY,
3108 kind=ObjectKind.ANY,
3109 **kw,
3110 )
3111 return self._value_or_raise(data, table_name, schema)
3112
3113 @lru_cache()
3114 def _index_query(self, owner):
3115 return (
3116 select(
3117 dictionary.all_ind_columns.c.table_name,
3118 dictionary.all_ind_columns.c.index_name,
3119 dictionary.all_ind_columns.c.column_name,
3120 dictionary.all_indexes.c.index_type,
3121 dictionary.all_indexes.c.uniqueness,
3122 dictionary.all_indexes.c.compression,
3123 dictionary.all_indexes.c.prefix_length,
3124 dictionary.all_ind_columns.c.descend,
3125 dictionary.all_ind_expressions.c.column_expression,
3126 )
3127 .select_from(dictionary.all_ind_columns)
3128 .join(
3129 dictionary.all_indexes,
3130 sql.and_(
3131 dictionary.all_ind_columns.c.index_name
3132 == dictionary.all_indexes.c.index_name,
3133 dictionary.all_ind_columns.c.index_owner
3134 == dictionary.all_indexes.c.owner,
3135 ),
3136 )
3137 .outerjoin(
3138 # NOTE: this adds about 20% to the query time. Using a
3139 # case expression with a scalar subquery only when needed
3140 # with the assumption that most indexes are not expression
3141 # would be faster but oracle does not like that with
3142 # LONG datatype. It errors with:
3143 # ORA-00997: illegal use of LONG datatype
3144 dictionary.all_ind_expressions,
3145 sql.and_(
3146 dictionary.all_ind_expressions.c.index_name
3147 == dictionary.all_ind_columns.c.index_name,
3148 dictionary.all_ind_expressions.c.index_owner
3149 == dictionary.all_ind_columns.c.index_owner,
3150 dictionary.all_ind_expressions.c.column_position
3151 == dictionary.all_ind_columns.c.column_position,
3152 ),
3153 )
3154 .where(
3155 dictionary.all_indexes.c.table_owner == owner,
3156 dictionary.all_indexes.c.table_name.in_(
3157 bindparam("all_objects")
3158 ),
3159 )
3160 .order_by(
3161 dictionary.all_ind_columns.c.index_name,
3162 dictionary.all_ind_columns.c.column_position,
3163 )
3164 )
3165
3166 @reflection.flexi_cache(
3167 ("schema", InternalTraversal.dp_string),
3168 ("dblink", InternalTraversal.dp_string),
3169 ("all_objects", InternalTraversal.dp_string_list),
3170 )
3171 def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw):
3172 owner = self.denormalize_schema_name(
3173 schema or self.default_schema_name
3174 )
3175
3176 query = self._index_query(owner)
3177
3178 pks = {
3179 row_dict["constraint_name"]
3180 for row_dict in self._get_all_constraint_rows(
3181 connection, schema, dblink, all_objects, **kw
3182 )
3183 if row_dict["constraint_type"] == "P"
3184 }
3185
3186 # all_ind_expressions.column_expression is LONG
3187 result = self._run_batches(
3188 connection,
3189 query,
3190 dblink,
3191 returns_long=True,
3192 mappings=True,
3193 all_objects=all_objects,
3194 )
3195
3196 return [
3197 row_dict
3198 for row_dict in result
3199 if row_dict["index_name"] not in pks
3200 ]
3201
3202 @_handle_synonyms_decorator
3203 def get_multi_indexes(
3204 self,
3205 connection,
3206 *,
3207 schema,
3208 filter_names,
3209 scope,
3210 kind,
3211 dblink=None,
3212 **kw,
3213 ):
3214 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3215 ``oracle_resolve_synonyms`` to resolve names to synonyms
3216 """
3217 all_objects = self._get_all_objects(
3218 connection, schema, scope, kind, filter_names, dblink, **kw
3219 )
3220
3221 uniqueness = {"NONUNIQUE": False, "UNIQUE": True}
3222 enabled = {"DISABLED": False, "ENABLED": True}
3223 is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"}
3224
3225 indexes = defaultdict(dict)
3226
3227 for row_dict in self._get_indexes_rows(
3228 connection, schema, dblink, all_objects, **kw
3229 ):
3230 index_name = self.normalize_name(row_dict["index_name"])
3231 table_name = self.normalize_name(row_dict["table_name"])
3232 table_indexes = indexes[(schema, table_name)]
3233
3234 if index_name not in table_indexes:
3235 table_indexes[index_name] = index_dict = {
3236 "name": index_name,
3237 "column_names": [],
3238 "dialect_options": {},
3239 "unique": uniqueness.get(row_dict["uniqueness"], False),
3240 }
3241 do = index_dict["dialect_options"]
3242 if row_dict["index_type"] in is_bitmap:
3243 do["oracle_bitmap"] = True
3244 if enabled.get(row_dict["compression"], False):
3245 do["oracle_compress"] = row_dict["prefix_length"]
3246
3247 else:
3248 index_dict = table_indexes[index_name]
3249
3250 expr = row_dict["column_expression"]
3251 if expr is not None:
3252 index_dict["column_names"].append(None)
3253 if "expressions" in index_dict:
3254 index_dict["expressions"].append(expr)
3255 else:
3256 index_dict["expressions"] = index_dict["column_names"][:-1]
3257 index_dict["expressions"].append(expr)
3258
3259 if row_dict["descend"].lower() != "asc":
3260 assert row_dict["descend"].lower() == "desc"
3261 cs = index_dict.setdefault("column_sorting", {})
3262 cs[expr] = ("desc",)
3263 else:
3264 assert row_dict["descend"].lower() == "asc"
3265 cn = self.normalize_name(row_dict["column_name"])
3266 index_dict["column_names"].append(cn)
3267 if "expressions" in index_dict:
3268 index_dict["expressions"].append(cn)
3269
3270 default = ReflectionDefaults.indexes
3271
3272 return (
3273 (key, list(indexes[key].values()) if key in indexes else default())
3274 for key in (
3275 (schema, self.normalize_name(obj_name))
3276 for obj_name in all_objects
3277 )
3278 )
3279
3280 @reflection.cache
3281 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
3282 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3283 ``oracle_resolve_synonyms`` to resolve names to synonyms
3284 """
3285 data = self.get_multi_pk_constraint(
3286 connection,
3287 schema=schema,
3288 filter_names=[table_name],
3289 scope=ObjectScope.ANY,
3290 kind=ObjectKind.ANY,
3291 **kw,
3292 )
3293 return self._value_or_raise(data, table_name, schema)
3294
3295 @lru_cache()
3296 def _constraint_query(self, owner):
3297 local = dictionary.all_cons_columns.alias("local")
3298 remote = dictionary.all_cons_columns.alias("remote")
3299 return (
3300 select(
3301 dictionary.all_constraints.c.table_name,
3302 dictionary.all_constraints.c.constraint_type,
3303 dictionary.all_constraints.c.constraint_name,
3304 local.c.column_name.label("local_column"),
3305 remote.c.table_name.label("remote_table"),
3306 remote.c.column_name.label("remote_column"),
3307 remote.c.owner.label("remote_owner"),
3308 dictionary.all_constraints.c.search_condition,
3309 dictionary.all_constraints.c.delete_rule,
3310 )
3311 .select_from(dictionary.all_constraints)
3312 .join(
3313 local,
3314 and_(
3315 local.c.owner == dictionary.all_constraints.c.owner,
3316 dictionary.all_constraints.c.constraint_name
3317 == local.c.constraint_name,
3318 ),
3319 )
3320 .outerjoin(
3321 remote,
3322 and_(
3323 dictionary.all_constraints.c.r_owner == remote.c.owner,
3324 dictionary.all_constraints.c.r_constraint_name
3325 == remote.c.constraint_name,
3326 or_(
3327 remote.c.position.is_(sql.null()),
3328 local.c.position == remote.c.position,
3329 ),
3330 ),
3331 )
3332 .where(
3333 dictionary.all_constraints.c.owner == owner,
3334 dictionary.all_constraints.c.table_name.in_(
3335 bindparam("all_objects")
3336 ),
3337 dictionary.all_constraints.c.constraint_type.in_(
3338 ("R", "P", "U", "C")
3339 ),
3340 )
3341 .order_by(
3342 dictionary.all_constraints.c.constraint_name, local.c.position
3343 )
3344 )
3345
3346 @reflection.flexi_cache(
3347 ("schema", InternalTraversal.dp_string),
3348 ("dblink", InternalTraversal.dp_string),
3349 ("all_objects", InternalTraversal.dp_string_list),
3350 )
3351 def _get_all_constraint_rows(
3352 self, connection, schema, dblink, all_objects, **kw
3353 ):
3354 owner = self.denormalize_schema_name(
3355 schema or self.default_schema_name
3356 )
3357 query = self._constraint_query(owner)
3358
3359 # since the result is cached a list must be created
3360 values = list(
3361 self._run_batches(
3362 connection,
3363 query,
3364 dblink,
3365 returns_long=False,
3366 mappings=True,
3367 all_objects=all_objects,
3368 )
3369 )
3370 return values
3371
3372 @_handle_synonyms_decorator
3373 def get_multi_pk_constraint(
3374 self,
3375 connection,
3376 *,
3377 scope,
3378 schema,
3379 filter_names,
3380 kind,
3381 dblink=None,
3382 **kw,
3383 ):
3384 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3385 ``oracle_resolve_synonyms`` to resolve names to synonyms
3386 """
3387 all_objects = self._get_all_objects(
3388 connection, schema, scope, kind, filter_names, dblink, **kw
3389 )
3390
3391 primary_keys = defaultdict(dict)
3392 default = ReflectionDefaults.pk_constraint
3393
3394 for row_dict in self._get_all_constraint_rows(
3395 connection, schema, dblink, all_objects, **kw
3396 ):
3397 if row_dict["constraint_type"] != "P":
3398 continue
3399 table_name = self.normalize_name(row_dict["table_name"])
3400 constraint_name = self.normalize_name(row_dict["constraint_name"])
3401 column_name = self.normalize_name(row_dict["local_column"])
3402
3403 table_pk = primary_keys[(schema, table_name)]
3404 if not table_pk:
3405 table_pk["name"] = constraint_name
3406 table_pk["constrained_columns"] = [column_name]
3407 else:
3408 table_pk["constrained_columns"].append(column_name)
3409
3410 return (
3411 (key, primary_keys[key] if key in primary_keys else default())
3412 for key in (
3413 (schema, self.normalize_name(obj_name))
3414 for obj_name in all_objects
3415 )
3416 )
3417
3418 @reflection.cache
3419 def get_foreign_keys(
3420 self,
3421 connection,
3422 table_name,
3423 schema=None,
3424 **kw,
3425 ):
3426 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3427 ``oracle_resolve_synonyms`` to resolve names to synonyms
3428 """
3429 data = self.get_multi_foreign_keys(
3430 connection,
3431 schema=schema,
3432 filter_names=[table_name],
3433 scope=ObjectScope.ANY,
3434 kind=ObjectKind.ANY,
3435 **kw,
3436 )
3437 return self._value_or_raise(data, table_name, schema)
3438
3439 @_handle_synonyms_decorator
3440 def get_multi_foreign_keys(
3441 self,
3442 connection,
3443 *,
3444 scope,
3445 schema,
3446 filter_names,
3447 kind,
3448 dblink=None,
3449 **kw,
3450 ):
3451 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3452 ``oracle_resolve_synonyms`` to resolve names to synonyms
3453 """
3454 all_objects = self._get_all_objects(
3455 connection, schema, scope, kind, filter_names, dblink, **kw
3456 )
3457
3458 resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
3459
3460 owner = self.denormalize_schema_name(
3461 schema or self.default_schema_name
3462 )
3463
3464 all_remote_owners = set()
3465 fkeys = defaultdict(dict)
3466
3467 for row_dict in self._get_all_constraint_rows(
3468 connection, schema, dblink, all_objects, **kw
3469 ):
3470 if row_dict["constraint_type"] != "R":
3471 continue
3472
3473 table_name = self.normalize_name(row_dict["table_name"])
3474 constraint_name = self.normalize_name(row_dict["constraint_name"])
3475 table_fkey = fkeys[(schema, table_name)]
3476
3477 assert constraint_name is not None
3478
3479 local_column = self.normalize_name(row_dict["local_column"])
3480 remote_table = self.normalize_name(row_dict["remote_table"])
3481 remote_column = self.normalize_name(row_dict["remote_column"])
3482 remote_owner_orig = row_dict["remote_owner"]
3483 remote_owner = self.normalize_name(remote_owner_orig)
3484 if remote_owner_orig is not None:
3485 all_remote_owners.add(remote_owner_orig)
3486
3487 if remote_table is None:
3488 # ticket 363
3489 if dblink and not dblink.startswith("@"):
3490 dblink = f"@{dblink}"
3491 util.warn(
3492 "Got 'None' querying 'table_name' from "
3493 f"all_cons_columns{dblink or ''} - does the user have "
3494 "proper rights to the table?"
3495 )
3496 continue
3497
3498 if constraint_name not in table_fkey:
3499 table_fkey[constraint_name] = fkey = {
3500 "name": constraint_name,
3501 "constrained_columns": [],
3502 "referred_schema": None,
3503 "referred_table": remote_table,
3504 "referred_columns": [],
3505 "options": {},
3506 }
3507
3508 if resolve_synonyms:
3509 # will be removed below
3510 fkey["_ref_schema"] = remote_owner
3511
3512 if schema is not None or remote_owner_orig != owner:
3513 fkey["referred_schema"] = remote_owner
3514
3515 delete_rule = row_dict["delete_rule"]
3516 if delete_rule != "NO ACTION":
3517 fkey["options"]["ondelete"] = delete_rule
3518
3519 else:
3520 fkey = table_fkey[constraint_name]
3521
3522 fkey["constrained_columns"].append(local_column)
3523 fkey["referred_columns"].append(remote_column)
3524
3525 if resolve_synonyms and all_remote_owners:
3526 query = select(
3527 dictionary.all_synonyms.c.owner,
3528 dictionary.all_synonyms.c.table_name,
3529 dictionary.all_synonyms.c.table_owner,
3530 dictionary.all_synonyms.c.synonym_name,
3531 ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners))
3532
3533 result = self._execute_reflection(
3534 connection, query, dblink, returns_long=False
3535 ).mappings()
3536
3537 remote_owners_lut = {}
3538 for row in result:
3539 synonym_owner = self.normalize_name(row["owner"])
3540 table_name = self.normalize_name(row["table_name"])
3541
3542 remote_owners_lut[(synonym_owner, table_name)] = (
3543 row["table_owner"],
3544 row["synonym_name"],
3545 )
3546
3547 empty = (None, None)
3548 for table_fkeys in fkeys.values():
3549 for table_fkey in table_fkeys.values():
3550 key = (
3551 table_fkey.pop("_ref_schema"),
3552 table_fkey["referred_table"],
3553 )
3554 remote_owner, syn_name = remote_owners_lut.get(key, empty)
3555 if syn_name:
3556 sn = self.normalize_name(syn_name)
3557 table_fkey["referred_table"] = sn
3558 if schema is not None or remote_owner != owner:
3559 ro = self.normalize_name(remote_owner)
3560 table_fkey["referred_schema"] = ro
3561 else:
3562 table_fkey["referred_schema"] = None
3563 default = ReflectionDefaults.foreign_keys
3564
3565 return (
3566 (key, list(fkeys[key].values()) if key in fkeys else default())
3567 for key in (
3568 (schema, self.normalize_name(obj_name))
3569 for obj_name in all_objects
3570 )
3571 )
3572
3573 @reflection.cache
3574 def get_unique_constraints(
3575 self, connection, table_name, schema=None, **kw
3576 ):
3577 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3578 ``oracle_resolve_synonyms`` to resolve names to synonyms
3579 """
3580 data = self.get_multi_unique_constraints(
3581 connection,
3582 schema=schema,
3583 filter_names=[table_name],
3584 scope=ObjectScope.ANY,
3585 kind=ObjectKind.ANY,
3586 **kw,
3587 )
3588 return self._value_or_raise(data, table_name, schema)
3589
3590 @_handle_synonyms_decorator
3591 def get_multi_unique_constraints(
3592 self,
3593 connection,
3594 *,
3595 scope,
3596 schema,
3597 filter_names,
3598 kind,
3599 dblink=None,
3600 **kw,
3601 ):
3602 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3603 ``oracle_resolve_synonyms`` to resolve names to synonyms
3604 """
3605 all_objects = self._get_all_objects(
3606 connection, schema, scope, kind, filter_names, dblink, **kw
3607 )
3608
3609 unique_cons = defaultdict(dict)
3610
3611 index_names = {
3612 row_dict["index_name"]
3613 for row_dict in self._get_indexes_rows(
3614 connection, schema, dblink, all_objects, **kw
3615 )
3616 }
3617
3618 for row_dict in self._get_all_constraint_rows(
3619 connection, schema, dblink, all_objects, **kw
3620 ):
3621 if row_dict["constraint_type"] != "U":
3622 continue
3623 table_name = self.normalize_name(row_dict["table_name"])
3624 constraint_name_orig = row_dict["constraint_name"]
3625 constraint_name = self.normalize_name(constraint_name_orig)
3626 column_name = self.normalize_name(row_dict["local_column"])
3627 table_uc = unique_cons[(schema, table_name)]
3628
3629 assert constraint_name is not None
3630
3631 if constraint_name not in table_uc:
3632 table_uc[constraint_name] = uc = {
3633 "name": constraint_name,
3634 "column_names": [],
3635 "duplicates_index": (
3636 constraint_name
3637 if constraint_name_orig in index_names
3638 else None
3639 ),
3640 }
3641 else:
3642 uc = table_uc[constraint_name]
3643
3644 uc["column_names"].append(column_name)
3645
3646 default = ReflectionDefaults.unique_constraints
3647
3648 return (
3649 (
3650 key,
3651 (
3652 list(unique_cons[key].values())
3653 if key in unique_cons
3654 else default()
3655 ),
3656 )
3657 for key in (
3658 (schema, self.normalize_name(obj_name))
3659 for obj_name in all_objects
3660 )
3661 )
3662
3663 @reflection.cache
3664 def get_view_definition(
3665 self,
3666 connection,
3667 view_name,
3668 schema=None,
3669 dblink=None,
3670 **kw,
3671 ):
3672 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3673 ``oracle_resolve_synonyms`` to resolve names to synonyms
3674 """
3675 if kw.get("oracle_resolve_synonyms", False):
3676 synonyms = self._get_synonyms(
3677 connection, schema, filter_names=[view_name], dblink=dblink
3678 )
3679 if synonyms:
3680 assert len(synonyms) == 1
3681 row_dict = synonyms[0]
3682 dblink = self.normalize_name(row_dict["db_link"])
3683 schema = row_dict["table_owner"]
3684 view_name = row_dict["table_name"]
3685
3686 name = self.denormalize_name(view_name)
3687 owner = self.denormalize_schema_name(
3688 schema or self.default_schema_name
3689 )
3690 query = (
3691 select(dictionary.all_views.c.text)
3692 .where(
3693 dictionary.all_views.c.view_name == name,
3694 dictionary.all_views.c.owner == owner,
3695 )
3696 .union_all(
3697 select(dictionary.all_mviews.c.query).where(
3698 dictionary.all_mviews.c.mview_name == name,
3699 dictionary.all_mviews.c.owner == owner,
3700 )
3701 )
3702 )
3703
3704 rp = self._execute_reflection(
3705 connection, query, dblink, returns_long=False
3706 ).scalar()
3707 if rp is None:
3708 raise exc.NoSuchTableError(
3709 f"{schema}.{view_name}" if schema else view_name
3710 )
3711 else:
3712 return rp
3713
3714 @reflection.cache
3715 def get_check_constraints(
3716 self, connection, table_name, schema=None, include_all=False, **kw
3717 ):
3718 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3719 ``oracle_resolve_synonyms`` to resolve names to synonyms
3720 """
3721 data = self.get_multi_check_constraints(
3722 connection,
3723 schema=schema,
3724 filter_names=[table_name],
3725 scope=ObjectScope.ANY,
3726 include_all=include_all,
3727 kind=ObjectKind.ANY,
3728 **kw,
3729 )
3730 return self._value_or_raise(data, table_name, schema)
3731
3732 @_handle_synonyms_decorator
3733 def get_multi_check_constraints(
3734 self,
3735 connection,
3736 *,
3737 schema,
3738 filter_names,
3739 dblink=None,
3740 scope,
3741 kind,
3742 include_all=False,
3743 **kw,
3744 ):
3745 """Supported kw arguments are: ``dblink`` to reflect via a db link;
3746 ``oracle_resolve_synonyms`` to resolve names to synonyms
3747 """
3748 all_objects = self._get_all_objects(
3749 connection, schema, scope, kind, filter_names, dblink, **kw
3750 )
3751
3752 not_null = re.compile(r"..+?. IS NOT NULL$")
3753
3754 check_constraints = defaultdict(list)
3755
3756 for row_dict in self._get_all_constraint_rows(
3757 connection, schema, dblink, all_objects, **kw
3758 ):
3759 if row_dict["constraint_type"] != "C":
3760 continue
3761 table_name = self.normalize_name(row_dict["table_name"])
3762 constraint_name = self.normalize_name(row_dict["constraint_name"])
3763 search_condition = row_dict["search_condition"]
3764
3765 table_checks = check_constraints[(schema, table_name)]
3766 if constraint_name is not None and (
3767 include_all or not not_null.match(search_condition)
3768 ):
3769 table_checks.append(
3770 {"name": constraint_name, "sqltext": search_condition}
3771 )
3772
3773 default = ReflectionDefaults.check_constraints
3774
3775 return (
3776 (
3777 key,
3778 (
3779 check_constraints[key]
3780 if key in check_constraints
3781 else default()
3782 ),
3783 )
3784 for key in (
3785 (schema, self.normalize_name(obj_name))
3786 for obj_name in all_objects
3787 )
3788 )
3789
3790 def _list_dblinks(self, connection, dblink=None):
3791 query = select(dictionary.all_db_links.c.db_link)
3792 links = self._execute_reflection(
3793 connection, query, dblink, returns_long=False
3794 ).scalars()
3795 return [self.normalize_name(link) for link in links]
3796
3797
3798class _OuterJoinColumn(sql.ClauseElement):
3799 __visit_name__ = "outer_join_column"
3800
3801 def __init__(self, column):
3802 self.column = column